From 825cc5836c90be4e7aa136c0abbf65f16f19f649 Mon Sep 17 00:00:00 2001 From: Konstantin Kuznetsov Date: Mon, 18 Mar 2024 12:53:45 +0300 Subject: [PATCH] Fetch MAM pages when scrolling --- libdino/src/service/content_item_store.vala | 20 +++++++++++-- libdino/src/service/history_sync.vala | 30 +++++++++++++++++++ libdino/src/service/message_processor.vala | 27 ++++++++++++++--- libdino/src/service/muc_manager.vala | 4 +-- .../content_populator.vala | 4 +-- .../conversation_view.vala | 24 +++++++++++---- 6 files changed, 94 insertions(+), 15 deletions(-) diff --git a/libdino/src/service/content_item_store.vala b/libdino/src/service/content_item_store.vala index 740dc2a9..de050633 100644 --- a/libdino/src/service/content_item_store.vala +++ b/libdino/src/service/content_item_store.vala @@ -6,11 +6,14 @@ using Xmpp; namespace Dino { +const int HISTORY_SYNC_MAM_PAGES = 10; + public class ContentItemStore : StreamInteractionModule, Object { public static ModuleIdentity IDENTITY = new ModuleIdentity("content_item_store"); public string id { get { return IDENTITY.id; } } public signal void new_item(ContentItem item, Conversation conversation); + public signal void history_loaded(Conversation conversation, ContentItem item, int count); private StreamInteractor stream_interactor; private Database db; @@ -241,8 +244,10 @@ public class ContentItemStore : StreamInteractionModule, Object { // return ret; // } - public Gee.List get_before(Conversation conversation, ContentItem item, int count) { + public Gee.List get_before(Conversation conversation, ContentItem item, int count, bool request_from_server = true) { + debug("Fetching earlier messages from the db"); long time = (long) item.time.to_unix(); + QueryBuilder select = db.content_item.select() .where(@"time < ? OR (time = ? AND id < ?)", { time.to_string(), time.to_string(), item.id.to_string() }) .with(db.content_item.conversation_id, "=", conversation.id) @@ -251,7 +256,18 @@ public class ContentItemStore : StreamInteractionModule, Object { .order_by(db.content_item.id, "DESC") .limit(count); - return get_items_from_query(select, conversation); + var items = get_items_from_query(select, conversation); + if (items.size == 0 && request_from_server) { + // Async request to get earlier messages from the server + var history_sync = stream_interactor.get_module(MessageProcessor.IDENTITY).history_sync; + history_sync.fetch_data.begin(conversation.account, conversation.counterpart.bare_jid, item.time, HISTORY_SYNC_MAM_PAGES, (_, res) => { + history_sync.fetch_data.end(res); + debug("History loaded"); + history_loaded(conversation, item, count); + }); + } + + return items; } public Gee.List get_after(Conversation conversation, ContentItem item, int count) { diff --git a/libdino/src/service/history_sync.vala b/libdino/src/service/history_sync.vala index 60d78476..170d97a2 100644 --- a/libdino/src/service/history_sync.vala +++ b/libdino/src/service/history_sync.vala @@ -120,6 +120,36 @@ public class Dino.HistorySync { } } + private async PageRequestResult fetch_pages(Account account, Xmpp.MessageArchiveManagement.V2.MamQueryParams query_params, int pages) { + debug("[%s | %s] Fetch query %s - %s", account.bare_jid.to_string(), query_params.mam_server.to_string(), query_params.start != null ? query_params.start.to_string() : "", query_params.end != null ? query_params.end.to_string() : ""); + PageRequestResult? page_result = null; + + int processed_pages = 0; + do { + page_result = yield get_mam_page(account, query_params, page_result, null); + processed_pages++; + + debug("[%s | %s] Page result %s (got stanzas: %s)", account.bare_jid.to_string(), query_params.mam_server.to_string(), page_result.page_result.to_string(), (page_result.stanzas != null).to_string()); + if (processed_pages == pages) { + break; + } + + if (page_result.page_result == PageResult.Error || page_result.page_result == PageResult.Cancelled || page_result.query_result.first == null) { + return page_result; + } + + } while (page_result.page_result == PageResult.MorePagesAvailable); + + return page_result; + } + + public async void fetch_data(Account account, Jid target, DateTime latest, int pages) { + debug("Fetch history for %s", target.to_string()); + + var query_params = new Xmpp.MessageArchiveManagement.V2.MamQueryParams.query_before(target, latest, null); + yield fetch_pages(account, query_params, pages); + } + public async void fetch_history(Account account, Jid target, Cancellable? cancellable = null) { debug("Fetch history for %s", target.to_string()); diff --git a/libdino/src/service/message_processor.vala b/libdino/src/service/message_processor.vala index baab37ce..00b07c08 100644 --- a/libdino/src/service/message_processor.vala +++ b/libdino/src/service/message_processor.vala @@ -134,10 +134,14 @@ public class MessageProcessor : StreamInteractionModule, Object { Entities.Message message = yield parse_message_stanza(account, message_stanza); Conversation? conversation = stream_interactor.get_module(ConversationManager.IDENTITY).get_conversation_for_message(message); - if (conversation == null) return; + if (conversation == null) { + return; + } bool abort = yield received_pipeline.run(message, message_stanza, conversation); - if (abort) return; + if (abort) { + return; + } if (message.direction == Entities.Message.DIRECTION_RECEIVED) { message_received(message, conversation); @@ -245,6 +249,7 @@ public class MessageProcessor : StreamInteractionModule, Object { // If the message is a duplicate if (builder.count() > 0) { + warning("deduplicate by server id"); history_sync.on_server_id_duplicate(account, stanza, message); return true; } @@ -271,6 +276,11 @@ public class MessageProcessor : StreamInteractionModule, Object { } } bool duplicate = builder.single().row().is_present(); + + if (duplicate) { + warning("deduplicate by uuid"); + } + return duplicate; } @@ -291,7 +301,13 @@ public class MessageProcessor : StreamInteractionModule, Object { } else { builder.with_null(db.message.counterpart_resource); } - return builder.count() > 0; + + bool duplicate = builder.count() > 0; + if (duplicate) { + warning("deduplicate by content and metadata"); + } + + return duplicate; } private class DeduplicateMessageListener : MessageListener { @@ -357,7 +373,10 @@ public class MessageProcessor : StreamInteractionModule, Object { } public override async bool run(Entities.Message message, Xmpp.MessageStanza stanza, Conversation conversation) { - if (message.body == null) return true; + if (message.body == null) { + return true; + } + stream_interactor.get_module(ContentItemStore.IDENTITY).insert_message(message, conversation); return false; } diff --git a/libdino/src/service/muc_manager.vala b/libdino/src/service/muc_manager.vala index 119079f0..edfa661e 100644 --- a/libdino/src/service/muc_manager.vala +++ b/libdino/src/service/muc_manager.vala @@ -106,8 +106,8 @@ public class MucManager : StreamInteractionModule, Object { if (can_do_mam) { var history_sync = stream_interactor.get_module(MessageProcessor.IDENTITY).history_sync; if (conversation == null) { - // We never joined the conversation before, just fetch the latest MAM page - yield history_sync.fetch_latest_page(account, jid.bare_jid, null, new DateTime.from_unix_utc(0), cancellable); + // We never joined the conversation before, fetch latest MAM pages + yield history_sync.fetch_data(account, jid.bare_jid, new DateTime.now(), 10); } else { // Fetch everything up to the last time the user actively joined if (!mucs_sync_cancellables.has_key(account)) { diff --git a/main/src/ui/conversation_content_view/content_populator.vala b/main/src/ui/conversation_content_view/content_populator.vala index ef859bde..54e4851b 100644 --- a/main/src/ui/conversation_content_view/content_populator.vala +++ b/main/src/ui/conversation_content_view/content_populator.vala @@ -41,9 +41,9 @@ public class ContentProvider : ContentItemCollection, Object { return ret; } - public Gee.List populate_before(Conversation conversation, ContentItem before_item, int n) { + public Gee.List populate_before(Conversation conversation, ContentItem before_item, int n, bool request_from_server = true) { Gee.List ret = new ArrayList(); - Gee.List items = stream_interactor.get_module(ContentItemStore.IDENTITY).get_before(conversation, before_item, n); + Gee.List items = stream_interactor.get_module(ContentItemStore.IDENTITY).get_before(conversation, before_item, n, request_from_server); foreach (ContentItem item in items) { ret.add(create_content_meta_item(item)); } diff --git a/main/src/ui/conversation_content_view/conversation_view.vala b/main/src/ui/conversation_content_view/conversation_view.vala index 519aa01f..3b502b59 100644 --- a/main/src/ui/conversation_content_view/conversation_view.vala +++ b/main/src/ui/conversation_content_view/conversation_view.vala @@ -44,6 +44,12 @@ public class ConversationView : Widget, Plugins.ConversationItemCollection, Plug ContentMetaItem? current_meta_item = null; double last_y = -1; + private void on_history_loaded(Conversation conversation, ContentItem item, int count) { + // We received new messages from the server + // Load them from the DB, but do not make new request to the server + load_earlier_messages(false); + } + construct { this.layout_manager = new BinLayout(); @@ -78,6 +84,9 @@ public class ConversationView : Widget, Plugins.ConversationItemCollection, Plug scrolled.vadjustment.notify["page-size"].connect(on_upper_notify); scrolled.vadjustment.notify["value"].connect(on_value_notify); + var content_item_store = stream_interactor.get_module(ContentItemStore.IDENTITY); + content_item_store.history_loaded.connect(on_history_loaded); + content_populator = new ContentProvider(stream_interactor); subscription_notification = new SubscriptionNotitication(stream_interactor); @@ -552,17 +561,22 @@ public class ConversationView : Widget, Plugins.ConversationItemCollection, Plug } } - private void load_earlier_messages() { + private void load_earlier_messages(bool request_from_server = true) { was_value = scrolled.vadjustment.value; - if (!reloading_mutex.trylock()) return; + debug("loading earlier messages"); + if (!reloading_mutex.trylock()) { + return; + } + if (content_items.size > 0) { - Gee.List items = content_populator.populate_before(conversation, ((ContentMetaItem) content_items.first()).content_item, 20); + Gee.List items = content_populator.populate_before(conversation, ((ContentMetaItem) content_items.first()).content_item, 20, request_from_server); + debug("inserting new messages, size: %d", items.size); foreach (ContentMetaItem item in items) { do_insert_item(item); } - } else { - reloading_mutex.unlock(); } + + reloading_mutex.unlock(); } private void load_later_messages() {