mv-experiment #1
|
@ -9,4 +9,6 @@ enum AppError: Error {
|
||||||
case invalidContentType
|
case invalidContentType
|
||||||
case invalidPath
|
case invalidPath
|
||||||
case invalidLocalName
|
case invalidLocalName
|
||||||
|
case moduleNotAvailable
|
||||||
|
case featureNotSupported
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,69 +5,29 @@ import Martin
|
||||||
|
|
||||||
final class ClientMartinMAM {
|
final class ClientMartinMAM {
|
||||||
private var cancellables: Set<AnyCancellable> = []
|
private var cancellables: Set<AnyCancellable> = []
|
||||||
private let messageProcessor: ArchivedMessageProcessor
|
|
||||||
|
private weak var module: MessageArchiveManagementModule?
|
||||||
|
private var afterAvailable = true
|
||||||
|
private var beforeAvailable = true
|
||||||
|
|
||||||
init(_ xmppConnection: XMPPClient) {
|
init(_ xmppConnection: XMPPClient) {
|
||||||
messageProcessor = ArchivedMessageProcessor()
|
module = xmppConnection.module(.mam)
|
||||||
|
subscribe()
|
||||||
|
}
|
||||||
|
|
||||||
|
private func subscribe() {
|
||||||
// subscribe to archived messages
|
// subscribe to archived messages
|
||||||
xmppConnection.module(.mam).archivedMessagesPublisher
|
module?.archivedMessagesPublisher
|
||||||
|
.delay(for: 0.7, scheduler: DispatchQueue.main)
|
||||||
.sink(receiveValue: { [weak self] archived in
|
.sink(receiveValue: { [weak self] archived in
|
||||||
guard let self = self else { return }
|
guard let self = self else { return }
|
||||||
Task {
|
Task {
|
||||||
await self.messageProcessor.addMessage(archived)
|
await self.handleMessage(archived)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.store(in: &cancellables)
|
.store(in: &cancellables)
|
||||||
}
|
}
|
||||||
|
|
||||||
// func requestArchivedMessages(for roster: Roster, before: String? = nil, after: String? = nil, in module: MessageArchiveManagementModule) async {
|
|
||||||
// print(roster, before, after, module)
|
|
||||||
//
|
|
||||||
// // let endDate = Date()
|
|
||||||
// // let startDate = Calendar.current.date(byAdding: .day, value: -Const.mamRequestDaysLength, to: endDate) ?? Date()
|
|
||||||
// // let response = try? await module.queryItems(
|
|
||||||
// // componentJid: JID(credentials.bareJid),
|
|
||||||
// // with: JID(roster.bareJid),
|
|
||||||
// // start: startDate,
|
|
||||||
// // end: endDate,
|
|
||||||
// // queryId: UUID().uuidString
|
|
||||||
// // )
|
|
||||||
// // let query: RSM.Query = .init(before: nil, after: nil, max: nil)
|
|
||||||
// }
|
|
||||||
|
|
||||||
// func requestArchivedMessages(for roster: Roster, before: String? = nil, after: String? = nil) async {
|
|
||||||
// assert(before != nil || after != nil, "Either before or after must be provided")
|
|
||||||
// if !discoManager.features.map({ $0.xep }).contains("XEP-0313") {
|
|
||||||
// return
|
|
||||||
// }
|
|
||||||
// let module = connection.module(MessageArchiveManagementModule.self)
|
|
||||||
// await mamManager.requestArchivedMessages(for: roster, before: before, after: after, in: module)
|
|
||||||
// }
|
|
||||||
}
|
|
||||||
|
|
||||||
private actor ArchivedMessageProcessor {
|
|
||||||
private var messageBuffer: [Martin.MessageArchiveManagementModule.ArchivedMessageReceived] = []
|
|
||||||
private let batchSize = 20
|
|
||||||
|
|
||||||
func addMessage(_ message: Martin.MessageArchiveManagementModule.ArchivedMessageReceived) async {
|
|
||||||
messageBuffer.append(message)
|
|
||||||
if messageBuffer.count >= batchSize {
|
|
||||||
await processBatch()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private func processBatch() async {
|
|
||||||
guard !messageBuffer.isEmpty else { return }
|
|
||||||
|
|
||||||
let batch = messageBuffer.prefix(batchSize)
|
|
||||||
messageBuffer.removeFirst(min(batchSize, messageBuffer.count))
|
|
||||||
|
|
||||||
for archived in batch {
|
|
||||||
await handleMessage(archived)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private func handleMessage(_ received: Martin.MessageArchiveManagementModule.ArchivedMessageReceived) async {
|
private func handleMessage(_ received: Martin.MessageArchiveManagementModule.ArchivedMessageReceived) async {
|
||||||
let message = received.message
|
let message = received.message
|
||||||
let date = received.timestamp
|
let date = received.timestamp
|
||||||
|
@ -98,41 +58,3 @@ private actor ArchivedMessageProcessor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// final class ClientMartinMAM {
|
|
||||||
// private var cancellables: Set<AnyCancellable> = []
|
|
||||||
//
|
|
||||||
// init(_ xmppConnection: XMPPClient) {
|
|
||||||
// // subscribe to archived messages
|
|
||||||
// xmppConnection.module(.mam).archivedMessagesPublisher
|
|
||||||
// .sink(receiveValue: { [weak self] archived in
|
|
||||||
// let message = archived.message
|
|
||||||
// message.attribute("archived_date", newValue: "\(archived.timestamp.timeIntervalSince1970)")
|
|
||||||
// self?.handleMessage(archived)
|
|
||||||
// })
|
|
||||||
// .store(in: &cancellables)
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// private func handleMessage(_ received: Martin.MessageArchiveManagementModule.ArchivedMessageReceived) {
|
|
||||||
// let message = received.message
|
|
||||||
// let date = received.timestamp
|
|
||||||
// #if DEBUG
|
|
||||||
// print("---")
|
|
||||||
// print("Archive message received: \(message)")
|
|
||||||
// print("Date: \(date)")
|
|
||||||
// print("---")
|
|
||||||
// #endif
|
|
||||||
//
|
|
||||||
// if let msg = Message.map(message) {
|
|
||||||
// Task {
|
|
||||||
// do {
|
|
||||||
// var msg = msg
|
|
||||||
// msg.date = received.timestamp
|
|
||||||
// try await msg.save()
|
|
||||||
// } catch {
|
|
||||||
// logIt(.error, "Error saving message: \(error)")
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
|
@ -140,6 +140,14 @@ extension Client {
|
||||||
throw URLError(.badServerResponse)
|
throw URLError(.badServerResponse)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func fetchArchiveMessages(for roster: Roster, query: RSM.Query) async throws -> Martin.MessageArchiveManagementModule.QueryResult {
|
||||||
|
if !discoManager.features.map({ $0.xep }).contains("XEP-0313") {
|
||||||
|
throw AppError.featureNotSupported
|
||||||
|
}
|
||||||
|
let module = connection.module(MessageArchiveManagementModule.self)
|
||||||
|
return try await module.queryItems(componentJid: JID(roster.bareJid), with: JID(roster.contactBareJid), queryId: UUID().uuidString, rsm: query)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
extension Client {
|
extension Client {
|
||||||
|
|
|
@ -1,8 +1,7 @@
|
||||||
import AVFoundation
|
|
||||||
import Combine
|
import Combine
|
||||||
import Foundation
|
import Foundation
|
||||||
import GRDB
|
import GRDB
|
||||||
import Photos
|
import Martin
|
||||||
|
|
||||||
@MainActor
|
@MainActor
|
||||||
final class MessagesStore: ObservableObject {
|
final class MessagesStore: ObservableObject {
|
||||||
|
@ -13,6 +12,7 @@ final class MessagesStore: ObservableObject {
|
||||||
private let client: Client
|
private let client: Client
|
||||||
|
|
||||||
private var messagesCancellable: AnyCancellable?
|
private var messagesCancellable: AnyCancellable?
|
||||||
|
private let archiveMessageFetcher = ArchiveMessageFetcher()
|
||||||
|
|
||||||
init(roster: Roster, client: Client) {
|
init(roster: Roster, client: Client) {
|
||||||
self.client = client
|
self.client = client
|
||||||
|
@ -21,6 +21,7 @@ final class MessagesStore: ObservableObject {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// MARK: - Send message
|
||||||
extension MessagesStore {
|
extension MessagesStore {
|
||||||
func sendMessage(_ message: String) {
|
func sendMessage(_ message: String) {
|
||||||
Task {
|
Task {
|
||||||
|
@ -49,6 +50,7 @@ extension MessagesStore {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// MARK: - Subscriptions
|
||||||
private extension MessagesStore {
|
private extension MessagesStore {
|
||||||
func subscribe() {
|
func subscribe() {
|
||||||
messagesCancellable = ValueObservation.tracking(Message
|
messagesCancellable = ValueObservation.tracking(Message
|
||||||
|
@ -64,6 +66,102 @@ private extension MessagesStore {
|
||||||
.sink { _ in
|
.sink { _ in
|
||||||
} receiveValue: { [weak self] messages in
|
} receiveValue: { [weak self] messages in
|
||||||
self?.messages = messages
|
self?.messages = messages
|
||||||
|
if messages.isEmpty {
|
||||||
|
self?.requestLastArchivedMessages()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// MARK: - Archived messages
|
||||||
|
extension MessagesStore {
|
||||||
|
func requestEarliestArchivedMessages() {
|
||||||
|
guard let beforeId = messages.first?.id else { return }
|
||||||
|
Task {
|
||||||
|
await archiveMessageFetcher.fetchBeforeMessages(roster, client, beforeId: beforeId)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func requestLatestArchivedMessages() {
|
||||||
|
guard let afterId = messages.first?.id else { return }
|
||||||
|
Task {
|
||||||
|
await archiveMessageFetcher.fetchAfterMessages(roster, client, afterId: afterId)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private func requestLastArchivedMessages() {
|
||||||
|
Task {
|
||||||
|
await archiveMessageFetcher.fetchLastMessages(roster, client)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private actor ArchiveMessageFetcher {
|
||||||
|
private var afterAvailable = true
|
||||||
|
private var beforeAvailable = true
|
||||||
|
private var isFetching = false
|
||||||
|
private var fetchingIsPossinle = true
|
||||||
|
|
||||||
|
func fetchLastMessages(_ roster: Roster, _ client: Client) async {
|
||||||
|
if !fetchingIsPossinle { return }
|
||||||
|
while isFetching {
|
||||||
|
await Task.yield()
|
||||||
|
}
|
||||||
|
isFetching = true
|
||||||
|
|
||||||
|
let query: RSM.Query = .init(lastItems: Const.mamRequestLimit)
|
||||||
|
do {
|
||||||
|
_ = try await client.fetchArchiveMessages(for: roster, query: query)
|
||||||
|
} catch AppError.featureNotSupported {
|
||||||
|
fetchingIsPossinle = false
|
||||||
|
} catch {
|
||||||
|
logIt(.error, "Error requesting archived messages: \(error)")
|
||||||
|
}
|
||||||
|
|
||||||
|
isFetching = false
|
||||||
|
}
|
||||||
|
|
||||||
|
func fetchBeforeMessages(_ roster: Roster, _ client: Client, beforeId: String) async {
|
||||||
|
if !fetchingIsPossinle || !beforeAvailable { return }
|
||||||
|
while isFetching {
|
||||||
|
await Task.yield()
|
||||||
|
}
|
||||||
|
isFetching = true
|
||||||
|
|
||||||
|
let query: RSM.Query = .init(before: beforeId, max: Const.mamRequestLimit)
|
||||||
|
do {
|
||||||
|
let result = try await client.fetchArchiveMessages(for: roster, query: query)
|
||||||
|
if result.complete {
|
||||||
|
beforeAvailable = false
|
||||||
|
}
|
||||||
|
} catch AppError.featureNotSupported {
|
||||||
|
fetchingIsPossinle = false
|
||||||
|
} catch {
|
||||||
|
logIt(.error, "Error requesting archived messages: \(error)")
|
||||||
|
}
|
||||||
|
|
||||||
|
isFetching = false
|
||||||
|
}
|
||||||
|
|
||||||
|
func fetchAfterMessages(_ roster: Roster, _ client: Client, afterId: String) async {
|
||||||
|
if !fetchingIsPossinle || !afterAvailable { return }
|
||||||
|
while isFetching {
|
||||||
|
await Task.yield()
|
||||||
|
}
|
||||||
|
isFetching = true
|
||||||
|
|
||||||
|
let query: RSM.Query = .init(after: afterId, max: Const.mamRequestLimit)
|
||||||
|
do {
|
||||||
|
let result = try await client.fetchArchiveMessages(for: roster, query: query)
|
||||||
|
if result.complete {
|
||||||
|
afterAvailable = false
|
||||||
|
}
|
||||||
|
} catch AppError.featureNotSupported {
|
||||||
|
fetchingIsPossinle = false
|
||||||
|
} catch {
|
||||||
|
logIt(.error, "Error requesting archived messages: \(error)")
|
||||||
|
}
|
||||||
|
|
||||||
|
isFetching = false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -43,6 +43,6 @@ enum Const {
|
||||||
// Size for attachment preview
|
// Size for attachment preview
|
||||||
static let attachmentPreviewSize = UIScreen.main.bounds.width * 0.5
|
static let attachmentPreviewSize = UIScreen.main.bounds.width * 0.5
|
||||||
|
|
||||||
// Lenght in days for MAM request
|
// MAM request page size
|
||||||
static let mamRequestDaysLength = 30
|
static let mamRequestLimit = 30
|
||||||
}
|
}
|
||||||
|
|
|
@ -62,7 +62,7 @@ private struct ChatsRow: View {
|
||||||
do {
|
do {
|
||||||
let (messages, attachments) = try await clientsStore.conversationStores(for: chat)
|
let (messages, attachments) = try await clientsStore.conversationStores(for: chat)
|
||||||
router.showScreen(.push) { _ in
|
router.showScreen(.push) { _ in
|
||||||
ConversationScreen(messages: messages, attachments: attachments)
|
ConversationScreen(messagesStore: messages, attachments: attachments)
|
||||||
.navigationBarHidden(true)
|
.navigationBarHidden(true)
|
||||||
}
|
}
|
||||||
} catch {
|
} catch {
|
||||||
|
|
|
@ -160,7 +160,7 @@ private struct ContactsScreenRow: View {
|
||||||
do {
|
do {
|
||||||
let (messages, attachments) = try await clientsStore.conversationStores(for: roster)
|
let (messages, attachments) = try await clientsStore.conversationStores(for: roster)
|
||||||
router.showScreen(.push) { _ in
|
router.showScreen(.push) { _ in
|
||||||
ConversationScreen(messages: messages, attachments: attachments)
|
ConversationScreen(messagesStore: messages, attachments: attachments)
|
||||||
.navigationBarHidden(true)
|
.navigationBarHidden(true)
|
||||||
}
|
}
|
||||||
} catch {
|
} catch {
|
||||||
|
|
|
@ -5,7 +5,7 @@ import SwiftUI
|
||||||
|
|
||||||
struct ConversationScreen: View {
|
struct ConversationScreen: View {
|
||||||
@Environment(\.router) var router
|
@Environment(\.router) var router
|
||||||
@StateObject var messages: MessagesStore
|
@StateObject var messagesStore: MessagesStore
|
||||||
@StateObject var attachments: AttachmentsStore
|
@StateObject var attachments: AttachmentsStore
|
||||||
|
|
||||||
@State private var autoScroll = true
|
@State private var autoScroll = true
|
||||||
|
@ -31,7 +31,7 @@ struct ConversationScreen: View {
|
||||||
)
|
)
|
||||||
|
|
||||||
// Msg list
|
// Msg list
|
||||||
let messages = messages.messages
|
let messages = messagesStore.messages
|
||||||
if !messages.isEmpty {
|
if !messages.isEmpty {
|
||||||
ScrollViewReader { proxy in
|
ScrollViewReader { proxy in
|
||||||
ScrollView {
|
ScrollView {
|
||||||
|
@ -45,6 +45,9 @@ struct ConversationScreen: View {
|
||||||
firstIsVisible = true
|
firstIsVisible = true
|
||||||
autoScroll = true
|
autoScroll = true
|
||||||
}
|
}
|
||||||
|
if message.id == messages.last?.id {
|
||||||
|
messagesStore.requestEarliestArchivedMessages()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
.onDisappear {
|
.onDisappear {
|
||||||
if message.id == messages.first?.id {
|
if message.id == messages.first?.id {
|
||||||
|
@ -97,11 +100,11 @@ struct ConversationScreen: View {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
.environmentObject(messages)
|
.environmentObject(messagesStore)
|
||||||
.environmentObject(attachments)
|
.environmentObject(attachments)
|
||||||
.safeAreaInset(edge: .bottom, spacing: 0) {
|
.safeAreaInset(edge: .bottom, spacing: 0) {
|
||||||
ConversationTextInput(autoScroll: $autoScroll)
|
ConversationTextInput(autoScroll: $autoScroll)
|
||||||
.environmentObject(messages)
|
.environmentObject(messagesStore)
|
||||||
.environmentObject(attachments)
|
.environmentObject(attachments)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue