Deduplicate messages before storing in database

This commit is contained in:
Marvin W 2023-02-06 14:39:59 +01:00
parent d76e12b215
commit f74c1f18b1
No known key found for this signature in database
GPG key ID: 072E9235DB996F2A

View file

@ -34,9 +34,9 @@ public class MessageProcessor : StreamInteractionModule, Object {
this.db = db; this.db = db;
this.history_sync = new HistorySync(db, stream_interactor); this.history_sync = new HistorySync(db, stream_interactor);
received_pipeline.connect(new DeduplicateMessageListener(this, db)); received_pipeline.connect(new DeduplicateMessageListener(this));
received_pipeline.connect(new FilterMessageListener()); received_pipeline.connect(new FilterMessageListener());
received_pipeline.connect(new StoreMessageListener(stream_interactor)); received_pipeline.connect(new StoreMessageListener(this, stream_interactor));
received_pipeline.connect(new StoreContentItemListener(stream_interactor)); received_pipeline.connect(new StoreContentItemListener(stream_interactor));
received_pipeline.connect(new MamMessageListener(stream_interactor)); received_pipeline.connect(new MamMessageListener(stream_interactor));
@ -233,24 +233,9 @@ public class MessageProcessor : StreamInteractionModule, Object {
return Entities.Message.Type.CHAT; return Entities.Message.Type.CHAT;
} }
private class DeduplicateMessageListener : MessageListener { private bool is_duplicate(Entities.Message message, Xmpp.MessageStanza stanza, Conversation conversation) {
public string[] after_actions_const = new string[]{ "FILTER_EMPTY", "MUC" };
public override string action_group { get { return "DEDUPLICATE"; } }
public override string[] after_actions { get { return after_actions_const; } }
private MessageProcessor outer;
private Database db;
public DeduplicateMessageListener(MessageProcessor outer, Database db) {
this.outer = outer;
this.db = db;
}
public override async bool run(Entities.Message message, Xmpp.MessageStanza stanza, Conversation conversation) {
Account account = conversation.account; Account account = conversation.account;
// Deduplicate by server_id // Deduplicate by server_id
if (message.server_id != null) { if (message.server_id != null) {
QueryBuilder builder = db.message.select() QueryBuilder builder = db.message.select()
@ -260,7 +245,7 @@ public class MessageProcessor : StreamInteractionModule, Object {
// If the message is a duplicate // If the message is a duplicate
if (builder.count() > 0) { if (builder.count() > 0) {
outer.history_sync.on_server_id_duplicate(account, stanza, message); history_sync.on_server_id_duplicate(account, stanza, message);
return true; return true;
} }
} }
@ -308,6 +293,22 @@ public class MessageProcessor : StreamInteractionModule, Object {
} }
return builder.count() > 0; return builder.count() > 0;
} }
private class DeduplicateMessageListener : MessageListener {
public string[] after_actions_const = new string[]{ "FILTER_EMPTY", "MUC" };
public override string action_group { get { return "DEDUPLICATE"; } }
public override string[] after_actions { get { return after_actions_const; } }
private MessageProcessor outer;
public DeduplicateMessageListener(MessageProcessor outer) {
this.outer = outer;
}
public override async bool run(Entities.Message message, Xmpp.MessageStanza stanza, Conversation conversation) {
return outer.is_duplicate(message, stanza, conversation);
}
} }
private class FilterMessageListener : MessageListener { private class FilterMessageListener : MessageListener {
@ -327,14 +328,17 @@ public class MessageProcessor : StreamInteractionModule, Object {
public override string action_group { get { return "STORE"; } } public override string action_group { get { return "STORE"; } }
public override string[] after_actions { get { return after_actions_const; } } public override string[] after_actions { get { return after_actions_const; } }
private MessageProcessor outer;
private StreamInteractor stream_interactor; private StreamInteractor stream_interactor;
public StoreMessageListener(StreamInteractor stream_interactor) { public StoreMessageListener(MessageProcessor outer, StreamInteractor stream_interactor) {
this.outer = outer;
this.stream_interactor = stream_interactor; this.stream_interactor = stream_interactor;
} }
public override async bool run(Entities.Message message, Xmpp.MessageStanza stanza, Conversation conversation) { public override async bool run(Entities.Message message, Xmpp.MessageStanza stanza, Conversation conversation) {
if (message.body == null) return true; if (message.body == null || outer.is_duplicate(message, stanza, conversation)) return true;
stream_interactor.get_module(MessageStorage.IDENTITY).add_message(message, conversation); stream_interactor.get_module(MessageStorage.IDENTITY).add_message(message, conversation);
return false; return false;
} }