support resume via sasl 2.0

This commit is contained in:
Daniel Gultsch 2022-08-29 19:22:25 +02:00
parent 928a16d31d
commit f6ab3dd068

View file

@ -506,6 +506,10 @@ public class XmppConnection implements Runnable {
account.getJid().asBareJid() account.getJid().asBareJid()
+ ": SASL 2.0 authorization identifier was " + ": SASL 2.0 authorization identifier was "
+ authorizationIdentifier); + authorizationIdentifier);
final Element resumed = success.findChild("resumed", "urn:xmpp:sm:3");
if (resumed != null && streamId != null) {
processResumed(resumed);
}
} }
if (version == SaslMechanism.Version.SASL) { if (version == SaslMechanism.Version.SASL) {
tagReader.reset(); tagReader.reset();
@ -579,7 +583,7 @@ public class XmppConnection implements Runnable {
tagWriter.writeElement(response); tagWriter.writeElement(response);
} else if (nextTag.isStart("enabled")) { } else if (nextTag.isStart("enabled")) {
final Element enabled = tagReader.readElement(nextTag); final Element enabled = tagReader.readElement(nextTag);
if ("true".equals(enabled.getAttribute("resume"))) { if (enabled.getAttributeAsBoolean("resume")) {
this.streamId = enabled.getAttribute("id"); this.streamId = enabled.getAttribute("id");
Log.d( Log.d(
Config.LOGTAG, Config.LOGTAG,
@ -600,57 +604,8 @@ public class XmppConnection implements Runnable {
final RequestPacket r = new RequestPacket(smVersion); final RequestPacket r = new RequestPacket(smVersion);
tagWriter.writeStanzaAsync(r); tagWriter.writeStanzaAsync(r);
} else if (nextTag.isStart("resumed")) { } else if (nextTag.isStart("resumed")) {
this.inSmacksSession = true;
this.isBound = true;
this.tagWriter.writeStanzaAsync(new RequestPacket(smVersion));
lastPacketReceived = SystemClock.elapsedRealtime();
final Element resumed = tagReader.readElement(nextTag); final Element resumed = tagReader.readElement(nextTag);
final String h = resumed.getAttribute("h"); processResumed(resumed);
try {
ArrayList<AbstractAcknowledgeableStanza> failedStanzas = new ArrayList<>();
final boolean acknowledgedMessages;
synchronized (this.mStanzaQueue) {
final int serverCount = Integer.parseInt(h);
if (serverCount < stanzasSent) {
Log.d(
Config.LOGTAG,
account.getJid().asBareJid().toString()
+ ": session resumed with lost packages");
stanzasSent = serverCount;
} else {
Log.d(
Config.LOGTAG,
account.getJid().asBareJid().toString() + ": session resumed");
}
acknowledgedMessages = acknowledgeStanzaUpTo(serverCount);
for (int i = 0; i < this.mStanzaQueue.size(); ++i) {
failedStanzas.add(mStanzaQueue.valueAt(i));
}
mStanzaQueue.clear();
}
if (acknowledgedMessages) {
mXmppConnectionService.updateConversationUi();
}
Log.d(Config.LOGTAG, "resending " + failedStanzas.size() + " stanzas");
for (AbstractAcknowledgeableStanza packet : failedStanzas) {
if (packet instanceof MessagePacket) {
MessagePacket message = (MessagePacket) packet;
mXmppConnectionService.markMessage(
account,
message.getTo().asBareJid(),
message.getId(),
Message.STATUS_UNSEND);
}
sendPacket(packet);
}
} catch (final NumberFormatException ignored) {
}
Log.d(
Config.LOGTAG,
account.getJid().asBareJid()
+ ": online with resource "
+ account.getResource());
changeStatus(Account.State.ONLINE);
} else if (nextTag.isStart("r")) { } else if (nextTag.isStart("r")) {
tagReader.readElement(nextTag); tagReader.readElement(nextTag);
if (Config.EXTENDED_SM_LOGGING) { if (Config.EXTENDED_SM_LOGGING) {
@ -739,6 +694,59 @@ public class XmppConnection implements Runnable {
} }
} }
private void processResumed(final Element resumed) {
this.inSmacksSession = true;
this.isBound = true;
this.tagWriter.writeStanzaAsync(new RequestPacket(smVersion));
lastPacketReceived = SystemClock.elapsedRealtime();
final String h = resumed.getAttribute("h");
try {
ArrayList<AbstractAcknowledgeableStanza> failedStanzas = new ArrayList<>();
final boolean acknowledgedMessages;
synchronized (this.mStanzaQueue) {
final int serverCount = Integer.parseInt(h);
if (serverCount < stanzasSent) {
Log.d(
Config.LOGTAG,
account.getJid().asBareJid() + ": session resumed with lost packages");
stanzasSent = serverCount;
} else {
Log.d(Config.LOGTAG, account.getJid().asBareJid() + ": session resumed");
}
acknowledgedMessages = acknowledgeStanzaUpTo(serverCount);
for (int i = 0; i < this.mStanzaQueue.size(); ++i) {
failedStanzas.add(mStanzaQueue.valueAt(i));
}
mStanzaQueue.clear();
}
if (acknowledgedMessages) {
mXmppConnectionService.updateConversationUi();
}
Log.d(
Config.LOGTAG,
account.getJid().asBareJid()
+ ": resending "
+ failedStanzas.size()
+ " stanzas");
for (AbstractAcknowledgeableStanza packet : failedStanzas) {
if (packet instanceof MessagePacket) {
MessagePacket message = (MessagePacket) packet;
mXmppConnectionService.markMessage(
account,
message.getTo().asBareJid(),
message.getId(),
Message.STATUS_UNSEND);
}
sendPacket(packet);
}
} catch (final NumberFormatException ignored) {
}
Log.d(
Config.LOGTAG,
account.getJid().asBareJid() + ": online with resource " + account.getResource());
changeStatus(Account.State.ONLINE);
}
private boolean acknowledgeStanzaUpTo(int serverCount) { private boolean acknowledgeStanzaUpTo(int serverCount) {
if (serverCount > stanzasSent) { if (serverCount > stanzasSent) {
Log.e(Config.LOGTAG, "server acknowledged more stanzas than we sent. serverCount=" + serverCount + ", ourCount=" + stanzasSent); Log.e(Config.LOGTAG, "server acknowledged more stanzas than we sent. serverCount=" + serverCount + ", ourCount=" + stanzasSent);
@ -986,6 +994,12 @@ public class XmppConnection implements Runnable {
+ XmlHelper.printElementNames(this.streamFeatures)); + XmlHelper.printElementNames(this.streamFeatures));
throw new StateChangingException(Account.State.INCOMPATIBLE_SERVER); throw new StateChangingException(Account.State.INCOMPATIBLE_SERVER);
} }
} else {
Log.d(
Config.LOGTAG,
account.getJid().asBareJid()
+ ": received NOP stream features"
+ this.streamFeatures);
} }
} }
@ -1030,7 +1044,14 @@ public class XmppConnection implements Runnable {
if (!Strings.isNullOrEmpty(firstMessage)) { if (!Strings.isNullOrEmpty(firstMessage)) {
authenticate.addChild("initial-response").setContent(firstMessage); authenticate.addChild("initial-response").setContent(firstMessage);
} }
// TODO place to add extensions final Element inline = this.streamFeatures.findChild("inline", Namespace.SASL_2);
final boolean inlineStreamManagement = inline != null && inline.hasChild("sm", "urn:xmpp:sm:3");
if (inlineStreamManagement && streamId != null) {
final ResumePacket resume = new ResumePacket(this.streamId, stanzasReceived, smVersion);
this.mSmCatchupMessageCounter.set(0);
this.mWaitingForSmCatchup.set(true);
authenticate.addChild(resume);
}
} else { } else {
throw new AssertionError("Missing implementation for " + version); throw new AssertionError("Missing implementation for " + version);
} }