Make the chats/users cache thread-safe

This commit is contained in:
bodqhrohro 2019-12-28 04:35:40 +02:00
parent fdc8397b93
commit 536451f648
6 changed files with 99 additions and 22 deletions

84
telegram/cache/cache.go vendored Normal file
View file

@ -0,0 +1,84 @@
package cache
import (
"sync"
"github.com/zelenin/go-tdlib/client"
)
// Cache allows operating the chats and users cache in
// a thread-safe manner
type Cache struct {
chats map[int64]*client.Chat
users map[int32]*client.User
chatsLock sync.Mutex
usersLock sync.Mutex
}
// NewCache initializes a cache
func NewCache() *Cache {
return &Cache{
chats: map[int64]*client.Chat{},
users: map[int32]*client.User{},
}
}
// ChatsKeys grabs chat ids synchronously to avoid lockups
// while they are used
func (cache *Cache) ChatsKeys() []int64 {
cache.chatsLock.Lock()
defer cache.chatsLock.Unlock()
var keys []int64
for id := range cache.chats {
keys = append(keys, id)
}
return keys
}
// UsersKeys grabs user ids synchronously to avoid lockups
// while they are used
func (cache *Cache) UsersKeys() []int32 {
cache.usersLock.Lock()
defer cache.usersLock.Unlock()
var keys []int32
for id := range cache.users {
keys = append(keys, id)
}
return keys
}
// GetChat retrieves chat by id if it's present in the cache
func (cache *Cache) GetChat(id int64) (*client.Chat, bool) {
cache.chatsLock.Lock()
defer cache.chatsLock.Unlock()
chat, ok := cache.chats[id]
return chat, ok
}
// GetUser retrieves user by id if it's present in the cache
func (cache *Cache) GetUser(id int32) (*client.User, bool) {
cache.usersLock.Lock()
defer cache.usersLock.Unlock()
user, ok := cache.users[id]
return user, ok
}
// SetChat stores a chat in the cache
func (cache *Cache) SetChat(id int64, chat *client.Chat) {
cache.chatsLock.Lock()
defer cache.chatsLock.Unlock()
cache.chats[id] = chat
}
// SetUser stores a user in the cache
func (cache *Cache) SetUser(id int32, user *client.User) {
cache.usersLock.Lock()
defer cache.usersLock.Unlock()
cache.users[id] = user
}

View file

@ -9,6 +9,7 @@ import (
"dev.narayana.im/narayana/telegabber/config"
"dev.narayana.im/narayana/telegabber/persistence"
"dev.narayana.im/narayana/telegabber/telegram/cache"
"github.com/zelenin/go-tdlib/client"
"gosrc.io/xmpp"
@ -24,11 +25,6 @@ var logConstants = map[string]int32{
":all": 1023,
}
type cache struct {
chats map[int64]*client.Chat
users map[int32]*client.User
}
func stringToLogConstant(c string) int32 {
level, ok := logConstants[c]
if !ok {
@ -51,7 +47,7 @@ type Client struct {
jid string
Session *persistence.Session
content *config.TelegramContentConfig
cache *cache
cache *cache.Cache
online bool
locks clientLocks
@ -109,11 +105,8 @@ func NewClient(conf config.TelegramConfig, jid string, component *xmpp.Component
jid: jid,
Session: session,
content: &conf.Content,
cache: &cache{
chats: map[int64]*client.Chat{},
users: map[int32]*client.User{},
},
options: options,
locks: clientLocks{},
cache: cache.NewCache(),
options: options,
locks: clientLocks{},
}, nil
}

View file

@ -206,7 +206,7 @@ func (c *Client) ProcessTransportCommand(cmdline string) string {
return notOnline
}
for id := range c.cache.chats {
for _, id := range c.cache.ChatsKeys() {
c.unsubscribe(id)
}

View file

@ -135,7 +135,7 @@ func (c *Client) Disconnect() {
log.Warn("Disconnecting from Telegram network...")
// we're offline (unsubscribe if logout)
for id := range c.cache.chats {
for _, id := range c.cache.ChatsKeys() {
gateway.SendPresence(
c.xmpp,
c.jid,
@ -200,7 +200,7 @@ func (c *Client) interactor() {
Limit: chatsLimit,
})
if err != nil {
log.Error("Could not retrieve chats")
log.Errorf("Could not retrieve chats: %v", err)
}
gateway.SendPresence(c.xmpp, c.jid, gateway.SPStatus("Logged in "+c.Session.Login))

View file

@ -108,7 +108,7 @@ func (c *Client) updateHandler() {
// new user discovered
func (c *Client) updateUser(update *client.UpdateUser) {
c.cache.users[update.User.Id] = update.User
c.cache.SetUser(update.User.Id, update.User)
show, status := c.userStatusToText(update.User.Status)
go c.processStatusUpdate(int64(update.User.Id), status, show)
}
@ -133,7 +133,7 @@ func (c *Client) updateNewChat(update *client.UpdateNewChat) {
}
}
c.cache.chats[update.Chat.Id] = update.Chat
c.cache.SetChat(update.Chat.Id, update.Chat)
var isChannel = false
if update.Chat.Type.ChatTypeType() == client.TypeChatTypeSupergroup {

View file

@ -58,7 +58,7 @@ func (c *Client) GetContactByID(id int64, chat *client.Chat) (*client.Chat, *cli
if id <= math.MaxInt32 && id >= math.MinInt32 {
userID := int32(id)
user, ok = c.cache.users[userID]
user, ok = c.cache.GetUser(userID)
if !ok && userID > 0 {
user, err = c.client.GetUser(&client.GetUserRequest{
UserId: userID,
@ -67,11 +67,11 @@ func (c *Client) GetContactByID(id int64, chat *client.Chat) (*client.Chat, *cli
return nil, nil, err
}
c.cache.users[userID] = user
c.cache.SetUser(userID, user)
}
}
cacheChat, ok = c.cache.chats[id]
cacheChat, ok = c.cache.GetChat(id)
if !ok {
if chat == nil {
cacheChat, err = c.client.GetChat(&client.GetChatRequest{
@ -86,9 +86,9 @@ func (c *Client) GetContactByID(id int64, chat *client.Chat) (*client.Chat, *cli
return nil, nil, err
}
c.cache.chats[id] = cacheChat
c.cache.SetChat(id, cacheChat)
} else {
c.cache.chats[id] = chat
c.cache.SetChat(id, chat)
}
}
if chat == nil {