Send carbons for outgoing messages to other resources

This commit is contained in:
Bohdan Horbeshko 2023-07-08 23:52:30 -04:00
parent 30b3fd1615
commit 959dc061ff
6 changed files with 89 additions and 43 deletions

View file

@ -52,6 +52,7 @@ type Client struct {
jid string jid string
Session *persistence.Session Session *persistence.Session
resources map[string]bool resources map[string]bool
outbox map[string]string
content *config.TelegramContentConfig content *config.TelegramContentConfig
cache *cache.Cache cache *cache.Cache
online bool online bool
@ -59,13 +60,15 @@ type Client struct {
DelayedStatuses map[int64]*DelayedStatus DelayedStatuses map[int64]*DelayedStatus
DelayedStatusesLock sync.Mutex DelayedStatusesLock sync.Mutex
locks clientLocks locks clientLocks
SendMessageLock sync.Mutex
} }
type clientLocks struct { type clientLocks struct {
authorizationReady sync.Mutex authorizationReady sync.Mutex
chatMessageLocks map[int64]*sync.Mutex chatMessageLocks map[int64]*sync.Mutex
resourcesLock sync.Mutex resourcesLock sync.Mutex
outboxLock sync.Mutex
} }
// NewClient instantiates a Telegram App // NewClient instantiates a Telegram App
@ -121,6 +124,7 @@ func NewClient(conf config.TelegramConfig, jid string, component *xmpp.Component
jid: jid, jid: jid,
Session: session, Session: session,
resources: make(map[string]bool), resources: make(map[string]bool),
outbox: make(map[string]string),
content: &conf.Content, content: &conf.Content,
cache: cache.NewCache(), cache: cache.NewCache(),
options: options, options: options,

View file

@ -513,11 +513,14 @@ func (c *Client) ProcessChatCommand(chatID int64, cmdline string) (string, bool)
content := c.PrepareOutgoingMessageContent(rawCmdArguments(cmdline, 0)) content := c.PrepareOutgoingMessageContent(rawCmdArguments(cmdline, 0))
if content != nil { if content != nil {
c.client.EditMessageText(&client.EditMessageTextRequest{ _, err = c.client.EditMessageText(&client.EditMessageTextRequest{
ChatId: chatID, ChatId: chatID,
MessageId: message.Id, MessageId: message.Id,
InputMessageContent: content, InputMessageContent: content,
}) })
if err != nil {
return "Message editing error", true
}
} else { } else {
return "Message processing error", true return "Message processing error", true
} }
@ -650,7 +653,7 @@ func (c *Client) ProcessChatCommand(chatID int64, cmdline string) (string, bool)
} }
if messages != nil && messages.Messages != nil { if messages != nil && messages.Messages != nil {
for _, message := range messages.Messages { for _, message := range messages.Messages {
c.ProcessIncomingMessage(targetChatId, message) c.ProcessIncomingMessage(targetChatId, message, "")
} }
} }
// print vCard // print vCard

View file

@ -203,26 +203,30 @@ func (c *Client) updateChatLastMessage(update *client.UpdateChatLastMessage) {
// message received // message received
func (c *Client) updateNewMessage(update *client.UpdateNewMessage) { func (c *Client) updateNewMessage(update *client.UpdateNewMessage) {
go func() { chatId := update.Message.ChatId
chatId := update.Message.ChatId
// guarantee sequential message delivering per chat c.SendMessageLock.Lock()
lock := c.getChatMessageLock(chatId) c.SendMessageLock.Unlock()
xmppId, err := gateway.IdsDB.GetByTgIds(c.Session.Login, c.jid, chatId, update.Message.Id)
var ignoredResource string
if err == nil {
ignoredResource = c.popFromOutbox(xmppId)
} else {
log.Infof("Couldn't retrieve XMPP message ids for %v, an echo may happen", update.Message.Id)
}
log.Warnf("xmppId: %v, ignoredResource: %v", xmppId, ignoredResource)
// guarantee sequential message delivering per chat
lock := c.getChatMessageLock(chatId)
go func() {
lock.Lock() lock.Lock()
defer lock.Unlock() defer lock.Unlock()
// ignore self outgoing messages
if update.Message.IsOutgoing &&
update.Message.SendingState != nil &&
update.Message.SendingState.MessageSendingStateType() == client.TypeMessageSendingStatePending {
return
}
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"chat_id": chatId, "chat_id": chatId,
}).Warn("New message from chat") }).Warn("New message from chat")
c.ProcessIncomingMessage(chatId, update.Message) c.ProcessIncomingMessage(chatId, update.Message, ignoredResource)
}() }()
} }

