add message carbon processing

This commit is contained in:
Daniel Gultsch 2023-01-24 14:08:50 +01:00
parent fe32526de8
commit ddcab5fb58
No known key found for this signature in database
GPG key ID: F43D18AD2A0982C2
18 changed files with 208 additions and 62 deletions

View file

@ -58,6 +58,10 @@ dependencies {
implementation "androidx.security:security-crypto:1.0.0"
implementation 'org.slf4j:slf4j-api:1.7.36'
implementation 'com.github.tony19:logback-android:2.0.1'
testImplementation 'junit:junit:4.13.2'
testImplementation 'org.robolectric:robolectric:4.9'

4
proguard-rules.pro vendored
View file

@ -13,6 +13,10 @@
-keep class org.openintents.openpgp.*
-keep class org.webrtc.** { *; }
# Logger
-keep class org.slf4j.** {*;}
-keep class ch.qos.** {*;}
-dontwarn javax.mail.internet.MimeMessage
-dontwarn javax.mail.internet.MimeBodyPart
-dontwarn javax.mail.internet.SharedInputStream

View file

@ -0,0 +1,16 @@
<configuration xmlns="https://tony19.github.io/logback-android/xml"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="https://tony19.github.io/logback-android/xml https://cdn.jsdelivr.net/gh/tony19/logback-android/logback.xsd">
<appender name="logcat" class="ch.qos.logback.classic.android.LogcatAppender">
<tagEncoder>
<pattern>conversations</pattern>
</tagEncoder>
<encoder>
<pattern>%logger{12}: %msg</pattern>
</encoder>
</appender>
<root level="DEBUG">
<appender-ref ref="logcat" />
</root>
</configuration>

View file

@ -79,4 +79,5 @@ public final class Namespace {
"http://gultsch.de/xmpp/drafts/omemo/dlts-srtp-verification";
public static final String UNIFIED_PUSH = "http://gultsch.de/xmpp/drafts/unified-push";
public static final String JABBER_CLIENT = "jabber:client";
public static final String FORWARD = "urn:xmpp:forward:0";
}

View file

@ -3,8 +3,9 @@ package im.conversations.android.database.dao;
import androidx.annotation.NonNull;
import androidx.annotation.Nullable;
import androidx.room.Dao;
import androidx.room.Insert;
import androidx.room.OnConflictStrategy;
import androidx.room.Query;
import androidx.room.Upsert;
import eu.siacs.conversations.xmpp.Jid;
import im.conversations.android.database.entity.PresenceEntity;
import im.conversations.android.database.model.Account;
@ -26,7 +27,7 @@ public abstract class PresenceDao {
+ " resource=:resource")
abstract void deletePresence(long account, Jid address, String resource);
@Upsert
@Insert(onConflict = OnConflictStrategy.REPLACE)
abstract void insert(PresenceEntity entity);
public void set(

View file

@ -12,6 +12,6 @@ public enum PresenceType {
if (typeAttribute == null) {
return null;
}
return of(typeAttribute.toUpperCase(Locale.ROOT));
return valueOf(typeAttribute.toUpperCase(Locale.ROOT));
}
}

View file

@ -1,7 +1,5 @@
package im.conversations.android.xml;
import android.util.Log;
import eu.siacs.conversations.Config;
import eu.siacs.conversations.xml.Element;
import eu.siacs.conversations.xml.Tag;
import im.conversations.android.xmpp.model.StreamElement;
@ -11,9 +9,13 @@ import java.io.OutputStreamWriter;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TagWriter {
private static final Logger LOGGER = LoggerFactory.getLogger(TagWriter.class);
private OutputStreamWriter outputStream;
private boolean finished = false;
private final LinkedBlockingQueue<StreamElement> writeQueue = new LinkedBlockingQueue<>();
@ -83,7 +85,7 @@ public class TagWriter {
public void writeStanzaAsync(StreamElement stanza) {
if (finished) {
Log.d(Config.LOGTAG, "attempting to write stanza to finished TagWriter");
LOGGER.info("attempting to write stanza to finished TagWriter");
} else {
if (!asyncStanzaWriter.isAlive()) {
try {

View file

@ -4,7 +4,6 @@ import static eu.siacs.conversations.utils.Random.SECURE_RANDOM;
import android.content.Context;
import android.os.SystemClock;
import android.util.Log;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
@ -28,9 +27,13 @@ import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ConnectionPool {
private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionPool.class);
private static volatile ConnectionPool INSTANCE;
private final Context context;
@ -137,7 +140,7 @@ public class ConnectionPool {
if (connection.getStatus() == ConnectionState.ONLINE) {
synchronized (lowPingTimeoutMode) {
if (lowPingTimeoutMode.remove(account.address)) {
Log.d(Config.LOGTAG, account.address + ": leaving low ping timeout mode");
LOGGER.debug("{}: leaving low ping timeout mode", account.address);
}
}
ConversationsDatabase.getInstance(context)
@ -154,11 +157,9 @@ public class ConnectionPool {
// resetSendingToWaiting(account);
if (isInLowPingTimeoutMode(account)) {
Log.d(
Config.LOGTAG,
account.address
+ ": went into offline state during low ping mode."
+ " reconnecting now");
LOGGER.debug(
"{}: went into offline state during low ping mode. reconnecting now",
account.address);
reconnectAccount(connection);
} else {
final int timeToReconnect = SECURE_RANDOM.nextInt(10) + 2;
@ -173,24 +174,20 @@ public class ConnectionPool {
final int next = connection.getTimeToNextAttempt();
final boolean lowPingTimeoutMode = isInLowPingTimeoutMode(account);
if (next <= 0) {
Log.d(
Config.LOGTAG,
account.address
+ ": error connecting account. reconnecting now."
+ " lowPingTimeout="
+ lowPingTimeoutMode);
LOGGER.debug(
"{}: error connecting account. reconnecting now. lowPingTimeout={}",
account.address,
lowPingTimeoutMode);
reconnectAccount(connection);
} else {
final int attempt = connection.getAttempt() + 1;
Log.d(
Config.LOGTAG,
account.address
+ ": error connecting account. try again in "
+ next
+ "s for the "
+ attempt
+ " time. lowPingTimeout="
+ lowPingTimeoutMode);
LOGGER.debug(
"{}: error connecting account. try again in {}s for the {} time."
+ " lowPingTimeout={}",
account.address,
next,
attempt,
lowPingTimeoutMode);
scheduleWakeUpCall(next);
}
}
@ -246,9 +243,7 @@ public class ConnectionPool {
final Account account = xmppConnection.getAccount();
final boolean lowTimeout = isInLowPingTimeoutMode(account);
xmppConnection.sendPing();
Log.d(
Config.LOGTAG,
account.address + " send ping (lowTimeout=" + lowTimeout + ")");
LOGGER.debug("{}: send ping (lowTimeout={})", account.address, lowTimeout);
scheduleWakeUpCall(lowTimeout ? Config.LOW_PING_TIMEOUT : Config.PING_TIMEOUT);
}
}
@ -277,7 +272,7 @@ public class ConnectionPool {
(lastSent + pingTimeout) - SystemClock.elapsedRealtime();
if (lastSent > lastReceived) {
if (pingTimeoutIn < 0) {
Log.d(Config.LOGTAG, account.address + ": ping timeout");
LOGGER.debug("{}: ping timeout", account.address);
this.reconnectAccount(connection);
} else {
this.scheduleWakeUpCall(Ints.saturatedCast(pingTimeoutIn / 1000));
@ -287,18 +282,14 @@ public class ConnectionPool {
if (isAccountPushed) {
pingNow = true;
if (lowPingTimeoutMode.add(account.address)) {
Log.d(
Config.LOGTAG,
account.address + ": entering low ping timeout mode");
LOGGER.debug("{}: entering low ping timeout mode", account.address);
}
} else if (msToNextPing <= 0) {
pingNow = true;
} else {
this.scheduleWakeUpCall(Ints.saturatedCast(msToNextPing / 1000));
if (lowPingTimeoutMode.remove(account.address)) {
Log.d(
Config.LOGTAG,
account.address + ": leaving low ping timeout mode");
LOGGER.debug("{}: leaving low ping timeout mode", account.address);
}
}
}
@ -310,13 +301,10 @@ public class ConnectionPool {
(SystemClock.elapsedRealtime() - connection.getLastConnect()) / 1000;
long timeout = Config.CONNECT_TIMEOUT - secondsSinceLastConnect;
if (timeout < 0) {
Log.d(
Config.LOGTAG,
account.address
+ ": time out during connect reconnecting"
+ " (secondsSinceLast="
+ secondsSinceLastConnect
+ ")");
LOGGER.debug(
"{}: time out during connect reconnecting (secondsSinceLast={})",
account.address,
secondsSinceLastConnect);
connection.resetAttemptCount(false);
reconnectAccount(connection);
}

View file

@ -2259,9 +2259,15 @@ public class XmppConnection implements Runnable {
public boolean fromAccount(final Stanza stanza) {
final Jid from = stanza.getFrom();
// TODO null is valid too?!
return from != null && from.asBareJid().equals(connectionAddress.asBareJid());
}
public boolean toAccount(final Stanza stanza) {
final Jid to = stanza.getTo();
return to == null || to.asBareJid().equals(connectionAddress.asBareJid());
}
public boolean supportsClientStateIndication() {
return this.streamFeatures != null && this.streamFeatures.clientStateIndication();
}

View file

@ -1,18 +1,26 @@
package im.conversations.android.xmpp.manager;
import android.content.Context;
import android.util.Log;
import eu.siacs.conversations.Config;
import im.conversations.android.xmpp.XmppConnection;
import im.conversations.android.xmpp.model.carbons.Enable;
import im.conversations.android.xmpp.model.carbons.Received;
import im.conversations.android.xmpp.model.carbons.Sent;
import im.conversations.android.xmpp.model.stanza.IQ;
import im.conversations.android.xmpp.processor.MessageProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CarbonsManager extends AbstractManager {
private static final Logger LOGGER = LoggerFactory.getLogger(CarbonsManager.class);
private final MessageProcessor messageProcessor;
private boolean enabled = false;
public CarbonsManager(Context context, XmppConnection connection) {
super(context, connection);
this.messageProcessor = new MessageProcessor(context, connection, false);
}
public void enable() {
@ -22,14 +30,11 @@ public class CarbonsManager extends AbstractManager {
iq,
result -> {
if (result.getType() == IQ.Type.RESULT) {
Log.d(
Config.LOGTAG,
getAccount().address + ": successfully enabled carbons");
LOGGER.info("{}: successfully enabled carbons", getAccount().address);
this.enabled = true;
} else {
Log.d(
Config.LOGTAG,
getAccount().address + ": could not enable carbons " + result);
LOGGER.warn(
"{}: could not enable carbons {}", getAccount().address, result);
}
});
}
@ -41,4 +46,30 @@ public class CarbonsManager extends AbstractManager {
public boolean isEnabled() {
return this.enabled;
}
public void handleReceived(final Received received) {
final var forwarded = received.getForwarded();
final var message = forwarded == null ? null : forwarded.getMessage();
if (message == null) {
LOGGER.warn("Received carbon copy did not contain forwarded message");
} else if (connection.toAccount(message)) {
// all received, forwarded messages must be addressed to us
this.messageProcessor.accept(message);
} else {
LOGGER.warn("Received carbon copy had invalid `to` attribute {}", message.getTo());
}
}
public void handleSent(final Sent sent) {
final var forwarded = sent.getForwarded();
final var message = forwarded == null ? null : forwarded.getMessage();
if (message == null) {
LOGGER.warn("Sent carbon copy did not contain forwarded message");
} else if (connection.fromAccount(message)) {
// all sent, forwarded messages must be addressed from us
this.messageProcessor.accept(message);
} else {
LOGGER.warn("Sent carbon copy had invalid `from` attribute {}", message.getFrom());
}
}
}

View file

@ -1,18 +1,20 @@
package im.conversations.android.xmpp.manager;
import android.content.Context;
import android.util.Log;
import com.google.common.base.Strings;
import com.google.common.collect.Collections2;
import eu.siacs.conversations.Config;
import im.conversations.android.xmpp.XmppConnection;
import im.conversations.android.xmpp.model.roster.Item;
import im.conversations.android.xmpp.model.roster.Query;
import im.conversations.android.xmpp.model.stanza.IQ;
import java.util.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RosterManager extends AbstractManager {
private static final Logger LOGGER = LoggerFactory.getLogger(RosterManager.class);
public RosterManager(final Context context, final XmppConnection connection) {
super(context, connection);
}
@ -31,9 +33,9 @@ public class RosterManager extends AbstractManager {
final Query rosterQuery = new Query();
iqPacket.addChild(rosterQuery);
if (Strings.isNullOrEmpty(rosterVersion)) {
Log.d(Config.LOGTAG, account.address + ": fetching roster");
LOGGER.info("{}: fetching roster", account.address);
} else {
Log.d(Config.LOGTAG, account.address + ": fetching roster version " + rosterVersion);
LOGGER.info("{}: fetching roster version {}", account.address, rosterVersion);
rosterQuery.setVersion(rosterVersion);
}
connection.sendIqPacket(iqPacket, this::handleFetchResult);
@ -52,7 +54,7 @@ public class RosterManager extends AbstractManager {
final var database = getDatabase();
final var version = query.getVersion();
final var items = query.getExtensions(Item.class);
// In a roster result (Section 2.1.4), the client MUST ignore values of the c'subscription'
// In a roster result (Section 2.1.4), the client MUST ignore values of the 'subscription'
// attribute other than "none", "to", "from", or "both".
final var validItems =
Collections2.filter(

View file

@ -0,0 +1,17 @@
package im.conversations.android.xmpp.model.carbons;
import im.conversations.android.annotation.XmlElement;
import im.conversations.android.xmpp.model.Extension;
import im.conversations.android.xmpp.model.forward.Forwarded;
@XmlElement
public class Received extends Extension {
public Received() {
super(Received.class);
}
public Forwarded getForwarded() {
return this.getExtension(Forwarded.class);
}
}

View file

@ -0,0 +1,17 @@
package im.conversations.android.xmpp.model.carbons;
import im.conversations.android.annotation.XmlElement;
import im.conversations.android.xmpp.model.Extension;
import im.conversations.android.xmpp.model.forward.Forwarded;
@XmlElement
public class Sent extends Extension {
public Sent() {
super(Sent.class);
}
public Forwarded getForwarded() {
return this.getExtension(Forwarded.class);
}
}

View file

@ -0,0 +1,18 @@
package im.conversations.android.xmpp.model.forward;
import eu.siacs.conversations.xml.Namespace;
import im.conversations.android.annotation.XmlElement;
import im.conversations.android.xmpp.model.Extension;
import im.conversations.android.xmpp.model.stanza.Message;
@XmlElement(namespace = Namespace.FORWARD)
public class Forwarded extends Extension {
public Forwarded() {
super(Forwarded.class);
}
public Message getMessage() {
return this.getExtension(Message.class);
}
}

View file

@ -32,7 +32,7 @@ public class Item extends Extension {
public Subscription getSubscription() {
final String value = this.getAttribute("subscription");
try {
return value == null ? null : Subscription.valueOf(value.toLowerCase(Locale.ROOT));
return value == null ? null : Subscription.valueOf(value.toUpperCase(Locale.ROOT));
} catch (final IllegalArgumentException e) {
return null;
}

View file

@ -8,4 +8,8 @@ public class Message extends Stanza {
public Message() {
super(Message.class);
}
public String getBody() {
return this.findChildContent("body");
}
}

View file

@ -9,6 +9,7 @@ import im.conversations.android.xmpp.manager.BlockingManager;
import im.conversations.android.xmpp.manager.BookmarkManager;
import im.conversations.android.xmpp.manager.DiscoManager;
import im.conversations.android.xmpp.manager.RosterManager;
import im.conversations.android.xmpp.model.stanza.Presence;
import java.util.function.Consumer;
public class BindProcessor extends XmppConnection.Delegate implements Consumer<Jid> {
@ -46,6 +47,8 @@ public class BindProcessor extends XmppConnection.Delegate implements Consumer<J
getManager(BookmarkManager.class).fetch();
connection.sendPresencePacket(new Presence());
// TODO send initial presence
}
}

View file

@ -1,14 +1,46 @@
package im.conversations.android.xmpp.processor;
import android.content.Context;
import com.google.common.base.Strings;
import im.conversations.android.xmpp.XmppConnection;
import im.conversations.android.xmpp.manager.CarbonsManager;
import im.conversations.android.xmpp.model.carbons.Received;
import im.conversations.android.xmpp.model.carbons.Sent;
import im.conversations.android.xmpp.model.stanza.Message;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MessageProcessor implements Consumer<Message> {
public class MessageProcessor extends XmppConnection.Delegate implements Consumer<Message> {
public MessageProcessor(final Context context, final XmppConnection connection) {}
private static final Logger LOGGER = LoggerFactory.getLogger(MessageProcessor.class);
private final boolean isRoot;
public MessageProcessor(final Context context, final XmppConnection connection) {
this(context, connection, true);
}
public MessageProcessor(
final Context context, final XmppConnection connection, final boolean isRoot) {
super(context, connection);
this.isRoot = isRoot;
}
@Override
public void accept(final Message messagePacket) {}
public void accept(final Message message) {
if (isRoot && connection.fromServer(message) && message.hasExtension(Received.class)) {
getManager(CarbonsManager.class).handleReceived(message.getExtension(Received.class));
}
if (isRoot && connection.fromServer(message) && message.hasExtension(Sent.class)) {
getManager(CarbonsManager.class).handleSent(message.getExtension(Sent.class));
}
final String body = message.getBody();
if (!Strings.isNullOrEmpty(body)) {
LOGGER.info("'{}' from {}", body, message.getFrom());
}
}
}