From 6bd552f6a32ca93826cb491f9b4bd757f9698227 Mon Sep 17 00:00:00 2001 From: Daniel Gultsch Date: Mon, 14 Feb 2022 11:46:57 +0100 Subject: [PATCH] flush stanzas in batches --- .../eu/siacs/conversations/xml/TagWriter.java | 22 +++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/src/main/java/eu/siacs/conversations/xml/TagWriter.java b/src/main/java/eu/siacs/conversations/xml/TagWriter.java index 4f429377a..2c2b8ac2c 100644 --- a/src/main/java/eu/siacs/conversations/xml/TagWriter.java +++ b/src/main/java/eu/siacs/conversations/xml/TagWriter.java @@ -8,12 +8,15 @@ import java.io.OutputStreamWriter; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import eu.siacs.conversations.Config; import eu.siacs.conversations.xmpp.stanzas.AbstractStanza; public class TagWriter { + private static final int FLUSH_DELAY = 400; + private OutputStreamWriter outputStream; private boolean finished = false; private final LinkedBlockingQueue writeQueue = new LinkedBlockingQueue(); @@ -21,6 +24,8 @@ public class TagWriter { private final Thread asyncStanzaWriter = new Thread() { + private final AtomicInteger batchStanzaCount = new AtomicInteger(0); + @Override public void run() { stanzaWriterCountDownLatch = new CountDownLatch(1); @@ -29,12 +34,21 @@ public class TagWriter { break; } try { - AbstractStanza output = writeQueue.take(); - outputStream.write(output.toString()); - if (writeQueue.size() == 0) { + final AbstractStanza stanza = writeQueue.poll(FLUSH_DELAY, TimeUnit.MILLISECONDS); + if (stanza != null) { + batchStanzaCount.incrementAndGet(); + outputStream.write(stanza.toString()); + } else { + final int batch = batchStanzaCount.getAndSet(0); + if (batch > 1) { + Log.d(Config.LOGTAG, "flushing " + batch + " stanzas"); + } outputStream.flush(); + final AbstractStanza nextStanza = writeQueue.take(); + batchStanzaCount.incrementAndGet(); + outputStream.write(nextStanza.toString()); } - } catch (Exception e) { + } catch (final Exception e) { break; } }