View file

@ -890,9 +890,34 @@ func (c *Client) ensureDownloadFile(file *client.File) *client.File {
} }
// ProcessIncomingMessage transfers a message to XMPP side and marks it as read on Telegram side // ProcessIncomingMessage transfers a message to XMPP side and marks it as read on Telegram side
func (c *Client) ProcessIncomingMessage(chatId int64, message *client.Message) { func (c *Client) ProcessIncomingMessage(chatId int64, message *client.Message, ignoredResource string) {
var text, oob, auxText string var jids []string
var isPM bool
var err error var err error
if gateway.MessageOutgoingPermission && c.Session.Carbons {
isPM, err = c.IsPM(chatId)
if err != nil {
log.Errorf("Could not determine if chat is PM: %v", err)
}
}
isOutgoing := message.IsOutgoing
isCarbon := isPM && isOutgoing
log.Warnf("isOutgoing: %v", isOutgoing)
if isOutgoing {
for resource := range c.resourcesRange() {
if ignoredResource == "" || resource != ignoredResource {
jids = append(jids, c.jid+"/"+resource)
}
}
if len(jids) == 0 {
log.Info("The only resource is ignored, aborting")
return
}
} else {
jids = []string{c.jid}
}
var text, oob, auxText string
reply, replyMsg := c.getMessageReply(message) reply, replyMsg := c.getMessageReply(message)
@ -965,27 +990,10 @@ func (c *Client) ProcessIncomingMessage(chatId int64, message *client.Message) {
sId := strconv.FormatInt(message.Id, 10) sId := strconv.FormatInt(message.Id, 10)
sChatId := strconv.FormatInt(chatId, 10) sChatId := strconv.FormatInt(chatId, 10)
var jids []string
var isPM bool
if gateway.MessageOutgoingPermission && c.Session.Carbons {
isPM, err = c.IsPM(chatId)
if err != nil {
log.Errorf("Could not determine if chat is PM: %v", err)
}
}
isOutgoing := isPM && message.IsOutgoing
if isOutgoing {
for resource := range c.resourcesRange() {
jids = append(jids, c.jid+"/"+resource)
}
} else {
jids = []string{c.jid}
}
for _, jid := range jids { for _, jid := range jids {
gateway.SendMessageWithOOB(jid, sChatId, text, sId, c.xmpp, reply, oob, isOutgoing) gateway.SendMessageWithOOB(jid, sChatId, text, sId, c.xmpp, reply, oob, isCarbon)
if auxText != "" { if auxText != "" {
gateway.SendMessage(jid, sChatId, auxText, sId, c.xmpp, reply, isOutgoing) gateway.SendMessage(jid, sChatId, auxText, sId, c.xmpp, reply, isCarbon)
} }
} }
} }
@ -1172,9 +1180,12 @@ func (c *Client) resourcesRange() chan string {
// resend statuses to (to another resource, for example) // resend statuses to (to another resource, for example)
func (c *Client) roster(resource string) { func (c *Client) roster(resource string) {
c.locks.resourcesLock.Lock()
if _, ok := c.resources[resource]; ok { if _, ok := c.resources[resource]; ok {
c.locks.resourcesLock.Unlock()
return // we know it return // we know it
} }
c.locks.resourcesLock.Unlock()
log.Warnf("Sending roster for %v", resource) log.Warnf("Sending roster for %v", resource)
@ -1347,3 +1358,24 @@ func (c *Client) UpdateChatNicknames() {
} }
} }
} }
// AddToOutbox remembers the resource from which a message with given ID was sent
func (c *Client) AddToOutbox(xmppId, resource string) {
c.locks.outboxLock.Lock()
defer c.locks.outboxLock.Unlock()
c.outbox[xmppId] = resource
}
func (c *Client) popFromOutbox(xmppId string) string {
c.locks.outboxLock.Lock()
defer c.locks.outboxLock.Unlock()
resource, ok := c.outbox[xmppId]
if ok {
delete(c.outbox, xmppId)
} else {
log.Warnf("No %v xmppId in outbox", xmppId)
}
return resource
}

