import Combine import Foundation import Martin final class XMPPMiddleware { static let shared = XMPPMiddleware() private let service = XMPPService(manager: Database.shared) private var cancellables: Set = [] private var uploadingMessageIDs = ThreadSafeSet() private init() { service.clientState.sink { client, state in let jid = client.userBareJid.stringValue let status = ConnectionStatus.from(state) let action = AppAction.xmppAction(.clientConnectionChanged(jid: jid, state: status)) DispatchQueue.main.async { store.dispatch(action) } } .store(in: &cancellables) service.clientMessages.sink { _, martinMessage in guard let message = Message.map(martinMessage) else { return } DispatchQueue.main.async { store.dispatch(.xmppAction(.xmppMessageReceived(message))) } } .store(in: &cancellables) service.clientFeatures.sink { client, features in let jid = client.userBareJid.stringValue DispatchQueue.main.async { store.dispatch(.xmppAction(.serverFeaturesLoaded(jid: jid, features: features))) } } .store(in: &cancellables) } func middleware(state: AppState, action: AppAction) -> AnyPublisher { switch action { case .accountsAction(.tryAddAccountWithCredentials): return Future { [weak self] promise in self?.service.updateClients(for: state.accountsState.accounts) promise(.success(.info("XMPPMiddleware: clients updated in XMPP service"))) } .eraseToAnyPublisher() case .accountsAction(.addAccountError): return Future { [weak self] promise in self?.service.updateClients(for: state.accountsState.accounts) promise(.success(.info("XMPPMiddleware: clients updated in XMPP service"))) } .eraseToAnyPublisher() case .databaseAction(.storedAccountsLoaded(let accounts)): return Future { [weak self] promise in self?.service.updateClients(for: accounts.filter { $0.isActive && !$0.isTemp }) promise(.success(.info("XMPPMiddleware: clients updated in XMPP service"))) } .eraseToAnyPublisher() case .rostersAction(.addRoster(let ownerJID, let contactJID, let name, let groups)): return Future { [weak self] promise in guard let service = self?.service, let client = service.clients.first(where: { $0.connectionConfiguration.userJid.stringValue == ownerJID }) else { return promise(.success(.rostersAction(.addRosterError(reason: XMPPError.item_not_found.localizedDescription)))) } let module = client.modulesManager.module(RosterModule.self) module.addItem(jid: JID(contactJID), name: name, groups: groups, completionHandler: { result in switch result { case .success: promise(.success(.rostersAction(.addRosterDone(jid: contactJID)))) case .failure(let error): promise(.success(.rostersAction(.addRosterError(reason: error.localizedDescription)))) } }) } .eraseToAnyPublisher() case .rostersAction(.deleteRoster(let ownerJID, let contactJID)): return Future { [weak self] promise in guard let service = self?.service, let client = service.clients.first(where: { $0.connectionConfiguration.userJid.stringValue == ownerJID }) else { return promise(.success(.rostersAction(.rosterDeletingFailed(reason: XMPPError.item_not_found.localizedDescription)))) } let module = client.modulesManager.module(RosterModule.self) module.removeItem(jid: JID(contactJID), completionHandler: { result in switch result { case .success: promise(.success(.info("XMPPMiddleware: roster \(contactJID) deleted from \(ownerJID)"))) case .failure(let error): promise(.success(.rostersAction(.rosterDeletingFailed(reason: error.localizedDescription)))) } }) } .eraseToAnyPublisher() case .xmppAction(.xmppMessageSent(let message)): return Future { [weak self] promise in DispatchQueue.global().async { self?.service.sendMessage(message: message) { done in if done { promise(.success(.xmppAction(.xmppMessageSendSuccess(msgId: message.id)))) } else { promise(.success(.xmppAction(.xmppMessageSendFailed(msgId: message.id)))) } } } } .eraseToAnyPublisher() case .xmppAction(.xmppSharingTryUpload(let message)): return Future { [weak self] promise in if self?.uploadingMessageIDs.contains(message.id) ?? false { return promise(.success(.info("XMPPMiddleware: attachment in message \(message.id) is already in uploading process"))) } else { self?.uploadingMessageIDs.insert(message.id) DispatchQueue.global().async { self?.service.uploadAttachment(message: message) { error, remotePath in self?.uploadingMessageIDs.remove(message.id) if let error { promise(.success(.xmppAction(.xmppSharingUploadFailed(msgId: message.id, reason: error.localizedDescription)))) } else { promise(.success(.xmppAction(.xmppSharingUploadSuccess(msgId: message.id, attachmentRemotePath: remotePath)))) } } } } } .eraseToAnyPublisher() case .xmppAction(.xmppLoadArchivedMessages(let jid, let to, let fromDate)): return Future { [weak self] promise in self?.service.requestArchivedMessages(jid: jid, to: to, fromDate: fromDate) promise(.success(.info("XMPPMiddleware: archived messages requested for \(jid) from \(fromDate)"))) } .eraseToAnyPublisher() default: return Empty().eraseToAnyPublisher() } } }