Implement file sending via Jingle

This is still disabled by default until prioritization is implemented;
otherwise this could be preferred to HTTP uploads.

File sending only works via Jingle In-Band-Bytestreams right now, more
transports are going to be implemented.

To test this, uncomment the line with `JingleFileTransfer` in
libdino/src/application.vala.
This commit is contained in:
hrxi 2019-06-23 14:53:18 +02:00
parent 6c480b862e
commit 877c46628f
12 changed files with 743 additions and 4 deletions

View file

@ -35,6 +35,7 @@ SOURCES
src/service/database.vala
src/service/entity_capabilities_storage.vala
src/service/file_manager.vala
src/service/jingle_file_manager.vala
src/service/message_processor.vala
src/service/message_storage.vala
src/service/module_manager.vala

View file

@ -37,6 +37,7 @@ public interface Dino.Application : GLib.Application {
RosterManager.start(stream_interactor, db);
ChatInteraction.start(stream_interactor);
FileManager.start(stream_interactor, db);
//JingleFileManager.start(stream_interactor); // TODO(hrxi): Activate
ContentItemStore.start(stream_interactor, db);
NotificationEvents.start(stream_interactor);
SearchProcessor.start(stream_interactor, db);

View file

@ -53,6 +53,7 @@ public class FileTransfer : Object {
}
public string path { get; set; }
public string? mime_type { get; set; }
// TODO(hrxi): expand to 64 bit
public int size { get; set; }
public State state { get; set; }

View file

@ -65,6 +65,7 @@ public class FileManager : StreamInteractionModule, Object {
foreach (FileSender file_sender in file_senders) {
if (file_sender.can_send(conversation, file_transfer)) {
// TODO(hrxi): Currently, this tries to send the file with every transfer available, but it should probably only select one.
file_sender.send_file(conversation, file_transfer);
}
}

View file

@ -0,0 +1,52 @@
using Gdk;
using Gee;
using Xmpp;
using Dino.Entities;
namespace Dino {
public class JingleFileManager : StreamInteractionModule, FileSender, Object {
public static ModuleIdentity<JingleFileManager> IDENTITY = new ModuleIdentity<JingleFileManager>("jingle_files");
public string id { get { return IDENTITY.id; } }
private StreamInteractor stream_interactor;
public static void start(StreamInteractor stream_interactor) {
JingleFileManager m = new JingleFileManager(stream_interactor);
stream_interactor.add_module(m);
}
private JingleFileManager(StreamInteractor stream_interactor) {
this.stream_interactor = stream_interactor;
stream_interactor.get_module(FileManager.IDENTITY).add_sender(this);
}
public bool is_upload_available(Conversation conversation) {
// TODO(hrxi) Here and in `send_file`: What should happen if `stream == null`?
XmppStream? stream = stream_interactor.get_stream(conversation.account);
foreach (Jid full_jid in stream.get_flag(Presence.Flag.IDENTITY).get_resources(conversation.counterpart)) {
if (stream.get_module(Xep.JingleFileTransfer.Module.IDENTITY).is_available(stream, full_jid)) {
return true;
}
}
return false;
}
public bool can_send(Conversation conversation, FileTransfer file_transfer) {
return file_transfer.encryption != Encryption.OMEMO;
}
public void send_file(Conversation conversation, FileTransfer file_transfer) {
XmppStream? stream = stream_interactor.get_stream(file_transfer.account);
foreach (Jid full_jid in stream.get_flag(Presence.Flag.IDENTITY).get_resources(conversation.counterpart)) {
// TODO(hrxi): Prioritization of transports (and resources?).
if (!stream.get_module(Xep.JingleFileTransfer.Module.IDENTITY).is_available(stream, full_jid)) {
continue;
}
stream.get_module(Xep.JingleFileTransfer.Module.IDENTITY).offer_file_stream(stream, full_jid, file_transfer.input_stream, file_transfer.file_name, file_transfer.size);
return;
}
}
}
}

View file

@ -78,6 +78,10 @@ public class ModuleManager {
module_map[account].add(new StreamError.Module());
module_map[account].add(new Xep.InBandRegistration.Module());
module_map[account].add(new Xep.HttpFileUpload.Module());
module_map[account].add(new Xep.InBandBytestreams.Module());
module_map[account].add(new Xep.Jingle.Module());
module_map[account].add(new Xep.JingleInBandBytestreams.Module());
module_map[account].add(new Xep.JingleFileTransfer.Module());
initialize_account_modules(account, module_map[account]);
}
}

View file

@ -49,8 +49,9 @@ SOURCES
"src/module/xep/0045_muc/flag.vala"
"src/module/xep/0045_muc/module.vala"
"src/module/xep/0045_muc/status_code.vala"
"src/module/xep/0048_bookmarks/module.vala"
"src/module/xep/0047_in_band_bytestreams.vala"
"src/module/xep/0048_bookmarks/conference.vala"
"src/module/xep/0048_bookmarks/module.vala"
"src/module/xep/0049_private_xml_storage.vala"
"src/module/xep/0054_vcard/module.vala"
"src/module/xep/0060_pubsub.vala"
@ -60,11 +61,14 @@ SOURCES
"src/module/xep/0084_user_avatars.vala"
"src/module/xep/0085_chat_state_notifications.vala"
"src/module/xep/0115_entitiy_capabilities.vala"
"src/module/xep/0166_jingle.vala"
"src/module/xep/0184_message_delivery_receipts.vala"
"src/module/xep/0191_blocking_command.vala"
"src/module/xep/0198_stream_management.vala"
"src/module/xep/0199_ping.vala"
"src/module/xep/0184_message_delivery_receipts.vala"
"src/module/xep/0203_delayed_delivery.vala"
"src/module/xep/0234_jingle_file_transfer.vala"
"src/module/xep/0261_jingle_in_band_bytestreams.vala"
"src/module/xep/0280_message_carbons.vala"
"src/module/xep/0313_message_archive_management.vala"
"src/module/xep/0333_chat_markers.vala"

View file

@ -0,0 +1,172 @@
using Gee;
using Xmpp;
using Xmpp.Xep;
namespace Xmpp.Xep.InBandBytestreams {
private const string NS_URI = "http://jabber.org/protocol/ibb";
private const int SEQ_MODULUS = 65536;
public class Module : XmppStreamModule {
public static Xmpp.ModuleIdentity<Module> IDENTITY = new Xmpp.ModuleIdentity<Module>(NS_URI, "0047_in_band_bytestreams");
public override void attach(XmppStream stream) {
stream.add_flag(new Flag());
}
public override void detach(XmppStream stream) { }
public void on_iq_set(XmppStream stream, Iq.Stanza iq) {
StanzaNode? data = iq.stanza.get_subnode("data", NS_URI);
string? sid = data != null ? data.get_attribute("sid") : null;
if (data == null || sid == null) {
stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.bad_request("missing data node or sid")));
return;
}
Connection? conn = stream.get_flag(Flag.IDENTITY).get_connection(sid);
if (conn == null) {
stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.item_not_found()));
return;
}
int seq = data.get_attribute_int("seq");
// TODO(hrxi): return an error on malformed base64 (need to do this
// according to the xep)
uint8[] content = Base64.decode(data.get_string_content());
if (seq < 0 || seq != conn.remote_seq) {
// TODO(hrxi): send an error and close the connection
return;
}
conn.remote_seq = (conn.remote_seq + 1) % SEQ_MODULUS;
stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.result(iq));
conn.on_data(stream, content);
}
public override string get_ns() { return NS_URI; }
public override string get_id() { return IDENTITY.id; }
}
public class Connection {
// TODO(hrxi): implement half-open states
public enum State {
UNCONNECTED,
CONNECTING,
CONNECTED,
DISCONNECTING,
DISCONNECTED,
ERROR,
}
State state = UNCONNECTED;
Jid receiver_full_jid;
public string sid { get; private set; }
int block_size;
int local_seq = 0;
int remote_ack = 0;
internal int remote_seq = 0;
public signal void on_error(XmppStream stream, string error);
public signal void on_data(XmppStream stream, uint8[] data);
public signal void on_ready(XmppStream stream);
public Connection(Jid receiver_full_jid, string sid, int block_size) {
this.receiver_full_jid = receiver_full_jid;
this.sid = sid;
this.block_size = block_size;
}
public void connect(XmppStream stream) {
assert(state == UNCONNECTED);
state = CONNECTING;
StanzaNode open = new StanzaNode.build("open", NS_URI)
.add_self_xmlns()
.put_attribute("block-size", block_size.to_string())
.put_attribute("sid", sid);
Iq.Stanza iq = new Iq.Stanza.set(open) { to=receiver_full_jid };
stream.get_module(Iq.Module.IDENTITY).send_iq(stream, iq, (stream, iq) => {
assert(state == CONNECTING);
if (!iq.is_error()) {
state = CONNECTED;
stream.get_flag(Flag.IDENTITY).add_connection(this);
on_ready(stream);
} else {
set_error(stream, "connection failed");
}
});
}
void set_error(XmppStream stream, string error) {
// TODO(hrxi): Send disconnect?
state = ERROR;
on_error(stream, error);
}
public void send(XmppStream stream, uint8[] bytes) {
assert(state == CONNECTED);
// TODO(hrxi): rate-limiting/merging?
int seq = local_seq;
local_seq = (local_seq + 1) % SEQ_MODULUS;
StanzaNode data = new StanzaNode.build("data", NS_URI)
.add_self_xmlns()
.put_attribute("sid", sid)
.put_attribute("seq", seq.to_string())
.put_node(new StanzaNode.text(Base64.encode(bytes)));
Iq.Stanza iq = new Iq.Stanza.set(data) { to=receiver_full_jid };
stream.get_module(Iq.Module.IDENTITY).send_iq(stream, iq, (stream, iq) => {
if (iq.is_error()) {
set_error(stream, "sending failed");
return;
}
if (remote_ack != seq) {
set_error(stream, "out of order acks");
return;
}
remote_ack = (remote_ack + 1) % SEQ_MODULUS;
if (local_seq == remote_ack) {
on_ready(stream);
}
});
}
public void close(XmppStream stream) {
assert(state == CONNECTED);
state = DISCONNECTING;
// TODO(hrxi): should not do this, might still receive data
stream.get_flag(Flag.IDENTITY).remove_connection(this);
StanzaNode close = new StanzaNode.build("close", NS_URI)
.add_self_xmlns()
.put_attribute("sid", sid);
Iq.Stanza iq = new Iq.Stanza.set(close) { to=receiver_full_jid };
stream.get_module(Iq.Module.IDENTITY).send_iq(stream, iq, (stream, iq) => {
assert(state == DISCONNECTING);
if (iq.is_error()) {
set_error(stream, "disconnecting failed");
return;
}
state = DISCONNECTED;
});
}
}
public class Flag : XmppStreamFlag {
public static FlagIdentity<Flag> IDENTITY = new FlagIdentity<Flag>(NS_URI, "in_band_bytestreams");
private HashMap<string, Connection> active = new HashMap<string, Connection>();
public void add_connection(Connection conn) {
active[conn.sid] = conn;
}
public Connection? get_connection(string sid) {
return active.has_key(sid) ? active[sid] : null;
}
public void remove_connection(Connection conn) {
active.unset(conn.sid);
}
public override string get_ns() { return NS_URI; }
public override string get_id() { return IDENTITY.id; }
}
}

