mv-experiment #1

Merged
fmodf merged 88 commits from mv-experiment into develop 2024-09-03 15:13:59 +00:00
Showing only changes of commit 087aeba94a - Show all commits

View file

@ -3,54 +3,75 @@ import Foundation
import GRDB import GRDB
import Martin import Martin
private typealias ArchMsg = Martin.MessageArchiveManagementModule.ArchivedMessageReceived
final class ClientMartinMAM { final class ClientMartinMAM {
private var cancellables: Set<AnyCancellable> = [] private var cancellables: Set<AnyCancellable> = []
private var processor = ArchiveMessageProcessor()
private weak var module: MessageArchiveManagementModule?
private var afterAvailable = true
private var beforeAvailable = true
init(_ xmppConnection: XMPPClient) { init(_ xmppConnection: XMPPClient) {
module = xmppConnection.module(.mam)
subscribe()
}
private func subscribe() {
// subscribe to archived messages // subscribe to archived messages
module?.archivedMessagesPublisher xmppConnection.module(.mam).archivedMessagesPublisher
.delay(for: 0.7, scheduler: DispatchQueue.main)
.sink(receiveValue: { [weak self] archived in .sink(receiveValue: { [weak self] archived in
guard let self = self else { return } guard let self = self else { return }
Task { Task {
await self.handleMessage(archived) await self.processor.append(archived)
} }
}) })
.store(in: &cancellables) .store(in: &cancellables)
} }
}
private func handleMessage(_ received: Martin.MessageArchiveManagementModule.ArchivedMessageReceived) async { private actor ArchiveMessageProcessor {
let message = received.message private var accumulator: [ArchMsg] = []
let date = received.timestamp
let msgId = received.messageId
init() {
Task {
while true {
try? await Task.sleep(nanoseconds: 700 * NSEC_PER_MSEC)
await process()
}
}
}
func append(_ msg: ArchMsg) async {
accumulator.append(msg)
if accumulator.count >= Const.mamRequestLimit {
await process()
}
}
func process() async {
if accumulator.isEmpty { return }
await handleMessages(accumulator)
accumulator.removeAll()
}
private func handleMessages(_ received: [ArchMsg]) async {
if received.isEmpty { return }
try? await Database.shared.dbQueue.write { db in try? await Database.shared.dbQueue.write { db in
if try Message.fetchOne(db, key: msgId) != nil { for recv in received {
#if DEBUG let message = recv.message
print("---") let date = recv.timestamp
print("Skipping archived message with id \(message.id ?? "???") (message exists)") if let msgId = message.id {
print("---") if try Message.fetchOne(db, key: msgId) != nil {
#endif #if DEBUG
return print("---")
} else { print("Skipping archived message with id \(msgId) (message exists)")
#if DEBUG print("---")
print("---") #endif
print("Archive message received: \(message)") } else {
print("Date: \(date)") #if DEBUG
print("---") print("---")
#endif print("Archive message received: \(message)")
if var msg = Message.map(message) { print("Date: \(date)")
msg.date = received.timestamp print("---")
try msg.insert(db) #endif
if var msg = Message.map(message) {
msg.date = date
try msg.insert(db)
}
}
} }
} }
} }