Read+(write) stream async
This commit is contained in:
parent
1d0745177e
commit
3f531d6b91
|
@ -20,7 +20,6 @@ public class ConnectionManager {
|
|||
private ArrayList<Account> connection_todo = new ArrayList<Account>(Account.equals_func);
|
||||
private HashMap<Account, Connection> connections = new HashMap<Account, Connection>(Account.hash_func, Account.equals_func);
|
||||
private HashMap<Account, ConnectionError> connection_errors = new HashMap<Account, ConnectionError>(Account.hash_func, Account.equals_func);
|
||||
private HashMap<Account, RecMutexWrap> connection_mutexes = new HashMap<Account, RecMutexWrap>(Account.hash_func, Account.equals_func);
|
||||
|
||||
private NetworkManager? network_manager;
|
||||
private Login1Manager? login1;
|
||||
|
@ -57,12 +56,6 @@ public class ConnectionManager {
|
|||
}
|
||||
}
|
||||
|
||||
private class RecMutexWrap {
|
||||
public RecMutex mutex = RecMutex();
|
||||
public void unlock() { mutex.unlock(); }
|
||||
public bool trylock() { return mutex.trylock(); }
|
||||
}
|
||||
|
||||
public ConnectionManager(ModuleManager module_manager) {
|
||||
this.module_manager = module_manager;
|
||||
network_manager = get_network_manager();
|
||||
|
@ -120,7 +113,6 @@ public class ConnectionManager {
|
|||
}
|
||||
|
||||
public Core.XmppStream? connect(Account account) {
|
||||
if (!connection_mutexes.has_key(account)) connection_mutexes[account] = new RecMutexWrap();
|
||||
if (!connection_todo.contains(account)) connection_todo.add(account);
|
||||
if (!connections.has_key(account)) {
|
||||
return connect_(account);
|
||||
|
@ -155,8 +147,6 @@ public class ConnectionManager {
|
|||
}
|
||||
|
||||
private Core.XmppStream? connect_(Account account, string? resource = null) {
|
||||
if (!connection_mutexes[account].trylock()) return null;
|
||||
|
||||
if (connections.has_key(account)) connections[account].stream.detach_modules();
|
||||
connection_errors.unset(account);
|
||||
if (resource == null) resource = account.resourcepart;
|
||||
|
@ -180,15 +170,21 @@ public class ConnectionManager {
|
|||
stream.received_node.connect(() => {
|
||||
connections[account].last_activity = new DateTime.now_utc();
|
||||
});
|
||||
new Thread<void*> (null, () => {
|
||||
connect_async.begin(account, stream);
|
||||
stream_opened(account, stream);
|
||||
|
||||
return stream;
|
||||
}
|
||||
|
||||
private async void connect_async(Account account, Core.XmppStream stream) {
|
||||
try {
|
||||
stream.connect(account.domainpart);
|
||||
yield stream.connect(account.domainpart);
|
||||
} catch (Error e) {
|
||||
stderr.printf("Stream Error: %s\n", e.message);
|
||||
change_connection_state(account, ConnectionState.DISCONNECTED);
|
||||
if (!connection_todo.contains(account)) {
|
||||
connections.unset(account);
|
||||
return null;
|
||||
return;
|
||||
}
|
||||
StreamError.Flag? flag = stream.get_flag(StreamError.Flag.IDENTITY);
|
||||
if (flag != null) {
|
||||
|
@ -196,13 +192,6 @@ public class ConnectionManager {
|
|||
}
|
||||
interpret_connection_error(account);
|
||||
}
|
||||
connection_mutexes[account].unlock();
|
||||
return null;
|
||||
});
|
||||
stream_opened(account, stream);
|
||||
|
||||
connection_mutexes[account].unlock();
|
||||
return stream;
|
||||
}
|
||||
|
||||
private void interpret_connection_error(Account account) {
|
||||
|
@ -243,7 +232,6 @@ public class ConnectionManager {
|
|||
}
|
||||
|
||||
private void check_reconnect(Account account) {
|
||||
if (!connection_mutexes[account].trylock()) return;
|
||||
bool acked = false;
|
||||
|
||||
Core.XmppStream stream = connections[account].stream;
|
||||
|
@ -261,7 +249,6 @@ public class ConnectionManager {
|
|||
connections[account].stream.disconnect();
|
||||
} catch (Error e) { }
|
||||
connect_(account);
|
||||
connection_mutexes[account].unlock();
|
||||
return false;
|
||||
});
|
||||
}
|
||||
|
|
|
@ -36,7 +36,7 @@ public class AddConferenceDialog : Gtk.Dialog {
|
|||
setup_conference_details_view();
|
||||
show_jid_add_view();
|
||||
|
||||
stream_interactor.get_module(MucManager.IDENTITY).joined.connect((account, jid, nick) => { Idle.add(() => { on_joined(account, jid, nick); return false; } ); });
|
||||
stream_interactor.get_module(MucManager.IDENTITY).joined.connect(on_joined);
|
||||
}
|
||||
|
||||
private void show_jid_add_view() {
|
||||
|
|
|
@ -99,12 +99,7 @@ protected class ConferenceDetailsFragment : Box {
|
|||
jid_entry.key_release_event.connect(() => { done = true; return false; }); // just for notifying
|
||||
nick_entry.key_release_event.connect(() => { done = true; return false; });
|
||||
|
||||
stream_interactor.get_module(MucManager.IDENTITY).enter_error.connect((account, jid, error) => {
|
||||
Idle.add(() => {
|
||||
on_enter_error(account, jid, error);
|
||||
return false;
|
||||
});
|
||||
});
|
||||
stream_interactor.get_module(MucManager.IDENTITY).enter_error.connect(on_enter_error);
|
||||
notification_button.clicked.connect(() => { notification_revealer.set_reveal_child(false); });
|
||||
ok_button.clicked.connect(() => {
|
||||
ok_button.label = _("Joining...");
|
||||
|
|
|
@ -21,11 +21,8 @@ protected class ConferenceList : FilterableList {
|
|||
set_sort_func(sort);
|
||||
|
||||
stream_interactor.get_module(MucManager.IDENTITY).bookmarks_updated.connect((account, conferences) => {
|
||||
Idle.add(() => {
|
||||
lists[account] = conferences;
|
||||
refresh_conferences();
|
||||
return false;
|
||||
});
|
||||
});
|
||||
|
||||
foreach (Account account in stream_interactor.get_accounts()) {
|
||||
|
@ -43,11 +40,8 @@ protected class ConferenceList : FilterableList {
|
|||
}
|
||||
|
||||
private void on_conference_bookmarks_received(Core.XmppStream stream, Account account, Gee.List<Xep.Bookmarks.Conference> conferences) {
|
||||
Idle.add(() => {
|
||||
lists[account] = conferences;
|
||||
refresh_conferences();
|
||||
return false;
|
||||
});
|
||||
}
|
||||
|
||||
private void header(ListBoxRow row, ListBoxRow? before_row) {
|
||||
|
|
|
@ -25,12 +25,12 @@ protected class RosterList : FilterableList {
|
|||
|
||||
handler_ids += stream_interactor.get_module(RosterManager.IDENTITY).removed_roster_item.connect( (account, jid, roster_item) => {
|
||||
if (accounts.contains(account)) {
|
||||
Idle.add(() => { on_removed_roster_item(account, jid, roster_item); return false;});
|
||||
on_removed_roster_item(account, jid, roster_item);
|
||||
}
|
||||
});
|
||||
handler_ids += stream_interactor.get_module(RosterManager.IDENTITY).updated_roster_item.connect( (account, jid, roster_item) => {
|
||||
if (accounts.contains(account)) {
|
||||
Idle.add(() => { on_updated_roster_item(account, jid, roster_item); return false;});
|
||||
on_updated_roster_item(account, jid, roster_item);
|
||||
}
|
||||
});
|
||||
destroy.connect(() => {
|
||||
|
|
|
@ -23,7 +23,7 @@ public class Dino.Ui.Application : Gtk.Application, Dino.Application {
|
|||
provider.load_from_resource("/im/dino/theme.css");
|
||||
StyleContext.add_provider_for_screen(Gdk.Screen.get_default(), provider, STYLE_PROVIDER_PRIORITY_APPLICATION);
|
||||
|
||||
activate.connect(() => {
|
||||
startup.connect(() => {
|
||||
if (window == null) {
|
||||
create_set_app_menu();
|
||||
window = new UnifiedWindow(this, stream_interactor);
|
||||
|
|
|
@ -111,13 +111,10 @@ public class Dialog : Gtk.Dialog {
|
|||
row.add(widget);
|
||||
categories[category].add(list_row);
|
||||
|
||||
Idle.add(() => {
|
||||
int pref_height, pref_width;
|
||||
get_content_area().get_preferred_height(null, out pref_height);
|
||||
get_preferred_width(out pref_width, null);
|
||||
resize(pref_width, int.min(500, pref_height));
|
||||
return false;
|
||||
});
|
||||
}
|
||||
|
||||
private void add_category(string category) {
|
||||
|
|
|
@ -21,13 +21,10 @@ public class MucConfigFormProvider : Plugins.ContactDetailsProvider, Object {
|
|||
if (stream == null) return;
|
||||
stream_interactor.get_module(MucManager.IDENTITY).get_config_form(conversation.account, conversation.counterpart, (jid, data_form) => {
|
||||
contact_details.save.connect(() => { data_form.submit(); });
|
||||
Idle.add(() => {
|
||||
for (int i = 0; i < data_form.fields.size; i++) {
|
||||
DataForms.DataForm.Field field = data_form.fields[i];
|
||||
add_field(field, contact_details);
|
||||
}
|
||||
return false;
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
|
@ -70,13 +70,10 @@ public abstract class ConversationRow : ListBoxRow {
|
|||
bool counterpart_online = stream_interactor.get_module(PresenceManager.IDENTITY).get_full_jids(conversation.counterpart, conversation.account) != null;
|
||||
bool greyscale = !self_online || !counterpart_online;
|
||||
|
||||
Idle.add(() => {
|
||||
Pixbuf pixbuf = ((new AvatarGenerator(AVATAR_SIZE, AVATAR_SIZE, image.scale_factor))
|
||||
.set_greyscale(greyscale)
|
||||
.draw_conversation(stream_interactor, conversation));
|
||||
Util.image_set_from_scaled_pixbuf(image, pixbuf, image.get_scale_factor());
|
||||
return false;
|
||||
});
|
||||
}
|
||||
|
||||
protected void update_name_label(string? new_name = null) {
|
||||
|
|
|
@ -22,35 +22,21 @@ public class List : ListBox {
|
|||
set_header_func(header);
|
||||
set_sort_func(sort);
|
||||
|
||||
stream_interactor.get_module(ConversationManager.IDENTITY).conversation_activated.connect((conversation) => {
|
||||
Idle.add(() => { add_conversation(conversation); return false; });
|
||||
});
|
||||
stream_interactor.get_module(ConversationManager.IDENTITY).conversation_deactivated.connect((conversation) => {
|
||||
Idle.add(() => { remove_conversation(conversation); return false; });
|
||||
});
|
||||
stream_interactor.get_module(MessageProcessor.IDENTITY).message_received.connect((message, conversation) => {
|
||||
Idle.add(() => { on_message_received(message, conversation); return false; });
|
||||
});
|
||||
stream_interactor.get_module(MessageProcessor.IDENTITY).message_sent.connect((message, conversation) => {
|
||||
Idle.add(() => { on_message_received(message, conversation); return false; });
|
||||
});
|
||||
stream_interactor.get_module(ConversationManager.IDENTITY).conversation_activated.connect(add_conversation);
|
||||
stream_interactor.get_module(ConversationManager.IDENTITY).conversation_deactivated.connect(remove_conversation);
|
||||
stream_interactor.get_module(MessageProcessor.IDENTITY).message_received.connect(on_message_received);
|
||||
stream_interactor.get_module(MessageProcessor.IDENTITY).message_sent.connect(on_message_received);
|
||||
stream_interactor.get_module(PresenceManager.IDENTITY).show_received.connect((show, jid, account) => {
|
||||
Idle.add(() => {
|
||||
foreach (Conversation conversation in stream_interactor.get_module(ConversationManager.IDENTITY).get_conversations_for_presence(show, account)) {
|
||||
if (rows.has_key(conversation)) rows[conversation].on_show_received(show);
|
||||
}
|
||||
return false;
|
||||
});
|
||||
});
|
||||
stream_interactor.get_module(AvatarManager.IDENTITY).received_avatar.connect((avatar, jid, account) => {
|
||||
Idle.add(() => {
|
||||
Conversation? conversation = stream_interactor.get_module(ConversationManager.IDENTITY).get_conversation(jid, account);
|
||||
if (conversation != null && rows.has_key(conversation)) {
|
||||
ChatRow row = rows[conversation] as ChatRow;
|
||||
if (row != null) row.update_avatar();
|
||||
}
|
||||
return false;
|
||||
});
|
||||
});
|
||||
Timeout.add_seconds(60, () => {
|
||||
foreach (ConversationRow row in rows.values) row.update();
|
||||
|
|
|
@ -21,12 +21,12 @@ class ChatStatePopulator : Plugins.ConversationItemPopulator, Object {
|
|||
|
||||
stream_interactor.get_module(CounterpartInteractionManager.IDENTITY).received_state.connect((account, jid, state) => {
|
||||
if (current_conversation != null && current_conversation.account.equals(account) && current_conversation.counterpart.equals_bare(jid)) {
|
||||
Idle.add(() => { update_chat_state(account, jid, state); return false; });
|
||||
update_chat_state(account, jid, state);
|
||||
}
|
||||
});
|
||||
stream_interactor.get_module(MessageProcessor.IDENTITY).message_sent.connect((message, conversation) => {
|
||||
if (conversation.equals(current_conversation)) {
|
||||
Idle.add(() => { update_chat_state(conversation.account, conversation.counterpart); return false; });
|
||||
update_chat_state(conversation.account, conversation.counterpart);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
|
@ -39,7 +39,7 @@ public class ConversationItemSkeleton : Grid {
|
|||
} else {
|
||||
set_title_widget(widget);
|
||||
}
|
||||
item.notify["mark"].connect_after(() => { Idle.add(() => { update_received(); return false; }); });
|
||||
item.notify["mark"].connect_after(update_received);
|
||||
update_received();
|
||||
}
|
||||
|
||||
|
|
|
@ -19,12 +19,8 @@ public class MessagePopulator : Object {
|
|||
app.plugin_registry.register_message_display(new SlashmeMessageDisplay(stream_interactor));
|
||||
|
||||
|
||||
stream_interactor.get_module(MessageProcessor.IDENTITY).message_received.connect((message, conversation) => {
|
||||
Idle.add(() => { handle_message(message, conversation); return false; });
|
||||
});
|
||||
stream_interactor.get_module(MessageProcessor.IDENTITY).message_sent.connect((message, conversation) => {
|
||||
Idle.add(() => { handle_message(message, conversation); return false; });
|
||||
});
|
||||
stream_interactor.get_module(MessageProcessor.IDENTITY).message_received.connect(handle_message);
|
||||
stream_interactor.get_module(MessageProcessor.IDENTITY).message_sent.connect(handle_message);
|
||||
}
|
||||
|
||||
public void init(Conversation conversation, Plugins.ConversationItemCollection item_collection) {
|
||||
|
|
|
@ -55,12 +55,9 @@ public class FileWidget : Button, Plugins.ConversationTitlebarWidget {
|
|||
}
|
||||
|
||||
public void on_upload_available(Account account) {
|
||||
Idle.add(() => {
|
||||
if (conversation != null && conversation.account.equals(account)) {
|
||||
visible = true;
|
||||
}
|
||||
return false;
|
||||
});
|
||||
}
|
||||
|
||||
public new void set_conversation(Conversation conversation) {
|
||||
|
|
|
@ -35,12 +35,9 @@ public class ConversationTitlebar : Gtk.HeaderBar {
|
|||
|
||||
|
||||
stream_interactor.get_module(MucManager.IDENTITY).subject_set.connect((account, jid, subject) => {
|
||||
Idle.add(() => {
|
||||
if (conversation != null && conversation.counterpart.equals_bare(jid) && conversation.account.equals(account)) {
|
||||
update_subtitle(subject);
|
||||
}
|
||||
return false;
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
|
|
|
@ -21,16 +21,14 @@ public class AccountRow : Gtk.ListBoxRow {
|
|||
jid_label.set_label(account.bare_jid.to_string());
|
||||
|
||||
stream_interactor.connection_manager.connection_error.connect((account, error) => {
|
||||
Idle.add(() => {
|
||||
if (account.equals(this.account)) update_warning_icon();
|
||||
return false;
|
||||
});
|
||||
if (account.equals(this.account)) {
|
||||
update_warning_icon();
|
||||
}
|
||||
});
|
||||
stream_interactor.connection_manager.connection_state_changed.connect((account, state) => {
|
||||
Idle.add(() => {
|
||||
if (account.equals(this.account)) update_warning_icon();
|
||||
return false;
|
||||
});
|
||||
if (account.equals(this.account)) {
|
||||
update_warning_icon();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
|
|
@ -84,23 +84,16 @@ public class Dialog : Gtk.Dialog {
|
|||
add_account(account);
|
||||
}
|
||||
|
||||
stream_interactor.get_module(AvatarManager.IDENTITY).received_avatar.connect((pixbuf, jid, account) => {
|
||||
Idle.add(() => {
|
||||
on_received_avatar(pixbuf, jid, account);
|
||||
return false;
|
||||
});
|
||||
});
|
||||
stream_interactor.get_module(AvatarManager.IDENTITY).received_avatar.connect(on_received_avatar);
|
||||
stream_interactor.connection_manager.connection_error.connect((account, error) => {
|
||||
Idle.add(() => {
|
||||
if (account.equals(selected_account)) update_status_label(account);
|
||||
return false;
|
||||
});
|
||||
if (account.equals(selected_account)) {
|
||||
update_status_label(account);
|
||||
}
|
||||
});
|
||||
stream_interactor.connection_manager.connection_state_changed.connect((account, state) => {
|
||||
Idle.add(() => {
|
||||
if (account.equals(selected_account)) update_status_label(account);
|
||||
return false;
|
||||
});
|
||||
if (account.equals(selected_account)) {
|
||||
update_status_label(account);
|
||||
}
|
||||
});
|
||||
|
||||
if (account_list.get_row_at_index(0) != null) account_list.select_row(account_list.get_row_at_index(0));
|
||||
|
|
|
@ -25,9 +25,7 @@ public class List : Box {
|
|||
list_box.set_filter_func(filter);
|
||||
search_entry.search_changed.connect(search_changed);
|
||||
|
||||
stream_interactor.get_module(PresenceManager.IDENTITY).show_received.connect((show, jid, account) => {
|
||||
Idle.add(() => { on_show_received(show, jid, account); return false; });
|
||||
});
|
||||
stream_interactor.get_module(PresenceManager.IDENTITY).show_received.connect(on_show_received);
|
||||
stream_interactor.get_module(RosterManager.IDENTITY).updated_roster_item.connect(on_updated_roster_item);
|
||||
|
||||
initialize_for_conversation(conversation);
|
||||
|
|
|
@ -40,12 +40,8 @@ public class UnifiedWindow : Window {
|
|||
|
||||
stream_interactor.account_added.connect((account) => { check_stack(true); });
|
||||
stream_interactor.account_removed.connect((account) => { check_stack(); });
|
||||
stream_interactor.get_module(ConversationManager.IDENTITY).conversation_activated.connect( (conversation) => {
|
||||
Idle.add( () => { check_stack(); return false; });
|
||||
});
|
||||
stream_interactor.get_module(ConversationManager.IDENTITY).conversation_deactivated.connect( (conversation) => {
|
||||
Idle.add( () => { check_stack(); return false; });
|
||||
});
|
||||
stream_interactor.get_module(ConversationManager.IDENTITY).conversation_activated.connect(() => check_stack());
|
||||
stream_interactor.get_module(ConversationManager.IDENTITY).conversation_deactivated.connect(() => check_stack());
|
||||
accounts_placeholder.primary_button.clicked.connect(() => { get_application().activate_action("accounts", null); });
|
||||
conversations_placeholder.primary_button.clicked.connect(() => { get_application().activate_action("add_chat", null); });
|
||||
conversations_placeholder.secondary_button.clicked.connect(() => { get_application().activate_action("add_conference", null); });
|
||||
|
|
|
@ -117,70 +117,10 @@ public class StreamModule : XmppStreamModule {
|
|||
|
||||
this.store = Plugin.get_context().create_store();
|
||||
store_created(store);
|
||||
stream.get_module(Message.Module.IDENTITY).pre_received_message.connect(on_pre_received_message);
|
||||
stream.get_module(Message.Module.IDENTITY).received_pipeline.connect(new ReceivedPipelineListener(store));
|
||||
stream.get_module(Pubsub.Module.IDENTITY).add_filtered_notification(stream, NODE_DEVICELIST, (stream, jid, id, node) => on_devicelist(stream, jid, id, node));
|
||||
}
|
||||
|
||||
private void on_pre_received_message(XmppStream stream, Message.Stanza message) {
|
||||
StanzaNode? _encrypted = message.stanza.get_subnode("encrypted", NS_URI);
|
||||
if (_encrypted == null || MessageFlag.get_flag(message) != null || message.from == null) return;
|
||||
StanzaNode encrypted = (!)_encrypted;
|
||||
if (!Plugin.ensure_context()) return;
|
||||
MessageFlag flag = new MessageFlag();
|
||||
message.add_flag(flag);
|
||||
StanzaNode? _header = encrypted.get_subnode("header");
|
||||
if (_header == null) return;
|
||||
StanzaNode header = (!)_header;
|
||||
if (header.get_attribute_int("sid") <= 0) return;
|
||||
foreach (StanzaNode key_node in header.get_subnodes("key")) {
|
||||
if (key_node.get_attribute_int("rid") == store.local_registration_id) {
|
||||
try {
|
||||
string? payload = encrypted.get_deep_string_content("payload");
|
||||
string? iv_node = header.get_deep_string_content("iv");
|
||||
string? key_node_content = key_node.get_string_content();
|
||||
if (payload == null || iv_node == null || key_node_content == null) continue;
|
||||
uint8[] key;
|
||||
uint8[] ciphertext = Base64.decode((!)payload);
|
||||
uint8[] iv = Base64.decode((!)iv_node);
|
||||
Address address = new Address(get_bare_jid((!)message.from), header.get_attribute_int("sid"));
|
||||
if (key_node.get_attribute_bool("prekey")) {
|
||||
PreKeySignalMessage msg = Plugin.get_context().deserialize_pre_key_signal_message(Base64.decode((!)key_node_content));
|
||||
SessionCipher cipher = store.create_session_cipher(address);
|
||||
key = cipher.decrypt_pre_key_signal_message(msg);
|
||||
} else {
|
||||
SignalMessage msg = Plugin.get_context().deserialize_signal_message(Base64.decode((!)key_node_content));
|
||||
SessionCipher cipher = store.create_session_cipher(address);
|
||||
key = cipher.decrypt_signal_message(msg);
|
||||
}
|
||||
address.device_id = 0; // TODO: Hack to have address obj live longer
|
||||
|
||||
if (key.length >= 32) {
|
||||
int authtaglength = key.length - 16;
|
||||
uint8[] new_ciphertext = new uint8[ciphertext.length + authtaglength];
|
||||
uint8[] new_key = new uint8[16];
|
||||
Memory.copy(new_ciphertext, ciphertext, ciphertext.length);
|
||||
Memory.copy((uint8*)new_ciphertext + ciphertext.length, (uint8*)key + 16, authtaglength);
|
||||
Memory.copy(new_key, key, 16);
|
||||
ciphertext = new_ciphertext;
|
||||
key = new_key;
|
||||
}
|
||||
|
||||
message.body = arr_to_str(aes_decrypt(Cipher.AES_GCM_NOPADDING, key, iv, ciphertext));
|
||||
flag.decrypted = true;
|
||||
} catch (Error e) {
|
||||
if (Plugin.DEBUG) print(@"OMEMO: Signal error while decrypting message: $(e.message)\n");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private string arr_to_str(uint8[] arr) {
|
||||
// null-terminate the array
|
||||
uint8[] rarr = new uint8[arr.length+1];
|
||||
Memory.copy(rarr, arr, arr.length);
|
||||
return (string)rarr;
|
||||
}
|
||||
|
||||
public void request_user_devicelist(XmppStream stream, string jid) {
|
||||
if (active_devicelist_requests.add(jid)) {
|
||||
if (Plugin.DEBUG) print(@"OMEMO: requesting device list for $jid\n");
|
||||
|
@ -442,4 +382,79 @@ public class StreamModule : XmppStreamModule {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
public class ReceivedPipelineListener : StanzaListener<Message.Stanza> {
|
||||
|
||||
private const string[] after_actions_const = {"EXTRACT_MESSAGE_2"};
|
||||
|
||||
public override string action_group { get { return "ENCRYPT_BODY"; } }
|
||||
public override string[] after_actions { get { return after_actions_const; } }
|
||||
|
||||
private Store store;
|
||||
|
||||
public ReceivedPipelineListener(Store store) {
|
||||
this.store = store;
|
||||
}
|
||||
|
||||
public override async void run(Core.XmppStream stream, Message.Stanza message) {
|
||||
StanzaNode? _encrypted = message.stanza.get_subnode("encrypted", NS_URI);
|
||||
if (_encrypted == null || MessageFlag.get_flag(message) != null || message.from == null) return;
|
||||
StanzaNode encrypted = (!)_encrypted;
|
||||
if (!Plugin.ensure_context()) return;
|
||||
MessageFlag flag = new MessageFlag();
|
||||
message.add_flag(flag);
|
||||
StanzaNode? _header = encrypted.get_subnode("header");
|
||||
if (_header == null) return;
|
||||
StanzaNode header = (!)_header;
|
||||
if (header.get_attribute_int("sid") <= 0) return;
|
||||
foreach (StanzaNode key_node in header.get_subnodes("key")) {
|
||||
if (key_node.get_attribute_int("rid") == store.local_registration_id) {
|
||||
try {
|
||||
string? payload = encrypted.get_deep_string_content("payload");
|
||||
string? iv_node = header.get_deep_string_content("iv");
|
||||
string? key_node_content = key_node.get_string_content();
|
||||
if (payload == null || iv_node == null || key_node_content == null) continue;
|
||||
uint8[] key;
|
||||
uint8[] ciphertext = Base64.decode((!)payload);
|
||||
uint8[] iv = Base64.decode((!)iv_node);
|
||||
Address address = new Address(get_bare_jid((!)message.from), header.get_attribute_int("sid"));
|
||||
if (key_node.get_attribute_bool("prekey")) {
|
||||
PreKeySignalMessage msg = Plugin.get_context().deserialize_pre_key_signal_message(Base64.decode((!)key_node_content));
|
||||
SessionCipher cipher = store.create_session_cipher(address);
|
||||
key = cipher.decrypt_pre_key_signal_message(msg);
|
||||
} else {
|
||||
SignalMessage msg = Plugin.get_context().deserialize_signal_message(Base64.decode((!)key_node_content));
|
||||
SessionCipher cipher = store.create_session_cipher(address);
|
||||
key = cipher.decrypt_signal_message(msg);
|
||||
}
|
||||
address.device_id = 0; // TODO: Hack to have address obj live longer
|
||||
|
||||
if (key.length >= 32) {
|
||||
int authtaglength = key.length - 16;
|
||||
uint8[] new_ciphertext = new uint8[ciphertext.length + authtaglength];
|
||||
uint8[] new_key = new uint8[16];
|
||||
Memory.copy(new_ciphertext, ciphertext, ciphertext.length);
|
||||
Memory.copy((uint8*)new_ciphertext + ciphertext.length, (uint8*)key + 16, authtaglength);
|
||||
Memory.copy(new_key, key, 16);
|
||||
ciphertext = new_ciphertext;
|
||||
key = new_key;
|
||||
}
|
||||
|
||||
message.body = arr_to_str(aes_decrypt(Cipher.AES_GCM_NOPADDING, key, iv, ciphertext));
|
||||
flag.decrypted = true;
|
||||
} catch (Error e) {
|
||||
if (Plugin.DEBUG) print(@"OMEMO: Signal error while decrypting message: $(e.message)\n");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private string arr_to_str(uint8[] arr) {
|
||||
// null-terminate the array
|
||||
uint8[] rarr = new uint8[arr.length+1];
|
||||
Memory.copy(rarr, arr, arr.length);
|
||||
return (string)rarr;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -43,22 +43,16 @@ namespace Dino.Plugins.OpenPgp {
|
|||
return false;
|
||||
}
|
||||
|
||||
public string? get_cyphertext(Message.Stanza message) {
|
||||
StanzaNode? x_node = message.stanza.get_subnode("x", NS_URI_ENCRYPTED);
|
||||
return x_node == null ? null : x_node.get_string_content();
|
||||
}
|
||||
|
||||
public override void attach(XmppStream stream) {
|
||||
stream.get_module(Presence.Module.IDENTITY).received_presence.connect(on_received_presence);
|
||||
stream.get_module(Presence.Module.IDENTITY).pre_send_presence_stanza.connect(on_pre_send_presence_stanza);
|
||||
stream.get_module(Message.Module.IDENTITY).pre_received_message.connect(on_pre_received_message);
|
||||
stream.get_module(Message.Module.IDENTITY).received_pipeline.connect(new ReceivedPipelineDecryptListener());
|
||||
stream.add_flag(new Flag());
|
||||
}
|
||||
|
||||
public override void detach(XmppStream stream) {
|
||||
stream.get_module(Presence.Module.IDENTITY).received_presence.disconnect(on_received_presence);
|
||||
stream.get_module(Presence.Module.IDENTITY).pre_send_presence_stanza.disconnect(on_pre_send_presence_stanza);
|
||||
stream.get_module(Message.Module.IDENTITY).pre_received_message.disconnect(on_pre_received_message);
|
||||
}
|
||||
|
||||
public static void require(XmppStream stream) {
|
||||
|
@ -69,6 +63,7 @@ namespace Dino.Plugins.OpenPgp {
|
|||
public override string get_id() { return IDENTITY.id; }
|
||||
|
||||
private void on_received_presence(XmppStream stream, Presence.Stanza presence) {
|
||||
new Thread<void*> (null, () => {
|
||||
StanzaNode x_node = presence.stanza.get_subnode("x", NS_URI_SIGNED);
|
||||
if (x_node != null) {
|
||||
string? sig = x_node.get_string_content();
|
||||
|
@ -77,10 +72,15 @@ namespace Dino.Plugins.OpenPgp {
|
|||
string? key_id = get_sign_key(sig, signed_data);
|
||||
if (key_id != null) {
|
||||
stream.get_flag(Flag.IDENTITY).set_key_id(presence.from, key_id);
|
||||
Idle.add(() => {
|
||||
received_jid_key_id(stream, presence.from, key_id);
|
||||
return false;
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
private void on_pre_send_presence_stanza(XmppStream stream, Presence.Stanza presence) {
|
||||
|
@ -89,19 +89,6 @@ namespace Dino.Plugins.OpenPgp {
|
|||
}
|
||||
}
|
||||
|
||||
private void on_pre_received_message(XmppStream stream, Message.Stanza message) {
|
||||
string? encrypted = get_cyphertext(message);
|
||||
if (encrypted != null) {
|
||||
MessageFlag flag = new MessageFlag();
|
||||
message.add_flag(flag);
|
||||
string? decrypted = gpg_decrypt(encrypted);
|
||||
if (decrypted != null) {
|
||||
flag.decrypted = true;
|
||||
message.body = decrypted;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static string? gpg_encrypt(string plain, GPG.Key[] keys) {
|
||||
string encr;
|
||||
try {
|
||||
|
@ -113,15 +100,6 @@ namespace Dino.Plugins.OpenPgp {
|
|||
return encr.substring(encryption_start, encr.length - "\n-----END PGP MESSAGE-----".length - encryption_start);
|
||||
}
|
||||
|
||||
private static string? gpg_decrypt(string enc) {
|
||||
string armor = "-----BEGIN PGP MESSAGE-----\n\n" + enc + "\n-----END PGP MESSAGE-----";
|
||||
string? decr = null;
|
||||
try {
|
||||
decr = GPGHelper.decrypt(armor);
|
||||
} catch (Error e) { }
|
||||
return decr;
|
||||
}
|
||||
|
||||
private static string? get_sign_key(string sig, string signed_text) {
|
||||
string armor = "-----BEGIN PGP MESSAGE-----\n\n" + sig + "\n-----END PGP MESSAGE-----";
|
||||
string? sign_key = null;
|
||||
|
@ -156,4 +134,48 @@ namespace Dino.Plugins.OpenPgp {
|
|||
public override string get_ns() { return NS_URI; }
|
||||
public override string get_id() { return id; }
|
||||
}
|
||||
|
||||
public class ReceivedPipelineDecryptListener : StanzaListener<Message.Stanza> {
|
||||
|
||||
private const string[] after_actions_const = {"MODIFY_BODY"};
|
||||
|
||||
public override string action_group { get { return "ENCRYPT_BODY"; } }
|
||||
public override string[] after_actions { get { return after_actions_const; } }
|
||||
|
||||
public override async void run(Core.XmppStream stream, Message.Stanza message) {
|
||||
string? encrypted = get_cyphertext(message);
|
||||
if (encrypted != null) {
|
||||
MessageFlag flag = new MessageFlag();
|
||||
message.add_flag(flag);
|
||||
string? decrypted = yield gpg_decrypt(encrypted);
|
||||
if (decrypted != null) {
|
||||
flag.decrypted = true;
|
||||
message.body = decrypted;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static async string? gpg_decrypt(string enc) {
|
||||
SourceFunc callback = gpg_decrypt.callback;
|
||||
string? res = null;
|
||||
new Thread<void*> (null, () => {
|
||||
string armor = "-----BEGIN PGP MESSAGE-----\n\n" + enc + "\n-----END PGP MESSAGE-----";
|
||||
try {
|
||||
res = GPGHelper.decrypt(armor);
|
||||
} catch (Error e) {
|
||||
res = null;
|
||||
}
|
||||
Idle.add((owned) callback);
|
||||
return null;
|
||||
});
|
||||
yield;
|
||||
return res;
|
||||
}
|
||||
|
||||
private string? get_cyphertext(Message.Stanza message) {
|
||||
StanzaNode? x_node = message.stanza.get_subnode("x", NS_URI_ENCRYPTED);
|
||||
return x_node == null ? null : x_node.get_string_content();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -44,12 +44,12 @@ public class StanzaReader {
|
|||
cancellable.cancel();
|
||||
}
|
||||
|
||||
private void update_buffer() throws XmlError {
|
||||
private async void update_buffer() throws XmlError {
|
||||
try {
|
||||
InputStream? input = this.input;
|
||||
if (input == null) throw new XmlError.EOF("No input stream specified and end of buffer reached.");
|
||||
if (cancellable.is_cancelled()) throw new XmlError.EOF("Input stream is canceled.");
|
||||
buffer_fill = (int) ((!)input).read(buffer, cancellable);
|
||||
buffer_fill = (int) yield ((!)input).read_async(buffer, GLib.Priority.DEFAULT, cancellable);
|
||||
if (buffer_fill == 0) throw new XmlError.EOF("End of input stream reached.");
|
||||
buffer_pos = 0;
|
||||
} catch (GLib.IOError e) {
|
||||
|
@ -57,15 +57,16 @@ public class StanzaReader {
|
|||
}
|
||||
}
|
||||
|
||||
private char read_single() throws XmlError {
|
||||
private async char read_single() throws XmlError {
|
||||
if (buffer_pos >= buffer_fill) {
|
||||
update_buffer();
|
||||
yield update_buffer();
|
||||
}
|
||||
return (char) buffer[buffer_pos++];
|
||||
char c = (char) buffer[buffer_pos++];
|
||||
return c;
|
||||
}
|
||||
|
||||
private char peek_single() throws XmlError {
|
||||
var res = read_single();
|
||||
private async char peek_single() throws XmlError {
|
||||
var res = yield read_single();
|
||||
buffer_pos--;
|
||||
return res;
|
||||
}
|
||||
|
@ -78,53 +79,53 @@ public class StanzaReader {
|
|||
buffer_pos++;
|
||||
}
|
||||
|
||||
private void skip_until_non_ws() throws XmlError {
|
||||
while (is_ws(peek_single())) {
|
||||
private async void skip_until_non_ws() throws XmlError {
|
||||
while (is_ws(yield peek_single())) {
|
||||
skip_single();
|
||||
}
|
||||
}
|
||||
|
||||
private string read_until_ws() throws XmlError {
|
||||
private async string read_until_ws() throws XmlError {
|
||||
var res = new StringBuilder();
|
||||
var what = peek_single();
|
||||
var what = yield peek_single();
|
||||
while (!is_ws(what)) {
|
||||
res.append_c(read_single());
|
||||
what = peek_single();
|
||||
res.append_c(yield read_single());
|
||||
what = yield peek_single();
|
||||
}
|
||||
return res.str;
|
||||
}
|
||||
|
||||
private string read_until_char_or_ws(char x, char y = 0) throws XmlError {
|
||||
private async string read_until_char_or_ws(char x, char y = 0) throws XmlError {
|
||||
var res = new StringBuilder();
|
||||
var what = peek_single();
|
||||
var what = yield peek_single();
|
||||
while (what != x && what != y && !is_ws(what)) {
|
||||
res.append_c(read_single());
|
||||
what = peek_single();
|
||||
res.append_c(yield read_single());
|
||||
what = yield peek_single();
|
||||
}
|
||||
return res.str;
|
||||
}
|
||||
|
||||
private string read_until_char(char x) throws XmlError {
|
||||
private async string read_until_char(char x) throws XmlError {
|
||||
var res = new StringBuilder();
|
||||
var what = peek_single();
|
||||
var what = yield peek_single();
|
||||
while (what != x) {
|
||||
res.append_c(read_single());
|
||||
what = peek_single();
|
||||
res.append_c(yield read_single());
|
||||
what = yield peek_single();
|
||||
}
|
||||
return res.str;
|
||||
}
|
||||
|
||||
private StanzaAttribute read_attribute() throws XmlError {
|
||||
private async StanzaAttribute read_attribute() throws XmlError {
|
||||
var res = new StanzaAttribute();
|
||||
res.name = read_until_char_or_ws('=');
|
||||
if (read_single() == '=') {
|
||||
var quot = peek_single();
|
||||
res.name = yield read_until_char_or_ws('=');
|
||||
if ((yield read_single()) == '=') {
|
||||
var quot = yield peek_single();
|
||||
if (quot == '\'' || quot == '"') {
|
||||
skip_single();
|
||||
res.encoded_val = read_until_char(quot);
|
||||
res.encoded_val = yield read_until_char(quot);
|
||||
skip_single();
|
||||
} else {
|
||||
res.encoded_val = read_until_ws();
|
||||
res.encoded_val = yield read_until_ws();
|
||||
}
|
||||
}
|
||||
return res;
|
||||
|
@ -161,17 +162,17 @@ public class StanzaReader {
|
|||
}
|
||||
}
|
||||
|
||||
public StanzaNode read_node_start() throws XmlError {
|
||||
public async StanzaNode read_node_start() throws XmlError {
|
||||
var res = new StanzaNode();
|
||||
res.attributes = new ArrayList<StanzaAttribute>();
|
||||
var eof = false;
|
||||
if (peek_single() == '<') skip_single();
|
||||
if (peek_single() == '?') res.pseudo = true;
|
||||
if (peek_single() == '/') {
|
||||
if ((yield peek_single()) == '<') skip_single();
|
||||
if ((yield peek_single()) == '?') res.pseudo = true;
|
||||
if ((yield peek_single()) == '/') {
|
||||
eof = true;
|
||||
skip_single();
|
||||
res.name = read_until_char_or_ws('>');
|
||||
while (peek_single() != '>') {
|
||||
res.name = yield read_until_char_or_ws('>');
|
||||
while ((yield peek_single()) != '>') {
|
||||
skip_single();
|
||||
}
|
||||
skip_single();
|
||||
|
@ -180,13 +181,15 @@ public class StanzaReader {
|
|||
handle_stanza_ns(res);
|
||||
return res;
|
||||
}
|
||||
res.name = read_until_char_or_ws('>', '/');
|
||||
skip_until_non_ws();
|
||||
while (peek_single() != '/' && peek_single() != '>' && peek_single() != '?') {
|
||||
res.attributes.add(read_attribute());
|
||||
skip_until_non_ws();
|
||||
res.name = yield read_until_char_or_ws('>', '/');
|
||||
yield skip_until_non_ws();
|
||||
char next_char = yield peek_single();
|
||||
while (next_char != '/' && next_char != '>' && next_char != '?') {
|
||||
res.attributes.add(yield read_attribute());
|
||||
yield skip_until_non_ws();
|
||||
next_char = yield peek_single();
|
||||
}
|
||||
if (read_single() == '/' || res.pseudo) {
|
||||
if ((yield read_single()) == '/' || res.pseudo) {
|
||||
res.has_nodes = false;
|
||||
skip_single();
|
||||
} else {
|
||||
|
@ -196,20 +199,20 @@ public class StanzaReader {
|
|||
return res;
|
||||
}
|
||||
|
||||
public StanzaNode read_text_node() throws XmlError {
|
||||
public async StanzaNode read_text_node() throws XmlError {
|
||||
var res = new StanzaNode();
|
||||
res.name = "#text";
|
||||
res.ns_uri = ns_state.current_ns_uri;
|
||||
res.encoded_val = read_until_char('<').strip();
|
||||
res.encoded_val = (yield read_until_char('<')).strip();
|
||||
return res;
|
||||
}
|
||||
|
||||
public StanzaNode read_root_node() throws XmlError {
|
||||
skip_until_non_ws();
|
||||
if (peek_single() == '<') {
|
||||
var res = read_node_start();
|
||||
public async StanzaNode read_root_node() throws XmlError {
|
||||
yield skip_until_non_ws();
|
||||
if ((yield peek_single()) == '<') {
|
||||
var res = yield read_node_start();
|
||||
if (res.pseudo) {
|
||||
return read_root_node();
|
||||
return yield read_root_node();
|
||||
}
|
||||
return res;
|
||||
} else {
|
||||
|
@ -217,18 +220,18 @@ public class StanzaReader {
|
|||
}
|
||||
}
|
||||
|
||||
public StanzaNode read_stanza_node() throws XmlError {
|
||||
public async StanzaNode read_stanza_node() throws XmlError {
|
||||
ns_state = ns_state.push();
|
||||
var res = read_node_start();
|
||||
var res = yield read_node_start();
|
||||
if (res.has_nodes) {
|
||||
bool finishNodeSeen = false;
|
||||
do {
|
||||
skip_until_non_ws();
|
||||
if (peek_single() == '<') {
|
||||
yield skip_until_non_ws();
|
||||
if ((yield peek_single()) == '<') {
|
||||
skip_single();
|
||||
if (peek_single() == '/') {
|
||||
if ((yield peek_single()) == '/') {
|
||||
skip_single();
|
||||
string desc = read_until_char('>');
|
||||
string desc = yield read_until_char('>');
|
||||
skip_single();
|
||||
if (desc.contains(":")) {
|
||||
var split = desc.split(":");
|
||||
|
@ -240,10 +243,10 @@ public class StanzaReader {
|
|||
}
|
||||
finishNodeSeen = true;
|
||||
} else {
|
||||
res.sub_nodes.add(read_stanza_node());
|
||||
res.sub_nodes.add(yield read_stanza_node());
|
||||
}
|
||||
} else {
|
||||
res.sub_nodes.add(read_text_node());
|
||||
res.sub_nodes.add(yield read_text_node());
|
||||
}
|
||||
} while (!finishNodeSeen);
|
||||
if (res.sub_nodes.size == 0) res.has_nodes = false;
|
||||
|
@ -252,12 +255,12 @@ public class StanzaReader {
|
|||
return res;
|
||||
}
|
||||
|
||||
public StanzaNode read_node() throws XmlError {
|
||||
skip_until_non_ws();
|
||||
if (peek_single() == '<') {
|
||||
return read_stanza_node();
|
||||
public async StanzaNode read_node() throws XmlError {
|
||||
yield skip_until_non_ws();
|
||||
if ((yield peek_single()) == '<') {
|
||||
return yield read_stanza_node();
|
||||
} else {
|
||||
return read_text_node();
|
||||
return yield read_text_node();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -42,7 +42,7 @@ public class XmppStream {
|
|||
register_connection_provider(new StartTlsConnectionProvider());
|
||||
}
|
||||
|
||||
public void connect(string? remote_name = null) throws IOStreamError {
|
||||
public async void connect(string? remote_name = null) throws IOStreamError {
|
||||
if (remote_name != null) this.remote_name = (!)remote_name;
|
||||
attach_negotation_modules();
|
||||
try {
|
||||
|
@ -67,7 +67,7 @@ public class XmppStream {
|
|||
stderr.printf("CONNECTION LOST?\n");
|
||||
throw new IOStreamError.CONNECT(e.message);
|
||||
}
|
||||
loop();
|
||||
yield loop();
|
||||
}
|
||||
|
||||
public void disconnect() throws IOStreamError {
|
||||
|
@ -96,11 +96,11 @@ public class XmppStream {
|
|||
return setup_needed;
|
||||
}
|
||||
|
||||
public StanzaNode read() throws IOStreamError {
|
||||
public async StanzaNode read() throws IOStreamError {
|
||||
StanzaReader? reader = this.reader;
|
||||
if (reader == null) throw new IOStreamError.READ("trying to read, but no stream open");
|
||||
try {
|
||||
StanzaNode node = ((!)reader).read_node();
|
||||
StanzaNode node = yield ((!)reader).read_node();
|
||||
log.node("IN", node);
|
||||
return node;
|
||||
} catch (XmlError e) {
|
||||
|
@ -175,7 +175,7 @@ public class XmppStream {
|
|||
return false;
|
||||
}
|
||||
|
||||
private void setup() throws IOStreamError {
|
||||
private async void setup() throws IOStreamError {
|
||||
StanzaNode outs = new StanzaNode.build("stream", "http://etherx.jabber.org/streams")
|
||||
.put_attribute("to", remote_name)
|
||||
.put_attribute("version", "1.0")
|
||||
|
@ -184,17 +184,21 @@ public class XmppStream {
|
|||
outs.has_nodes = true;
|
||||
log.node("OUT ROOT", outs);
|
||||
write(outs);
|
||||
received_root_node(this, read_root());
|
||||
received_root_node(this, yield read_root());
|
||||
}
|
||||
|
||||
private void loop() throws IOStreamError {
|
||||
private async void loop() throws IOStreamError {
|
||||
while (true) {
|
||||
if (setup_needed) {
|
||||
setup();
|
||||
yield setup();
|
||||
setup_needed = false;
|
||||
}
|
||||
|
||||
StanzaNode node = read();
|
||||
StanzaNode node = yield read();
|
||||
|
||||
Idle.add(loop.callback);
|
||||
yield;
|
||||
|
||||
received_node(this, node);
|
||||
|
||||
if (node.ns_uri == NS_URI && node.name == "features") {
|
||||
|
@ -266,11 +270,11 @@ public class XmppStream {
|
|||
}
|
||||
}
|
||||
|
||||
private StanzaNode read_root() throws IOStreamError {
|
||||
private async StanzaNode read_root() throws IOStreamError {
|
||||
StanzaReader? reader = this.reader;
|
||||
if (reader == null) throw new IOStreamError.READ("trying to read, but no stream open");
|
||||
try {
|
||||
StanzaNode node = ((!)reader).read_root_node();
|
||||
StanzaNode node = yield ((!)reader).read_root_node();
|
||||
log.node("IN ROOT", node);
|
||||
return node;
|
||||
} catch (XmlError e) {
|
||||
|
|
|
@ -8,24 +8,27 @@ namespace Xmpp.Message {
|
|||
public class Module : XmppStreamModule {
|
||||
public static ModuleIdentity<Module> IDENTITY = new ModuleIdentity<Module>(NS_URI, "message_module");
|
||||
|
||||
public signal void pre_send_message(XmppStream stream, Message.Stanza message);
|
||||
public StanzaListenerHolder<Message.Stanza> received_pipeline = new StanzaListenerHolder<Message.Stanza>();
|
||||
public StanzaListenerHolder<Message.Stanza> send_pipeline = new StanzaListenerHolder<Message.Stanza>();
|
||||
|
||||
public signal void pre_received_message(XmppStream stream, Message.Stanza message);
|
||||
public signal void received_message(XmppStream stream, Message.Stanza message);
|
||||
|
||||
public void send_message(XmppStream stream, Message.Stanza message) {
|
||||
pre_send_message(stream, message);
|
||||
send_pipeline.run.begin(stream, message);
|
||||
stream.write(message.stanza);
|
||||
}
|
||||
|
||||
public void received_message_stanza(XmppStream stream, StanzaNode node) {
|
||||
public async void received_message_stanza_async(XmppStream stream, StanzaNode node) {
|
||||
Message.Stanza message = new Message.Stanza.from_stanza(node, stream.get_flag(Bind.Flag.IDENTITY).my_jid);
|
||||
do {
|
||||
message.rerun_parsing = false;
|
||||
pre_received_message(stream, message);
|
||||
} while(message.rerun_parsing);
|
||||
yield received_pipeline.run(stream, message);
|
||||
received_message(stream, message);
|
||||
}
|
||||
|
||||
private void received_message_stanza(XmppStream stream, StanzaNode node) {
|
||||
received_message_stanza_async.begin(stream, node);
|
||||
}
|
||||
|
||||
public override void attach(XmppStream stream) {
|
||||
stream.received_message_stanza.connect(received_message_stanza);
|
||||
}
|
||||
|
|
|
@ -1,3 +1,5 @@
|
|||
using Gee;
|
||||
|
||||
namespace Xmpp {
|
||||
public string get_bare_jid(string jid) {
|
||||
return jid.split("/")[0];
|
||||
|
@ -20,4 +22,61 @@ namespace Xmpp {
|
|||
uint32 b5_2 = Random.next_int();
|
||||
return "%08x-%04x-%04x-%04x-%04x%08x".printf(b1, b2, b3, b4, b5_1, b5_2);
|
||||
}
|
||||
|
||||
public abstract class StanzaListener<T> : Object {
|
||||
public abstract string action_group { get; }
|
||||
public abstract string[] after_actions { get; }
|
||||
public abstract async void run(Core.XmppStream stream, T stanza);
|
||||
}
|
||||
|
||||
public class StanzaListenerHolder<T> : Object {
|
||||
private Gee.List<StanzaListener<T>> listeners = new ArrayList<StanzaListener<T>>();
|
||||
|
||||
public new void connect(StanzaListener<T> listener) {
|
||||
listeners.add(listener);
|
||||
resort_list();
|
||||
}
|
||||
|
||||
public async void run(Core.XmppStream stream, T stanza) {
|
||||
foreach (StanzaListener<T> l in listeners) {
|
||||
yield l.run(stream, stanza);
|
||||
}
|
||||
}
|
||||
|
||||
private Gee.List<StanzaListener<T>> set_minus(Gee.List<StanzaListener<T>> main_set, Gee.List<StanzaListener<T>> minus) {
|
||||
Gee.List<StanzaListener<T>> res = new ArrayList<StanzaListener<T>>();
|
||||
foreach (StanzaListener<T> l in main_set) {
|
||||
if (!minus.contains(l)) {
|
||||
res.add(l);
|
||||
}
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
private bool set_contains_action(Gee.List<StanzaListener<T>> s, string[] actions) {
|
||||
foreach(StanzaListener<T> l in s) {
|
||||
if (l.action_group in actions) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private void resort_list() {
|
||||
Gee.List<StanzaListener<T>> new_list = new ArrayList<StanzaListener<T>>();
|
||||
while (listeners.size > new_list.size) {
|
||||
bool changed = false;
|
||||
foreach (StanzaListener<T> l in listeners) {
|
||||
Gee.List<StanzaListener<T>> remaining = set_minus(listeners, new_list);
|
||||
if (!set_contains_action(remaining, l.after_actions)) {
|
||||
new_list.add(l);
|
||||
changed = true;
|
||||
}
|
||||
}
|
||||
if (!changed) warning("Can't sort listeners");
|
||||
}
|
||||
listeners = new_list;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -31,24 +31,17 @@ public class Module : XmppStreamModule {
|
|||
|
||||
public override void attach(XmppStream stream) {
|
||||
stream.get_module(ServiceDiscovery.Module.IDENTITY).add_feature(stream, NS_URI);
|
||||
stream.get_module(Message.Module.IDENTITY).pre_send_message.connect(on_pre_send_message);
|
||||
stream.get_module(Message.Module.IDENTITY).send_pipeline.connect(new SendPipelineListener());
|
||||
stream.get_module(Message.Module.IDENTITY).received_message.connect(on_received_message);
|
||||
}
|
||||
|
||||
public override void detach(XmppStream stream) {
|
||||
stream.get_module(Message.Module.IDENTITY).pre_send_message.disconnect(on_pre_send_message);
|
||||
stream.get_module(Message.Module.IDENTITY).received_message.disconnect(on_received_message);
|
||||
}
|
||||
|
||||
public override string get_ns() { return NS_URI; }
|
||||
public override string get_id() { return IDENTITY.id; }
|
||||
|
||||
private void on_pre_send_message(XmppStream stream, Message.Stanza message) {
|
||||
if (message.body == null) return;
|
||||
if (message.type_ != Message.Stanza.TYPE_CHAT) return;
|
||||
message.stanza.put_node(new StanzaNode.build(STATE_ACTIVE, NS_URI).add_self_xmlns());
|
||||
}
|
||||
|
||||
private void on_received_message(XmppStream stream, Message.Stanza message) {
|
||||
if (!message.is_error()) {
|
||||
Gee.List<StanzaNode> nodes = message.stanza.get_all_subnodes();
|
||||
|
@ -62,4 +55,18 @@ public class Module : XmppStreamModule {
|
|||
}
|
||||
}
|
||||
|
||||
public class SendPipelineListener : StanzaListener<Message.Stanza> {
|
||||
|
||||
private const string[] after_actions_const = {"MODIFY_BODY"};
|
||||
|
||||
public override string action_group { get { return "ADD_NODES"; } }
|
||||
public override string[] after_actions { get { return after_actions_const; } }
|
||||
|
||||
public override async void run(Core.XmppStream stream, Message.Stanza message) {
|
||||
if (message.body == null) return;
|
||||
if (message.type_ != Message.Stanza.TYPE_CHAT) return;
|
||||
message.stanza.put_node(new StanzaNode.build(STATE_ACTIVE, NS_URI).add_self_xmlns());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -22,12 +22,11 @@ namespace Xmpp.Xep.MessageDeliveryReceipts {
|
|||
public override void attach(XmppStream stream) {
|
||||
stream.get_module(ServiceDiscovery.Module.IDENTITY).add_feature(stream, NS_URI);
|
||||
stream.get_module(Message.Module.IDENTITY).received_message.connect(received_message);
|
||||
stream.get_module(Message.Module.IDENTITY).pre_send_message.connect(pre_send_message);
|
||||
stream.get_module(Message.Module.IDENTITY).send_pipeline.connect(new SendPipelineListener());
|
||||
}
|
||||
|
||||
public override void detach(XmppStream stream) {
|
||||
stream.get_module(Message.Module.IDENTITY).received_message.disconnect(received_message);
|
||||
stream.get_module(Message.Module.IDENTITY).pre_send_message.disconnect(pre_send_message);
|
||||
}
|
||||
|
||||
public override string get_ns() { return NS_URI; }
|
||||
|
@ -39,13 +38,22 @@ namespace Xmpp.Xep.MessageDeliveryReceipts {
|
|||
receipt_received(stream, message.from, received_node.get_attribute("id", NS_URI));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void pre_send_message(XmppStream stream, Message.Stanza message) {
|
||||
public class SendPipelineListener : StanzaListener<Message.Stanza> {
|
||||
|
||||
private const string[] after_actions_const = {};
|
||||
|
||||
public override string action_group { get { return "ADD_NODES"; } }
|
||||
public override string[] after_actions { get { return after_actions_const; } }
|
||||
|
||||
public override async void run(Core.XmppStream stream, Message.Stanza message) {
|
||||
StanzaNode? received_node = message.stanza.get_subnode("received", NS_URI);
|
||||
if (received_node != null) return;
|
||||
if (message.body == null) return;
|
||||
if (message.type_ == Message.Stanza.TYPE_GROUPCHAT) return;
|
||||
message.stanza.put_node(new StanzaNode.build("request", NS_URI).add_self_xmlns());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -27,19 +27,27 @@ namespace Xmpp.Xep.DelayedDelivery {
|
|||
}
|
||||
|
||||
public override void attach(XmppStream stream) {
|
||||
stream.get_module(Message.Module.IDENTITY).pre_received_message.connect(on_pre_received_message);
|
||||
stream.get_module(Message.Module.IDENTITY).received_pipeline.connect(new ReceivedPipelineListener());
|
||||
}
|
||||
|
||||
public override void detach(XmppStream stream) { }
|
||||
|
||||
public override string get_ns() { return NS_URI; }
|
||||
public override string get_id() { return IDENTITY.id; }
|
||||
}
|
||||
|
||||
private void on_pre_received_message(XmppStream stream, Message.Stanza message) {
|
||||
DateTime? datetime = get_time_for_message(message);
|
||||
public class ReceivedPipelineListener : StanzaListener<Message.Stanza> {
|
||||
|
||||
private const string[] after_actions_const = {};
|
||||
|
||||
public override string action_group { get { return "ADD_NODE"; } }
|
||||
public override string[] after_actions { get { return after_actions_const; } }
|
||||
|
||||
public override async void run(Core.XmppStream stream, Message.Stanza message) {
|
||||
DateTime? datetime = Module.get_time_for_message(message);
|
||||
if (datetime != null) message.add_flag(new MessageFlag(datetime));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public class MessageFlag : Message.MessageFlag {
|
||||
public const string ID = "delayed_delivery";
|
||||
|
|
|
@ -18,19 +18,26 @@ namespace Xmpp.Xep.MessageCarbons {
|
|||
|
||||
public override void attach(XmppStream stream) {
|
||||
stream.stream_negotiated.connect(enable);
|
||||
stream.get_module(Message.Module.IDENTITY).pre_received_message.connect(pre_received_message);
|
||||
stream.get_module(Message.Module.IDENTITY).received_pipeline.connect(new ReceivedPipelineListener());
|
||||
stream.get_module(ServiceDiscovery.Module.IDENTITY).add_feature(stream, NS_URI);
|
||||
}
|
||||
|
||||
public override void detach(XmppStream stream) {
|
||||
stream.stream_negotiated.disconnect(enable);
|
||||
stream.get_module(Message.Module.IDENTITY).pre_received_message.disconnect(pre_received_message);
|
||||
}
|
||||
|
||||
public override string get_ns() { return NS_URI; }
|
||||
public override string get_id() { return IDENTITY.id; }
|
||||
}
|
||||
|
||||
private void pre_received_message(XmppStream stream, Message.Stanza message) {
|
||||
public class ReceivedPipelineListener : StanzaListener<Message.Stanza> {
|
||||
|
||||
private const string[] after_actions_const = {"EXTRACT_MESSAGE_1"};
|
||||
|
||||
public override string action_group { get { return "EXTRACT_MESSAGE_2"; } }
|
||||
public override string[] after_actions { get { return after_actions_const; } }
|
||||
|
||||
public override async void run(Core.XmppStream stream, Message.Stanza message) {
|
||||
StanzaNode? received_node = message.stanza.get_subnode("received", NS_URI);
|
||||
StanzaNode? sent_node = received_node == null ? message.stanza.get_subnode("sent", NS_URI) : null;
|
||||
StanzaNode? carbons_node = received_node != null ? received_node : sent_node;
|
||||
|
@ -54,7 +61,7 @@ namespace Xmpp.Xep.MessageCarbons {
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public class MessageFlag : Message.MessageFlag {
|
||||
public const string ID = "message_carbons";
|
||||
|
|
|
@ -5,6 +5,10 @@ namespace Xmpp.Xep.MessageArchiveManagement {
|
|||
public const string NS_URI = "urn:xmpp:mam:2";
|
||||
public const string NS_URI_1 = "urn:xmpp:mam:1";
|
||||
|
||||
private static string NS_VER(XmppStream stream) {
|
||||
return stream.get_flag(Flag.IDENTITY).ns_ver;
|
||||
}
|
||||
|
||||
public class Module : XmppStreamModule {
|
||||
public static ModuleIdentity<Module> IDENTITY = new ModuleIdentity<Module>(NS_URI, "0313_message_archive_management");
|
||||
|
||||
|
@ -38,7 +42,7 @@ public class Module : XmppStreamModule {
|
|||
}
|
||||
|
||||
public override void attach(XmppStream stream) {
|
||||
stream.get_module(Message.Module.IDENTITY).pre_received_message.connect(on_pre_received_message);
|
||||
stream.get_module(Message.Module.IDENTITY).received_pipeline.connect(new ReceivedPipelineListener());
|
||||
stream.stream_negotiated.connect(query_availability);
|
||||
}
|
||||
|
||||
|
@ -47,21 +51,6 @@ public class Module : XmppStreamModule {
|
|||
public override string get_ns() { return NS_URI; }
|
||||
public override string get_id() { return IDENTITY.id; }
|
||||
|
||||
private void on_pre_received_message(XmppStream stream, Message.Stanza message) {
|
||||
// if (message.from != stream.remote_name) return;
|
||||
if (stream.get_flag(Flag.IDENTITY) == null) return;
|
||||
|
||||
StanzaNode? message_node = message.stanza.get_deep_subnode(NS_VER(stream) + ":result", "urn:xmpp:forward:0:forwarded", Message.NS_URI + ":message");
|
||||
if (message_node != null) {
|
||||
StanzaNode? forward_node = message.stanza.get_deep_subnode(NS_VER(stream) + ":result", "urn:xmpp:forward:0:forwarded", DelayedDelivery.NS_URI + ":delay");
|
||||
DateTime? datetime = DelayedDelivery.Module.get_time_for_node(forward_node);
|
||||
message.add_flag(new MessageFlag(datetime));
|
||||
|
||||
message.stanza = message_node;
|
||||
message.rerun_parsing = true;
|
||||
}
|
||||
}
|
||||
|
||||
private static void page_through_results(XmppStream stream, Iq.Stanza iq) {
|
||||
string? last = iq.stanza.get_deep_string_content(NS_VER(stream) + ":fin", "http://jabber.org/protocol/rsm" + ":set", "last");
|
||||
if (last == null) {
|
||||
|
@ -89,9 +78,28 @@ public class Module : XmppStreamModule {
|
|||
if (stream.get_flag(Flag.IDENTITY) != null) feature_available(stream);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private static string NS_VER(XmppStream stream) {
|
||||
return stream.get_flag(Flag.IDENTITY).ns_ver;
|
||||
public class ReceivedPipelineListener : StanzaListener<Message.Stanza> {
|
||||
|
||||
private const string[] after_actions_const = {};
|
||||
|
||||
public override string action_group { get { return "EXTRACT_MESSAGE_1"; } }
|
||||
public override string[] after_actions { get { return after_actions_const; } }
|
||||
|
||||
public override async void run(Core.XmppStream stream, Message.Stanza message) {
|
||||
// if (message.from != stream.remote_name) return;
|
||||
if (stream.get_flag(Flag.IDENTITY) == null) return;
|
||||
|
||||
StanzaNode? message_node = message.stanza.get_deep_subnode(NS_VER(stream) + ":result", "urn:xmpp:forward:0:forwarded", Message.NS_URI + ":message");
|
||||
if (message_node != null) {
|
||||
StanzaNode? forward_node = message.stanza.get_deep_subnode(NS_VER(stream) + ":result", "urn:xmpp:forward:0:forwarded", DelayedDelivery.NS_URI + ":delay");
|
||||
DateTime? datetime = DelayedDelivery.Module.get_time_for_node(forward_node);
|
||||
message.add_flag(new MessageFlag(datetime));
|
||||
|
||||
message.stanza = message_node;
|
||||
message.rerun_parsing = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -31,12 +31,11 @@ public class Module : XmppStreamModule {
|
|||
|
||||
public override void attach(XmppStream stream) {
|
||||
stream.get_module(ServiceDiscovery.Module.IDENTITY).add_feature(stream, NS_URI);
|
||||
stream.get_module(Message.Module.IDENTITY).pre_send_message.connect(on_pre_send_message);
|
||||
stream.get_module(Message.Module.IDENTITY).send_pipeline.connect(new SendPipelineListener());
|
||||
stream.get_module(Message.Module.IDENTITY).received_message.connect(on_received_message);
|
||||
}
|
||||
|
||||
public override void detach(XmppStream stream) {
|
||||
stream.get_module(Message.Module.IDENTITY).pre_send_message.disconnect(on_pre_send_message);
|
||||
stream.get_module(Message.Module.IDENTITY).received_message.disconnect(on_received_message);
|
||||
}
|
||||
|
||||
|
@ -52,8 +51,16 @@ public class Module : XmppStreamModule {
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void on_pre_send_message(XmppStream stream, Message.Stanza message) {
|
||||
public class SendPipelineListener : StanzaListener<Message.Stanza> {
|
||||
|
||||
private const string[] after_actions_const = {};
|
||||
|
||||
public override string action_group { get { return "ADD_NODES"; } }
|
||||
public override string[] after_actions { get { return after_actions_const; } }
|
||||
|
||||
public override async void run(Core.XmppStream stream, Message.Stanza message) {
|
||||
StanzaNode? received_node = message.stanza.get_subnode("received", NS_URI);
|
||||
if (received_node != null) return;
|
||||
if (message.body == null) return;
|
||||
|
|
Loading…
Reference in a new issue