From b0f08e72656292321223e21942c004f7973ee08a Mon Sep 17 00:00:00 2001 From: Aleksandr Zelenin Date: Tue, 4 Jan 2022 15:22:29 +0300 Subject: [PATCH] receiver refactoring --- client/client.go | 66 ++++++----------------------- client/tdlib.go | 106 ++++++++++++++++++++++++++++++++++++----------- 2 files changed, 95 insertions(+), 77 deletions(-) diff --git a/client/client.go b/client/client.go index 325ad2f..9b3b432 100644 --- a/client/client.go +++ b/client/client.go @@ -3,18 +3,14 @@ package client import ( "context" "errors" - "log" "sync" - "sync/atomic" "time" ) -var clients = &sync.Map{} - type Client struct { jsonClient *JsonClient extraGenerator ExtraGenerator - catcher chan *Response + responses chan *Response listenerStore *listenerStore catchersStore *sync.Map updatesTimeout time.Duration @@ -35,12 +31,6 @@ func WithCatchTimeout(timeout time.Duration) Option { } } -func WithUpdatesTimeout(timeout time.Duration) Option { - return func(client *Client) { - client.updatesTimeout = timeout - } -} - func WithProxy(req *AddProxyRequest) Option { return func(client *Client) { client.AddProxy(req) @@ -54,27 +44,23 @@ func WithLogVerbosity(req *SetLogVerbosityLevelRequest) Option { } func NewClient(authorizationStateHandler AuthorizationStateHandler, options ...Option) (*Client, error) { - catchersListener := make(chan *Response, 1000) - client := &Client{ jsonClient: NewJsonClient(), - catcher: catchersListener, + responses: make(chan *Response, 1000), listenerStore: newListenerStore(), catchersStore: &sync.Map{}, } - clients.Store(client.jsonClient.id, client) - client.extraGenerator = UuidV4Generator() client.catchTimeout = 60 * time.Second - client.updatesTimeout = 60 * time.Second for _, option := range options { option(client) } - go receive(client) - go client.catch(catchersListener) + tdlibInstance.addClient(client) + + go client.receiver() err := Authorize(client, authorizationStateHandler) if err != nil { @@ -84,31 +70,16 @@ func NewClient(authorizationStateHandler AuthorizationStateHandler, options ...O return client, nil } -var alreadyRunning uint32 - -func receive(client *Client) { - if atomic.LoadUint32(&alreadyRunning) != 0 { - return - } - atomic.StoreUint32(&alreadyRunning, 1) - - for { - resp, err := Receive(client.updatesTimeout) - if err != nil { - continue +func (client *Client) receiver() { + for response := range client.responses { + if response.Extra != "" { + value, ok := client.catchersStore.Load(response.Extra) + if ok { + value.(chan *Response) <- response + } } - value, ok := clients.Load(resp.ClientId) - if !ok { - log.Printf("Response with wrong clientId: %d", resp.ClientId) - continue - } - - client := value.(*Client) - - client.catcher <- resp - - typ, err := UnmarshalType(resp.Data) + typ, err := UnmarshalType(response.Data) if err != nil { continue } @@ -127,17 +98,6 @@ func receive(client *Client) { } } -func (client *Client) catch(updates chan *Response) { - for update := range updates { - if update.Extra != "" { - value, ok := client.catchersStore.Load(update.Extra) - if ok { - value.(chan *Response) <- update - } - } - } -} - func (client *Client) Send(req Request) (*Response, error) { req.Extra = client.extraGenerator() diff --git a/client/tdlib.go b/client/tdlib.go index 190c6f2..6a75b03 100644 --- a/client/tdlib.go +++ b/client/tdlib.go @@ -10,11 +10,93 @@ import ( "encoding/json" "errors" "fmt" + "log" "strconv" + "sync" "time" "unsafe" ) +var tdlibInstance *tdlib + +func init() { + tdlibInstance = &tdlib{ + timeout: 60 * time.Second, + clients: map[int]*Client{}, + } +} + +type tdlib struct { + once sync.Once + timeout time.Duration + mu sync.Mutex + clients map[int]*Client +} + +func (instance *tdlib) addClient(client *Client) { + instance.mu.Lock() + defer instance.mu.Unlock() + + instance.clients[client.jsonClient.id] = client + + instance.once.Do(func() { + go instance.receiver() + }) +} + +func (instance *tdlib) getClient(id int) (*Client, error) { + instance.mu.Lock() + defer instance.mu.Unlock() + + client, ok := instance.clients[id] + if !ok { + return nil, fmt.Errorf("client [id: %d] does not exist", id) + } + + return client, nil +} + +func (instance *tdlib) receiver() { + for { + resp, err := instance.receive(instance.timeout) + if err != nil { + continue + } + + client, err := instance.getClient(resp.ClientId) + if err != nil { + log.Print(err) + continue + } + + client.responses <- resp + } +} + +// Receives incoming updates and request responses from the TDLib client. May be called from any thread, but +// shouldn't be called simultaneously from two different threads. +// Returned pointer will be deallocated by TDLib during next call to td_json_client_receive or td_json_client_execute +// in the same thread, so it can't be used after that. +func (instance *tdlib) receive(timeout time.Duration) (*Response, error) { + result := C.td_receive(C.double(float64(timeout) / float64(time.Second))) + if result == nil { + return nil, errors.New("update receiving timeout") + } + + data := []byte(C.GoString(result)) + + var resp Response + + err := json.Unmarshal(data, &resp) + if err != nil { + return nil, err + } + + resp.Data = data + + return &resp, nil +} + type JsonClient struct { id int } @@ -35,30 +117,6 @@ func (jsonClient *JsonClient) Send(req Request) { C.td_send(C.int(jsonClient.id), query) } -// Receives incoming updates and request responses from the TDLib client. May be called from any thread, but -// shouldn't be called simultaneously from two different threads. -// Returned pointer will be deallocated by TDLib during next call to td_json_client_receive or td_json_client_execute -// in the same thread, so it can't be used after that. -func Receive(timeout time.Duration) (*Response, error) { - result := C.td_receive(C.double(float64(timeout) / float64(time.Second))) - if result == nil { - return nil, errors.New("update receiving timeout") - } - - data := []byte(C.GoString(result)) - - var resp Response - - err := json.Unmarshal(data, &resp) - if err != nil { - return nil, err - } - - resp.Data = data - - return &resp, nil -} - // Synchronously executes TDLib request. May be called from any thread. // Only a few requests can be executed synchronously. // Returned pointer will be deallocated by TDLib during next call to td_json_client_receive or td_json_client_execute