Fetch MAM pages when scrolling

This commit is contained in:
Konstantin Kuznetsov 2024-03-18 12:53:45 +03:00 committed by Maxim Logaev
parent 1532d181c4
commit 825cc5836c
6 changed files with 94 additions and 15 deletions

View file

@ -6,11 +6,14 @@ using Xmpp;
namespace Dino {
const int HISTORY_SYNC_MAM_PAGES = 10;
public class ContentItemStore : StreamInteractionModule, Object {
public static ModuleIdentity<ContentItemStore> IDENTITY = new ModuleIdentity<ContentItemStore>("content_item_store");
public string id { get { return IDENTITY.id; } }
public signal void new_item(ContentItem item, Conversation conversation);
public signal void history_loaded(Conversation conversation, ContentItem item, int count);
private StreamInteractor stream_interactor;
private Database db;
@ -241,8 +244,10 @@ public class ContentItemStore : StreamInteractionModule, Object {
// return ret;
// }
public Gee.List<ContentItem> get_before(Conversation conversation, ContentItem item, int count) {
public Gee.List<ContentItem> get_before(Conversation conversation, ContentItem item, int count, bool request_from_server = true) {
debug("Fetching earlier messages from the db");
long time = (long) item.time.to_unix();
QueryBuilder select = db.content_item.select()
.where(@"time < ? OR (time = ? AND id < ?)", { time.to_string(), time.to_string(), item.id.to_string() })
.with(db.content_item.conversation_id, "=", conversation.id)
@ -251,7 +256,18 @@ public class ContentItemStore : StreamInteractionModule, Object {
.order_by(db.content_item.id, "DESC")
.limit(count);
return get_items_from_query(select, conversation);
var items = get_items_from_query(select, conversation);
if (items.size == 0 && request_from_server) {
// Async request to get earlier messages from the server
var history_sync = stream_interactor.get_module(MessageProcessor.IDENTITY).history_sync;
history_sync.fetch_data.begin(conversation.account, conversation.counterpart.bare_jid, item.time, HISTORY_SYNC_MAM_PAGES, (_, res) => {
history_sync.fetch_data.end(res);
debug("History loaded");
history_loaded(conversation, item, count);
});
}
return items;
}
public Gee.List<ContentItem> get_after(Conversation conversation, ContentItem item, int count) {

View file

@ -120,6 +120,36 @@ public class Dino.HistorySync {
}
}
private async PageRequestResult fetch_pages(Account account, Xmpp.MessageArchiveManagement.V2.MamQueryParams query_params, int pages) {
debug("[%s | %s] Fetch query %s - %s", account.bare_jid.to_string(), query_params.mam_server.to_string(), query_params.start != null ? query_params.start.to_string() : "", query_params.end != null ? query_params.end.to_string() : "");
PageRequestResult? page_result = null;
int processed_pages = 0;
do {
page_result = yield get_mam_page(account, query_params, page_result, null);
processed_pages++;
debug("[%s | %s] Page result %s (got stanzas: %s)", account.bare_jid.to_string(), query_params.mam_server.to_string(), page_result.page_result.to_string(), (page_result.stanzas != null).to_string());
if (processed_pages == pages) {
break;
}
if (page_result.page_result == PageResult.Error || page_result.page_result == PageResult.Cancelled || page_result.query_result.first == null) {
return page_result;
}
} while (page_result.page_result == PageResult.MorePagesAvailable);
return page_result;
}
public async void fetch_data(Account account, Jid target, DateTime latest, int pages) {
debug("Fetch history for %s", target.to_string());
var query_params = new Xmpp.MessageArchiveManagement.V2.MamQueryParams.query_before(target, latest, null);
yield fetch_pages(account, query_params, pages);
}
public async void fetch_history(Account account, Jid target, Cancellable? cancellable = null) {
debug("Fetch history for %s", target.to_string());

View file

@ -134,10 +134,14 @@ public class MessageProcessor : StreamInteractionModule, Object {
Entities.Message message = yield parse_message_stanza(account, message_stanza);
Conversation? conversation = stream_interactor.get_module(ConversationManager.IDENTITY).get_conversation_for_message(message);
if (conversation == null) return;
if (conversation == null) {
return;
}
bool abort = yield received_pipeline.run(message, message_stanza, conversation);
if (abort) return;
if (abort) {
return;
}
if (message.direction == Entities.Message.DIRECTION_RECEIVED) {
message_received(message, conversation);
@ -245,6 +249,7 @@ public class MessageProcessor : StreamInteractionModule, Object {
// If the message is a duplicate
if (builder.count() > 0) {
warning("deduplicate by server id");
history_sync.on_server_id_duplicate(account, stanza, message);
return true;
}
@ -271,6 +276,11 @@ public class MessageProcessor : StreamInteractionModule, Object {
}
}
bool duplicate = builder.single().row().is_present();
if (duplicate) {
warning("deduplicate by uuid");
}
return duplicate;
}
@ -291,7 +301,13 @@ public class MessageProcessor : StreamInteractionModule, Object {
} else {
builder.with_null(db.message.counterpart_resource);
}
return builder.count() > 0;
bool duplicate = builder.count() > 0;
if (duplicate) {
warning("deduplicate by content and metadata");
}
return duplicate;
}
private class DeduplicateMessageListener : MessageListener {
@ -357,7 +373,10 @@ public class MessageProcessor : StreamInteractionModule, Object {
}
public override async bool run(Entities.Message message, Xmpp.MessageStanza stanza, Conversation conversation) {
if (message.body == null) return true;
if (message.body == null) {
return true;
}
stream_interactor.get_module(ContentItemStore.IDENTITY).insert_message(message, conversation);
return false;
}

View file

@ -106,8 +106,8 @@ public class MucManager : StreamInteractionModule, Object {
if (can_do_mam) {
var history_sync = stream_interactor.get_module(MessageProcessor.IDENTITY).history_sync;
if (conversation == null) {
// We never joined the conversation before, just fetch the latest MAM page
yield history_sync.fetch_latest_page(account, jid.bare_jid, null, new DateTime.from_unix_utc(0), cancellable);
// We never joined the conversation before, fetch latest MAM pages
yield history_sync.fetch_data(account, jid.bare_jid, new DateTime.now(), 10);
} else {
// Fetch everything up to the last time the user actively joined
if (!mucs_sync_cancellables.has_key(account)) {

View file

@ -41,9 +41,9 @@ public class ContentProvider : ContentItemCollection, Object {
return ret;
}
public Gee.List<ContentMetaItem> populate_before(Conversation conversation, ContentItem before_item, int n) {
public Gee.List<ContentMetaItem> populate_before(Conversation conversation, ContentItem before_item, int n, bool request_from_server = true) {
Gee.List<ContentMetaItem> ret = new ArrayList<ContentMetaItem>();
Gee.List<ContentItem> items = stream_interactor.get_module(ContentItemStore.IDENTITY).get_before(conversation, before_item, n);
Gee.List<ContentItem> items = stream_interactor.get_module(ContentItemStore.IDENTITY).get_before(conversation, before_item, n, request_from_server);
foreach (ContentItem item in items) {
ret.add(create_content_meta_item(item));
}

View file

@ -44,6 +44,12 @@ public class ConversationView : Widget, Plugins.ConversationItemCollection, Plug
ContentMetaItem? current_meta_item = null;
double last_y = -1;
private void on_history_loaded(Conversation conversation, ContentItem item, int count) {
// We received new messages from the server
// Load them from the DB, but do not make new request to the server
load_earlier_messages(false);
}
construct {
this.layout_manager = new BinLayout();
@ -78,6 +84,9 @@ public class ConversationView : Widget, Plugins.ConversationItemCollection, Plug
scrolled.vadjustment.notify["page-size"].connect(on_upper_notify);
scrolled.vadjustment.notify["value"].connect(on_value_notify);
var content_item_store = stream_interactor.get_module(ContentItemStore.IDENTITY);
content_item_store.history_loaded.connect(on_history_loaded);
content_populator = new ContentProvider(stream_interactor);
subscription_notification = new SubscriptionNotitication(stream_interactor);
@ -552,17 +561,22 @@ public class ConversationView : Widget, Plugins.ConversationItemCollection, Plug
}
}
private void load_earlier_messages() {
private void load_earlier_messages(bool request_from_server = true) {
was_value = scrolled.vadjustment.value;
if (!reloading_mutex.trylock()) return;
debug("loading earlier messages");
if (!reloading_mutex.trylock()) {
return;
}
if (content_items.size > 0) {
Gee.List<ContentMetaItem> items = content_populator.populate_before(conversation, ((ContentMetaItem) content_items.first()).content_item, 20);
Gee.List<ContentMetaItem> items = content_populator.populate_before(conversation, ((ContentMetaItem) content_items.first()).content_item, 20, request_from_server);
debug("inserting new messages, size: %d", items.size);
foreach (ContentMetaItem item in items) {
do_insert_item(item);
}
} else {
reloading_mutex.unlock();
}
reloading_mutex.unlock();
}
private void load_later_messages() {