Wait for messages instead of MAM pages

This commit is contained in:
Konstantin Kuznetsov 2024-03-22 13:06:02 +03:00 committed by Maxim Logaev
parent ce9d848388
commit 3a0d2d2ffe
3 changed files with 31 additions and 15 deletions

View file

@ -2,7 +2,7 @@ using Xmpp;
namespace Dino.Entities { namespace Dino.Entities {
const int HISTORY_SYNC_MAM_PAGES = 10; const int HISTORY_SYNC_MAM_MESSAGES = 20;
public class Conversation : Object { public class Conversation : Object {
@ -18,10 +18,6 @@ public class Conversation : Object {
} }
} }
public int syncSpeed() {
return HISTORY_SYNC_MAM_PAGES;
}
public int id { get; set; } public int id { get; set; }
public Type type_ { get; set; } public Type type_ { get; set; }
public Account account { get; private set; } public Account account { get; private set; }

View file

@ -22,6 +22,7 @@ public class Dino.HistorySync {
public HashMap<Account, DateTime> catchup_until_time = new HashMap<Account, DateTime>(Account.hash_func, Account.equals_func); public HashMap<Account, DateTime> catchup_until_time = new HashMap<Account, DateTime>(Account.hash_func, Account.equals_func);
private HashMap<string, Gee.List<Xmpp.MessageStanza>> stanzas = new HashMap<string, Gee.List<Xmpp.MessageStanza>>(); private HashMap<string, Gee.List<Xmpp.MessageStanza>> stanzas = new HashMap<string, Gee.List<Xmpp.MessageStanza>>();
private HashMap<string, int> messages_processed = new HashMap<string, int>();
public class HistorySync(Database db, StreamInteractor stream_interactor) { public class HistorySync(Database db, StreamInteractor stream_interactor) {
this.stream_interactor = stream_interactor; this.stream_interactor = stream_interactor;
@ -120,7 +121,7 @@ public class Dino.HistorySync {
} }
} }
private async PageRequestResult fetch_pages(Account account, Xmpp.MessageArchiveManagement.V2.MamQueryParams query_params, int pages) { private async PageRequestResult fetch_messages(Account account, Xmpp.MessageArchiveManagement.V2.MamQueryParams query_params) {
debug("[%s | %s] Fetch query %s - %s", account.bare_jid.to_string(), query_params.mam_server.to_string(), query_params.start != null ? query_params.start.to_string() : "", query_params.end != null ? query_params.end.to_string() : ""); debug("[%s | %s] Fetch query %s - %s", account.bare_jid.to_string(), query_params.mam_server.to_string(), query_params.start != null ? query_params.start.to_string() : "", query_params.end != null ? query_params.end.to_string() : "");
PageRequestResult? page_result = null; PageRequestResult? page_result = null;
@ -129,11 +130,13 @@ public class Dino.HistorySync {
page_result = yield get_mam_page(account, query_params, page_result, null); page_result = yield get_mam_page(account, query_params, page_result, null);
processed_pages++; processed_pages++;
debug("[%s | %s] Page result %s (got stanzas: %s)", account.bare_jid.to_string(), query_params.mam_server.to_string(), page_result.page_result.to_string(), (page_result.stanzas != null).to_string()); debug("%d messages left to process, current page is %d", messages_processed[query_params.query_id], processed_pages);
if (processed_pages == pages) { if (messages_processed[query_params.query_id] <= 1) {
debug("Done processing new messages");
break; break;
} }
debug("[%s | %s] Page result %s (got stanzas: %s)", account.bare_jid.to_string(), query_params.mam_server.to_string(), page_result.page_result.to_string(), (page_result.stanzas != null).to_string());
if (page_result.page_result == PageResult.Error || page_result.page_result == PageResult.Cancelled || page_result.query_result.first == null) { if (page_result.page_result == PageResult.Error || page_result.page_result == PageResult.Cancelled || page_result.query_result.first == null) {
return page_result; return page_result;
} }
@ -147,7 +150,10 @@ public class Dino.HistorySync {
debug("Fetch history for %s", target.to_string()); debug("Fetch history for %s", target.to_string());
var query_params = new Xmpp.MessageArchiveManagement.V2.MamQueryParams.query_before(target, latest, null); var query_params = new Xmpp.MessageArchiveManagement.V2.MamQueryParams.query_before(target, latest, null);
yield fetch_pages(account, query_params, HISTORY_SYNC_MAM_PAGES); string query_id = query_params.query_id;
messages_processed[query_id] = HISTORY_SYNC_MAM_MESSAGES;
yield fetch_messages(account, query_params);
} }
public async void fetch_conversation_data(Conversation? conversation, DateTime latest) { public async void fetch_conversation_data(Conversation? conversation, DateTime latest) {
@ -166,7 +172,10 @@ public class Dino.HistorySync {
query_params.with = target; query_params.with = target;
} }
yield fetch_pages(account, query_params, conversation.syncSpeed()); string query_id = query_params.query_id;
messages_processed[query_id] = HISTORY_SYNC_MAM_MESSAGES;
yield fetch_messages(account, query_params);
} }
public async void fetch_history(Account account, Jid target, Cancellable? cancellable = null) { public async void fetch_history(Account account, Jid target, Cancellable? cancellable = null) {
@ -497,6 +506,7 @@ public class Dino.HistorySync {
} }
} }
} }
if (hitted_range.has_key(query_id) && hitted_range[query_id] == -2) { if (hitted_range.has_key(query_id) && hitted_range[query_id] == -2) {
// Message got filtered out by xmpp-vala, but succesful range fetch nevertheless // Message got filtered out by xmpp-vala, but succesful range fetch nevertheless
yield send_messages_back_into_pipeline(account, query_id); yield send_messages_back_into_pipeline(account, query_id);
@ -511,15 +521,24 @@ public class Dino.HistorySync {
if (cancellable != null && cancellable.is_cancelled()) { if (cancellable != null && cancellable.is_cancelled()) {
page_result = PageResult.Cancelled; page_result = PageResult.Cancelled;
} }
return new PageRequestResult(page_result, query_result, stanzas_for_query); return new PageRequestResult(page_result, query_result, stanzas_for_query);
} }
private async void send_messages_back_into_pipeline(Account account, string query_id, Cancellable? cancellable = null) { private async void send_messages_back_into_pipeline(Account account, string query_id, Cancellable? cancellable = null) {
if (!stanzas.has_key(query_id)) return; if (!stanzas.has_key(query_id)) {
return;
}
foreach (Xmpp.MessageStanza message in stanzas[query_id]) { foreach (Xmpp.MessageStanza message in stanzas[query_id]) {
if (cancellable != null && cancellable.is_cancelled()) break; if (cancellable != null && cancellable.is_cancelled()) break;
yield stream_interactor.get_module(MessageProcessor.IDENTITY).run_pipeline_announce(account, message); bool result = yield stream_interactor.get_module(MessageProcessor.IDENTITY).run_pipeline_announce(account, message);
if (result && messages_processed.has_key(query_id)) {
int count = messages_processed[query_id];
count = count - 1;
messages_processed[query_id] = count;
}
} }
stanzas.unset(query_id); stanzas.unset(query_id);
} }

View file

@ -130,17 +130,17 @@ public class MessageProcessor : StreamInteractionModule, Object {
run_pipeline_announce.begin(account, message_stanza); run_pipeline_announce.begin(account, message_stanza);
} }
public async void run_pipeline_announce(Account account, Xmpp.MessageStanza message_stanza) { public async bool run_pipeline_announce(Account account, Xmpp.MessageStanza message_stanza) {
Entities.Message message = yield parse_message_stanza(account, message_stanza); Entities.Message message = yield parse_message_stanza(account, message_stanza);
Conversation? conversation = stream_interactor.get_module(ConversationManager.IDENTITY).get_conversation_for_message(message); Conversation? conversation = stream_interactor.get_module(ConversationManager.IDENTITY).get_conversation_for_message(message);
if (conversation == null) { if (conversation == null) {
return; return false;
} }
bool abort = yield received_pipeline.run(message, message_stanza, conversation); bool abort = yield received_pipeline.run(message, message_stanza, conversation);
if (abort) { if (abort) {
return; return false;
} }
if (message.direction == Entities.Message.DIRECTION_RECEIVED) { if (message.direction == Entities.Message.DIRECTION_RECEIVED) {
@ -150,6 +150,7 @@ public class MessageProcessor : StreamInteractionModule, Object {
} }
message_sent_or_received(message, conversation); message_sent_or_received(message, conversation);
return true;
} }
public async Entities.Message parse_message_stanza(Account account, Xmpp.MessageStanza message) { public async Entities.Message parse_message_stanza(Account account, Xmpp.MessageStanza message) {