Improve history sync
- Ensure we fully fetch desired history if possible (previously, duplicates from offline message queue could hinder MAM sync) - Early drop illegal MAM messages so they don't pile up in the pending queue waiting for their query to end (which it never will if they were not requested in first place). Fixes #1386
This commit is contained in:
parent
acf9c69470
commit
89b9110fcb
|
@ -163,7 +163,7 @@ public class Dino.HistorySync {
|
||||||
if (current_row[db.mam_catchup.from_end]) return;
|
if (current_row[db.mam_catchup.from_end]) return;
|
||||||
|
|
||||||
debug("[%s] Fetching between ranges %s - %s", mam_server.to_string(), previous_row[db.mam_catchup.to_time].to_string(), current_row[db.mam_catchup.from_time].to_string());
|
debug("[%s] Fetching between ranges %s - %s", mam_server.to_string(), previous_row[db.mam_catchup.to_time].to_string(), current_row[db.mam_catchup.from_time].to_string());
|
||||||
current_row = yield fetch_between_ranges(account, mam_server, previous_row, current_row);
|
current_row = yield fetch_between_ranges(account, mam_server, previous_row, current_row, cancellable);
|
||||||
if (current_row == null) return;
|
if (current_row == null) return;
|
||||||
|
|
||||||
RowOption previous_row_opt = db.mam_catchup.select()
|
RowOption previous_row_opt = db.mam_catchup.select()
|
||||||
|
@ -214,13 +214,11 @@ public class Dino.HistorySync {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
// If we get PageResult.Duplicate, we still want to update the db row to the latest message.
|
|
||||||
|
|
||||||
// Catchup finished within first page. Update latest db entry.
|
// Catchup finished within first page. Update latest db entry.
|
||||||
if (latest_row_id != -1 &&
|
if (latest_row_id != -1 &&
|
||||||
page_result.page_result in new PageResult[] { PageResult.TargetReached, PageResult.NoMoreMessages, PageResult.Duplicate }) {
|
page_result.page_result in new PageResult[] { PageResult.TargetReached, PageResult.NoMoreMessages }) {
|
||||||
|
|
||||||
if (page_result.stanzas == null || page_result.stanzas.is_empty) return null;
|
if (page_result.stanzas == null) return null;
|
||||||
|
|
||||||
string latest_mam_id = page_result.query_result.last;
|
string latest_mam_id = page_result.query_result.last;
|
||||||
long latest_mam_time = (long) mam_times[account][latest_mam_id].to_unix();
|
long latest_mam_time = (long) mam_times[account][latest_mam_id].to_unix();
|
||||||
|
@ -272,7 +270,7 @@ public class Dino.HistorySync {
|
||||||
** Merges the `earlier_range` db row into the `later_range` db row.
|
** Merges the `earlier_range` db row into the `later_range` db row.
|
||||||
** @return The resulting range comprising `earlier_range`, `later_rage`, and everything in between. null if fetching/merge failed.
|
** @return The resulting range comprising `earlier_range`, `later_rage`, and everything in between. null if fetching/merge failed.
|
||||||
**/
|
**/
|
||||||
private async Row? fetch_between_ranges(Account account, Jid mam_server, Row earlier_range, Row later_range) {
|
private async Row? fetch_between_ranges(Account account, Jid mam_server, Row earlier_range, Row later_range, Cancellable? cancellable = null) {
|
||||||
int later_range_id = (int) later_range[db.mam_catchup.id];
|
int later_range_id = (int) later_range[db.mam_catchup.id];
|
||||||
DateTime earliest_time = new DateTime.from_unix_utc(earlier_range[db.mam_catchup.to_time]);
|
DateTime earliest_time = new DateTime.from_unix_utc(earlier_range[db.mam_catchup.to_time]);
|
||||||
DateTime latest_time = new DateTime.from_unix_utc(later_range[db.mam_catchup.from_time]);
|
DateTime latest_time = new DateTime.from_unix_utc(later_range[db.mam_catchup.from_time]);
|
||||||
|
@ -282,9 +280,9 @@ public class Dino.HistorySync {
|
||||||
earliest_time, earlier_range[db.mam_catchup.to_id],
|
earliest_time, earlier_range[db.mam_catchup.to_id],
|
||||||
latest_time, later_range[db.mam_catchup.from_id]);
|
latest_time, later_range[db.mam_catchup.from_id]);
|
||||||
|
|
||||||
PageRequestResult page_result = yield fetch_query(account, query_params, later_range_id);
|
PageRequestResult page_result = yield fetch_query(account, query_params, later_range_id, cancellable);
|
||||||
|
|
||||||
if (page_result.page_result == PageResult.TargetReached) {
|
if (page_result.page_result == PageResult.TargetReached || page_result.page_result == PageResult.NoMoreMessages) {
|
||||||
debug("[%s | %s] Merging range %i into %i", account.bare_jid.to_string(), mam_server.to_string(), earlier_range[db.mam_catchup.id], later_range_id);
|
debug("[%s | %s] Merging range %i into %i", account.bare_jid.to_string(), mam_server.to_string(), earlier_range[db.mam_catchup.id], later_range_id);
|
||||||
// Merge earlier range into later one.
|
// Merge earlier range into later one.
|
||||||
db.mam_catchup.update()
|
db.mam_catchup.update()
|
||||||
|
@ -330,9 +328,9 @@ public class Dino.HistorySync {
|
||||||
PageRequestResult? page_result = null;
|
PageRequestResult? page_result = null;
|
||||||
do {
|
do {
|
||||||
page_result = yield get_mam_page(account, query_params, page_result, cancellable);
|
page_result = yield get_mam_page(account, query_params, page_result, cancellable);
|
||||||
debug("Page result %s %b", page_result.page_result.to_string(), page_result.stanzas == null);
|
debug("[%s | %s] Page result %s (got stanzas: %s)", account.bare_jid.to_string(), query_params.mam_server.to_string(), page_result.page_result.to_string(), (page_result.stanzas != null).to_string());
|
||||||
|
|
||||||
if (page_result.page_result == PageResult.Error || page_result.page_result == PageResult.Cancelled || page_result.stanzas == null) return page_result;
|
if (page_result.page_result == PageResult.Error || page_result.page_result == PageResult.Cancelled || page_result.query_result.first == null) return page_result;
|
||||||
|
|
||||||
string earliest_mam_id = page_result.query_result.first;
|
string earliest_mam_id = page_result.query_result.first;
|
||||||
long earliest_mam_time = (long)mam_times[account][earliest_mam_id].to_unix();
|
long earliest_mam_time = (long)mam_times[account][earliest_mam_id].to_unix();
|
||||||
|
@ -357,7 +355,6 @@ public class Dino.HistorySync {
|
||||||
MorePagesAvailable,
|
MorePagesAvailable,
|
||||||
TargetReached,
|
TargetReached,
|
||||||
NoMoreMessages,
|
NoMoreMessages,
|
||||||
Duplicate,
|
|
||||||
Error,
|
Error,
|
||||||
Cancelled
|
Cancelled
|
||||||
}
|
}
|
||||||
|
@ -399,23 +396,25 @@ public class Dino.HistorySync {
|
||||||
string query_id = query_params.query_id;
|
string query_id = query_params.query_id;
|
||||||
string? after_id = query_params.start_id;
|
string? after_id = query_params.start_id;
|
||||||
|
|
||||||
|
var stanzas_for_query = stanzas.has_key(query_id) && !stanzas[query_id].is_empty ? stanzas[query_id] : null;
|
||||||
if (cancellable != null && cancellable.is_cancelled()) {
|
if (cancellable != null && cancellable.is_cancelled()) {
|
||||||
return new PageRequestResult(PageResult.Cancelled, query_result, stanzas[query_id]);
|
stanzas.unset(query_id);
|
||||||
|
return new PageRequestResult(PageResult.Cancelled, query_result, stanzas_for_query);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (stanzas.has_key(query_id) && !stanzas[query_id].is_empty) {
|
if (stanzas_for_query != null) {
|
||||||
|
|
||||||
// Check it we reached our target (from_id)
|
// Check it we reached our target (from_id)
|
||||||
foreach (Xmpp.MessageStanza message in stanzas[query_id]) {
|
foreach (Xmpp.MessageStanza message in stanzas_for_query) {
|
||||||
Xmpp.MessageArchiveManagement.MessageFlag? mam_message_flag = Xmpp.MessageArchiveManagement.MessageFlag.get_flag(message);
|
Xmpp.MessageArchiveManagement.MessageFlag? mam_message_flag = Xmpp.MessageArchiveManagement.MessageFlag.get_flag(message);
|
||||||
if (mam_message_flag != null && mam_message_flag.mam_id != null) {
|
if (mam_message_flag != null && mam_message_flag.mam_id != null) {
|
||||||
if (after_id != null && mam_message_flag.mam_id == after_id) {
|
if (after_id != null && mam_message_flag.mam_id == after_id) {
|
||||||
// Successfully fetched the whole range
|
// Successfully fetched the whole range
|
||||||
yield send_messages_back_into_pipeline(account, query_id, cancellable);
|
yield send_messages_back_into_pipeline(account, query_id, cancellable);
|
||||||
if (cancellable != null && cancellable.is_cancelled()) {
|
if (cancellable != null && cancellable.is_cancelled()) {
|
||||||
return new PageRequestResult(PageResult.Cancelled, query_result, stanzas[query_id]);
|
return new PageRequestResult(PageResult.Cancelled, query_result, stanzas_for_query);
|
||||||
}
|
}
|
||||||
return new PageRequestResult(PageResult.TargetReached, query_result, stanzas[query_id]);
|
return new PageRequestResult(PageResult.TargetReached, query_result, stanzas_for_query);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -423,37 +422,9 @@ public class Dino.HistorySync {
|
||||||
// Message got filtered out by xmpp-vala, but succesful range fetch nevertheless
|
// Message got filtered out by xmpp-vala, but succesful range fetch nevertheless
|
||||||
yield send_messages_back_into_pipeline(account, query_id);
|
yield send_messages_back_into_pipeline(account, query_id);
|
||||||
if (cancellable != null && cancellable.is_cancelled()) {
|
if (cancellable != null && cancellable.is_cancelled()) {
|
||||||
return new PageRequestResult(PageResult.Cancelled, query_result, stanzas[query_id]);
|
return new PageRequestResult(PageResult.Cancelled, query_result, stanzas_for_query);
|
||||||
}
|
}
|
||||||
return new PageRequestResult(PageResult.TargetReached, query_result, stanzas[query_id]);
|
return new PageRequestResult(PageResult.TargetReached, query_result, stanzas_for_query);
|
||||||
}
|
|
||||||
|
|
||||||
// Check for duplicates. Go through all messages and build a db query.
|
|
||||||
foreach (Xmpp.MessageStanza message in stanzas[query_id]) {
|
|
||||||
Xmpp.MessageArchiveManagement.MessageFlag? mam_message_flag = Xmpp.MessageArchiveManagement.MessageFlag.get_flag(message);
|
|
||||||
if (mam_message_flag != null && mam_message_flag.mam_id != null) {
|
|
||||||
if (selection == null) {
|
|
||||||
selection = @"$(db.message.server_id) = ?";
|
|
||||||
} else {
|
|
||||||
selection += @" OR $(db.message.server_id) = ?";
|
|
||||||
}
|
|
||||||
selection_args += mam_message_flag.mam_id;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
var duplicates_qry = db.message.select()
|
|
||||||
.with(db.message.account_id, "=", account.id)
|
|
||||||
.where(selection, selection_args);
|
|
||||||
// We don't want messages from different MAM servers to interfere with each other.
|
|
||||||
if (!query_params.mam_server.equals_bare(account.bare_jid)) {
|
|
||||||
duplicates_qry.with(db.message.counterpart_id, "=", db.get_jid_id(query_params.mam_server));
|
|
||||||
} else {
|
|
||||||
duplicates_qry.with(db.message.type_, "=", Message.Type.CHAT);
|
|
||||||
}
|
|
||||||
var duplicates_count = duplicates_qry.count();
|
|
||||||
if (duplicates_count > 0) {
|
|
||||||
// We got a duplicate although we thought we have to catch up.
|
|
||||||
// There was a server bug where prosody would send all messages if it didn't know the after ID that was given
|
|
||||||
page_result = PageResult.Duplicate;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -461,7 +432,7 @@ public class Dino.HistorySync {
|
||||||
if (cancellable != null && cancellable.is_cancelled()) {
|
if (cancellable != null && cancellable.is_cancelled()) {
|
||||||
page_result = PageResult.Cancelled;
|
page_result = PageResult.Cancelled;
|
||||||
}
|
}
|
||||||
return new PageRequestResult(page_result, query_result, stanzas.has_key(query_id) ? stanzas[query_id] : null);
|
return new PageRequestResult(page_result, query_result, stanzas_for_query);
|
||||||
}
|
}
|
||||||
|
|
||||||
private async void send_messages_back_into_pipeline(Account account, string query_id, Cancellable? cancellable = null) {
|
private async void send_messages_back_into_pipeline(Account account, string query_id, Cancellable? cancellable = null) {
|
||||||
|
|
|
@ -11,8 +11,8 @@ public class QueryResult {
|
||||||
public bool error { get; set; default=false; }
|
public bool error { get; set; default=false; }
|
||||||
public bool malformed { get; set; default=false; }
|
public bool malformed { get; set; default=false; }
|
||||||
public bool complete { get; set; default=false; }
|
public bool complete { get; set; default=false; }
|
||||||
public string first { get; set; }
|
public string? first { get; set; }
|
||||||
public string last { get; set; }
|
public string? last { get; set; }
|
||||||
}
|
}
|
||||||
|
|
||||||
public class Module : XmppStreamModule {
|
public class Module : XmppStreamModule {
|
||||||
|
@ -65,16 +65,17 @@ public class Module : XmppStreamModule {
|
||||||
}
|
}
|
||||||
|
|
||||||
StanzaNode query_node = new StanzaNode.build("query", NS_VER(stream)).add_self_xmlns().put_node(data_form.get_submit_node());
|
StanzaNode query_node = new StanzaNode.build("query", NS_VER(stream)).add_self_xmlns().put_node(data_form.get_submit_node());
|
||||||
if (queryid != null) {
|
query_node.put_attribute("queryid", queryid);
|
||||||
query_node.put_attribute("queryid", queryid);
|
|
||||||
}
|
|
||||||
return query_node;
|
return query_node;
|
||||||
}
|
}
|
||||||
|
|
||||||
internal async QueryResult query_archive(XmppStream stream, string ns, Jid? mam_server, StanzaNode query_node, Cancellable? cancellable = null) {
|
internal async QueryResult query_archive(XmppStream stream, string ns, Jid? mam_server, StanzaNode query_node, Cancellable? cancellable = null) {
|
||||||
var res = new QueryResult();
|
|
||||||
|
|
||||||
if (stream.get_flag(Flag.IDENTITY) == null) { res.error = true; return res; }
|
var res = new QueryResult();
|
||||||
|
Flag? flag = stream.get_flag(Flag.IDENTITY);
|
||||||
|
string? query_id = query_node.get_attribute("queryid");
|
||||||
|
if (flag == null || query_id == null) { res.error = true; return res; }
|
||||||
|
flag.active_query_ids.add(query_id);
|
||||||
|
|
||||||
// Build and send query
|
// Build and send query
|
||||||
Iq.Stanza iq = new Iq.Stanza.set(query_node) { to=mam_server };
|
Iq.Stanza iq = new Iq.Stanza.set(query_node) { to=mam_server };
|
||||||
|
@ -93,6 +94,11 @@ public class Module : XmppStreamModule {
|
||||||
if ((res.first == null) != (res.last == null)) { res.malformed = true; return res; }
|
if ((res.first == null) != (res.last == null)) { res.malformed = true; return res; }
|
||||||
res.complete = fin_node.get_attribute_bool("complete", false, ns);
|
res.complete = fin_node.get_attribute_bool("complete", false, ns);
|
||||||
|
|
||||||
|
Idle.add(() => {
|
||||||
|
flag.active_query_ids.remove(query_id);
|
||||||
|
return Source.REMOVE;
|
||||||
|
}, Priority.LOW);
|
||||||
|
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -104,7 +110,8 @@ public class ReceivedPipelineListener : StanzaListener<MessageStanza> {
|
||||||
public override string[] after_actions { get { return after_actions_const; } }
|
public override string[] after_actions { get { return after_actions_const; } }
|
||||||
|
|
||||||
public override async bool run(XmppStream stream, MessageStanza message) {
|
public override async bool run(XmppStream stream, MessageStanza message) {
|
||||||
if (stream.get_flag(Flag.IDENTITY) == null) return false;
|
Flag? flag = stream.get_flag(Flag.IDENTITY);
|
||||||
|
if (flag == null) return false;
|
||||||
|
|
||||||
StanzaNode? message_node = message.stanza.get_deep_subnode(NS_VER(stream) + ":result", StanzaForwarding.NS_URI + ":forwarded", Xmpp.NS_URI + ":message");
|
StanzaNode? message_node = message.stanza.get_deep_subnode(NS_VER(stream) + ":result", StanzaForwarding.NS_URI + ":forwarded", Xmpp.NS_URI + ":message");
|
||||||
if (message_node != null) {
|
if (message_node != null) {
|
||||||
|
@ -112,6 +119,28 @@ public class ReceivedPipelineListener : StanzaListener<MessageStanza> {
|
||||||
DateTime? datetime = DelayedDelivery.get_time_for_node(forward_node);
|
DateTime? datetime = DelayedDelivery.get_time_for_node(forward_node);
|
||||||
string? mam_id = message.stanza.get_deep_attribute(NS_VER(stream) + ":result", NS_VER(stream) + ":id");
|
string? mam_id = message.stanza.get_deep_attribute(NS_VER(stream) + ":result", NS_VER(stream) + ":id");
|
||||||
string? query_id = message.stanza.get_deep_attribute(NS_VER(stream) + ":result", NS_VER(stream) + ":queryid");
|
string? query_id = message.stanza.get_deep_attribute(NS_VER(stream) + ":result", NS_VER(stream) + ":queryid");
|
||||||
|
|
||||||
|
if (query_id == null) {
|
||||||
|
warning("Received MAM message without queryid from %s, ignoring", message.from.to_string());
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!flag.active_query_ids.contains(query_id)) {
|
||||||
|
warning("Received MAM message from %s with unknown query id %s, ignoring", message.from.to_string(), query_id ?? "<none>");
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
Jid? inner_from = null;
|
||||||
|
try {
|
||||||
|
inner_from = new Jid(message_node.get_attribute("from"));
|
||||||
|
} catch (InvalidJidError e) {
|
||||||
|
warning("Received MAM message with invalid from attribute in forwarded message from %s, ignoring", message.from.to_string());
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (!message.from.equals(stream.get_flag(Bind.Flag.IDENTITY).my_jid.bare_jid) && !message.from.equals_bare(inner_from)) {
|
||||||
|
warning("Received MAM message from %s illegally impersonating %s, ignoring", message.from.to_string(), inner_from.to_string());
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
message.add_flag(new MessageFlag(message.from, datetime, mam_id, query_id));
|
message.add_flag(new MessageFlag(message.from, datetime, mam_id, query_id));
|
||||||
|
|
||||||
message.stanza = message_node;
|
message.stanza = message_node;
|
||||||
|
@ -124,6 +153,7 @@ public class ReceivedPipelineListener : StanzaListener<MessageStanza> {
|
||||||
public class Flag : XmppStreamFlag {
|
public class Flag : XmppStreamFlag {
|
||||||
public static FlagIdentity<Flag> IDENTITY = new FlagIdentity<Flag>(NS_URI, "message_archive_management");
|
public static FlagIdentity<Flag> IDENTITY = new FlagIdentity<Flag>(NS_URI, "message_archive_management");
|
||||||
public bool cought_up { get; set; default=false; }
|
public bool cought_up { get; set; default=false; }
|
||||||
|
public Gee.Set<string> active_query_ids { get; set; default = new HashSet<string>(); }
|
||||||
public string ns_ver;
|
public string ns_ver;
|
||||||
|
|
||||||
public Flag(string ns_ver) {
|
public Flag(string ns_ver) {
|
||||||
|
|
Loading…
Reference in a new issue