receiver refactoring

This commit is contained in:
Aleksandr Zelenin 2022-01-04 15:22:29 +03:00
parent 794e63a0b4
commit b0f08e7265
2 changed files with 95 additions and 77 deletions

View file

@ -3,18 +3,14 @@ package client
import (
"context"
"errors"
"log"
"sync"
"sync/atomic"
"time"
)
var clients = &sync.Map{}
type Client struct {
jsonClient *JsonClient
extraGenerator ExtraGenerator
catcher chan *Response
responses chan *Response
listenerStore *listenerStore
catchersStore *sync.Map
updatesTimeout time.Duration
@ -35,12 +31,6 @@ func WithCatchTimeout(timeout time.Duration) Option {
}
}
func WithUpdatesTimeout(timeout time.Duration) Option {
return func(client *Client) {
client.updatesTimeout = timeout
}
}
func WithProxy(req *AddProxyRequest) Option {
return func(client *Client) {
client.AddProxy(req)
@ -54,27 +44,23 @@ func WithLogVerbosity(req *SetLogVerbosityLevelRequest) Option {
}
func NewClient(authorizationStateHandler AuthorizationStateHandler, options ...Option) (*Client, error) {
catchersListener := make(chan *Response, 1000)
client := &Client{
jsonClient: NewJsonClient(),
catcher: catchersListener,
responses: make(chan *Response, 1000),
listenerStore: newListenerStore(),
catchersStore: &sync.Map{},
}
clients.Store(client.jsonClient.id, client)
client.extraGenerator = UuidV4Generator()
client.catchTimeout = 60 * time.Second
client.updatesTimeout = 60 * time.Second
for _, option := range options {
option(client)
}
go receive(client)
go client.catch(catchersListener)
tdlibInstance.addClient(client)
go client.receiver()
err := Authorize(client, authorizationStateHandler)
if err != nil {
@ -84,31 +70,16 @@ func NewClient(authorizationStateHandler AuthorizationStateHandler, options ...O
return client, nil
}
var alreadyRunning uint32
func receive(client *Client) {
if atomic.LoadUint32(&alreadyRunning) != 0 {
return
func (client *Client) receiver() {
for response := range client.responses {
if response.Extra != "" {
value, ok := client.catchersStore.Load(response.Extra)
if ok {
value.(chan *Response) <- response
}
atomic.StoreUint32(&alreadyRunning, 1)
for {
resp, err := Receive(client.updatesTimeout)
if err != nil {
continue
}
value, ok := clients.Load(resp.ClientId)
if !ok {
log.Printf("Response with wrong clientId: %d", resp.ClientId)
continue
}
client := value.(*Client)
client.catcher <- resp
typ, err := UnmarshalType(resp.Data)
typ, err := UnmarshalType(response.Data)
if err != nil {
continue
}
@ -127,17 +98,6 @@ func receive(client *Client) {
}
}
func (client *Client) catch(updates chan *Response) {
for update := range updates {
if update.Extra != "" {
value, ok := client.catchersStore.Load(update.Extra)
if ok {
value.(chan *Response) <- update
}
}
}
}
func (client *Client) Send(req Request) (*Response, error) {
req.Extra = client.extraGenerator()

View file

@ -10,11 +10,93 @@ import (
"encoding/json"
"errors"
"fmt"
"log"
"strconv"
"sync"
"time"
"unsafe"
)
var tdlibInstance *tdlib
func init() {
tdlibInstance = &tdlib{
timeout: 60 * time.Second,
clients: map[int]*Client{},
}
}
type tdlib struct {
once sync.Once
timeout time.Duration
mu sync.Mutex
clients map[int]*Client
}
func (instance *tdlib) addClient(client *Client) {
instance.mu.Lock()
defer instance.mu.Unlock()
instance.clients[client.jsonClient.id] = client
instance.once.Do(func() {
go instance.receiver()
})
}
func (instance *tdlib) getClient(id int) (*Client, error) {
instance.mu.Lock()
defer instance.mu.Unlock()
client, ok := instance.clients[id]
if !ok {
return nil, fmt.Errorf("client [id: %d] does not exist", id)
}
return client, nil
}
func (instance *tdlib) receiver() {
for {
resp, err := instance.receive(instance.timeout)
if err != nil {
continue
}
client, err := instance.getClient(resp.ClientId)
if err != nil {
log.Print(err)
continue
}
client.responses <- resp
}
}
// Receives incoming updates and request responses from the TDLib client. May be called from any thread, but
// shouldn't be called simultaneously from two different threads.
// Returned pointer will be deallocated by TDLib during next call to td_json_client_receive or td_json_client_execute
// in the same thread, so it can't be used after that.
func (instance *tdlib) receive(timeout time.Duration) (*Response, error) {
result := C.td_receive(C.double(float64(timeout) / float64(time.Second)))
if result == nil {
return nil, errors.New("update receiving timeout")
}
data := []byte(C.GoString(result))
var resp Response
err := json.Unmarshal(data, &resp)
if err != nil {
return nil, err
}
resp.Data = data
return &resp, nil
}
type JsonClient struct {
id int
}
@ -35,30 +117,6 @@ func (jsonClient *JsonClient) Send(req Request) {
C.td_send(C.int(jsonClient.id), query)
}
// Receives incoming updates and request responses from the TDLib client. May be called from any thread, but
// shouldn't be called simultaneously from two different threads.
// Returned pointer will be deallocated by TDLib during next call to td_json_client_receive or td_json_client_execute
// in the same thread, so it can't be used after that.
func Receive(timeout time.Duration) (*Response, error) {
result := C.td_receive(C.double(float64(timeout) / float64(time.Second)))
if result == nil {
return nil, errors.New("update receiving timeout")
}
data := []byte(C.GoString(result))
var resp Response
err := json.Unmarshal(data, &resp)
if err != nil {
return nil, err
}
resp.Data = data
return &resp, nil
}
// Synchronously executes TDLib request. May be called from any thread.
// Only a few requests can be executed synchronously.
// Returned pointer will be deallocated by TDLib during next call to td_json_client_receive or td_json_client_execute