From e791d8ba282a5dcebdd40e7a4b02aedba1ec5ee9 Mon Sep 17 00:00:00 2001 From: Aleksandr Zelenin Date: Tue, 11 Sep 2018 01:30:14 +0300 Subject: [PATCH] add update listener --- README.md | 12 +++++---- client/client.go | 32 +++++++++++++++------- client/listener.go | 66 ++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 95 insertions(+), 15 deletions(-) create mode 100644 client/listener.go diff --git a/README.md b/README.md index e3e3464..95cbc57 100644 --- a/README.md +++ b/README.md @@ -107,15 +107,17 @@ func main() { ### Receive updates ```go -responses := make(chan client.Type, 100) -tdlibClient, err := client.NewClient(authorizer, client.WithListener(responses)) +tdlibClient, err := client.NewClient(authorizer) if err != nil { log.Fatalf("NewClient error: %s", err) } -for response := range responses { - if response.GetClass() == client.ClassUpdate { - log.Printf("%#v", response) +listener := tdlibClient.GetListener() +defer listener.Close() + +for update := range listener.Updates { + if update.GetClass() == client.ClassUpdate { + log.Printf("%#v", update) } } ``` diff --git a/client/client.go b/client/client.go index dfc0cb7..3022836 100644 --- a/client/client.go +++ b/client/client.go @@ -10,7 +10,7 @@ type Client struct { jsonClient *JsonClient extraGenerator ExtraGenerator catcher chan *Response - listeners []chan Type + listenerStore *listenerStore catchersStore *sync.Map } @@ -22,19 +22,13 @@ func WithExtraGenerator(extraGenerator ExtraGenerator) Option { } } -func WithListener(listener chan Type) Option { - return func(client *Client) { - client.listeners = append(client.listeners, listener) - } -} - func NewClient(authorizationStateHandler AuthorizationStateHandler, options ...Option) (*Client, error) { catchersListener := make(chan *Response, 1000) client := &Client{ jsonClient: NewJsonClient(), catcher: catchersListener, - listeners: []chan Type{}, + listenerStore: newListenerStore(), catchersStore: &sync.Map{}, } @@ -70,8 +64,16 @@ func (client *Client) receive() { continue } - for _, listener := range client.listeners { - listener <- typ + needGc := false + for _, listener := range client.listenerStore.Listeners() { + if listener.IsActive() { + listener.Updates <- typ + } else { + needGc = true + } + } + if needGc { + client.listenerStore.gc() } } } @@ -109,3 +111,13 @@ func (client *Client) Send(req Request) (*Response, error) { return nil, errors.New("timeout") } } + +func (client *Client) GetListener() *Listener { + listener := &Listener{ + isActive: true, + Updates: make(chan Type, 1000), + } + client.listenerStore.Add(listener) + + return listener +} diff --git a/client/listener.go b/client/listener.go new file mode 100644 index 0000000..c57b4ea --- /dev/null +++ b/client/listener.go @@ -0,0 +1,66 @@ +package client + +import ( + "sync" +) + +func newListenerStore() *listenerStore { + return &listenerStore{ + listeners: []*Listener{}, + } +} + +type listenerStore struct { + sync.Mutex + listeners []*Listener +} + +func (store *listenerStore) Add(listener *Listener) { + store.Lock() + defer store.Unlock() + + store.listeners = append(store.listeners, listener) +} + +func (store *listenerStore) Listeners() []*Listener { + store.Lock() + defer store.Unlock() + + return store.listeners +} + +func (store *listenerStore) gc() { + store.Lock() + defer store.Unlock() + + oldListeners := store.listeners + + store.listeners = []*Listener{} + + for _, listener := range oldListeners { + if listener.IsActive() { + store.listeners = append(store.listeners, listener) + } + } +} + +type Listener struct { + mu sync.Mutex + isActive bool + Updates chan Type +} + +func (listener *Listener) Close() { + listener.mu.Lock() + defer listener.mu.Unlock() + + listener.isActive = false + close(listener.Updates) +} + +func (listener *Listener) IsActive() bool { + listener.mu.Lock() + defer listener.mu.Unlock() + + return listener.isActive +}