Address pull requests comments, fix a few TODOs

This commit is contained in:
hrxi 2019-07-12 01:53:28 +02:00
parent 82e7cf4447
commit 4b6fe6bf7f
3 changed files with 46 additions and 14 deletions

View file

@ -66,7 +66,7 @@ public class FileManager : StreamInteractionModule, Object {
foreach (FileSender file_sender in file_senders) {
if (file_sender.can_send(conversation, file_transfer)) {
file_sender.send_file(conversation, file_transfer);
return;
break;
}
}
received_file(file_transfer, conversation);

View file

@ -45,9 +45,9 @@ public class JingleFileManager : StreamInteractionModule, FileProvider, FileSend
file_transfer.local_time = new DateTime.now_utc();
file_transfer.direction = FileTransfer.DIRECTION_RECEIVED;
file_transfer.file_name = jingle_file_transfer.file_name;
file_transfer.size = (int)jingle_file_transfer.size; // TODO(hrxi): remove cast
file_transfer.size = (int)jingle_file_transfer.size;
file_transfer.state = FileTransfer.State.NOT_STARTED;
file_transfer.provider = 0; // TODO(hrxi): what is this?
file_transfer.provider = 1;
file_transfer.info = id;
file_transfers[id] = jingle_file_transfer;
@ -56,7 +56,8 @@ public class JingleFileManager : StreamInteractionModule, FileProvider, FileSend
}
async void get_meta_info(FileTransfer file_transfer) {
// TODO(hrxi): what is this function?
// In Jingle, all the metadata is provided up-front, so there's no more
// metadata to get.
}
async void download(FileTransfer file_transfer, File file_) {
// TODO(hrxi) What should happen if `stream == null`?
@ -112,7 +113,7 @@ public class JingleFileManager : StreamInteractionModule, FileProvider, FileSend
if (!stream.get_module(Xep.JingleFileTransfer.Module.IDENTITY).is_available(stream, full_jid)) {
continue;
}
stream.get_module(Xep.JingleFileTransfer.Module.IDENTITY).offer_file_stream(stream, full_jid, file_transfer.input_stream, file_transfer.file_name, file_transfer.size);
stream.get_module(Xep.JingleFileTransfer.Module.IDENTITY).offer_file_stream.begin(stream, full_jid, file_transfer.input_stream, file_transfer.file_name, file_transfer.size);
return;
}
}

View file

@ -127,8 +127,14 @@ public class Connection : IOStream {
XmppStream stream;
int read_callback_priority;
Cancellable? read_callback_cancellable = null;
ulong read_callback_cancellable_id;
SourceFunc? read_callback = null;
int write_callback_priority;
SourceFunc? write_callback = null;
ulong write_callback_cancellable_id;
Cancellable? write_callback_cancellable = null;
// Need `Bytes` instead of `uint8[]` because the latter doesn't work in
// parameter position of `LinkedList`.
LinkedList<Bytes> received = new LinkedList<Bytes>();
@ -144,35 +150,54 @@ public class Connection : IOStream {
output = new Output(this);
}
public void set_read_calllback(SourceFunc callback) throws IOError {
public void set_read_callback(SourceFunc callback, Cancellable? cancellable, int io_priority) throws IOError {
if (read_callback != null) {
throw new IOError.PENDING("only one async read is permitted at a time on an in-band bytestream");
}
read_callback = callback;
if (cancellable != null) {
read_callback_cancellable_id = cancellable.connect(trigger_read_callback);
}
public void set_write_calllback(SourceFunc callback) throws IOError {
read_callback = callback;
read_callback_cancellable = cancellable;
read_callback_priority = io_priority;
}
public void set_write_callback(SourceFunc callback, Cancellable? cancellable, int io_priority) throws IOError {
if (write_callback != null) {
throw new IOError.PENDING("only one async write is permitted at a time on an in-band bytestream");
}
if (cancellable != null) {
write_callback_cancellable_id = cancellable.connect(trigger_write_callback);
}
write_callback = callback;
write_callback_cancellable = cancellable;
write_callback_priority = io_priority;
}
public void trigger_read_callback() {
if (read_callback != null) {
Idle.add((owned) read_callback);
Idle.add((owned) read_callback, read_callback_priority);
read_callback = null;
if (read_callback_cancellable != null) {
read_callback_cancellable.disconnect(read_callback_cancellable_id);
}
read_callback_cancellable = null;
}
}
public void trigger_write_callback() {
if (write_callback != null) {
Idle.add((owned) write_callback);
Idle.add((owned) write_callback, write_callback_priority);
write_callback = null;
if (write_callback_cancellable != null) {
write_callback_cancellable.disconnect(write_callback_cancellable_id);
}
write_callback_cancellable = null;
}
}
public async ssize_t read_async(uint8[]? buffer, int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError {
// TODO(hrxi): cancellable?
// TODO(hrxi): io_priority?
while (true) {
if (cancellable != null) {
cancellable.set_error_if_cancelled();
}
if (input_closed) {
return 0;
}
@ -190,14 +215,17 @@ public class Connection : IOStream {
if (state == DISCONNECTED) {
return 0;
}
set_read_calllback(read_async.callback);
set_read_callback(read_async.callback, cancellable, io_priority);
yield;
}
}
public async ssize_t write_async(uint8[]? buffer, int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError {
while (state == WAITING_FOR_CONNECT || state == CONNECTING) {
set_write_calllback(write_async.callback);
if (cancellable != null) {
cancellable.set_error_if_cancelled();
}
set_write_callback(write_async.callback, cancellable, io_priority);
yield;
}
throw_if_closed();
@ -214,7 +242,7 @@ public class Connection : IOStream {
.put_attribute("seq", seq.to_string())
.put_node(new StanzaNode.text(Base64.encode(buffer)));
Iq.Stanza iq = new Iq.Stanza.set(data) { to=receiver_full_jid };
set_write_calllback(write_async.callback);
set_write_callback(write_async.callback, cancellable, io_priority);
stream.get_module(Iq.Module.IDENTITY).send_iq(stream, iq, (stream, iq) => {
if (iq.is_error()) {
set_error("sending failed");
@ -228,6 +256,9 @@ public class Connection : IOStream {
}
});
yield;
if (cancellable != null) {
cancellable.set_error_if_cancelled();
}
throw_if_error();
return buffer.length;
}