View file

@ -42,8 +42,8 @@ var DirtySessions = false
var MessageOutgoingPermission = false var MessageOutgoingPermission = false
// SendMessage creates and sends a message stanza // SendMessage creates and sends a message stanza
func SendMessage(to string, from string, body string, id string, component *xmpp.Component, reply *Reply, isOutgoing bool) { func SendMessage(to string, from string, body string, id string, component *xmpp.Component, reply *Reply, isCarbon bool) {
sendMessageWrapper(to, from, body, id, component, reply, "", isOutgoing) sendMessageWrapper(to, from, body, id, component, reply, "", isCarbon)
} }
// SendServiceMessage creates and sends a simple message stanza from transport // SendServiceMessage creates and sends a simple message stanza from transport
@ -57,11 +57,11 @@ func SendTextMessage(to string, from string, body string, component *xmpp.Compon
} }
// SendMessageWithOOB creates and sends a message stanza with OOB URL // SendMessageWithOOB creates and sends a message stanza with OOB URL
func SendMessageWithOOB(to string, from string, body string, id string, component *xmpp.Component, reply *Reply, oob string, isOutgoing bool) { func SendMessageWithOOB(to string, from string, body string, id string, component *xmpp.Component, reply *Reply, oob string, isCarbon bool) {
sendMessageWrapper(to, from, body, id, component, reply, oob, isOutgoing) sendMessageWrapper(to, from, body, id, component, reply, oob, isCarbon)
} }
func sendMessageWrapper(to string, from string, body string, id string, component *xmpp.Component, reply *Reply, oob string, isOutgoing bool) { func sendMessageWrapper(to string, from string, body string, id string, component *xmpp.Component, reply *Reply, oob string, isCarbon bool) {
toJid, err := stanza.NewJid(to) toJid, err := stanza.NewJid(to)
if err != nil { if err != nil {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
@ -83,7 +83,7 @@ func sendMessageWrapper(to string, from string, body string, id string, componen
logFrom = from logFrom = from
messageFrom = from + "@" + componentJid messageFrom = from + "@" + componentJid
} }
if isOutgoing { if isCarbon {
messageTo = messageFrom messageTo = messageFrom
messageFrom = bareTo + "/" + Jid.Resource messageFrom = bareTo + "/" + Jid.Resource
} else { } else {
@ -120,7 +120,7 @@ func sendMessageWrapper(to string, from string, body string, id string, componen
} }
} }
if isOutgoing { if isCarbon {
carbonMessage := extensions.ClientMessage{ carbonMessage := extensions.ClientMessage{
Attrs: stanza.Attrs{ Attrs: stanza.Attrs{
From: bareTo, From: bareTo,

View file

@ -167,6 +167,8 @@ func HandleMessage(s xmpp.Sender, p stanza.Packet) {
} }
} }
session.SendMessageLock.Lock()
defer session.SendMessageLock.Unlock()
tgMessageId := session.ProcessOutgoingMessage(toID, text, msg.From, replyId, replaceId) tgMessageId := session.ProcessOutgoingMessage(toID, text, msg.From, replyId, replaceId)
if tgMessageId != 0 { if tgMessageId != 0 {
if replaceId != 0 { if replaceId != 0 {
@ -181,6 +183,7 @@ func HandleMessage(s xmpp.Sender, p stanza.Packet) {
log.Errorf("Failed to save ids %v/%v %v", toID, tgMessageId, msg.Id) log.Errorf("Failed to save ids %v/%v %v", toID, tgMessageId, msg.Id)
} }
} }
session.AddToOutbox(msg.Id, resource)
} else { } else {
/* /*
// if a message failed to edit on Telegram side, match new XMPP ID with old Telegram ID anyway // if a message failed to edit on Telegram side, match new XMPP ID with old Telegram ID anyway