View file

@ -0,0 +1,328 @@
using Gee;
using Xmpp.Xep;
using Xmpp;
namespace Xmpp.Xep.Jingle {
private const string NS_URI = "urn:xmpp:jingle:1";
private const string ERROR_NS_URI = "urn:xmpp:jingle:errors:1";
public errordomain CreateConnectionError {
BAD_REQUEST,
NOT_ACCEPTABLE,
}
public errordomain Error {
GENERAL,
BAD_REQUEST,
INVALID_PARAMETERS,
UNSUPPORTED_TRANSPORT,
NO_SHARED_PROTOCOLS,
TRANSPORT_ERROR,
}
public class Module : XmppStreamModule, Iq.Handler {
public static Xmpp.ModuleIdentity<Module> IDENTITY = new Xmpp.ModuleIdentity<Module>(NS_URI, "0166_jingle");
public override void attach(XmppStream stream) {
stream.add_flag(new Flag());
stream.get_module(ServiceDiscovery.Module.IDENTITY).add_feature(stream, NS_URI);
stream.get_module(Iq.Module.IDENTITY).register_for_namespace(NS_URI, this);
}
public override void detach(XmppStream stream) { }
public void add_transport(XmppStream stream, Transport transport) {
stream.get_flag(Flag.IDENTITY).add_transport(transport);
}
public Transport? select_transport(XmppStream stream, TransportType type, Jid receiver_full_jid) {
return stream.get_flag(Flag.IDENTITY).select_transport(stream, type, receiver_full_jid);
}
private bool is_jingle_available(XmppStream stream, Jid full_jid) {
bool? has_jingle = stream.get_flag(ServiceDiscovery.Flag.IDENTITY).has_entity_feature(full_jid, NS_URI);
return has_jingle != null && has_jingle;
}
public bool is_available(XmppStream stream, TransportType type, Jid full_jid) {
return is_jingle_available(stream, full_jid) && select_transport(stream, type, full_jid) != null;
}
public Session create_session(XmppStream stream, TransportType type, Jid receiver_full_jid, Senders senders, string content_name, StanzaNode description) throws Error {
if (!is_jingle_available(stream, receiver_full_jid)) {
throw new Error.NO_SHARED_PROTOCOLS("No Jingle support");
}
Transport? transport = select_transport(stream, type, receiver_full_jid);
if (transport == null) {
throw new Error.NO_SHARED_PROTOCOLS("No suitable transports");
}
Jid? my_jid = stream.get_flag(Bind.Flag.IDENTITY).my_jid;
if (my_jid == null) {
throw new Error.GENERAL("Couldn't determine own JID");
}
Session session = new Session(random_uuid(), type, receiver_full_jid);
StanzaNode content = new StanzaNode.build("content", NS_URI)
.put_attribute("creator", "initiator")
.put_attribute("name", content_name)
.put_attribute("senders", senders.to_string())
.put_node(description)
.put_node(transport.to_transport_stanza_node());
StanzaNode jingle = new StanzaNode.build("jingle", NS_URI)
.add_self_xmlns()
.put_attribute("action", "session-initiate")
.put_attribute("initiator", my_jid.to_string())
.put_attribute("sid", session.sid)
.put_node(content);
Iq.Stanza iq = new Iq.Stanza.set(jingle) { to=receiver_full_jid };
stream.get_module(Iq.Module.IDENTITY).send_iq(stream, iq, (stream, iq) => {
stream.get_flag(Flag.IDENTITY).add_session(session);
});
return session;
}
public void on_iq_set(XmppStream stream, Iq.Stanza iq) {
StanzaNode? jingle = iq.stanza.get_subnode("jingle", NS_URI);
string? sid = jingle != null ? jingle.get_attribute("sid") : null;
string? action = jingle != null ? jingle.get_attribute("action") : null;
if (jingle == null || sid == null || action == null) {
stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.bad_request("missing jingle node, sid or action")));
return;
}
Session? session = stream.get_flag(Flag.IDENTITY).get_session(sid);
if (session == null) {
StanzaNode unknown_session = new StanzaNode.build("unknown-session", ERROR_NS_URI).add_self_xmlns();
stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.item_not_found(unknown_session)));
return;
}
session.handle_iq_set(stream, action, jingle, iq);
}
public override string get_ns() { return NS_URI; }
public override string get_id() { return IDENTITY.id; }
}
public enum TransportType {
DATAGRAM,
STREAMING,
}
public enum Senders {
BOTH,
INITIATOR,
NONE,
RESPONDER;
public string to_string() {
switch (this) {
case BOTH: return "both";
case INITIATOR: return "initiator";
case NONE: return "none";
case RESPONDER: return "responder";
}
assert_not_reached();
}
}
public interface Transport : Object {
public abstract bool is_transport_available(XmppStream stream, Jid full_jid);
public abstract TransportType transport_type();
public abstract StanzaNode to_transport_stanza_node();
public abstract Connection? create_transport_connection(XmppStream stream, Jid peer_full_jid, StanzaNode content) throws CreateConnectionError;
}
public class Session {
public enum State {
PENDING,
ACTIVE,
ENDED,
}
public State state { get; private set; }
Connection? conn;
public string sid { get; private set; }
public Type type_ { get; private set; }
public Jid peer_full_jid { get; private set; }
public Session(string sid, Type type, Jid peer_full_jid) {
this.state = PENDING;
this.conn = null;
this.sid = sid;
this.type_ = type;
this.peer_full_jid = peer_full_jid;
}
public signal void on_error(XmppStream stream, Error error);
public signal void on_data(XmppStream stream, uint8[] data);
// Signals that the stream is ready to send (more) data.
public signal void on_ready(XmppStream stream);
private void handle_error(XmppStream stream, Error error) {
if (state == PENDING || state == ACTIVE) {
StanzaNode reason = new StanzaNode.build("reason", NS_URI)
.put_node(new StanzaNode.build("general-error", NS_URI)) // TODO(hrxi): Is this the right error?
.put_node(new StanzaNode.build("text", NS_URI)
.put_node(new StanzaNode.text(error.message))
);
terminate(stream, reason);
}
}
delegate void SendIq(Iq.Stanza iq);
public void handle_iq_set(XmppStream stream, string action, StanzaNode jingle, Iq.Stanza iq) {
SendIq send_iq = (iq) => stream.get_module(Iq.Module.IDENTITY).send_iq(stream, iq);
if (state != PENDING || action != "session-accept") {
return;
}
StanzaNode? content = jingle.get_subnode("content");
if (content == null) {
// TODO(hrxi): here and below, should we terminate the session?
send_iq(new Iq.Stanza.error(iq, new ErrorStanza.bad_request("no content element")));
return;
}
string? responder_str = jingle.get_attribute("responder");
Jid responder;
if (responder_str != null) {
responder = Jid.parse(responder_str) ?? iq.from;
} else {
responder = iq.from; // TODO(hrxi): and above, can we assume iq.from != null
// TODO(hrxi): more sanity checking, perhaps replace who we're talking to
}
if (!responder.is_full()) {
send_iq(new Iq.Stanza.error(iq, new ErrorStanza.bad_request("invalid responder JID")));
return;
}
try {
conn = stream.get_flag(Flag.IDENTITY).create_connection(stream, type_, peer_full_jid, content);
} catch (CreateConnectionError e) {
if (e is CreateConnectionError.BAD_REQUEST) {
send_iq(new Iq.Stanza.error(iq, new ErrorStanza.bad_request(e.message)));
} else if (e is CreateConnectionError.NOT_ACCEPTABLE) {
send_iq(new Iq.Stanza.error(iq, new ErrorStanza.not_acceptable(e.message)));
}
return;
}
send_iq(new Iq.Stanza.result(iq));
if (conn == null) {
terminate(stream, new StanzaNode.build("reason", NS_URI)
.put_node(new StanzaNode.build("unsupported-transports", NS_URI)));
return;
}
conn.on_error.connect((stream, error) => on_error(stream, error));
conn.on_data.connect((stream, data) => on_data(stream, data));
conn.on_ready.connect((stream) => on_ready(stream));
on_error.connect((stream, error) => handle_error(stream, error));
conn.connect(stream);
state = ACTIVE;
}
public void send(XmppStream stream, uint8[] data) {
if (state != ACTIVE) {
return; // TODO(hrxi): what to do?
}
conn.send(stream, data);
}
public void set_application_error(XmppStream stream, StanzaNode? application_reason = null) {
StanzaNode reason = new StanzaNode.build("reason", NS_URI)
.put_node(new StanzaNode.build("failed-application", NS_URI));
if (application_reason != null) {
reason.put_node(application_reason);
}
terminate(stream, reason);
}
public void close_connection(XmppStream stream) {
if (state != ACTIVE) {
return; // TODO(hrxi): what to do?
}
conn.close(stream);
}
public void terminate(XmppStream stream, StanzaNode reason) {
if (state != PENDING && state != ACTIVE) {
// TODO(hrxi): what to do?
return;
}
if (conn != null) {
conn.close(stream);
}
StanzaNode jingle = new StanzaNode.build("jingle", NS_URI)
.add_self_xmlns()
.put_attribute("action", "session-terminate")
.put_attribute("sid", sid)
.put_node(reason);
Iq.Stanza iq = new Iq.Stanza.set(jingle) { to=peer_full_jid };
stream.get_module(Iq.Module.IDENTITY).send_iq(stream, iq);
state = ENDED;
// Immediately remove the session from the open sessions as per the
// XEP, don't wait for confirmation.
stream.get_flag(Flag.IDENTITY).remove_session(sid);
}
}
public abstract class Connection {
public Jid? peer_full_jid { get; private set; }
public Connection(Jid peer_full_jid) {
this.peer_full_jid = peer_full_jid;
}
public signal void on_error(XmppStream stream, Error error);
public signal void on_data(XmppStream stream, uint8[] data);
public signal void on_ready(XmppStream stream);
public abstract void connect(XmppStream stream);
public abstract void send(XmppStream stream, uint8[] data);
public abstract void close(XmppStream stream);
}
public class Flag : XmppStreamFlag {
public static FlagIdentity<Flag> IDENTITY = new FlagIdentity<Flag>(NS_URI, "jingle");
private Gee.List<Transport> transports = new ArrayList<Transport>();
private HashMap<string, Session> sessions = new HashMap<string, Session>();
public void add_transport(Transport transport) { transports.add(transport); }
public Transport? select_transport(XmppStream stream, TransportType type, Jid receiver_full_jid) {
foreach (Transport transport in transports) {
if (transport.transport_type() != type) {
continue;
}
// TODO(hrxi): prioritization
if (transport.is_transport_available(stream, receiver_full_jid)) {
return transport;
}
}
return null;
}
public void add_session(Session session) {
sessions[session.sid] = session;
}
public Connection? create_connection(XmppStream stream, Type type, Jid peer_full_jid, StanzaNode content) throws CreateConnectionError {
foreach (Transport transport in transports) {
if (transport.transport_type() != type) {
continue;
}
Connection? conn = transport.create_transport_connection(stream, peer_full_jid, content);
if (conn != null) {
return conn;
}
}
return null;
}
public Session? get_session(string sid) {
return sessions.has_key(sid) ? sessions[sid] : null;
}
public void remove_session(string sid) {
sessions.unset(sid);
}
public override string get_ns() { return NS_URI; }
public override string get_id() { return IDENTITY.id; }
}
}

