This commit is contained in:
fmodf 2024-08-18 19:20:55 +02:00
parent da104ec43e
commit e8d39f6ab6
6 changed files with 160 additions and 88 deletions

View file

@ -0,0 +1,47 @@
import Combine
import Foundation
import GRDB
import Martin
final class ClientMartinCarbonsManager {
private var cancellables: Set<AnyCancellable> = []
init(_ xmppConnection: XMPPClient) {
// subscribe to carbons
xmppConnection.module(MessageCarbonsModule.self).carbonsPublisher
.sink { [weak self] carbon in
self?.handleMessage(carbon)
}
.store(in: &cancellables)
// enable carbons if available
xmppConnection.module(.messageCarbons).$isAvailable.filter { $0 }
.sink(receiveValue: { [weak xmppConnection] _ in
xmppConnection?.module(.messageCarbons).enable()
})
.store(in: &cancellables)
}
private func handleMessage(_ received: Martin.MessageCarbonsModule.CarbonReceived) {
let message = received.message
let action = received.action
let onJid = received.jid
#if DEBUG
print("---")
print("Carbons message received: \(message)")
print("Action: \(action)")
print("On JID: \(onJid)")
print("---")
#endif
if let msg = Message.map(message) {
Task {
do {
try await msg.save()
} catch {
logIt(.error, "Error saving message: \(error)")
}
}
}
}
}

View file

@ -0,0 +1,42 @@
import Combine
import Foundation
import GRDB
import Martin
final class ClientMartinMAM {
private var cancellables: Set<AnyCancellable> = []
init(_ xmppConnection: XMPPClient) {
// subscribe to archived messages
xmppConnection.module(.mam).archivedMessagesPublisher
.sink(receiveValue: { [weak self] archived in
let message = archived.message
message.attribute("archived_date", newValue: "\(archived.timestamp.timeIntervalSince1970)")
self?.handleMessage(archived)
})
.store(in: &cancellables)
}
private func handleMessage(_ received: Martin.MessageArchiveManagementModule.ArchivedMessageReceived) {
let message = received.message
let date = received.timestamp
#if DEBUG
print("---")
print("Archive message received: \(message)")
print("Date: \(date)")
print("---")
#endif
if let msg = Message.map(message) {
Task {
do {
var msg = msg
msg.date = received.timestamp
try await msg.save()
} catch {
logIt(.error, "Error saving message: \(error)")
}
}
}
}
}

View file

