multi-instance fix

This commit is contained in:
Aleksandr Zelenin 2021-12-31 22:03:50 +03:00
parent d916d47121
commit e5d2ffd3f9
2 changed files with 26 additions and 8 deletions

View file

@ -3,10 +3,14 @@ package client
import ( import (
"context" "context"
"errors" "errors"
"log"
"sync" "sync"
"sync/atomic"
"time" "time"
) )
var clients = &sync.Map{}
type Client struct { type Client struct {
jsonClient *JsonClient jsonClient *JsonClient
extraGenerator ExtraGenerator extraGenerator ExtraGenerator
@ -59,6 +63,8 @@ func NewClient(authorizationStateHandler AuthorizationStateHandler, options ...O
catchersStore: &sync.Map{}, catchersStore: &sync.Map{},
} }
clients.Store(client.jsonClient.id, client)
client.extraGenerator = UuidV4Generator() client.extraGenerator = UuidV4Generator()
client.catchTimeout = 60 * time.Second client.catchTimeout = 60 * time.Second
client.updatesTimeout = 60 * time.Second client.updatesTimeout = 60 * time.Second
@ -67,7 +73,7 @@ func NewClient(authorizationStateHandler AuthorizationStateHandler, options ...O
option(client) option(client)
} }
go client.receive() go receive(client)
go client.catch(catchersListener) go client.catch(catchersListener)
err := Authorize(client, authorizationStateHandler) err := Authorize(client, authorizationStateHandler)
@ -78,12 +84,28 @@ func NewClient(authorizationStateHandler AuthorizationStateHandler, options ...O
return client, nil return client, nil
} }
func (client *Client) receive() { var alreadyRunning uint32
func receive(client *Client) {
if atomic.LoadUint32(&alreadyRunning) != 0 {
return
}
atomic.StoreUint32(&alreadyRunning, 1)
for { for {
resp, err := client.jsonClient.Receive(client.updatesTimeout) resp, err := Receive(client.updatesTimeout)
if err != nil { if err != nil {
continue continue
} }
value, ok := clients.Load(resp.ClientId)
if !ok {
log.Printf("Response with wrong clientId: %d", resp.ClientId)
continue
}
client := value.(*Client)
client.catcher <- resp client.catcher <- resp
typ, err := UnmarshalType(resp.Data) typ, err := UnmarshalType(resp.Data)

View file

@ -39,7 +39,7 @@ func (jsonClient *JsonClient) Send(req Request) {
// shouldn't be called simultaneously from two different threads. // shouldn't be called simultaneously from two different threads.
// Returned pointer will be deallocated by TDLib during next call to td_json_client_receive or td_json_client_execute // Returned pointer will be deallocated by TDLib during next call to td_json_client_receive or td_json_client_execute
// in the same thread, so it can't be used after that. // in the same thread, so it can't be used after that.
func (jsonClient *JsonClient) Receive(timeout time.Duration) (*Response, error) { func Receive(timeout time.Duration) (*Response, error) {
result := C.td_receive(C.double(float64(timeout) / float64(time.Second))) result := C.td_receive(C.double(float64(timeout) / float64(time.Second)))
if result == nil { if result == nil {
return nil, errors.New("update receiving timeout") return nil, errors.New("update receiving timeout")
@ -54,10 +54,6 @@ func (jsonClient *JsonClient) Receive(timeout time.Duration) (*Response, error)
return nil, err return nil, err
} }
if resp.ClientId != jsonClient.id {
return nil, errors.New("wrong @client_id")
}
resp.Data = data resp.Data = data
return &resp, nil return &resp, nil