From 959dc061ff30ba1cf5c699adc0f7d1d991d7afa5 Mon Sep 17 00:00:00 2001 From: Bohdan Horbeshko Date: Sat, 8 Jul 2023 23:52:30 -0400 Subject: [PATCH] Send carbons for outgoing messages to other resources --- telegram/client.go | 6 +++- telegram/commands.go | 7 ++-- telegram/handlers.go | 28 +++++++++------- telegram/utils.go | 74 +++++++++++++++++++++++++++++------------ xmpp/gateway/gateway.go | 14 ++++---- xmpp/handlers.go | 3 ++ 6 files changed, 89 insertions(+), 43 deletions(-) diff --git a/telegram/client.go b/telegram/client.go index 71d8125..61d46aa 100644 --- a/telegram/client.go +++ b/telegram/client.go @@ -52,6 +52,7 @@ type Client struct { jid string Session *persistence.Session resources map[string]bool + outbox map[string]string content *config.TelegramContentConfig cache *cache.Cache online bool @@ -59,13 +60,15 @@ type Client struct { DelayedStatuses map[int64]*DelayedStatus DelayedStatusesLock sync.Mutex - locks clientLocks + locks clientLocks + SendMessageLock sync.Mutex } type clientLocks struct { authorizationReady sync.Mutex chatMessageLocks map[int64]*sync.Mutex resourcesLock sync.Mutex + outboxLock sync.Mutex } // NewClient instantiates a Telegram App @@ -121,6 +124,7 @@ func NewClient(conf config.TelegramConfig, jid string, component *xmpp.Component jid: jid, Session: session, resources: make(map[string]bool), + outbox: make(map[string]string), content: &conf.Content, cache: cache.NewCache(), options: options, diff --git a/telegram/commands.go b/telegram/commands.go index 2f879b1..53b5d75 100644 --- a/telegram/commands.go +++ b/telegram/commands.go @@ -513,11 +513,14 @@ func (c *Client) ProcessChatCommand(chatID int64, cmdline string) (string, bool) content := c.PrepareOutgoingMessageContent(rawCmdArguments(cmdline, 0)) if content != nil { - c.client.EditMessageText(&client.EditMessageTextRequest{ + _, err = c.client.EditMessageText(&client.EditMessageTextRequest{ ChatId: chatID, MessageId: message.Id, InputMessageContent: content, }) + if err != nil { + return "Message editing error", true + } } else { return "Message processing error", true } @@ -650,7 +653,7 @@ func (c *Client) ProcessChatCommand(chatID int64, cmdline string) (string, bool) } if messages != nil && messages.Messages != nil { for _, message := range messages.Messages { - c.ProcessIncomingMessage(targetChatId, message) + c.ProcessIncomingMessage(targetChatId, message, "") } } // print vCard diff --git a/telegram/handlers.go b/telegram/handlers.go index 65e5f2f..8173ecd 100644 --- a/telegram/handlers.go +++ b/telegram/handlers.go @@ -203,26 +203,30 @@ func (c *Client) updateChatLastMessage(update *client.UpdateChatLastMessage) { // message received func (c *Client) updateNewMessage(update *client.UpdateNewMessage) { - go func() { - chatId := update.Message.ChatId + chatId := update.Message.ChatId - // guarantee sequential message delivering per chat - lock := c.getChatMessageLock(chatId) + c.SendMessageLock.Lock() + c.SendMessageLock.Unlock() + xmppId, err := gateway.IdsDB.GetByTgIds(c.Session.Login, c.jid, chatId, update.Message.Id) + var ignoredResource string + if err == nil { + ignoredResource = c.popFromOutbox(xmppId) + } else { + log.Infof("Couldn't retrieve XMPP message ids for %v, an echo may happen", update.Message.Id) + } + log.Warnf("xmppId: %v, ignoredResource: %v", xmppId, ignoredResource) + + // guarantee sequential message delivering per chat + lock := c.getChatMessageLock(chatId) + go func() { lock.Lock() defer lock.Unlock() - // ignore self outgoing messages - if update.Message.IsOutgoing && - update.Message.SendingState != nil && - update.Message.SendingState.MessageSendingStateType() == client.TypeMessageSendingStatePending { - return - } - log.WithFields(log.Fields{ "chat_id": chatId, }).Warn("New message from chat") - c.ProcessIncomingMessage(chatId, update.Message) + c.ProcessIncomingMessage(chatId, update.Message, ignoredResource) }() } diff --git a/telegram/utils.go b/telegram/utils.go index a9e0bc8..9664857 100644 --- a/telegram/utils.go +++ b/telegram/utils.go @@ -890,9 +890,34 @@ func (c *Client) ensureDownloadFile(file *client.File) *client.File { } // ProcessIncomingMessage transfers a message to XMPP side and marks it as read on Telegram side -func (c *Client) ProcessIncomingMessage(chatId int64, message *client.Message) { - var text, oob, auxText string +func (c *Client) ProcessIncomingMessage(chatId int64, message *client.Message, ignoredResource string) { + var jids []string + var isPM bool var err error + if gateway.MessageOutgoingPermission && c.Session.Carbons { + isPM, err = c.IsPM(chatId) + if err != nil { + log.Errorf("Could not determine if chat is PM: %v", err) + } + } + isOutgoing := message.IsOutgoing + isCarbon := isPM && isOutgoing + log.Warnf("isOutgoing: %v", isOutgoing) + if isOutgoing { + for resource := range c.resourcesRange() { + if ignoredResource == "" || resource != ignoredResource { + jids = append(jids, c.jid+"/"+resource) + } + } + if len(jids) == 0 { + log.Info("The only resource is ignored, aborting") + return + } + } else { + jids = []string{c.jid} + } + + var text, oob, auxText string reply, replyMsg := c.getMessageReply(message) @@ -965,27 +990,10 @@ func (c *Client) ProcessIncomingMessage(chatId int64, message *client.Message) { sId := strconv.FormatInt(message.Id, 10) sChatId := strconv.FormatInt(chatId, 10) - var jids []string - var isPM bool - if gateway.MessageOutgoingPermission && c.Session.Carbons { - isPM, err = c.IsPM(chatId) - if err != nil { - log.Errorf("Could not determine if chat is PM: %v", err) - } - } - isOutgoing := isPM && message.IsOutgoing - if isOutgoing { - for resource := range c.resourcesRange() { - jids = append(jids, c.jid+"/"+resource) - } - } else { - jids = []string{c.jid} - } - for _, jid := range jids { - gateway.SendMessageWithOOB(jid, sChatId, text, sId, c.xmpp, reply, oob, isOutgoing) + gateway.SendMessageWithOOB(jid, sChatId, text, sId, c.xmpp, reply, oob, isCarbon) if auxText != "" { - gateway.SendMessage(jid, sChatId, auxText, sId, c.xmpp, reply, isOutgoing) + gateway.SendMessage(jid, sChatId, auxText, sId, c.xmpp, reply, isCarbon) } } } @@ -1172,9 +1180,12 @@ func (c *Client) resourcesRange() chan string { // resend statuses to (to another resource, for example) func (c *Client) roster(resource string) { + c.locks.resourcesLock.Lock() if _, ok := c.resources[resource]; ok { + c.locks.resourcesLock.Unlock() return // we know it } + c.locks.resourcesLock.Unlock() log.Warnf("Sending roster for %v", resource) @@ -1347,3 +1358,24 @@ func (c *Client) UpdateChatNicknames() { } } } + +// AddToOutbox remembers the resource from which a message with given ID was sent +func (c *Client) AddToOutbox(xmppId, resource string) { + c.locks.outboxLock.Lock() + defer c.locks.outboxLock.Unlock() + + c.outbox[xmppId] = resource +} + +func (c *Client) popFromOutbox(xmppId string) string { + c.locks.outboxLock.Lock() + defer c.locks.outboxLock.Unlock() + + resource, ok := c.outbox[xmppId] + if ok { + delete(c.outbox, xmppId) + } else { + log.Warnf("No %v xmppId in outbox", xmppId) + } + return resource +} diff --git a/xmpp/gateway/gateway.go b/xmpp/gateway/gateway.go index 29c8a07..7e54ee5 100644 --- a/xmpp/gateway/gateway.go +++ b/xmpp/gateway/gateway.go @@ -42,8 +42,8 @@ var DirtySessions = false var MessageOutgoingPermission = false // SendMessage creates and sends a message stanza -func SendMessage(to string, from string, body string, id string, component *xmpp.Component, reply *Reply, isOutgoing bool) { - sendMessageWrapper(to, from, body, id, component, reply, "", isOutgoing) +func SendMessage(to string, from string, body string, id string, component *xmpp.Component, reply *Reply, isCarbon bool) { + sendMessageWrapper(to, from, body, id, component, reply, "", isCarbon) } // SendServiceMessage creates and sends a simple message stanza from transport @@ -57,11 +57,11 @@ func SendTextMessage(to string, from string, body string, component *xmpp.Compon } // SendMessageWithOOB creates and sends a message stanza with OOB URL -func SendMessageWithOOB(to string, from string, body string, id string, component *xmpp.Component, reply *Reply, oob string, isOutgoing bool) { - sendMessageWrapper(to, from, body, id, component, reply, oob, isOutgoing) +func SendMessageWithOOB(to string, from string, body string, id string, component *xmpp.Component, reply *Reply, oob string, isCarbon bool) { + sendMessageWrapper(to, from, body, id, component, reply, oob, isCarbon) } -func sendMessageWrapper(to string, from string, body string, id string, component *xmpp.Component, reply *Reply, oob string, isOutgoing bool) { +func sendMessageWrapper(to string, from string, body string, id string, component *xmpp.Component, reply *Reply, oob string, isCarbon bool) { toJid, err := stanza.NewJid(to) if err != nil { log.WithFields(log.Fields{ @@ -83,7 +83,7 @@ func sendMessageWrapper(to string, from string, body string, id string, componen logFrom = from messageFrom = from + "@" + componentJid } - if isOutgoing { + if isCarbon { messageTo = messageFrom messageFrom = bareTo + "/" + Jid.Resource } else { @@ -120,7 +120,7 @@ func sendMessageWrapper(to string, from string, body string, id string, componen } } - if isOutgoing { + if isCarbon { carbonMessage := extensions.ClientMessage{ Attrs: stanza.Attrs{ From: bareTo, diff --git a/xmpp/handlers.go b/xmpp/handlers.go index 1286914..e6671bc 100644 --- a/xmpp/handlers.go +++ b/xmpp/handlers.go @@ -167,6 +167,8 @@ func HandleMessage(s xmpp.Sender, p stanza.Packet) { } } + session.SendMessageLock.Lock() + defer session.SendMessageLock.Unlock() tgMessageId := session.ProcessOutgoingMessage(toID, text, msg.From, replyId, replaceId) if tgMessageId != 0 { if replaceId != 0 { @@ -181,6 +183,7 @@ func HandleMessage(s xmpp.Sender, p stanza.Packet) { log.Errorf("Failed to save ids %v/%v %v", toID, tgMessageId, msg.Id) } } + session.AddToOutbox(msg.Id, resource) } else { /* // if a message failed to edit on Telegram side, match new XMPP ID with old Telegram ID anyway