From d0488401cea54a78c8190c372d33a63440cc3aeb Mon Sep 17 00:00:00 2001 From: fiaxh Date: Sat, 14 Nov 2020 16:59:21 +0100 Subject: [PATCH] Use WeakMap for message caching --- libdino/src/service/content_item_store.vala | 24 +-- libdino/src/service/message_storage.vala | 176 ++++++++++++++------ plugins/http-files/src/file_provider.vala | 4 +- 3 files changed, 130 insertions(+), 74 deletions(-) diff --git a/libdino/src/service/content_item_store.vala b/libdino/src/service/content_item_store.vala index 7b320071..357d2300 100644 --- a/libdino/src/service/content_item_store.vala +++ b/libdino/src/service/content_item_store.vala @@ -49,24 +49,12 @@ public class ContentItemStore : StreamInteractionModule, Object { DateTime local_time = new DateTime.from_unix_utc(row[db.content_item.local_time]); switch (provider) { case 1: - RowOption row_option = db.message.select().with(db.message.id, "=", foreign_id) - .outer_join_with(db.message_correction, db.message_correction.message_id, db.message.id) - .row(); - if (row_option.is_present()) { - Message? message = stream_interactor.get_module(MessageStorage.IDENTITY).get_message_by_id(foreign_id, conversation); - if (message == null) { - try { - message = new Message.from_row(db, row_option.inner); - } catch (InvalidJidError e) { - warning("Ignoring message with invalid Jid: %s", e.message); - } - } - if (message != null) { - var message_item = new MessageItem(message, conversation, row[db.content_item.id]); - message_item.display_time = time; - message_item.sort_time = local_time; - items.add(message_item); - } + Message? message = stream_interactor.get_module(MessageStorage.IDENTITY).get_message_by_id(foreign_id, conversation); + if (message != null) { + var message_item = new MessageItem(message, conversation, row[db.content_item.id]); + message_item.display_time = time; + message_item.sort_time = local_time; + items.add(message_item); } break; case 2: diff --git a/libdino/src/service/message_storage.vala b/libdino/src/service/message_storage.vala index 7bf03b08..c010a876 100644 --- a/libdino/src/service/message_storage.vala +++ b/libdino/src/service/message_storage.vala @@ -1,3 +1,4 @@ +using Xmpp; using Gee; using Qlite; @@ -12,7 +13,12 @@ public class MessageStorage : StreamInteractionModule, Object { private StreamInteractor stream_interactor; private Database db; - private HashMap> messages = new HashMap>(Conversation.hash_func, Conversation.equals_func); + private WeakMap messages_by_db_id = new WeakMap(); + private HashMap> messages_by_stanza_id = new HashMap>(Conversation.hash_func, Conversation.equals_func); + private HashMap> messages_by_server_id = new HashMap>(Conversation.hash_func, Conversation.equals_func); + + // This is to keep the last 300 messages such that we don't have to recreate the newest ones all the time + private LinkedList message_refs = new LinkedList(); public static void start(StreamInteractor stream_interactor, Database db) { MessageStorage m = new MessageStorage(stream_interactor, db); @@ -26,53 +32,48 @@ public class MessageStorage : StreamInteractionModule, Object { public void add_message(Message message, Conversation conversation) { message.persist(db); - init_conversation(conversation); - messages[conversation].add(message); + cache_message(message, conversation); } public Gee.List get_messages(Conversation conversation, int count = 50) { - init_conversation(conversation); - Gee.List ret = new ArrayList(Message.equals_func); - BidirIterator iter = messages[conversation].bidir_iterator(); - iter.last(); - if (messages[conversation].size > 0) { - do { - ret.insert(0, iter.get()); - iter.previous(); - } while (iter.has_previous() && ret.size < count); + var query = db.message.select() + .with(db.message.account_id, "=", conversation.account.id) + .with(db.message.counterpart_id, "=", db.get_jid_id(conversation.counterpart)) + .with(db.message.type_, "=", (int) Util.get_message_type_for_conversation(conversation)) + .order_by(db.message.local_time, "DESC") + .order_by(db.message.time, "DESC") + .outer_join_with(db.message_correction, db.message_correction.message_id, db.message.id) + .limit(count); + + Gee.List ret = new LinkedList(Message.equals_func); + foreach (Row row in query) { + Message? message = messages_by_db_id[row[db.message.id]]; + if (message == null) { + message = create_message_from_row(row, conversation); + } + ret.insert(0, message); } + return ret; } public Message? get_last_message(Conversation conversation) { - init_conversation(conversation); - if (messages[conversation].size > 0) { - return messages[conversation].last(); + Gee.List messages = get_messages(conversation, 1); + + if (messages.size > 0) { + return messages[0]; } + return null; } public Gee.List get_messages_before_message(Conversation? conversation, DateTime before, int id, int count = 20) { -// SortedSet? before = messages[conversation].head_set(message); -// if (before != null && before.size >= count) { -// Gee.List ret = new ArrayList(Message.equals_func); -// Iterator iter = before.iterator(); -// iter.next(); -// for (int from_index = before.size - count; iter.has_next() && from_index > 0; from_index--) iter.next(); -// while(iter.has_next()) { -// Message m = iter.get(); -// ret.add(m); -// iter.next(); -// } -// return ret; -// } else { Gee.List db_messages = db.get_messages(conversation.counterpart, conversation.account, Util.get_message_type_for_conversation(conversation), count, before, null, id); Gee.List ret = new ArrayList(); foreach (Message message in db_messages) { ret.add(new MessageItem(message, conversation, -1)); } return ret; -// } } public Gee.List get_messages_after_message(Conversation? conversation, DateTime after, int id, int count = 20) { @@ -85,43 +86,110 @@ public class MessageStorage : StreamInteractionModule, Object { } public Message? get_message_by_id(int id, Conversation conversation) { - init_conversation(conversation); - foreach (Message message in messages[conversation]) { - if (message.id == id) return message; + Message? message = messages_by_db_id[id]; + if (message != null) { + return message; } - return null; + + RowOption row_option = db.message.select().with(db.message.id, "=", id) + .outer_join_with(db.message_correction, db.message_correction.message_id, db.message.id) + .row(); + + return create_message_from_row_opt(row_option, conversation); } public Message? get_message_by_stanza_id(string stanza_id, Conversation conversation) { - init_conversation(conversation); - foreach (Message message in messages[conversation]) { - if (message.stanza_id == stanza_id) return message; + if (messages_by_stanza_id.has_key(conversation)) { + Message? message = messages_by_stanza_id[conversation][stanza_id]; + if (message != null) { + return message; + } } - return null; + + var query = db.message.select() + .with(db.message.account_id, "=", conversation.account.id) + .with(db.message.counterpart_id, "=", db.get_jid_id(conversation.counterpart)) + .with(db.message.type_, "=", (int) Util.get_message_type_for_conversation(conversation)) + .with(db.message.stanza_id, "=", stanza_id) + .order_by(db.message.local_time, "DESC") + .order_by(db.message.time, "DESC") + .outer_join_with(db.message_correction, db.message_correction.message_id, db.message.id); + + if (conversation.counterpart.resourcepart == null) { + query.with_null(db.message.counterpart_resource); + } else { + query.with(db.message.counterpart_resource, "=", conversation.counterpart.resourcepart); + } + + RowOption row_option = query.single().row(); + + return create_message_from_row_opt(row_option, conversation); } public Message? get_message_by_server_id(string server_id, Conversation conversation) { - init_conversation(conversation); - foreach (Message message in messages[conversation]) { - if (message.server_id == server_id) return message; + if (messages_by_server_id.has_key(conversation)) { + Message? message = messages_by_server_id[conversation][server_id]; + if (message != null) { + return message; + } + } + + var query = db.message.select() + .with(db.message.account_id, "=", conversation.account.id) + .with(db.message.counterpart_id, "=", db.get_jid_id(conversation.counterpart)) + .with(db.message.type_, "=", (int) Util.get_message_type_for_conversation(conversation)) + .with(db.message.server_id, "=", server_id) + .order_by(db.message.local_time, "DESC") + .order_by(db.message.time, "DESC") + .outer_join_with(db.message_correction, db.message_correction.message_id, db.message.id); + + if (conversation.counterpart.resourcepart == null) { + query.with_null(db.message.counterpart_resource); + } else { + query.with(db.message.counterpart_resource, "=", conversation.counterpart.resourcepart); + } + + RowOption row_option = query.single().row(); + + return create_message_from_row_opt(row_option, conversation); + } + + private Message? create_message_from_row_opt(RowOption row_option, Conversation conversation) { + if (!row_option.is_present()) return null; + return create_message_from_row(row_option.inner, conversation); + } + + private Message? create_message_from_row(Row row, Conversation conversation) { + try { + Message message = new Message.from_row(db, row); + cache_message(message, conversation); + return message; + } catch (InvalidJidError e) { + warning("Got message with invalid Jid: %s", e.message); } return null; } - private void init_conversation(Conversation conversation) { - if (!messages.has_key(conversation)) { - messages[conversation] = new Gee.TreeSet((a, b) => { - int res = a.local_time.compare(b.local_time); - if (res == 0) { - res = a.time.compare(b.time); - } - if (res == 0) { - res = a.id - b.id > 0 ? 1 : -1; - } - return res; - }); - Gee.List db_messages = db.get_messages(conversation.counterpart, conversation.account, Util.get_message_type_for_conversation(conversation), 50, null, null, -1); - messages[conversation].add_all(db_messages); + private void cache_message(Message message, Conversation conversation) { + messages_by_db_id[message.id] = message; + + if (message.stanza_id != null) { + if (!messages_by_stanza_id.has_key(conversation)) { + messages_by_stanza_id[conversation] = new WeakMap(); + } + messages_by_stanza_id[conversation][message.stanza_id] = message; + } + + if (message.server_id != null) { + if (!messages_by_server_id.has_key(conversation)) { + messages_by_server_id[conversation] = new WeakMap(); + } + messages_by_server_id[conversation][message.server_id] = message; + } + + message_refs.insert(0, message); + if (message_refs.size > 300) { + message_refs.remove_at(message_refs.size - 1); } } } diff --git a/plugins/http-files/src/file_provider.vala b/plugins/http-files/src/file_provider.vala index fd952b6b..0a68a1fb 100644 --- a/plugins/http-files/src/file_provider.vala +++ b/plugins/http-files/src/file_provider.vala @@ -114,7 +114,7 @@ public class FileProvider : Dino.FileProvider, Object { Conversation? conversation = stream_interactor.get_module(ConversationManager.IDENTITY).get_conversation(file_transfer.counterpart.bare_jid, file_transfer.account); if (conversation == null) throw new FileReceiveError.GET_METADATA_FAILED("No conversation"); - Message? message = dino_db.get_message_by_id(int.parse(file_transfer.info)); + Message? message = stream_interactor.get_module(MessageStorage.IDENTITY).get_message_by_id(int.parse(file_transfer.info), conversation); if (message == null) throw new FileReceiveError.GET_METADATA_FAILED("No message"); var file_meta = new HttpFileMeta(); @@ -132,7 +132,7 @@ public class FileProvider : Dino.FileProvider, Object { Conversation? conversation = stream_interactor.get_module(ConversationManager.IDENTITY).get_conversation(file_transfer.counterpart.bare_jid, file_transfer.account); if (conversation == null) return null; - Message? message = dino_db.get_message_by_id(int.parse(file_transfer.info)); + Message? message = stream_interactor.get_module(MessageStorage.IDENTITY).get_message_by_id(int.parse(file_transfer.info), conversation); if (message == null) return null; var receive_data = new HttpFileReceiveData();