View file

@ -0,0 +1,100 @@
using Gee;
using Xmpp;
using Xmpp.Xep;
namespace Xmpp.Xep.JingleFileTransfer {
private const string NS_URI = "urn:xmpp:jingle:apps:file-transfer:5";
public errordomain Error {
FILE_INACCESSIBLE,
}
public class Module : XmppStreamModule {
public static Xmpp.ModuleIdentity<Module> IDENTITY = new Xmpp.ModuleIdentity<Module>(NS_URI, "0234_jingle_file_transfer");
public override void attach(XmppStream stream) {
stream.add_flag(new Flag());
stream.get_module(ServiceDiscovery.Module.IDENTITY).add_feature(stream, NS_URI);
}
public override void detach(XmppStream stream) { }
public bool is_available(XmppStream stream, Jid full_jid) {
bool? has_feature = stream.get_flag(ServiceDiscovery.Flag.IDENTITY).has_entity_feature(full_jid, NS_URI);
if (has_feature == null || !(!)has_feature) {
return false;
}
return stream.get_module(Jingle.Module.IDENTITY).is_available(stream, Jingle.TransportType.STREAMING, full_jid);
}
public void offer_file(XmppStream stream, Jid receiver_full_jid, string path) throws Error {
File file = File.new_for_path(path);
FileInputStream input_stream;
int64 size;
try {
input_stream = file.read();
} catch (GLib.Error e) {
throw new Error.FILE_INACCESSIBLE(@"could not open the file \"$path\" for reading: $(e.message)");
}
try {
size = input_stream.query_info(FileAttribute.STANDARD_SIZE).get_size();
} catch (GLib.Error e) {
throw new Error.FILE_INACCESSIBLE(@"could not read the size: $(e.message)");
}
offer_file_stream(stream, receiver_full_jid, input_stream, file.get_basename(), size);
}
public void offer_file_stream(XmppStream stream, Jid receiver_full_jid, InputStream input_stream, string basename, int64 size) {
StanzaNode description = new StanzaNode.build("description", NS_URI)
.add_self_xmlns()
.put_node(new StanzaNode.build("file", NS_URI)
.put_node(new StanzaNode.build("name", NS_URI).put_node(new StanzaNode.text(basename)))
.put_node(new StanzaNode.build("size", NS_URI).put_node(new StanzaNode.text(size.to_string()))));
// TODO(hrxi): Add the mandatory hash field
Jingle.Session? session = stream.get_module(Jingle.Module.IDENTITY)
.create_session(stream, Jingle.TransportType.STREAMING, receiver_full_jid, Jingle.Senders.INITIATOR, "a-file-offer", description); // TODO(hrxi): Why "a-file-offer"?
FileTransfer transfer = new FileTransfer(input_stream);
session.on_ready.connect(transfer.send_data);
stream.get_flag(Flag.IDENTITY).add_file_transfer(transfer);
}
public override string get_ns() { return NS_URI; }
public override string get_id() { return IDENTITY.id; }
}
public class FileTransfer : Object {
InputStream input_stream;
public FileTransfer(InputStream input_stream) {
this.input_stream = input_stream;
}
public void send_data(Jingle.Session session, XmppStream stream) {
uint8 buffer[4096];
ssize_t read;
try {
if((read = input_stream.read(buffer)) != 0) {
session.send(stream, buffer[0:read]);
} else {
session.close_connection(stream);
}
} catch (GLib.IOError e) {
session.set_application_error(stream);
}
// TODO(hrxi): remove file transfer
}
}
public class Flag : XmppStreamFlag {
public static FlagIdentity<Flag> IDENTITY = new FlagIdentity<Flag>(NS_URI, "jingle_file_transfer");
private Gee.List<FileTransfer> transfers = new ArrayList<FileTransfer>();
public void add_file_transfer(FileTransfer transfer) { transfers.add(transfer); }
public override string get_ns() { return NS_URI; }
public override string get_id() { return IDENTITY.id; }
}
}