@ -7,102 +7,30 @@ final class ClientMartinMessagesManager {
private var cancellables: Set<AnyCancellable> = []
init(_ xmppConnection: XMPPClient) {
// subscribe to client messages
xmppConnection.module(MessageModule.self).messagesPublisher
.sink { [weak self] message in
self?.handleMessage(message.message)
}
.store(in: &cancellables)
// subscribe to carbons
xmppConnection.module(MessageCarbonsModule.self).carbonsPublisher
.sink { [weak self] carbon in
self?.handleMessage(carbon.message)
}
.store(in: &cancellables)
// subscribe to archived messages
xmppConnection.module(.mam).archivedMessagesPublisher
.sink(receiveValue: { [weak self] archived in
let message = archived.message
message.attribute("archived_date", newValue: "\(archived.timestamp.timeIntervalSince1970)")
self?.handleMessage(message)
})
.store(in: &cancellables)
// enable carbons if available
xmppConnection.module(.messageCarbons).$isAvailable.filter { $0 }
.sink(receiveValue: { [weak xmppConnection] _ in
xmppConnection?.module(.messageCarbons).enable()
})
}
.store(in: &cancellables)
}
private func handleMessage(_ received: Martin.Message) {
private func handleMessage(_ received: Martin.MessageModule.MessageReceived) {
let message = received.message
let chat = received.chat
#if DEBUG
print("---")
print("Message received: \(received)")
print("Chat: \(chat)")
print("---")
#endif
// Check that the message type is supported
let chatTypes: [StanzaType] = [.chat, .groupchat]
guard let mType = received.type, chatTypes.contains(mType) else {
#if DEBUG
print("Unsupported received type: \(received.type?.rawValue ?? "nil")")
#endif
return
}
// Type
let type = MessageType(rawValue: received.type?.rawValue ?? "") ?? .chat
// Content type
var contentType: MessageContentType = .text
if let oob = received.oob {
contentType = .attachment(.init(
type: oob.attachmentType,
localName: nil,
thumbnailName: nil,
remotePath: oob
))
} else if received.hints.contains(.noStore) {
contentType = .typing
// skip for now
return
}
// From/To
let from = received.from?.bareJid.stringValue ?? ""
let to = received.to?.bareJid.stringValue
// Extract date or set current
var date = Date()
if let timestampStr = received.attribute("archived_date"), let timeInterval = TimeInterval(timestampStr) {
date = Date(timeIntervalSince1970: timeInterval)
}
// Msg
let msg = Message(
id: received.id ?? UUID().uuidString,
type: type,
date: date,
contentType: contentType,
status: .sent,
from: from,
to: to,
body: received.body,
subject: received.subject,
thread: received.thread,
oobUrl: received.oob
)
// Save message
Task {
do {
try await msg.save()
} catch {
logIt(.error, "Error saving message: \(error)")
if let msg = Message.map(message) {
Task {
do {
try await msg.save()
} catch {
logIt(.error, "Error saving message: \(error)")
}
}
}
}

View file

@ -24,15 +24,19 @@ final class Client: ObservableObject {
private var rosterManager = ClientMartinRosterManager()
private var chatsManager = ClientMartinChatsManager()
private var messageManager: ClientMartinMessagesManager
private var discoManager: ClientMartinDiscoManager
private var messageManager: ClientMartinMessagesManager
private var carbonsManager: ClientMartinCarbonsManager
private var mamManager: ClientMartinMAM
init(credentials: Credentials) {
self.credentials = credentials
state = credentials.isActive ? .enabled(.disconnected) : .disabled
connection = Self.prepareConnection(credentials, rosterManager, chatsManager)
messageManager = ClientMartinMessagesManager(connection)
discoManager = ClientMartinDiscoManager(connection)
messageManager = ClientMartinMessagesManager(connection)
carbonsManager = ClientMartinCarbonsManager(connection)
mamManager = ClientMartinMAM(connection)
connectionCancellable = connection.$state
.sink { [weak self] state in
guard let self = self else { return }

View file

@ -57,7 +57,7 @@ struct Message: DBStorable, Equatable {
let id: String
var type: MessageType
let date: Date
var date: Date
var contentType: MessageContentType
var status: MessageStatus
@ -101,3 +101,54 @@ extension Message {
)
}
}
extension Message {
static func map(_ martinMessage: Martin.Message) -> Message? {
// Check that the message type is supported
let chatTypes: [StanzaType] = [.chat, .groupchat]
guard let mType = martinMessage.type, chatTypes.contains(mType) else {
#if DEBUG
print("Unsupported martinMessage type: \(martinMessage.type?.rawValue ?? "nil")")
#endif
return nil
}
// Type
let type = MessageType(rawValue: martinMessage.type?.rawValue ?? "") ?? .chat
// Content type
var contentType: MessageContentType = .text
if let oob = martinMessage.oob {
contentType = .attachment(.init(
type: oob.attachmentType,
localName: nil,
thumbnailName: nil,
remotePath: oob
))
} else if martinMessage.hints.contains(.noStore) {
contentType = .typing
// skip for now
return nil
}
// From/To
let from = martinMessage.from?.bareJid.stringValue ?? ""
let to = martinMessage.to?.bareJid.stringValue
// Msg
let msg = Message(
id: martinMessage.id ?? UUID().uuidString,
type: type,
date: Date(),
contentType: contentType,
status: .sent,
from: from,
to: to,
body: martinMessage.body,
subject: martinMessage.subject,
thread: martinMessage.thread,
oobUrl: martinMessage.oob
)
return msg
}
}

View file

@ -210,7 +210,7 @@ private extension AttachmentsStore {
.sink { _ in
} receiveValue: { [weak self] messages in
let forProcessing = messages
// .filter { $0.status == .pending }
.filter { $0.status != .error }
.filter { self?.processing.contains($0.id) == false }
.filter { $0.contentType.isAttachment }
for message in forProcessing {