import Combine import Foundation import GRDB import Martin final class ClientMartinMAM { private var cancellables: Set = [] private let messageProcessor: ArchivedMessageProcessor init(_ xmppConnection: XMPPClient) { messageProcessor = ArchivedMessageProcessor() // subscribe to archived messages xmppConnection.module(.mam).archivedMessagesPublisher .sink(receiveValue: { [weak self] archived in guard let self = self else { return } Task { await self.messageProcessor.addMessage(archived) } }) .store(in: &cancellables) } // func requestArchivedMessages(for roster: Roster, before: String? = nil, after: String? = nil, in module: MessageArchiveManagementModule) async { // print(roster, before, after, module) // // // let endDate = Date() // // let startDate = Calendar.current.date(byAdding: .day, value: -Const.mamRequestDaysLength, to: endDate) ?? Date() // // let response = try? await module.queryItems( // // componentJid: JID(credentials.bareJid), // // with: JID(roster.bareJid), // // start: startDate, // // end: endDate, // // queryId: UUID().uuidString // // ) // // let query: RSM.Query = .init(before: nil, after: nil, max: nil) // } // func requestArchivedMessages(for roster: Roster, before: String? = nil, after: String? = nil) async { // assert(before != nil || after != nil, "Either before or after must be provided") // if !discoManager.features.map({ $0.xep }).contains("XEP-0313") { // return // } // let module = connection.module(MessageArchiveManagementModule.self) // await mamManager.requestArchivedMessages(for: roster, before: before, after: after, in: module) // } } private actor ArchivedMessageProcessor { private var messageBuffer: [Martin.MessageArchiveManagementModule.ArchivedMessageReceived] = [] private let batchSize = 20 func addMessage(_ message: Martin.MessageArchiveManagementModule.ArchivedMessageReceived) async { messageBuffer.append(message) if messageBuffer.count >= batchSize { await processBatch() } } private func processBatch() async { guard !messageBuffer.isEmpty else { return } let batch = messageBuffer.prefix(batchSize) messageBuffer.removeFirst(min(batchSize, messageBuffer.count)) for archived in batch { await handleMessage(archived) } } private func handleMessage(_ received: Martin.MessageArchiveManagementModule.ArchivedMessageReceived) async { let message = received.message let date = received.timestamp #if DEBUG print("---") print("Archive message received: \(message)") print("Date: \(date)") print("---") #endif // Skip archived message if such message already exists in the database if let archiveId = message.id { try? await Database.shared.dbQueue.read { db in if try Message.fetchOne(db, key: archiveId) != nil { return } } } if let msg = Message.map(message) { do { var msg = msg msg.date = received.timestamp try await msg.save() } catch { logIt(.error, "Error saving message: \(error)") } } } } // final class ClientMartinMAM { // private var cancellables: Set = [] // // init(_ xmppConnection: XMPPClient) { // // subscribe to archived messages // xmppConnection.module(.mam).archivedMessagesPublisher // .sink(receiveValue: { [weak self] archived in // let message = archived.message // message.attribute("archived_date", newValue: "\(archived.timestamp.timeIntervalSince1970)") // self?.handleMessage(archived) // }) // .store(in: &cancellables) // } // // private func handleMessage(_ received: Martin.MessageArchiveManagementModule.ArchivedMessageReceived) { // let message = received.message // let date = received.timestamp // #if DEBUG // print("---") // print("Archive message received: \(message)") // print("Date: \(date)") // print("---") // #endif // // if let msg = Message.map(message) { // Task { // do { // var msg = msg // msg.date = received.timestamp // try await msg.save() // } catch { // logIt(.error, "Error saving message: \(error)") // } // } // } // } // }