View file

@ -0,0 +1,75 @@
using Xmpp;
using Xmpp.Xep;
namespace Xmpp.Xep.JingleInBandBytestreams {
private const string NS_URI = "urn:xmpp:jingle:transports:ibb:1";
private const int DEFAULT_BLOCKSIZE = 4096;
public class Module : Jingle.Transport, XmppStreamModule {
public static Xmpp.ModuleIdentity<Module> IDENTITY = new Xmpp.ModuleIdentity<Module>(NS_URI, "0261_jingle_in_band_bytestreams");
public override void attach(XmppStream stream) {
stream.get_module(Jingle.Module.IDENTITY).add_transport(stream, this);
stream.get_module(ServiceDiscovery.Module.IDENTITY).add_feature(stream, NS_URI);
}
public override void detach(XmppStream stream) { }
public override string get_ns() { return NS_URI; }
public override string get_id() { return IDENTITY.id; }
public bool is_transport_available(XmppStream stream, Jid full_jid) {
bool? result = stream.get_flag(ServiceDiscovery.Flag.IDENTITY).has_entity_feature(full_jid, NS_URI);
return result != null && result;
}
public Jingle.TransportType transport_type() {
return Jingle.TransportType.STREAMING;
}
public StanzaNode to_transport_stanza_node() {
return new StanzaNode.build("transport", NS_URI)
.add_self_xmlns()
.put_attribute("block-size", DEFAULT_BLOCKSIZE.to_string())
.put_attribute("sid", random_uuid());
}
public Jingle.Connection? create_transport_connection(XmppStream stream, Jid peer_full_jid, StanzaNode content) throws Jingle.CreateConnectionError {
StanzaNode? transport = content.get_subnode("transport", NS_URI);
if (transport == null) {
return null;
}
string? sid = transport.get_attribute("sid");
int block_size = transport.get_attribute_int("block-size");
if (sid == null || block_size <= 0) {
throw new Jingle.CreateConnectionError.BAD_REQUEST("Invalid IBB parameters");
}
if (block_size > DEFAULT_BLOCKSIZE) {
throw new Jingle.CreateConnectionError.NOT_ACCEPTABLE("Invalid IBB parameters: peer increased block size");
}
return new Connection(peer_full_jid, new InBandBytestreams.Connection(peer_full_jid, sid, block_size));
}
}
public class Connection : Jingle.Connection {
InBandBytestreams.Connection inner;
public Connection(Jid full_jid, InBandBytestreams.Connection inner) {
base(full_jid);
inner.on_error.connect((stream, error) => on_error(stream, new Jingle.Error.TRANSPORT_ERROR(error)));
inner.on_data.connect((stream, data) => on_data(stream, data));
inner.on_ready.connect((stream) => on_ready(stream));
this.inner = inner;
}
public override void connect(XmppStream stream) {
inner.connect(stream);
}
public override void send(XmppStream stream, uint8[] data) {
inner.send(stream, data);
}
public override void close(XmppStream stream) {
inner.close(stream);
}
}
}