From 3a0d2d2ffee84fd92545df3e08ecb6d5a8e71234 Mon Sep 17 00:00:00 2001 From: Konstantin Kuznetsov Date: Fri, 22 Mar 2024 13:06:02 +0300 Subject: [PATCH] Wait for messages instead of MAM pages --- libdino/src/entity/conversation.vala | 6 +--- libdino/src/service/history_sync.vala | 33 +++++++++++++++++----- libdino/src/service/message_processor.vala | 7 +++-- 3 files changed, 31 insertions(+), 15 deletions(-) diff --git a/libdino/src/entity/conversation.vala b/libdino/src/entity/conversation.vala index 2510f1c7..a1e0081d 100644 --- a/libdino/src/entity/conversation.vala +++ b/libdino/src/entity/conversation.vala @@ -2,7 +2,7 @@ using Xmpp; namespace Dino.Entities { -const int HISTORY_SYNC_MAM_PAGES = 10; +const int HISTORY_SYNC_MAM_MESSAGES = 20; public class Conversation : Object { @@ -18,10 +18,6 @@ public class Conversation : Object { } } - public int syncSpeed() { - return HISTORY_SYNC_MAM_PAGES; - } - public int id { get; set; } public Type type_ { get; set; } public Account account { get; private set; } diff --git a/libdino/src/service/history_sync.vala b/libdino/src/service/history_sync.vala index d8604a6e..e8256c5a 100644 --- a/libdino/src/service/history_sync.vala +++ b/libdino/src/service/history_sync.vala @@ -22,6 +22,7 @@ public class Dino.HistorySync { public HashMap catchup_until_time = new HashMap(Account.hash_func, Account.equals_func); private HashMap> stanzas = new HashMap>(); + private HashMap messages_processed = new HashMap(); public class HistorySync(Database db, StreamInteractor stream_interactor) { this.stream_interactor = stream_interactor; @@ -120,7 +121,7 @@ public class Dino.HistorySync { } } - private async PageRequestResult fetch_pages(Account account, Xmpp.MessageArchiveManagement.V2.MamQueryParams query_params, int pages) { + private async PageRequestResult fetch_messages(Account account, Xmpp.MessageArchiveManagement.V2.MamQueryParams query_params) { debug("[%s | %s] Fetch query %s - %s", account.bare_jid.to_string(), query_params.mam_server.to_string(), query_params.start != null ? query_params.start.to_string() : "", query_params.end != null ? query_params.end.to_string() : ""); PageRequestResult? page_result = null; @@ -129,11 +130,13 @@ public class Dino.HistorySync { page_result = yield get_mam_page(account, query_params, page_result, null); processed_pages++; - debug("[%s | %s] Page result %s (got stanzas: %s)", account.bare_jid.to_string(), query_params.mam_server.to_string(), page_result.page_result.to_string(), (page_result.stanzas != null).to_string()); - if (processed_pages == pages) { + debug("%d messages left to process, current page is %d", messages_processed[query_params.query_id], processed_pages); + if (messages_processed[query_params.query_id] <= 1) { + debug("Done processing new messages"); break; } + debug("[%s | %s] Page result %s (got stanzas: %s)", account.bare_jid.to_string(), query_params.mam_server.to_string(), page_result.page_result.to_string(), (page_result.stanzas != null).to_string()); if (page_result.page_result == PageResult.Error || page_result.page_result == PageResult.Cancelled || page_result.query_result.first == null) { return page_result; } @@ -147,7 +150,10 @@ public class Dino.HistorySync { debug("Fetch history for %s", target.to_string()); var query_params = new Xmpp.MessageArchiveManagement.V2.MamQueryParams.query_before(target, latest, null); - yield fetch_pages(account, query_params, HISTORY_SYNC_MAM_PAGES); + string query_id = query_params.query_id; + messages_processed[query_id] = HISTORY_SYNC_MAM_MESSAGES; + + yield fetch_messages(account, query_params); } public async void fetch_conversation_data(Conversation? conversation, DateTime latest) { @@ -165,8 +171,11 @@ public class Dino.HistorySync { query_params.mam_server = account.bare_jid; query_params.with = target; } + + string query_id = query_params.query_id; + messages_processed[query_id] = HISTORY_SYNC_MAM_MESSAGES; - yield fetch_pages(account, query_params, conversation.syncSpeed()); + yield fetch_messages(account, query_params); } public async void fetch_history(Account account, Jid target, Cancellable? cancellable = null) { @@ -497,6 +506,7 @@ public class Dino.HistorySync { } } } + if (hitted_range.has_key(query_id) && hitted_range[query_id] == -2) { // Message got filtered out by xmpp-vala, but succesful range fetch nevertheless yield send_messages_back_into_pipeline(account, query_id); @@ -511,15 +521,24 @@ public class Dino.HistorySync { if (cancellable != null && cancellable.is_cancelled()) { page_result = PageResult.Cancelled; } + return new PageRequestResult(page_result, query_result, stanzas_for_query); } private async void send_messages_back_into_pipeline(Account account, string query_id, Cancellable? cancellable = null) { - if (!stanzas.has_key(query_id)) return; + if (!stanzas.has_key(query_id)) { + return; + } foreach (Xmpp.MessageStanza message in stanzas[query_id]) { if (cancellable != null && cancellable.is_cancelled()) break; - yield stream_interactor.get_module(MessageProcessor.IDENTITY).run_pipeline_announce(account, message); + bool result = yield stream_interactor.get_module(MessageProcessor.IDENTITY).run_pipeline_announce(account, message); + + if (result && messages_processed.has_key(query_id)) { + int count = messages_processed[query_id]; + count = count - 1; + messages_processed[query_id] = count; + } } stanzas.unset(query_id); } diff --git a/libdino/src/service/message_processor.vala b/libdino/src/service/message_processor.vala index 00b07c08..ce74659d 100644 --- a/libdino/src/service/message_processor.vala +++ b/libdino/src/service/message_processor.vala @@ -130,17 +130,17 @@ public class MessageProcessor : StreamInteractionModule, Object { run_pipeline_announce.begin(account, message_stanza); } - public async void run_pipeline_announce(Account account, Xmpp.MessageStanza message_stanza) { + public async bool run_pipeline_announce(Account account, Xmpp.MessageStanza message_stanza) { Entities.Message message = yield parse_message_stanza(account, message_stanza); Conversation? conversation = stream_interactor.get_module(ConversationManager.IDENTITY).get_conversation_for_message(message); if (conversation == null) { - return; + return false; } bool abort = yield received_pipeline.run(message, message_stanza, conversation); if (abort) { - return; + return false; } if (message.direction == Entities.Message.DIRECTION_RECEIVED) { @@ -150,6 +150,7 @@ public class MessageProcessor : StreamInteractionModule, Object { } message_sent_or_received(message, conversation); + return true; } public async Entities.Message parse_message_stanza(Account account, Xmpp.MessageStanza message) {