parent
a8ee61b34c
commit
0968da1ff7
|
@ -126,7 +126,7 @@ public class ChatInteraction : StreamInteractionModule, Object {
|
||||||
|
|
||||||
private class ReceivedMessageListener : MessageListener {
|
private class ReceivedMessageListener : MessageListener {
|
||||||
|
|
||||||
public string[] after_actions_const = new string[]{ };
|
public string[] after_actions_const = new string[]{ "DEDUPLICATE" };
|
||||||
public override string action_group { get { return "OTHER_NODES"; } }
|
public override string action_group { get { return "OTHER_NODES"; } }
|
||||||
public override string[] after_actions { get { return after_actions_const; } }
|
public override string[] after_actions { get { return after_actions_const; } }
|
||||||
|
|
||||||
|
|
|
@ -53,30 +53,39 @@ public class CounterpartInteractionManager : StreamInteractionModule, Object {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void on_chat_marker_received(Account account, Jid jid, string marker, string stanza_id) {
|
private void on_chat_marker_received(Account account, Jid jid, string marker, string stanza_id) {
|
||||||
foreach (Conversation conversation in stream_interactor.get_module(ConversationManager.IDENTITY).get_conversations(jid, account)) {
|
bool own_marker = account.bare_jid.to_string() == jid.bare_jid.to_string();
|
||||||
|
if (own_marker) {
|
||||||
|
Conversation? conversation = stream_interactor.get_module(MessageStorage.IDENTITY).get_conversation_for_stanza_id(account, stanza_id);
|
||||||
|
if (conversation == null) return;
|
||||||
Entities.Message? message = stream_interactor.get_module(MessageStorage.IDENTITY).get_message_by_id(stanza_id, conversation);
|
Entities.Message? message = stream_interactor.get_module(MessageStorage.IDENTITY).get_message_by_id(stanza_id, conversation);
|
||||||
if (message != null) {
|
if (message == null) return;
|
||||||
switch (marker) {
|
conversation.read_up_to = message;
|
||||||
case Xep.ChatMarkers.MARKER_RECEIVED:
|
} else {
|
||||||
received_message_received(account, jid, message);
|
foreach (Conversation conversation in stream_interactor.get_module(ConversationManager.IDENTITY).get_conversations(jid, account)) {
|
||||||
message.marked = Entities.Message.Marked.RECEIVED;
|
Entities.Message? message = stream_interactor.get_module(MessageStorage.IDENTITY).get_message_by_id(stanza_id, conversation);
|
||||||
break;
|
if (message != null) {
|
||||||
case Xep.ChatMarkers.MARKER_DISPLAYED:
|
switch (marker) {
|
||||||
received_message_displayed(account, jid, message);
|
case Xep.ChatMarkers.MARKER_RECEIVED:
|
||||||
Gee.List<Entities.Message> messages = stream_interactor.get_module(MessageStorage.IDENTITY).get_messages(conversation);
|
received_message_received(account, jid, message);
|
||||||
foreach (Entities.Message m in messages) {
|
message.marked = Entities.Message.Marked.RECEIVED;
|
||||||
if (m.equals(message)) break;
|
break;
|
||||||
if (m.marked == Entities.Message.Marked.RECEIVED) m.marked = Entities.Message.Marked.READ;
|
case Xep.ChatMarkers.MARKER_DISPLAYED:
|
||||||
}
|
received_message_displayed(account, jid, message);
|
||||||
message.marked = Entities.Message.Marked.READ;
|
Gee.List<Entities.Message> messages = stream_interactor.get_module(MessageStorage.IDENTITY).get_messages(conversation);
|
||||||
break;
|
foreach (Entities.Message m in messages) {
|
||||||
}
|
if (m.equals(message)) break;
|
||||||
} else {
|
if (m.marked == Entities.Message.Marked.RECEIVED) m.marked = Entities.Message.Marked.READ;
|
||||||
if (marker_wo_message.has_key(stanza_id) &&
|
}
|
||||||
|
message.marked = Entities.Message.Marked.READ;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (marker_wo_message.has_key(stanza_id) &&
|
||||||
marker_wo_message[stanza_id] == Xep.ChatMarkers.MARKER_DISPLAYED && marker == Xep.ChatMarkers.MARKER_RECEIVED) {
|
marker_wo_message[stanza_id] == Xep.ChatMarkers.MARKER_DISPLAYED && marker == Xep.ChatMarkers.MARKER_RECEIVED) {
|
||||||
return;
|
return;
|
||||||
|
}
|
||||||
|
marker_wo_message[stanza_id] = marker;
|
||||||
}
|
}
|
||||||
marker_wo_message[stanza_id] = marker;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,6 +13,7 @@ public class MessageProcessor : StreamInteractionModule, Object {
|
||||||
public signal void build_message_stanza(Entities.Message message, Xmpp.MessageStanza message_stanza, Conversation conversation);
|
public signal void build_message_stanza(Entities.Message message, Xmpp.MessageStanza message_stanza, Conversation conversation);
|
||||||
public signal void pre_message_send(Entities.Message message, Xmpp.MessageStanza message_stanza, Conversation conversation);
|
public signal void pre_message_send(Entities.Message message, Xmpp.MessageStanza message_stanza, Conversation conversation);
|
||||||
public signal void message_sent(Entities.Message message, Conversation conversation);
|
public signal void message_sent(Entities.Message message, Conversation conversation);
|
||||||
|
public signal void history_synced(Account account);
|
||||||
|
|
||||||
public MessageListenerHolder received_pipeline = new MessageListenerHolder();
|
public MessageListenerHolder received_pipeline = new MessageListenerHolder();
|
||||||
|
|
||||||
|
@ -66,7 +67,9 @@ public class MessageProcessor : StreamInteractionModule, Object {
|
||||||
});
|
});
|
||||||
stream_interactor.module_manager.get_module(account, Xmpp.Xep.MessageArchiveManagement.Module.IDENTITY).feature_available.connect( (stream) => {
|
stream_interactor.module_manager.get_module(account, Xmpp.Xep.MessageArchiveManagement.Module.IDENTITY).feature_available.connect( (stream) => {
|
||||||
DateTime start_time = account.mam_earliest_synced.to_unix() > 60 ? account.mam_earliest_synced.add_minutes(-1) : account.mam_earliest_synced;
|
DateTime start_time = account.mam_earliest_synced.to_unix() > 60 ? account.mam_earliest_synced.add_minutes(-1) : account.mam_earliest_synced;
|
||||||
stream.get_module(Xep.MessageArchiveManagement.Module.IDENTITY).query_archive(stream, null, start_time, null);
|
stream.get_module(Xep.MessageArchiveManagement.Module.IDENTITY).query_archive(stream, null, start_time, null, () => {
|
||||||
|
history_synced(account);
|
||||||
|
});
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -78,6 +78,16 @@ public class MessageStorage : StreamInteractionModule, Object {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Conversation? get_conversation_for_stanza_id(Account account, string stanza_id) {
|
||||||
|
foreach (Conversation conversation in messages.keys) {
|
||||||
|
if (!conversation.account.equals(account)) continue;
|
||||||
|
foreach (Message message in messages[conversation]) {
|
||||||
|
if (message.stanza_id == stanza_id) return conversation;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
private void init_conversation(Conversation conversation) {
|
private void init_conversation(Conversation conversation) {
|
||||||
if (!messages.has_key(conversation)) {
|
if (!messages.has_key(conversation)) {
|
||||||
messages[conversation] = new Gee.TreeSet<Message>((a, b) => {
|
messages[conversation] = new Gee.TreeSet<Message>((a, b) => {
|
||||||
|
|
|
@ -14,6 +14,9 @@ public class NotificationEvents : StreamInteractionModule, Object {
|
||||||
|
|
||||||
private StreamInteractor stream_interactor;
|
private StreamInteractor stream_interactor;
|
||||||
|
|
||||||
|
private HashMap<Account, HashMap<Conversation, Entities.Message>> mam_potential_new = new HashMap<Account, HashMap<Conversation, Entities.Message>>(Account.hash_func, Account.equals_func);
|
||||||
|
private Gee.List<Account> synced_accounts = new ArrayList<Account>();
|
||||||
|
|
||||||
public static void start(StreamInteractor stream_interactor) {
|
public static void start(StreamInteractor stream_interactor) {
|
||||||
NotificationEvents m = new NotificationEvents(stream_interactor);
|
NotificationEvents m = new NotificationEvents(stream_interactor);
|
||||||
stream_interactor.add_module(m);
|
stream_interactor.add_module(m);
|
||||||
|
@ -24,9 +27,29 @@ public class NotificationEvents : StreamInteractionModule, Object {
|
||||||
|
|
||||||
stream_interactor.get_module(MessageProcessor.IDENTITY).message_received.connect(on_message_received);
|
stream_interactor.get_module(MessageProcessor.IDENTITY).message_received.connect(on_message_received);
|
||||||
stream_interactor.get_module(PresenceManager.IDENTITY).received_subscription_request.connect(on_received_subscription_request);
|
stream_interactor.get_module(PresenceManager.IDENTITY).received_subscription_request.connect(on_received_subscription_request);
|
||||||
|
stream_interactor.get_module(MessageProcessor.IDENTITY).history_synced.connect((account) => {
|
||||||
|
synced_accounts.add(account);
|
||||||
|
if (!mam_potential_new.has_key(account)) return;
|
||||||
|
foreach (Conversation c in mam_potential_new[account].keys) {
|
||||||
|
Entities.Message m = mam_potential_new[account][c];
|
||||||
|
Entities.Message last_message = stream_interactor.get_module(MessageStorage.IDENTITY).get_last_message(c);
|
||||||
|
if (m.equals(last_message) && !c.read_up_to.equals(m)) {
|
||||||
|
on_message_received(m, c);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
mam_potential_new[account].clear();
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private void on_message_received(Entities.Message message, Conversation conversation) {
|
private void on_message_received(Entities.Message message, Conversation conversation) {
|
||||||
|
if (!synced_accounts.contains(conversation.account)) {
|
||||||
|
if (!mam_potential_new.has_key(conversation.account)) {
|
||||||
|
mam_potential_new[conversation.account] = new HashMap<Conversation, Entities.Message>(Conversation.hash_func, Conversation.equals_func);
|
||||||
|
}
|
||||||
|
mam_potential_new[conversation.account][conversation] = message;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (!should_notify_message(message, conversation)) return;
|
||||||
if (!should_notify_message(message, conversation)) return;
|
if (!should_notify_message(message, conversation)) return;
|
||||||
if (stream_interactor.get_module(ChatInteraction.IDENTITY).is_active_focus()) return;
|
if (stream_interactor.get_module(ChatInteraction.IDENTITY).is_active_focus()) return;
|
||||||
notify_message(message, conversation);
|
notify_message(message, conversation);
|
||||||
|
|
|
@ -14,7 +14,8 @@ public class Module : XmppStreamModule {
|
||||||
|
|
||||||
private ReceivedPipelineListener received_pipeline_listener = new ReceivedPipelineListener();
|
private ReceivedPipelineListener received_pipeline_listener = new ReceivedPipelineListener();
|
||||||
|
|
||||||
public void query_archive(XmppStream stream, string? jid, DateTime? start, DateTime? end) {
|
public delegate void OnFinished(XmppStream stream);
|
||||||
|
public void query_archive(XmppStream stream, string? jid, DateTime? start, DateTime? end, owned OnFinished? on_finished = null) {
|
||||||
if (stream.get_flag(Flag.IDENTITY) == null) return;
|
if (stream.get_flag(Flag.IDENTITY) == null) return;
|
||||||
|
|
||||||
DataForms.DataForm data_form = new DataForms.DataForm();
|
DataForms.DataForm data_form = new DataForms.DataForm();
|
||||||
|
@ -38,7 +39,7 @@ public class Module : XmppStreamModule {
|
||||||
}
|
}
|
||||||
StanzaNode query_node = new StanzaNode.build("query", NS_VER(stream)).add_self_xmlns().put_node(data_form.get_submit_node());
|
StanzaNode query_node = new StanzaNode.build("query", NS_VER(stream)).add_self_xmlns().put_node(data_form.get_submit_node());
|
||||||
Iq.Stanza iq = new Iq.Stanza.set(query_node);
|
Iq.Stanza iq = new Iq.Stanza.set(query_node);
|
||||||
stream.get_module(Iq.Module.IDENTITY).send_iq(stream, iq, page_through_results);
|
stream.get_module(Iq.Module.IDENTITY).send_iq(stream, iq, (stream, iq) => { page_through_results(stream, iq, on_finished); });
|
||||||
}
|
}
|
||||||
|
|
||||||
public override void attach(XmppStream stream) {
|
public override void attach(XmppStream stream) {
|
||||||
|
@ -53,10 +54,11 @@ public class Module : XmppStreamModule {
|
||||||
public override string get_ns() { return NS_URI; }
|
public override string get_ns() { return NS_URI; }
|
||||||
public override string get_id() { return IDENTITY.id; }
|
public override string get_id() { return IDENTITY.id; }
|
||||||
|
|
||||||
private static void page_through_results(XmppStream stream, Iq.Stanza iq) {
|
private static void page_through_results(XmppStream stream, Iq.Stanza iq, owned OnFinished? on_finished = null) {
|
||||||
string? last = iq.stanza.get_deep_string_content(NS_VER(stream) + ":fin", "http://jabber.org/protocol/rsm" + ":set", "last");
|
string? last = iq.stanza.get_deep_string_content(NS_VER(stream) + ":fin", "http://jabber.org/protocol/rsm" + ":set", "last");
|
||||||
if (last == null) {
|
if (last == null) {
|
||||||
stream.get_flag(Flag.IDENTITY).cought_up = true;
|
stream.get_flag(Flag.IDENTITY).cought_up = true;
|
||||||
|
if (on_finished != null) on_finished(stream);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -67,7 +69,7 @@ public class Module : XmppStreamModule {
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
stream.get_module(Iq.Module.IDENTITY).send_iq(stream, paging_iq, page_through_results);
|
stream.get_module(Iq.Module.IDENTITY).send_iq(stream, paging_iq, (stream, iq) => { page_through_results(stream, iq, on_finished); });
|
||||||
}
|
}
|
||||||
|
|
||||||
private void query_availability(XmppStream stream) {
|
private void query_availability(XmppStream stream) {
|
||||||
|
|
Loading…
Reference in a new issue