From 3f531d6b91edab6c79fa232143db828bad13853c Mon Sep 17 00:00:00 2001 From: fiaxh Date: Sat, 11 Nov 2017 21:29:13 +0100 Subject: [PATCH] Read+(write) stream async --- libdino/src/service/connection_manager.vala | 51 +++---- .../add_conference_dialog.vala | 2 +- .../conference_details_fragment.vala | 7 +- .../ui/add_conversation/conference_list.vala | 14 +- main/src/ui/add_conversation/roster_list.vala | 4 +- main/src/ui/application.vala | 2 +- main/src/ui/contact_details/dialog.vala | 11 +- .../muc_config_form_provider.vala | 11 +- .../conversation_row.vala | 7 +- main/src/ui/conversation_selector/list.vala | 38 ++--- .../chat_state_populator.vala | 4 +- .../conversation_item_skeleton.vala | 2 +- .../message_populator.vala | 8 +- .../ui/conversation_titlebar/file_entry.vala | 9 +- main/src/ui/conversation_titlebar/view.vala | 9 +- main/src/ui/manage_accounts/account_row.vala | 14 +- main/src/ui/manage_accounts/dialog.vala | 21 +-- main/src/ui/occupant_menu/list.vala | 4 +- main/src/ui/unified_window.vala | 8 +- plugins/omemo/src/stream_module.vala | 137 ++++++++++-------- plugins/openpgp/src/stream_module.vala | 100 ++++++++----- xmpp-vala/src/core/stanza_reader.vala | 121 ++++++++-------- xmpp-vala/src/core/stanza_writer.vala | 2 +- xmpp-vala/src/core/xmpp_stream.vala | 26 ++-- xmpp-vala/src/module/message/module.vala | 17 ++- xmpp-vala/src/module/util.vala | 59 ++++++++ .../xep/0085_chat_state_notifications.vala | 23 ++- .../xep/0184_message_delivery_receipts.vala | 26 ++-- .../src/module/xep/0203_delayed_delivery.vala | 20 ++- .../src/module/xep/0280_message_carbons.vala | 47 +++--- .../xep/0313_message_archive_management.vala | 44 +++--- .../src/module/xep/0333_chat_markers.vala | 13 +- 32 files changed, 470 insertions(+), 391 deletions(-) diff --git a/libdino/src/service/connection_manager.vala b/libdino/src/service/connection_manager.vala index d0a11cd5..5fcd66a2 100644 --- a/libdino/src/service/connection_manager.vala +++ b/libdino/src/service/connection_manager.vala @@ -20,7 +20,6 @@ public class ConnectionManager { private ArrayList connection_todo = new ArrayList(Account.equals_func); private HashMap connections = new HashMap(Account.hash_func, Account.equals_func); private HashMap connection_errors = new HashMap(Account.hash_func, Account.equals_func); - private HashMap connection_mutexes = new HashMap(Account.hash_func, Account.equals_func); private NetworkManager? network_manager; private Login1Manager? login1; @@ -57,12 +56,6 @@ public class ConnectionManager { } } - private class RecMutexWrap { - public RecMutex mutex = RecMutex(); - public void unlock() { mutex.unlock(); } - public bool trylock() { return mutex.trylock(); } - } - public ConnectionManager(ModuleManager module_manager) { this.module_manager = module_manager; network_manager = get_network_manager(); @@ -120,7 +113,6 @@ public class ConnectionManager { } public Core.XmppStream? connect(Account account) { - if (!connection_mutexes.has_key(account)) connection_mutexes[account] = new RecMutexWrap(); if (!connection_todo.contains(account)) connection_todo.add(account); if (!connections.has_key(account)) { return connect_(account); @@ -155,8 +147,6 @@ public class ConnectionManager { } private Core.XmppStream? connect_(Account account, string? resource = null) { - if (!connection_mutexes[account].trylock()) return null; - if (connections.has_key(account)) connections[account].stream.detach_modules(); connection_errors.unset(account); if (resource == null) resource = account.resourcepart; @@ -180,31 +170,30 @@ public class ConnectionManager { stream.received_node.connect(() => { connections[account].last_activity = new DateTime.now_utc(); }); - new Thread (null, () => { - try { - stream.connect(account.domainpart); - } catch (Error e) { - stderr.printf("Stream Error: %s\n", e.message); - change_connection_state(account, ConnectionState.DISCONNECTED); - if (!connection_todo.contains(account)) { - connections.unset(account); - return null; - } - StreamError.Flag? flag = stream.get_flag(StreamError.Flag.IDENTITY); - if (flag != null) { - set_connection_error(account, ConnectionError.Source.STREAM_ERROR, flag.error_type); - } - interpret_connection_error(account); - } - connection_mutexes[account].unlock(); - return null; - }); + connect_async.begin(account, stream); stream_opened(account, stream); - connection_mutexes[account].unlock(); return stream; } + private async void connect_async(Account account, Core.XmppStream stream) { + try { + yield stream.connect(account.domainpart); + } catch (Error e) { + stderr.printf("Stream Error: %s\n", e.message); + change_connection_state(account, ConnectionState.DISCONNECTED); + if (!connection_todo.contains(account)) { + connections.unset(account); + return; + } + StreamError.Flag? flag = stream.get_flag(StreamError.Flag.IDENTITY); + if (flag != null) { + set_connection_error(account, ConnectionError.Source.STREAM_ERROR, flag.error_type); + } + interpret_connection_error(account); + } + } + private void interpret_connection_error(Account account) { ConnectionError? error = connection_errors[account]; int wait_sec = 5; @@ -243,7 +232,6 @@ public class ConnectionManager { } private void check_reconnect(Account account) { - if (!connection_mutexes[account].trylock()) return; bool acked = false; Core.XmppStream stream = connections[account].stream; @@ -261,7 +249,6 @@ public class ConnectionManager { connections[account].stream.disconnect(); } catch (Error e) { } connect_(account); - connection_mutexes[account].unlock(); return false; }); } diff --git a/main/src/ui/add_conversation/add_conference_dialog.vala b/main/src/ui/add_conversation/add_conference_dialog.vala index 5e5698fb..d840ff2a 100644 --- a/main/src/ui/add_conversation/add_conference_dialog.vala +++ b/main/src/ui/add_conversation/add_conference_dialog.vala @@ -36,7 +36,7 @@ public class AddConferenceDialog : Gtk.Dialog { setup_conference_details_view(); show_jid_add_view(); - stream_interactor.get_module(MucManager.IDENTITY).joined.connect((account, jid, nick) => { Idle.add(() => { on_joined(account, jid, nick); return false; } ); }); + stream_interactor.get_module(MucManager.IDENTITY).joined.connect(on_joined); } private void show_jid_add_view() { diff --git a/main/src/ui/add_conversation/conference_details_fragment.vala b/main/src/ui/add_conversation/conference_details_fragment.vala index 064d1053..fbe9245d 100644 --- a/main/src/ui/add_conversation/conference_details_fragment.vala +++ b/main/src/ui/add_conversation/conference_details_fragment.vala @@ -99,12 +99,7 @@ protected class ConferenceDetailsFragment : Box { jid_entry.key_release_event.connect(() => { done = true; return false; }); // just for notifying nick_entry.key_release_event.connect(() => { done = true; return false; }); - stream_interactor.get_module(MucManager.IDENTITY).enter_error.connect((account, jid, error) => { - Idle.add(() => { - on_enter_error(account, jid, error); - return false; - }); - }); + stream_interactor.get_module(MucManager.IDENTITY).enter_error.connect(on_enter_error); notification_button.clicked.connect(() => { notification_revealer.set_reveal_child(false); }); ok_button.clicked.connect(() => { ok_button.label = _("Joining..."); diff --git a/main/src/ui/add_conversation/conference_list.vala b/main/src/ui/add_conversation/conference_list.vala index 570166b1..8338558c 100644 --- a/main/src/ui/add_conversation/conference_list.vala +++ b/main/src/ui/add_conversation/conference_list.vala @@ -21,11 +21,8 @@ protected class ConferenceList : FilterableList { set_sort_func(sort); stream_interactor.get_module(MucManager.IDENTITY).bookmarks_updated.connect((account, conferences) => { - Idle.add(() => { - lists[account] = conferences; - refresh_conferences(); - return false; - }); + lists[account] = conferences; + refresh_conferences(); }); foreach (Account account in stream_interactor.get_accounts()) { @@ -43,11 +40,8 @@ protected class ConferenceList : FilterableList { } private void on_conference_bookmarks_received(Core.XmppStream stream, Account account, Gee.List conferences) { - Idle.add(() => { - lists[account] = conferences; - refresh_conferences(); - return false; - }); + lists[account] = conferences; + refresh_conferences(); } private void header(ListBoxRow row, ListBoxRow? before_row) { diff --git a/main/src/ui/add_conversation/roster_list.vala b/main/src/ui/add_conversation/roster_list.vala index 70e4bc14..3107dc47 100644 --- a/main/src/ui/add_conversation/roster_list.vala +++ b/main/src/ui/add_conversation/roster_list.vala @@ -25,12 +25,12 @@ protected class RosterList : FilterableList { handler_ids += stream_interactor.get_module(RosterManager.IDENTITY).removed_roster_item.connect( (account, jid, roster_item) => { if (accounts.contains(account)) { - Idle.add(() => { on_removed_roster_item(account, jid, roster_item); return false;}); + on_removed_roster_item(account, jid, roster_item); } }); handler_ids += stream_interactor.get_module(RosterManager.IDENTITY).updated_roster_item.connect( (account, jid, roster_item) => { if (accounts.contains(account)) { - Idle.add(() => { on_updated_roster_item(account, jid, roster_item); return false;}); + on_updated_roster_item(account, jid, roster_item); } }); destroy.connect(() => { diff --git a/main/src/ui/application.vala b/main/src/ui/application.vala index e4fdd9eb..b7809890 100644 --- a/main/src/ui/application.vala +++ b/main/src/ui/application.vala @@ -23,7 +23,7 @@ public class Dino.Ui.Application : Gtk.Application, Dino.Application { provider.load_from_resource("/im/dino/theme.css"); StyleContext.add_provider_for_screen(Gdk.Screen.get_default(), provider, STYLE_PROVIDER_PRIORITY_APPLICATION); - activate.connect(() => { + startup.connect(() => { if (window == null) { create_set_app_menu(); window = new UnifiedWindow(this, stream_interactor); diff --git a/main/src/ui/contact_details/dialog.vala b/main/src/ui/contact_details/dialog.vala index 2f1fc81e..61044857 100644 --- a/main/src/ui/contact_details/dialog.vala +++ b/main/src/ui/contact_details/dialog.vala @@ -111,13 +111,10 @@ public class Dialog : Gtk.Dialog { row.add(widget); categories[category].add(list_row); - Idle.add(() => { - int pref_height, pref_width; - get_content_area().get_preferred_height(null, out pref_height); - get_preferred_width(out pref_width, null); - resize(pref_width, int.min(500, pref_height)); - return false; - }); + int pref_height, pref_width; + get_content_area().get_preferred_height(null, out pref_height); + get_preferred_width(out pref_width, null); + resize(pref_width, int.min(500, pref_height)); } private void add_category(string category) { diff --git a/main/src/ui/contact_details/muc_config_form_provider.vala b/main/src/ui/contact_details/muc_config_form_provider.vala index 47bbbe28..d4598265 100644 --- a/main/src/ui/contact_details/muc_config_form_provider.vala +++ b/main/src/ui/contact_details/muc_config_form_provider.vala @@ -21,13 +21,10 @@ public class MucConfigFormProvider : Plugins.ContactDetailsProvider, Object { if (stream == null) return; stream_interactor.get_module(MucManager.IDENTITY).get_config_form(conversation.account, conversation.counterpart, (jid, data_form) => { contact_details.save.connect(() => { data_form.submit(); }); - Idle.add(() => { - for (int i = 0; i < data_form.fields.size; i++) { - DataForms.DataForm.Field field = data_form.fields[i]; - add_field(field, contact_details); - } - return false; - }); + for (int i = 0; i < data_form.fields.size; i++) { + DataForms.DataForm.Field field = data_form.fields[i]; + add_field(field, contact_details); + } }); } } diff --git a/main/src/ui/conversation_selector/conversation_row.vala b/main/src/ui/conversation_selector/conversation_row.vala index 13f8a8f7..64b2aeaf 100644 --- a/main/src/ui/conversation_selector/conversation_row.vala +++ b/main/src/ui/conversation_selector/conversation_row.vala @@ -70,13 +70,10 @@ public abstract class ConversationRow : ListBoxRow { bool counterpart_online = stream_interactor.get_module(PresenceManager.IDENTITY).get_full_jids(conversation.counterpart, conversation.account) != null; bool greyscale = !self_online || !counterpart_online; - Idle.add(() => { - Pixbuf pixbuf = ((new AvatarGenerator(AVATAR_SIZE, AVATAR_SIZE, image.scale_factor)) + Pixbuf pixbuf = ((new AvatarGenerator(AVATAR_SIZE, AVATAR_SIZE, image.scale_factor)) .set_greyscale(greyscale) .draw_conversation(stream_interactor, conversation)); - Util.image_set_from_scaled_pixbuf(image, pixbuf, image.get_scale_factor()); - return false; - }); + Util.image_set_from_scaled_pixbuf(image, pixbuf, image.get_scale_factor()); } protected void update_name_label(string? new_name = null) { diff --git a/main/src/ui/conversation_selector/list.vala b/main/src/ui/conversation_selector/list.vala index dbad72a8..08958cb5 100644 --- a/main/src/ui/conversation_selector/list.vala +++ b/main/src/ui/conversation_selector/list.vala @@ -22,35 +22,21 @@ public class List : ListBox { set_header_func(header); set_sort_func(sort); - stream_interactor.get_module(ConversationManager.IDENTITY).conversation_activated.connect((conversation) => { - Idle.add(() => { add_conversation(conversation); return false; }); - }); - stream_interactor.get_module(ConversationManager.IDENTITY).conversation_deactivated.connect((conversation) => { - Idle.add(() => { remove_conversation(conversation); return false; }); - }); - stream_interactor.get_module(MessageProcessor.IDENTITY).message_received.connect((message, conversation) => { - Idle.add(() => { on_message_received(message, conversation); return false; }); - }); - stream_interactor.get_module(MessageProcessor.IDENTITY).message_sent.connect((message, conversation) => { - Idle.add(() => { on_message_received(message, conversation); return false; }); - }); + stream_interactor.get_module(ConversationManager.IDENTITY).conversation_activated.connect(add_conversation); + stream_interactor.get_module(ConversationManager.IDENTITY).conversation_deactivated.connect(remove_conversation); + stream_interactor.get_module(MessageProcessor.IDENTITY).message_received.connect(on_message_received); + stream_interactor.get_module(MessageProcessor.IDENTITY).message_sent.connect(on_message_received); stream_interactor.get_module(PresenceManager.IDENTITY).show_received.connect((show, jid, account) => { - Idle.add(() => { - foreach (Conversation conversation in stream_interactor.get_module(ConversationManager.IDENTITY).get_conversations_for_presence(show, account)) { - if (rows.has_key(conversation)) rows[conversation].on_show_received(show); - } - return false; - }); + foreach (Conversation conversation in stream_interactor.get_module(ConversationManager.IDENTITY).get_conversations_for_presence(show, account)) { + if (rows.has_key(conversation)) rows[conversation].on_show_received(show); + } }); stream_interactor.get_module(AvatarManager.IDENTITY).received_avatar.connect((avatar, jid, account) => { - Idle.add(() => { - Conversation? conversation = stream_interactor.get_module(ConversationManager.IDENTITY).get_conversation(jid, account); - if (conversation != null && rows.has_key(conversation)) { - ChatRow row = rows[conversation] as ChatRow; - if (row != null) row.update_avatar(); - } - return false; - }); + Conversation? conversation = stream_interactor.get_module(ConversationManager.IDENTITY).get_conversation(jid, account); + if (conversation != null && rows.has_key(conversation)) { + ChatRow row = rows[conversation] as ChatRow; + if (row != null) row.update_avatar(); + } }); Timeout.add_seconds(60, () => { foreach (ConversationRow row in rows.values) row.update(); diff --git a/main/src/ui/conversation_summary/chat_state_populator.vala b/main/src/ui/conversation_summary/chat_state_populator.vala index e491fe44..6f397249 100644 --- a/main/src/ui/conversation_summary/chat_state_populator.vala +++ b/main/src/ui/conversation_summary/chat_state_populator.vala @@ -21,12 +21,12 @@ class ChatStatePopulator : Plugins.ConversationItemPopulator, Object { stream_interactor.get_module(CounterpartInteractionManager.IDENTITY).received_state.connect((account, jid, state) => { if (current_conversation != null && current_conversation.account.equals(account) && current_conversation.counterpart.equals_bare(jid)) { - Idle.add(() => { update_chat_state(account, jid, state); return false; }); + update_chat_state(account, jid, state); } }); stream_interactor.get_module(MessageProcessor.IDENTITY).message_sent.connect((message, conversation) => { if (conversation.equals(current_conversation)) { - Idle.add(() => { update_chat_state(conversation.account, conversation.counterpart); return false; }); + update_chat_state(conversation.account, conversation.counterpart); } }); } diff --git a/main/src/ui/conversation_summary/conversation_item_skeleton.vala b/main/src/ui/conversation_summary/conversation_item_skeleton.vala index 03114227..1eb76840 100644 --- a/main/src/ui/conversation_summary/conversation_item_skeleton.vala +++ b/main/src/ui/conversation_summary/conversation_item_skeleton.vala @@ -39,7 +39,7 @@ public class ConversationItemSkeleton : Grid { } else { set_title_widget(widget); } - item.notify["mark"].connect_after(() => { Idle.add(() => { update_received(); return false; }); }); + item.notify["mark"].connect_after(update_received); update_received(); } diff --git a/main/src/ui/conversation_summary/message_populator.vala b/main/src/ui/conversation_summary/message_populator.vala index f6d55a92..dc4b5770 100644 --- a/main/src/ui/conversation_summary/message_populator.vala +++ b/main/src/ui/conversation_summary/message_populator.vala @@ -19,12 +19,8 @@ public class MessagePopulator : Object { app.plugin_registry.register_message_display(new SlashmeMessageDisplay(stream_interactor)); - stream_interactor.get_module(MessageProcessor.IDENTITY).message_received.connect((message, conversation) => { - Idle.add(() => { handle_message(message, conversation); return false; }); - }); - stream_interactor.get_module(MessageProcessor.IDENTITY).message_sent.connect((message, conversation) => { - Idle.add(() => { handle_message(message, conversation); return false; }); - }); + stream_interactor.get_module(MessageProcessor.IDENTITY).message_received.connect(handle_message); + stream_interactor.get_module(MessageProcessor.IDENTITY).message_sent.connect(handle_message); } public void init(Conversation conversation, Plugins.ConversationItemCollection item_collection) { diff --git a/main/src/ui/conversation_titlebar/file_entry.vala b/main/src/ui/conversation_titlebar/file_entry.vala index cb7d0807..0f0eb6d6 100644 --- a/main/src/ui/conversation_titlebar/file_entry.vala +++ b/main/src/ui/conversation_titlebar/file_entry.vala @@ -55,12 +55,9 @@ public class FileWidget : Button, Plugins.ConversationTitlebarWidget { } public void on_upload_available(Account account) { - Idle.add(() => { - if (conversation != null && conversation.account.equals(account)) { - visible = true; - } - return false; - }); + if (conversation != null && conversation.account.equals(account)) { + visible = true; + } } public new void set_conversation(Conversation conversation) { diff --git a/main/src/ui/conversation_titlebar/view.vala b/main/src/ui/conversation_titlebar/view.vala index bd8fe8c9..baa036a8 100644 --- a/main/src/ui/conversation_titlebar/view.vala +++ b/main/src/ui/conversation_titlebar/view.vala @@ -35,12 +35,9 @@ public class ConversationTitlebar : Gtk.HeaderBar { stream_interactor.get_module(MucManager.IDENTITY).subject_set.connect((account, jid, subject) => { - Idle.add(() => { - if (conversation != null && conversation.counterpart.equals_bare(jid) && conversation.account.equals(account)) { - update_subtitle(subject); - } - return false; - }); + if (conversation != null && conversation.counterpart.equals_bare(jid) && conversation.account.equals(account)) { + update_subtitle(subject); + } }); } diff --git a/main/src/ui/manage_accounts/account_row.vala b/main/src/ui/manage_accounts/account_row.vala index 911d48f4..8ac2c213 100644 --- a/main/src/ui/manage_accounts/account_row.vala +++ b/main/src/ui/manage_accounts/account_row.vala @@ -21,16 +21,14 @@ public class AccountRow : Gtk.ListBoxRow { jid_label.set_label(account.bare_jid.to_string()); stream_interactor.connection_manager.connection_error.connect((account, error) => { - Idle.add(() => { - if (account.equals(this.account)) update_warning_icon(); - return false; - }); + if (account.equals(this.account)) { + update_warning_icon(); + } }); stream_interactor.connection_manager.connection_state_changed.connect((account, state) => { - Idle.add(() => { - if (account.equals(this.account)) update_warning_icon(); - return false; - }); + if (account.equals(this.account)) { + update_warning_icon(); + } }); } diff --git a/main/src/ui/manage_accounts/dialog.vala b/main/src/ui/manage_accounts/dialog.vala index 6fdce712..f82e90c0 100644 --- a/main/src/ui/manage_accounts/dialog.vala +++ b/main/src/ui/manage_accounts/dialog.vala @@ -84,23 +84,16 @@ public class Dialog : Gtk.Dialog { add_account(account); } - stream_interactor.get_module(AvatarManager.IDENTITY).received_avatar.connect((pixbuf, jid, account) => { - Idle.add(() => { - on_received_avatar(pixbuf, jid, account); - return false; - }); - }); + stream_interactor.get_module(AvatarManager.IDENTITY).received_avatar.connect(on_received_avatar); stream_interactor.connection_manager.connection_error.connect((account, error) => { - Idle.add(() => { - if (account.equals(selected_account)) update_status_label(account); - return false; - }); + if (account.equals(selected_account)) { + update_status_label(account); + } }); stream_interactor.connection_manager.connection_state_changed.connect((account, state) => { - Idle.add(() => { - if (account.equals(selected_account)) update_status_label(account); - return false; - }); + if (account.equals(selected_account)) { + update_status_label(account); + } }); if (account_list.get_row_at_index(0) != null) account_list.select_row(account_list.get_row_at_index(0)); diff --git a/main/src/ui/occupant_menu/list.vala b/main/src/ui/occupant_menu/list.vala index 47adccb6..904ab3a7 100644 --- a/main/src/ui/occupant_menu/list.vala +++ b/main/src/ui/occupant_menu/list.vala @@ -25,9 +25,7 @@ public class List : Box { list_box.set_filter_func(filter); search_entry.search_changed.connect(search_changed); - stream_interactor.get_module(PresenceManager.IDENTITY).show_received.connect((show, jid, account) => { - Idle.add(() => { on_show_received(show, jid, account); return false; }); - }); + stream_interactor.get_module(PresenceManager.IDENTITY).show_received.connect(on_show_received); stream_interactor.get_module(RosterManager.IDENTITY).updated_roster_item.connect(on_updated_roster_item); initialize_for_conversation(conversation); diff --git a/main/src/ui/unified_window.vala b/main/src/ui/unified_window.vala index 69b530d7..be5b0805 100644 --- a/main/src/ui/unified_window.vala +++ b/main/src/ui/unified_window.vala @@ -40,12 +40,8 @@ public class UnifiedWindow : Window { stream_interactor.account_added.connect((account) => { check_stack(true); }); stream_interactor.account_removed.connect((account) => { check_stack(); }); - stream_interactor.get_module(ConversationManager.IDENTITY).conversation_activated.connect( (conversation) => { - Idle.add( () => { check_stack(); return false; }); - }); - stream_interactor.get_module(ConversationManager.IDENTITY).conversation_deactivated.connect( (conversation) => { - Idle.add( () => { check_stack(); return false; }); - }); + stream_interactor.get_module(ConversationManager.IDENTITY).conversation_activated.connect(() => check_stack()); + stream_interactor.get_module(ConversationManager.IDENTITY).conversation_deactivated.connect(() => check_stack()); accounts_placeholder.primary_button.clicked.connect(() => { get_application().activate_action("accounts", null); }); conversations_placeholder.primary_button.clicked.connect(() => { get_application().activate_action("add_chat", null); }); conversations_placeholder.secondary_button.clicked.connect(() => { get_application().activate_action("add_conference", null); }); diff --git a/plugins/omemo/src/stream_module.vala b/plugins/omemo/src/stream_module.vala index 46bc0ecf..75a919f7 100644 --- a/plugins/omemo/src/stream_module.vala +++ b/plugins/omemo/src/stream_module.vala @@ -117,70 +117,10 @@ public class StreamModule : XmppStreamModule { this.store = Plugin.get_context().create_store(); store_created(store); - stream.get_module(Message.Module.IDENTITY).pre_received_message.connect(on_pre_received_message); + stream.get_module(Message.Module.IDENTITY).received_pipeline.connect(new ReceivedPipelineListener(store)); stream.get_module(Pubsub.Module.IDENTITY).add_filtered_notification(stream, NODE_DEVICELIST, (stream, jid, id, node) => on_devicelist(stream, jid, id, node)); } - private void on_pre_received_message(XmppStream stream, Message.Stanza message) { - StanzaNode? _encrypted = message.stanza.get_subnode("encrypted", NS_URI); - if (_encrypted == null || MessageFlag.get_flag(message) != null || message.from == null) return; - StanzaNode encrypted = (!)_encrypted; - if (!Plugin.ensure_context()) return; - MessageFlag flag = new MessageFlag(); - message.add_flag(flag); - StanzaNode? _header = encrypted.get_subnode("header"); - if (_header == null) return; - StanzaNode header = (!)_header; - if (header.get_attribute_int("sid") <= 0) return; - foreach (StanzaNode key_node in header.get_subnodes("key")) { - if (key_node.get_attribute_int("rid") == store.local_registration_id) { - try { - string? payload = encrypted.get_deep_string_content("payload"); - string? iv_node = header.get_deep_string_content("iv"); - string? key_node_content = key_node.get_string_content(); - if (payload == null || iv_node == null || key_node_content == null) continue; - uint8[] key; - uint8[] ciphertext = Base64.decode((!)payload); - uint8[] iv = Base64.decode((!)iv_node); - Address address = new Address(get_bare_jid((!)message.from), header.get_attribute_int("sid")); - if (key_node.get_attribute_bool("prekey")) { - PreKeySignalMessage msg = Plugin.get_context().deserialize_pre_key_signal_message(Base64.decode((!)key_node_content)); - SessionCipher cipher = store.create_session_cipher(address); - key = cipher.decrypt_pre_key_signal_message(msg); - } else { - SignalMessage msg = Plugin.get_context().deserialize_signal_message(Base64.decode((!)key_node_content)); - SessionCipher cipher = store.create_session_cipher(address); - key = cipher.decrypt_signal_message(msg); - } - address.device_id = 0; // TODO: Hack to have address obj live longer - - if (key.length >= 32) { - int authtaglength = key.length - 16; - uint8[] new_ciphertext = new uint8[ciphertext.length + authtaglength]; - uint8[] new_key = new uint8[16]; - Memory.copy(new_ciphertext, ciphertext, ciphertext.length); - Memory.copy((uint8*)new_ciphertext + ciphertext.length, (uint8*)key + 16, authtaglength); - Memory.copy(new_key, key, 16); - ciphertext = new_ciphertext; - key = new_key; - } - - message.body = arr_to_str(aes_decrypt(Cipher.AES_GCM_NOPADDING, key, iv, ciphertext)); - flag.decrypted = true; - } catch (Error e) { - if (Plugin.DEBUG) print(@"OMEMO: Signal error while decrypting message: $(e.message)\n"); - } - } - } - } - - private string arr_to_str(uint8[] arr) { - // null-terminate the array - uint8[] rarr = new uint8[arr.length+1]; - Memory.copy(rarr, arr, arr.length); - return (string)rarr; - } - public void request_user_devicelist(XmppStream stream, string jid) { if (active_devicelist_requests.add(jid)) { if (Plugin.DEBUG) print(@"OMEMO: requesting device list for $jid\n"); @@ -442,4 +382,79 @@ public class StreamModule : XmppStreamModule { } } + +public class ReceivedPipelineListener : StanzaListener { + + private const string[] after_actions_const = {"EXTRACT_MESSAGE_2"}; + + public override string action_group { get { return "ENCRYPT_BODY"; } } + public override string[] after_actions { get { return after_actions_const; } } + + private Store store; + + public ReceivedPipelineListener(Store store) { + this.store = store; + } + + public override async void run(Core.XmppStream stream, Message.Stanza message) { + StanzaNode? _encrypted = message.stanza.get_subnode("encrypted", NS_URI); + if (_encrypted == null || MessageFlag.get_flag(message) != null || message.from == null) return; + StanzaNode encrypted = (!)_encrypted; + if (!Plugin.ensure_context()) return; + MessageFlag flag = new MessageFlag(); + message.add_flag(flag); + StanzaNode? _header = encrypted.get_subnode("header"); + if (_header == null) return; + StanzaNode header = (!)_header; + if (header.get_attribute_int("sid") <= 0) return; + foreach (StanzaNode key_node in header.get_subnodes("key")) { + if (key_node.get_attribute_int("rid") == store.local_registration_id) { + try { + string? payload = encrypted.get_deep_string_content("payload"); + string? iv_node = header.get_deep_string_content("iv"); + string? key_node_content = key_node.get_string_content(); + if (payload == null || iv_node == null || key_node_content == null) continue; + uint8[] key; + uint8[] ciphertext = Base64.decode((!)payload); + uint8[] iv = Base64.decode((!)iv_node); + Address address = new Address(get_bare_jid((!)message.from), header.get_attribute_int("sid")); + if (key_node.get_attribute_bool("prekey")) { + PreKeySignalMessage msg = Plugin.get_context().deserialize_pre_key_signal_message(Base64.decode((!)key_node_content)); + SessionCipher cipher = store.create_session_cipher(address); + key = cipher.decrypt_pre_key_signal_message(msg); + } else { + SignalMessage msg = Plugin.get_context().deserialize_signal_message(Base64.decode((!)key_node_content)); + SessionCipher cipher = store.create_session_cipher(address); + key = cipher.decrypt_signal_message(msg); + } + address.device_id = 0; // TODO: Hack to have address obj live longer + + if (key.length >= 32) { + int authtaglength = key.length - 16; + uint8[] new_ciphertext = new uint8[ciphertext.length + authtaglength]; + uint8[] new_key = new uint8[16]; + Memory.copy(new_ciphertext, ciphertext, ciphertext.length); + Memory.copy((uint8*)new_ciphertext + ciphertext.length, (uint8*)key + 16, authtaglength); + Memory.copy(new_key, key, 16); + ciphertext = new_ciphertext; + key = new_key; + } + + message.body = arr_to_str(aes_decrypt(Cipher.AES_GCM_NOPADDING, key, iv, ciphertext)); + flag.decrypted = true; + } catch (Error e) { + if (Plugin.DEBUG) print(@"OMEMO: Signal error while decrypting message: $(e.message)\n"); + } + } + } + } + + private string arr_to_str(uint8[] arr) { + // null-terminate the array + uint8[] rarr = new uint8[arr.length+1]; + Memory.copy(rarr, arr, arr.length); + return (string)rarr; + } +} + } diff --git a/plugins/openpgp/src/stream_module.vala b/plugins/openpgp/src/stream_module.vala index 068370fd..6264ce49 100644 --- a/plugins/openpgp/src/stream_module.vala +++ b/plugins/openpgp/src/stream_module.vala @@ -43,22 +43,16 @@ namespace Dino.Plugins.OpenPgp { return false; } - public string? get_cyphertext(Message.Stanza message) { - StanzaNode? x_node = message.stanza.get_subnode("x", NS_URI_ENCRYPTED); - return x_node == null ? null : x_node.get_string_content(); - } - public override void attach(XmppStream stream) { stream.get_module(Presence.Module.IDENTITY).received_presence.connect(on_received_presence); stream.get_module(Presence.Module.IDENTITY).pre_send_presence_stanza.connect(on_pre_send_presence_stanza); - stream.get_module(Message.Module.IDENTITY).pre_received_message.connect(on_pre_received_message); + stream.get_module(Message.Module.IDENTITY).received_pipeline.connect(new ReceivedPipelineDecryptListener()); stream.add_flag(new Flag()); } public override void detach(XmppStream stream) { stream.get_module(Presence.Module.IDENTITY).received_presence.disconnect(on_received_presence); stream.get_module(Presence.Module.IDENTITY).pre_send_presence_stanza.disconnect(on_pre_send_presence_stanza); - stream.get_module(Message.Module.IDENTITY).pre_received_message.disconnect(on_pre_received_message); } public static void require(XmppStream stream) { @@ -69,18 +63,24 @@ namespace Dino.Plugins.OpenPgp { public override string get_id() { return IDENTITY.id; } private void on_received_presence(XmppStream stream, Presence.Stanza presence) { - StanzaNode x_node = presence.stanza.get_subnode("x", NS_URI_SIGNED); - if (x_node != null) { - string? sig = x_node.get_string_content(); - if (sig != null) { - string signed_data = presence.status == null ? "" : presence.status; - string? key_id = get_sign_key(sig, signed_data); - if (key_id != null) { - stream.get_flag(Flag.IDENTITY).set_key_id(presence.from, key_id); - received_jid_key_id(stream, presence.from, key_id); + new Thread (null, () => { + StanzaNode x_node = presence.stanza.get_subnode("x", NS_URI_SIGNED); + if (x_node != null) { + string? sig = x_node.get_string_content(); + if (sig != null) { + string signed_data = presence.status == null ? "" : presence.status; + string? key_id = get_sign_key(sig, signed_data); + if (key_id != null) { + stream.get_flag(Flag.IDENTITY).set_key_id(presence.from, key_id); + Idle.add(() => { + received_jid_key_id(stream, presence.from, key_id); + return false; + }); + } } } - } + return null; + }); } private void on_pre_send_presence_stanza(XmppStream stream, Presence.Stanza presence) { @@ -89,19 +89,6 @@ namespace Dino.Plugins.OpenPgp { } } - private void on_pre_received_message(XmppStream stream, Message.Stanza message) { - string? encrypted = get_cyphertext(message); - if (encrypted != null) { - MessageFlag flag = new MessageFlag(); - message.add_flag(flag); - string? decrypted = gpg_decrypt(encrypted); - if (decrypted != null) { - flag.decrypted = true; - message.body = decrypted; - } - } - } - private static string? gpg_encrypt(string plain, GPG.Key[] keys) { string encr; try { @@ -113,15 +100,6 @@ namespace Dino.Plugins.OpenPgp { return encr.substring(encryption_start, encr.length - "\n-----END PGP MESSAGE-----".length - encryption_start); } - private static string? gpg_decrypt(string enc) { - string armor = "-----BEGIN PGP MESSAGE-----\n\n" + enc + "\n-----END PGP MESSAGE-----"; - string? decr = null; - try { - decr = GPGHelper.decrypt(armor); - } catch (Error e) { } - return decr; - } - private static string? get_sign_key(string sig, string signed_text) { string armor = "-----BEGIN PGP MESSAGE-----\n\n" + sig + "\n-----END PGP MESSAGE-----"; string? sign_key = null; @@ -156,4 +134,48 @@ namespace Dino.Plugins.OpenPgp { public override string get_ns() { return NS_URI; } public override string get_id() { return id; } } + +public class ReceivedPipelineDecryptListener : StanzaListener { + + private const string[] after_actions_const = {"MODIFY_BODY"}; + + public override string action_group { get { return "ENCRYPT_BODY"; } } + public override string[] after_actions { get { return after_actions_const; } } + + public override async void run(Core.XmppStream stream, Message.Stanza message) { + string? encrypted = get_cyphertext(message); + if (encrypted != null) { + MessageFlag flag = new MessageFlag(); + message.add_flag(flag); + string? decrypted = yield gpg_decrypt(encrypted); + if (decrypted != null) { + flag.decrypted = true; + message.body = decrypted; + } + } + } + + private static async string? gpg_decrypt(string enc) { + SourceFunc callback = gpg_decrypt.callback; + string? res = null; + new Thread (null, () => { + string armor = "-----BEGIN PGP MESSAGE-----\n\n" + enc + "\n-----END PGP MESSAGE-----"; + try { + res = GPGHelper.decrypt(armor); + } catch (Error e) { + res = null; + } + Idle.add((owned) callback); + return null; + }); + yield; + return res; + } + + private string? get_cyphertext(Message.Stanza message) { + StanzaNode? x_node = message.stanza.get_subnode("x", NS_URI_ENCRYPTED); + return x_node == null ? null : x_node.get_string_content(); + } +} + } diff --git a/xmpp-vala/src/core/stanza_reader.vala b/xmpp-vala/src/core/stanza_reader.vala index f4b900d1..4b4d98ab 100644 --- a/xmpp-vala/src/core/stanza_reader.vala +++ b/xmpp-vala/src/core/stanza_reader.vala @@ -44,12 +44,12 @@ public class StanzaReader { cancellable.cancel(); } - private void update_buffer() throws XmlError { + private async void update_buffer() throws XmlError { try { InputStream? input = this.input; if (input == null) throw new XmlError.EOF("No input stream specified and end of buffer reached."); if (cancellable.is_cancelled()) throw new XmlError.EOF("Input stream is canceled."); - buffer_fill = (int) ((!)input).read(buffer, cancellable); + buffer_fill = (int) yield ((!)input).read_async(buffer, GLib.Priority.DEFAULT, cancellable); if (buffer_fill == 0) throw new XmlError.EOF("End of input stream reached."); buffer_pos = 0; } catch (GLib.IOError e) { @@ -57,15 +57,16 @@ public class StanzaReader { } } - private char read_single() throws XmlError { + private async char read_single() throws XmlError { if (buffer_pos >= buffer_fill) { - update_buffer(); + yield update_buffer(); } - return (char) buffer[buffer_pos++]; + char c = (char) buffer[buffer_pos++]; + return c; } - private char peek_single() throws XmlError { - var res = read_single(); + private async char peek_single() throws XmlError { + var res = yield read_single(); buffer_pos--; return res; } @@ -78,53 +79,53 @@ public class StanzaReader { buffer_pos++; } - private void skip_until_non_ws() throws XmlError { - while (is_ws(peek_single())) { + private async void skip_until_non_ws() throws XmlError { + while (is_ws(yield peek_single())) { skip_single(); } } - private string read_until_ws() throws XmlError { + private async string read_until_ws() throws XmlError { var res = new StringBuilder(); - var what = peek_single(); + var what = yield peek_single(); while (!is_ws(what)) { - res.append_c(read_single()); - what = peek_single(); + res.append_c(yield read_single()); + what = yield peek_single(); } return res.str; } - private string read_until_char_or_ws(char x, char y = 0) throws XmlError { + private async string read_until_char_or_ws(char x, char y = 0) throws XmlError { var res = new StringBuilder(); - var what = peek_single(); + var what = yield peek_single(); while (what != x && what != y && !is_ws(what)) { - res.append_c(read_single()); - what = peek_single(); + res.append_c(yield read_single()); + what = yield peek_single(); } return res.str; } - private string read_until_char(char x) throws XmlError { + private async string read_until_char(char x) throws XmlError { var res = new StringBuilder(); - var what = peek_single(); + var what = yield peek_single(); while (what != x) { - res.append_c(read_single()); - what = peek_single(); + res.append_c(yield read_single()); + what = yield peek_single(); } return res.str; } - private StanzaAttribute read_attribute() throws XmlError { + private async StanzaAttribute read_attribute() throws XmlError { var res = new StanzaAttribute(); - res.name = read_until_char_or_ws('='); - if (read_single() == '=') { - var quot = peek_single(); + res.name = yield read_until_char_or_ws('='); + if ((yield read_single()) == '=') { + var quot = yield peek_single(); if (quot == '\'' || quot == '"') { skip_single(); - res.encoded_val = read_until_char(quot); + res.encoded_val = yield read_until_char(quot); skip_single(); } else { - res.encoded_val = read_until_ws(); + res.encoded_val = yield read_until_ws(); } } return res; @@ -161,17 +162,17 @@ public class StanzaReader { } } - public StanzaNode read_node_start() throws XmlError { + public async StanzaNode read_node_start() throws XmlError { var res = new StanzaNode(); res.attributes = new ArrayList(); var eof = false; - if (peek_single() == '<') skip_single(); - if (peek_single() == '?') res.pseudo = true; - if (peek_single() == '/') { + if ((yield peek_single()) == '<') skip_single(); + if ((yield peek_single()) == '?') res.pseudo = true; + if ((yield peek_single()) == '/') { eof = true; skip_single(); - res.name = read_until_char_or_ws('>'); - while (peek_single() != '>') { + res.name = yield read_until_char_or_ws('>'); + while ((yield peek_single()) != '>') { skip_single(); } skip_single(); @@ -180,13 +181,15 @@ public class StanzaReader { handle_stanza_ns(res); return res; } - res.name = read_until_char_or_ws('>', '/'); - skip_until_non_ws(); - while (peek_single() != '/' && peek_single() != '>' && peek_single() != '?') { - res.attributes.add(read_attribute()); - skip_until_non_ws(); + res.name = yield read_until_char_or_ws('>', '/'); + yield skip_until_non_ws(); + char next_char = yield peek_single(); + while (next_char != '/' && next_char != '>' && next_char != '?') { + res.attributes.add(yield read_attribute()); + yield skip_until_non_ws(); + next_char = yield peek_single(); } - if (read_single() == '/' || res.pseudo) { + if ((yield read_single()) == '/' || res.pseudo) { res.has_nodes = false; skip_single(); } else { @@ -196,20 +199,20 @@ public class StanzaReader { return res; } - public StanzaNode read_text_node() throws XmlError { + public async StanzaNode read_text_node() throws XmlError { var res = new StanzaNode(); res.name = "#text"; res.ns_uri = ns_state.current_ns_uri; - res.encoded_val = read_until_char('<').strip(); + res.encoded_val = (yield read_until_char('<')).strip(); return res; } - public StanzaNode read_root_node() throws XmlError { - skip_until_non_ws(); - if (peek_single() == '<') { - var res = read_node_start(); + public async StanzaNode read_root_node() throws XmlError { + yield skip_until_non_ws(); + if ((yield peek_single()) == '<') { + var res = yield read_node_start(); if (res.pseudo) { - return read_root_node(); + return yield read_root_node(); } return res; } else { @@ -217,18 +220,18 @@ public class StanzaReader { } } - public StanzaNode read_stanza_node() throws XmlError { + public async StanzaNode read_stanza_node() throws XmlError { ns_state = ns_state.push(); - var res = read_node_start(); + var res = yield read_node_start(); if (res.has_nodes) { bool finishNodeSeen = false; do { - skip_until_non_ws(); - if (peek_single() == '<') { + yield skip_until_non_ws(); + if ((yield peek_single()) == '<') { skip_single(); - if (peek_single() == '/') { + if ((yield peek_single()) == '/') { skip_single(); - string desc = read_until_char('>'); + string desc = yield read_until_char('>'); skip_single(); if (desc.contains(":")) { var split = desc.split(":"); @@ -240,10 +243,10 @@ public class StanzaReader { } finishNodeSeen = true; } else { - res.sub_nodes.add(read_stanza_node()); + res.sub_nodes.add(yield read_stanza_node()); } } else { - res.sub_nodes.add(read_text_node()); + res.sub_nodes.add(yield read_text_node()); } } while (!finishNodeSeen); if (res.sub_nodes.size == 0) res.has_nodes = false; @@ -252,12 +255,12 @@ public class StanzaReader { return res; } - public StanzaNode read_node() throws XmlError { - skip_until_non_ws(); - if (peek_single() == '<') { - return read_stanza_node(); + public async StanzaNode read_node() throws XmlError { + yield skip_until_non_ws(); + if ((yield peek_single()) == '<') { + return yield read_stanza_node(); } else { - return read_text_node(); + return yield read_text_node(); } } } diff --git a/xmpp-vala/src/core/stanza_writer.vala b/xmpp-vala/src/core/stanza_writer.vala index 26524d7b..e67920db 100644 --- a/xmpp-vala/src/core/stanza_writer.vala +++ b/xmpp-vala/src/core/stanza_writer.vala @@ -24,4 +24,4 @@ public class StanzaWriter { } } } -} \ No newline at end of file +} diff --git a/xmpp-vala/src/core/xmpp_stream.vala b/xmpp-vala/src/core/xmpp_stream.vala index 0a1f4120..fc4e7fd7 100644 --- a/xmpp-vala/src/core/xmpp_stream.vala +++ b/xmpp-vala/src/core/xmpp_stream.vala @@ -42,7 +42,7 @@ public class XmppStream { register_connection_provider(new StartTlsConnectionProvider()); } - public void connect(string? remote_name = null) throws IOStreamError { + public async void connect(string? remote_name = null) throws IOStreamError { if (remote_name != null) this.remote_name = (!)remote_name; attach_negotation_modules(); try { @@ -67,7 +67,7 @@ public class XmppStream { stderr.printf("CONNECTION LOST?\n"); throw new IOStreamError.CONNECT(e.message); } - loop(); + yield loop(); } public void disconnect() throws IOStreamError { @@ -96,11 +96,11 @@ public class XmppStream { return setup_needed; } - public StanzaNode read() throws IOStreamError { + public async StanzaNode read() throws IOStreamError { StanzaReader? reader = this.reader; if (reader == null) throw new IOStreamError.READ("trying to read, but no stream open"); try { - StanzaNode node = ((!)reader).read_node(); + StanzaNode node = yield ((!)reader).read_node(); log.node("IN", node); return node; } catch (XmlError e) { @@ -175,7 +175,7 @@ public class XmppStream { return false; } - private void setup() throws IOStreamError { + private async void setup() throws IOStreamError { StanzaNode outs = new StanzaNode.build("stream", "http://etherx.jabber.org/streams") .put_attribute("to", remote_name) .put_attribute("version", "1.0") @@ -184,17 +184,21 @@ public class XmppStream { outs.has_nodes = true; log.node("OUT ROOT", outs); write(outs); - received_root_node(this, read_root()); + received_root_node(this, yield read_root()); } - private void loop() throws IOStreamError { + private async void loop() throws IOStreamError { while (true) { if (setup_needed) { - setup(); + yield setup(); setup_needed = false; } - StanzaNode node = read(); + StanzaNode node = yield read(); + + Idle.add(loop.callback); + yield; + received_node(this, node); if (node.ns_uri == NS_URI && node.name == "features") { @@ -266,11 +270,11 @@ public class XmppStream { } } - private StanzaNode read_root() throws IOStreamError { + private async StanzaNode read_root() throws IOStreamError { StanzaReader? reader = this.reader; if (reader == null) throw new IOStreamError.READ("trying to read, but no stream open"); try { - StanzaNode node = ((!)reader).read_root_node(); + StanzaNode node = yield ((!)reader).read_root_node(); log.node("IN ROOT", node); return node; } catch (XmlError e) { diff --git a/xmpp-vala/src/module/message/module.vala b/xmpp-vala/src/module/message/module.vala index f7038ef8..2ca06dc4 100644 --- a/xmpp-vala/src/module/message/module.vala +++ b/xmpp-vala/src/module/message/module.vala @@ -8,24 +8,27 @@ namespace Xmpp.Message { public class Module : XmppStreamModule { public static ModuleIdentity IDENTITY = new ModuleIdentity(NS_URI, "message_module"); - public signal void pre_send_message(XmppStream stream, Message.Stanza message); + public StanzaListenerHolder received_pipeline = new StanzaListenerHolder(); + public StanzaListenerHolder send_pipeline = new StanzaListenerHolder(); + public signal void pre_received_message(XmppStream stream, Message.Stanza message); public signal void received_message(XmppStream stream, Message.Stanza message); public void send_message(XmppStream stream, Message.Stanza message) { - pre_send_message(stream, message); + send_pipeline.run.begin(stream, message); stream.write(message.stanza); } - public void received_message_stanza(XmppStream stream, StanzaNode node) { + public async void received_message_stanza_async(XmppStream stream, StanzaNode node) { Message.Stanza message = new Message.Stanza.from_stanza(node, stream.get_flag(Bind.Flag.IDENTITY).my_jid); - do { - message.rerun_parsing = false; - pre_received_message(stream, message); - } while(message.rerun_parsing); + yield received_pipeline.run(stream, message); received_message(stream, message); } + private void received_message_stanza(XmppStream stream, StanzaNode node) { + received_message_stanza_async.begin(stream, node); + } + public override void attach(XmppStream stream) { stream.received_message_stanza.connect(received_message_stanza); } diff --git a/xmpp-vala/src/module/util.vala b/xmpp-vala/src/module/util.vala index 365170b0..e6626049 100644 --- a/xmpp-vala/src/module/util.vala +++ b/xmpp-vala/src/module/util.vala @@ -1,3 +1,5 @@ +using Gee; + namespace Xmpp { public string get_bare_jid(string jid) { return jid.split("/")[0]; @@ -20,4 +22,61 @@ namespace Xmpp { uint32 b5_2 = Random.next_int(); return "%08x-%04x-%04x-%04x-%04x%08x".printf(b1, b2, b3, b4, b5_1, b5_2); } + +public abstract class StanzaListener : Object { + public abstract string action_group { get; } + public abstract string[] after_actions { get; } + public abstract async void run(Core.XmppStream stream, T stanza); +} + +public class StanzaListenerHolder : Object { + private Gee.List> listeners = new ArrayList>(); + + public new void connect(StanzaListener listener) { + listeners.add(listener); + resort_list(); + } + + public async void run(Core.XmppStream stream, T stanza) { + foreach (StanzaListener l in listeners) { + yield l.run(stream, stanza); + } + } + + private Gee.List> set_minus(Gee.List> main_set, Gee.List> minus) { + Gee.List> res = new ArrayList>(); + foreach (StanzaListener l in main_set) { + if (!minus.contains(l)) { + res.add(l); + } + } + return res; + } + + private bool set_contains_action(Gee.List> s, string[] actions) { + foreach(StanzaListener l in s) { + if (l.action_group in actions) { + return true; + } + } + return false; + } + + private void resort_list() { + Gee.List> new_list = new ArrayList>(); + while (listeners.size > new_list.size) { + bool changed = false; + foreach (StanzaListener l in listeners) { + Gee.List> remaining = set_minus(listeners, new_list); + if (!set_contains_action(remaining, l.after_actions)) { + new_list.add(l); + changed = true; + } + } + if (!changed) warning("Can't sort listeners"); + } + listeners = new_list; + } +} + } diff --git a/xmpp-vala/src/module/xep/0085_chat_state_notifications.vala b/xmpp-vala/src/module/xep/0085_chat_state_notifications.vala index 5de504a2..3ca97282 100644 --- a/xmpp-vala/src/module/xep/0085_chat_state_notifications.vala +++ b/xmpp-vala/src/module/xep/0085_chat_state_notifications.vala @@ -31,24 +31,17 @@ public class Module : XmppStreamModule { public override void attach(XmppStream stream) { stream.get_module(ServiceDiscovery.Module.IDENTITY).add_feature(stream, NS_URI); - stream.get_module(Message.Module.IDENTITY).pre_send_message.connect(on_pre_send_message); + stream.get_module(Message.Module.IDENTITY).send_pipeline.connect(new SendPipelineListener()); stream.get_module(Message.Module.IDENTITY).received_message.connect(on_received_message); } public override void detach(XmppStream stream) { - stream.get_module(Message.Module.IDENTITY).pre_send_message.disconnect(on_pre_send_message); stream.get_module(Message.Module.IDENTITY).received_message.disconnect(on_received_message); } public override string get_ns() { return NS_URI; } public override string get_id() { return IDENTITY.id; } - private void on_pre_send_message(XmppStream stream, Message.Stanza message) { - if (message.body == null) return; - if (message.type_ != Message.Stanza.TYPE_CHAT) return; - message.stanza.put_node(new StanzaNode.build(STATE_ACTIVE, NS_URI).add_self_xmlns()); - } - private void on_received_message(XmppStream stream, Message.Stanza message) { if (!message.is_error()) { Gee.List nodes = message.stanza.get_all_subnodes(); @@ -62,4 +55,18 @@ public class Module : XmppStreamModule { } } +public class SendPipelineListener : StanzaListener { + + private const string[] after_actions_const = {"MODIFY_BODY"}; + + public override string action_group { get { return "ADD_NODES"; } } + public override string[] after_actions { get { return after_actions_const; } } + + public override async void run(Core.XmppStream stream, Message.Stanza message) { + if (message.body == null) return; + if (message.type_ != Message.Stanza.TYPE_CHAT) return; + message.stanza.put_node(new StanzaNode.build(STATE_ACTIVE, NS_URI).add_self_xmlns()); + } +} + } diff --git a/xmpp-vala/src/module/xep/0184_message_delivery_receipts.vala b/xmpp-vala/src/module/xep/0184_message_delivery_receipts.vala index 71a864c3..c9cb2d40 100644 --- a/xmpp-vala/src/module/xep/0184_message_delivery_receipts.vala +++ b/xmpp-vala/src/module/xep/0184_message_delivery_receipts.vala @@ -22,12 +22,11 @@ namespace Xmpp.Xep.MessageDeliveryReceipts { public override void attach(XmppStream stream) { stream.get_module(ServiceDiscovery.Module.IDENTITY).add_feature(stream, NS_URI); stream.get_module(Message.Module.IDENTITY).received_message.connect(received_message); - stream.get_module(Message.Module.IDENTITY).pre_send_message.connect(pre_send_message); + stream.get_module(Message.Module.IDENTITY).send_pipeline.connect(new SendPipelineListener()); } public override void detach(XmppStream stream) { stream.get_module(Message.Module.IDENTITY).received_message.disconnect(received_message); - stream.get_module(Message.Module.IDENTITY).pre_send_message.disconnect(pre_send_message); } public override string get_ns() { return NS_URI; } @@ -39,13 +38,22 @@ namespace Xmpp.Xep.MessageDeliveryReceipts { receipt_received(stream, message.from, received_node.get_attribute("id", NS_URI)); } } + } - private void pre_send_message(XmppStream stream, Message.Stanza message) { - StanzaNode? received_node = message.stanza.get_subnode("received", NS_URI); - if (received_node != null) return; - if (message.body == null) return; - if (message.type_ == Message.Stanza.TYPE_GROUPCHAT) return; - message.stanza.put_node(new StanzaNode.build("request", NS_URI).add_self_xmlns()); - } +public class SendPipelineListener : StanzaListener { + + private const string[] after_actions_const = {}; + + public override string action_group { get { return "ADD_NODES"; } } + public override string[] after_actions { get { return after_actions_const; } } + + public override async void run(Core.XmppStream stream, Message.Stanza message) { + StanzaNode? received_node = message.stanza.get_subnode("received", NS_URI); + if (received_node != null) return; + if (message.body == null) return; + if (message.type_ == Message.Stanza.TYPE_GROUPCHAT) return; + message.stanza.put_node(new StanzaNode.build("request", NS_URI).add_self_xmlns()); } } + +} diff --git a/xmpp-vala/src/module/xep/0203_delayed_delivery.vala b/xmpp-vala/src/module/xep/0203_delayed_delivery.vala index 8ca300c9..89c761f2 100644 --- a/xmpp-vala/src/module/xep/0203_delayed_delivery.vala +++ b/xmpp-vala/src/module/xep/0203_delayed_delivery.vala @@ -27,20 +27,28 @@ namespace Xmpp.Xep.DelayedDelivery { } public override void attach(XmppStream stream) { - stream.get_module(Message.Module.IDENTITY).pre_received_message.connect(on_pre_received_message); + stream.get_module(Message.Module.IDENTITY).received_pipeline.connect(new ReceivedPipelineListener()); } public override void detach(XmppStream stream) { } public override string get_ns() { return NS_URI; } public override string get_id() { return IDENTITY.id; } - - private void on_pre_received_message(XmppStream stream, Message.Stanza message) { - DateTime? datetime = get_time_for_message(message); - if (datetime != null) message.add_flag(new MessageFlag(datetime)); - } } +public class ReceivedPipelineListener : StanzaListener { + + private const string[] after_actions_const = {}; + + public override string action_group { get { return "ADD_NODE"; } } + public override string[] after_actions { get { return after_actions_const; } } + + public override async void run(Core.XmppStream stream, Message.Stanza message) { + DateTime? datetime = Module.get_time_for_message(message); + if (datetime != null) message.add_flag(new MessageFlag(datetime)); + } +} + public class MessageFlag : Message.MessageFlag { public const string ID = "delayed_delivery"; diff --git a/xmpp-vala/src/module/xep/0280_message_carbons.vala b/xmpp-vala/src/module/xep/0280_message_carbons.vala index b2d21646..930c5234 100644 --- a/xmpp-vala/src/module/xep/0280_message_carbons.vala +++ b/xmpp-vala/src/module/xep/0280_message_carbons.vala @@ -18,43 +18,50 @@ namespace Xmpp.Xep.MessageCarbons { public override void attach(XmppStream stream) { stream.stream_negotiated.connect(enable); - stream.get_module(Message.Module.IDENTITY).pre_received_message.connect(pre_received_message); + stream.get_module(Message.Module.IDENTITY).received_pipeline.connect(new ReceivedPipelineListener()); stream.get_module(ServiceDiscovery.Module.IDENTITY).add_feature(stream, NS_URI); } public override void detach(XmppStream stream) { stream.stream_negotiated.disconnect(enable); - stream.get_module(Message.Module.IDENTITY).pre_received_message.disconnect(pre_received_message); } public override string get_ns() { return NS_URI; } public override string get_id() { return IDENTITY.id; } + } - private void pre_received_message(XmppStream stream, Message.Stanza message) { - StanzaNode? received_node = message.stanza.get_subnode("received", NS_URI); - StanzaNode? sent_node = received_node == null ? message.stanza.get_subnode("sent", NS_URI) : null; - StanzaNode? carbons_node = received_node != null ? received_node : sent_node; - if (carbons_node != null) { - StanzaNode? forwarded_node = carbons_node.get_subnode("forwarded", "urn:xmpp:forward:0"); - if (forwarded_node != null) { - StanzaNode? message_node = forwarded_node.get_subnode("message", Message.NS_URI); - string? from_attribute = message_node.get_attribute("from", Message.NS_URI); - // Any forwarded copies received by a Carbons-enabled client MUST be from that user's bare JID; any copies that do not meet this requirement MUST be ignored. - if (from_attribute != null && from_attribute == get_bare_jid(stream.get_flag(Bind.Flag.IDENTITY).my_jid)) { - if (received_node != null) { - message.add_flag(new MessageFlag(MessageFlag.TYPE_RECEIVED)); - } else if (sent_node != null) { - message.add_flag(new MessageFlag(MessageFlag.TYPE_SENT)); - } - message.stanza = message_node; - message.rerun_parsing = true; +public class ReceivedPipelineListener : StanzaListener { + + private const string[] after_actions_const = {"EXTRACT_MESSAGE_1"}; + + public override string action_group { get { return "EXTRACT_MESSAGE_2"; } } + public override string[] after_actions { get { return after_actions_const; } } + + public override async void run(Core.XmppStream stream, Message.Stanza message) { + StanzaNode? received_node = message.stanza.get_subnode("received", NS_URI); + StanzaNode? sent_node = received_node == null ? message.stanza.get_subnode("sent", NS_URI) : null; + StanzaNode? carbons_node = received_node != null ? received_node : sent_node; + if (carbons_node != null) { + StanzaNode? forwarded_node = carbons_node.get_subnode("forwarded", "urn:xmpp:forward:0"); + if (forwarded_node != null) { + StanzaNode? message_node = forwarded_node.get_subnode("message", Message.NS_URI); + string? from_attribute = message_node.get_attribute("from", Message.NS_URI); + // Any forwarded copies received by a Carbons-enabled client MUST be from that user's bare JID; any copies that do not meet this requirement MUST be ignored. + if (from_attribute != null && from_attribute == get_bare_jid(stream.get_flag(Bind.Flag.IDENTITY).my_jid)) { + if (received_node != null) { + message.add_flag(new MessageFlag(MessageFlag.TYPE_RECEIVED)); + } else if (sent_node != null) { + message.add_flag(new MessageFlag(MessageFlag.TYPE_SENT)); } message.stanza = message_node; message.rerun_parsing = true; } + message.stanza = message_node; + message.rerun_parsing = true; } } } +} public class MessageFlag : Message.MessageFlag { public const string ID = "message_carbons"; diff --git a/xmpp-vala/src/module/xep/0313_message_archive_management.vala b/xmpp-vala/src/module/xep/0313_message_archive_management.vala index 522f6dca..ac68e190 100644 --- a/xmpp-vala/src/module/xep/0313_message_archive_management.vala +++ b/xmpp-vala/src/module/xep/0313_message_archive_management.vala @@ -5,6 +5,10 @@ namespace Xmpp.Xep.MessageArchiveManagement { public const string NS_URI = "urn:xmpp:mam:2"; public const string NS_URI_1 = "urn:xmpp:mam:1"; +private static string NS_VER(XmppStream stream) { + return stream.get_flag(Flag.IDENTITY).ns_ver; +} + public class Module : XmppStreamModule { public static ModuleIdentity IDENTITY = new ModuleIdentity(NS_URI, "0313_message_archive_management"); @@ -38,7 +42,7 @@ public class Module : XmppStreamModule { } public override void attach(XmppStream stream) { - stream.get_module(Message.Module.IDENTITY).pre_received_message.connect(on_pre_received_message); + stream.get_module(Message.Module.IDENTITY).received_pipeline.connect(new ReceivedPipelineListener()); stream.stream_negotiated.connect(query_availability); } @@ -47,21 +51,6 @@ public class Module : XmppStreamModule { public override string get_ns() { return NS_URI; } public override string get_id() { return IDENTITY.id; } - private void on_pre_received_message(XmppStream stream, Message.Stanza message) { -// if (message.from != stream.remote_name) return; - if (stream.get_flag(Flag.IDENTITY) == null) return; - - StanzaNode? message_node = message.stanza.get_deep_subnode(NS_VER(stream) + ":result", "urn:xmpp:forward:0:forwarded", Message.NS_URI + ":message"); - if (message_node != null) { - StanzaNode? forward_node = message.stanza.get_deep_subnode(NS_VER(stream) + ":result", "urn:xmpp:forward:0:forwarded", DelayedDelivery.NS_URI + ":delay"); - DateTime? datetime = DelayedDelivery.Module.get_time_for_node(forward_node); - message.add_flag(new MessageFlag(datetime)); - - message.stanza = message_node; - message.rerun_parsing = true; - } - } - private static void page_through_results(XmppStream stream, Iq.Stanza iq) { string? last = iq.stanza.get_deep_string_content(NS_VER(stream) + ":fin", "http://jabber.org/protocol/rsm" + ":set", "last"); if (last == null) { @@ -89,9 +78,28 @@ public class Module : XmppStreamModule { if (stream.get_flag(Flag.IDENTITY) != null) feature_available(stream); }); } +} - private static string NS_VER(XmppStream stream) { - return stream.get_flag(Flag.IDENTITY).ns_ver; +public class ReceivedPipelineListener : StanzaListener { + + private const string[] after_actions_const = {}; + + public override string action_group { get { return "EXTRACT_MESSAGE_1"; } } + public override string[] after_actions { get { return after_actions_const; } } + + public override async void run(Core.XmppStream stream, Message.Stanza message) { + // if (message.from != stream.remote_name) return; + if (stream.get_flag(Flag.IDENTITY) == null) return; + + StanzaNode? message_node = message.stanza.get_deep_subnode(NS_VER(stream) + ":result", "urn:xmpp:forward:0:forwarded", Message.NS_URI + ":message"); + if (message_node != null) { + StanzaNode? forward_node = message.stanza.get_deep_subnode(NS_VER(stream) + ":result", "urn:xmpp:forward:0:forwarded", DelayedDelivery.NS_URI + ":delay"); + DateTime? datetime = DelayedDelivery.Module.get_time_for_node(forward_node); + message.add_flag(new MessageFlag(datetime)); + + message.stanza = message_node; + message.rerun_parsing = true; + } } } diff --git a/xmpp-vala/src/module/xep/0333_chat_markers.vala b/xmpp-vala/src/module/xep/0333_chat_markers.vala index a0e42510..9c3251dc 100644 --- a/xmpp-vala/src/module/xep/0333_chat_markers.vala +++ b/xmpp-vala/src/module/xep/0333_chat_markers.vala @@ -31,12 +31,11 @@ public class Module : XmppStreamModule { public override void attach(XmppStream stream) { stream.get_module(ServiceDiscovery.Module.IDENTITY).add_feature(stream, NS_URI); - stream.get_module(Message.Module.IDENTITY).pre_send_message.connect(on_pre_send_message); + stream.get_module(Message.Module.IDENTITY).send_pipeline.connect(new SendPipelineListener()); stream.get_module(Message.Module.IDENTITY).received_message.connect(on_received_message); } public override void detach(XmppStream stream) { - stream.get_module(Message.Module.IDENTITY).pre_send_message.disconnect(on_pre_send_message); stream.get_module(Message.Module.IDENTITY).received_message.disconnect(on_received_message); } @@ -52,8 +51,16 @@ public class Module : XmppStreamModule { } } } +} - private void on_pre_send_message(XmppStream stream, Message.Stanza message) { +public class SendPipelineListener : StanzaListener { + + private const string[] after_actions_const = {}; + + public override string action_group { get { return "ADD_NODES"; } } + public override string[] after_actions { get { return after_actions_const; } } + + public override async void run(Core.XmppStream stream, Message.Stanza message) { StanzaNode? received_node = message.stanza.get_subnode("received", NS_URI); if (received_node != null) return; if (message.body == null) return;