Don't continue reading after stream was disconnected, make disconnecting async

fixes #636
This commit is contained in:
fiaxh 2019-11-27 18:46:29 +01:00
parent c4325473fb
commit 1985fe1d7b
5 changed files with 36 additions and 24 deletions

View file

@ -117,7 +117,7 @@ public interface Dino.Application : GLib.Application {
protected void remove_connection(Account account) { protected void remove_connection(Account account) {
if ((get_flags() & ApplicationFlags.IS_SERVICE) == ApplicationFlags.IS_SERVICE) release(); if ((get_flags() & ApplicationFlags.IS_SERVICE) == ApplicationFlags.IS_SERVICE) release();
stream_interactor.disconnect_account(account); stream_interactor.disconnect_account.begin(account);
} }
private void restore() { private void restore() {

View file

@ -131,11 +131,11 @@ public class ConnectionManager : Object {
connections[account].stream.get_module(Presence.Module.IDENTITY).send_presence(connections[account].stream, presence); connections[account].stream.get_module(Presence.Module.IDENTITY).send_presence(connections[account].stream, presence);
} }
public void disconnect_account(Account account) { public async void disconnect_account(Account account) {
if (connections.has_key(account)) { if (connections.has_key(account)) {
make_offline(account); make_offline(account);
try { try {
connections[account].stream.disconnect(); yield connections[account].stream.disconnect();
} catch (Error e) { } catch (Error e) {
debug("Error disconnecting stream: %s", e.message); debug("Error disconnecting stream: %s", e.message);
} }
@ -217,6 +217,8 @@ public class ConnectionManager : Object {
} }
private void check_reconnect(Account account) { private void check_reconnect(Account account) {
if (!connections.has_key(account)) return;
bool acked = false; bool acked = false;
DateTime? last_activity_was = connections[account].last_activity; DateTime? last_activity_was = connections[account].last_activity;
@ -228,6 +230,7 @@ public class ConnectionManager : Object {
}); });
Timeout.add_seconds(10, () => { Timeout.add_seconds(10, () => {
if (!connections.has_key(account)) return false;
if (connections[account].stream != stream) return false; if (connections[account].stream != stream) return false;
if (acked) return false; if (acked) return false;
if (connections[account].last_activity != last_activity_was) return false; if (connections[account].last_activity != last_activity_was) return false;
@ -235,11 +238,15 @@ public class ConnectionManager : Object {
// Reconnect. Nothing gets through the stream. // Reconnect. Nothing gets through the stream.
debug("[%s %p] Ping timeouted. Reconnecting", account.bare_jid.to_string(), stream); debug("[%s %p] Ping timeouted. Reconnecting", account.bare_jid.to_string(), stream);
change_connection_state(account, ConnectionState.DISCONNECTED); change_connection_state(account, ConnectionState.DISCONNECTED);
try {
connections[account].stream.disconnect(); connections[account].stream.disconnect.begin((_, res) => {
} catch (Error e) { try {
debug("Error disconnecting stream: %s", e.message); connections[account].stream.disconnect.end(res);
} } catch (Error e) {
debug("Error disconnecting stream: %s", e.message);
}
});
connect_(account); connect_(account);
return false; return false;
}); });
@ -267,7 +274,7 @@ public class ConnectionManager : Object {
} }
} }
private void on_prepare_for_sleep(bool suspend) { private async void on_prepare_for_sleep(bool suspend) {
foreach (Account account in connection_todo) { foreach (Account account in connection_todo) {
change_connection_state(account, ConnectionState.DISCONNECTED); change_connection_state(account, ConnectionState.DISCONNECTED);
} }
@ -276,7 +283,7 @@ public class ConnectionManager : Object {
foreach (Account account in connection_todo) { foreach (Account account in connection_todo) {
try { try {
make_offline(account); make_offline(account);
connections[account].stream.disconnect(); yield connections[account].stream.disconnect();
} catch (Error e) { } catch (Error e) {
debug("Error disconnecting stream %p: %s", connections[account].stream, e.message); debug("Error disconnecting stream %p: %s", connections[account].stream, e.message);
} }

View file

@ -37,7 +37,7 @@ public class Register : StreamInteractionModule, Object{
if (connected_account.equals(account)) { if (connected_account.equals(account)) {
ret = error.source; ret = error.source;
} }
stream_interactor.disconnect_account(account); stream_interactor.disconnect_account.begin(account);
Idle.add((owned)callback); Idle.add((owned)callback);
}); });
@ -91,7 +91,7 @@ public class Register : StreamInteractionModule, Object{
yield; yield;
try { try {
stream.disconnect(); yield stream.disconnect();
} catch (Error e) {} } catch (Error e) {}
return ret; return ret;
} }
@ -130,7 +130,7 @@ public class Register : StreamInteractionModule, Object{
form = yield stream.get_module(Xep.InBandRegistration.Module.IDENTITY).get_from_server(stream, jid); form = yield stream.get_module(Xep.InBandRegistration.Module.IDENTITY).get_from_server(stream, jid);
} }
try { try {
stream.disconnect(); yield stream.disconnect();
} catch (Error e) {} } catch (Error e) {}
return form; return form;

View file

@ -29,8 +29,8 @@ public class StreamInteractor : Object {
connection_manager.connect_account(account); connection_manager.connect_account(account);
} }
public void disconnect_account(Account account) { public async void disconnect_account(Account account) {
connection_manager.disconnect_account(account); yield connection_manager.disconnect_account(account);
account_removed(account); account_removed(account);
} }

View file

@ -27,6 +27,7 @@ public class XmppStream {
public bool negotiation_complete { get; set; default=false; } public bool negotiation_complete { get; set; default=false; }
private bool setup_needed = false; private bool setup_needed = false;
private bool non_negotiation_modules_attached = false; private bool non_negotiation_modules_attached = false;
private bool disconnected = false;
public signal void received_node(XmppStream stream, StanzaNode node); public signal void received_node(XmppStream stream, StanzaNode node);
public signal void received_root_node(XmppStream stream, StanzaNode node); public signal void received_root_node(XmppStream stream, StanzaNode node);
@ -75,15 +76,15 @@ public class XmppStream {
yield loop(); yield loop();
} }
public void disconnect() throws IOStreamError { public async void disconnect() throws IOStreamError, XmlError, IOError {
StanzaWriter? writer = this.writer; disconnected = true;
StanzaReader? reader = this.reader; if (writer == null || reader == null || stream == null) {
IOStream? stream = this.stream; throw new IOStreamError.DISCONNECT("trying to disconnect, but no stream open");
if (writer == null || reader == null || stream == null) throw new IOStreamError.DISCONNECT("trying to disconnect, but no stream open"); }
log.str("OUT", "</stream:stream>"); log.str("OUT", "</stream:stream>");
((!)writer).write.begin("</stream:stream>"); yield writer.write("</stream:stream>");
((!)reader).cancel(); reader.cancel();
((!)stream).close_async.begin(); yield stream.close_async();
} }
public void reset_stream(IOStream stream) { public void reset_stream(IOStream stream) {
@ -217,6 +218,8 @@ public class XmppStream {
Idle.add(loop.callback); Idle.add(loop.callback);
yield; yield;
if (disconnected) break;
received_node(this, node); received_node(this, node);
if (node.ns_uri == NS_URI && node.name == "features") { if (node.ns_uri == NS_URI && node.name == "features") {
@ -224,7 +227,9 @@ public class XmppStream {
received_features_node(this); received_features_node(this);
} else if (node.ns_uri == NS_URI && node.name == "stream" && node.pseudo) { } else if (node.ns_uri == NS_URI && node.name == "stream" && node.pseudo) {
debug("[%p] Server closed stream", this); debug("[%p] Server closed stream", this);
disconnect(); try {
yield disconnect();
} catch (Error e) {}
return; return;
} else if (node.ns_uri == JABBER_URI) { } else if (node.ns_uri == JABBER_URI) {
if (node.name == "message") { if (node.name == "message") {