From a0e74051fd3abb57a1507b791fc89f46a3cdc474 Mon Sep 17 00:00:00 2001 From: Wichert Akkerman Date: Tue, 29 Oct 2019 11:37:49 +0100 Subject: [PATCH] Use a channel based API for SendIQ This makes sending IQ more idiomatic Go, but more importantly it solves a problem with contexts that were not being cancelled correctly with the previous API. As a side-effect of this change `Route.route` must now be invoked in a go-routine to prevent deadlocks. This also allows for stanzas to be processed in parallel, which can result in a nice performance win. --- client.go | 13 ++++++----- router.go | 57 ++++++++++++++++------------------------------ router_test.go | 61 +++++++++++++++++++++----------------------------- 3 files changed, 51 insertions(+), 80 deletions(-) diff --git a/client.go b/client.go index c8000bd..a7e6c7d 100644 --- a/client.go +++ b/client.go @@ -231,18 +231,16 @@ func (c *Client) Send(packet stanza.Packet) error { // forever for an IQ result. For example: // // ctx, _ := context.WithTimeout(context.Background(), 30 * time.Second) -// client.SendIQ(ctx, iq, func(s Sender, p stanza.Packet) { -// // Handle the result here -// }) +// result := <- client.SendIQ(ctx, iq) // -func (c *Client) SendIQ(ctx context.Context, iq stanza.IQ, handler IQResultHandlerFunc) (*IQResultRoute, error) { +func (c *Client) SendIQ(ctx context.Context, iq stanza.IQ) (chan stanza.IQ, error) { if iq.Attrs.Type != "set" && iq.Attrs.Type != "get" { return nil, ErrCanOnlySendGetOrSetIq } if err := c.Send(iq); err != nil { return nil, err } - return c.router.NewIQResultRoute(ctx, iq.Attrs.Id).HandlerFunc(handler), nil + return c.router.NewIQResultRoute(ctx, iq.Attrs.Id), nil } // SendRaw sends an XMPP stanza as a string to the server. @@ -295,7 +293,10 @@ func (c *Client) recv(state SMState, keepaliveQuit chan<- struct{}) (err error) state.Inbound++ } - c.router.route(c, val) + // Do normal route processing in a go-routine so we can immediately + // start receiving other stanzas. This also allows route handlers to + // send and receive more stanzas. + go c.router.route(c, val) } } diff --git a/router.go b/router.go index 05f4b09..23a134e 100644 --- a/router.go +++ b/router.go @@ -51,8 +51,8 @@ func (r *Router) route(s Sender, p stanza.Packet) { r.IQResultRouteLock.Lock() delete(r.IQResultRoutes, iq.Id) r.IQResultRouteLock.Unlock() - close(route.matched) - route.handler.HandleIQ(route.context, s, iq) + route.result <- iq + close(route.result) return } } @@ -91,29 +91,22 @@ func (r *Router) NewRoute() *Route { // NewIQResultRoute register a route that will catch an IQ result stanza with // the given Id. The route will only match ones, after which it will automatically // be unregistered -func (r *Router) NewIQResultRoute(ctx context.Context, id string) *IQResultRoute { - route := &IQResultRoute{ - context: ctx, - matched: make(chan struct{}), - } +func (r *Router) NewIQResultRoute(ctx context.Context, id string) chan stanza.IQ { + route := NewIQResultRoute(ctx) r.IQResultRouteLock.Lock() r.IQResultRoutes[id] = route r.IQResultRouteLock.Unlock() + // Start a go function to make sure the route is unregistered when the context + // is done. go func() { - select { - case <-route.context.Done(): - r.IQResultRouteLock.Lock() - delete(r.IQResultRoutes, id) - r.IQResultRouteLock.Unlock() - if route.timeoutHandler != nil { - route.timeoutHandler(route.context.Err()) - } - case <-route.matched: - } + <-route.context.Done() + r.IQResultRouteLock.Lock() + delete(r.IQResultRoutes, id) + r.IQResultRouteLock.Unlock() }() - return route + return route.result } func (r *Router) Match(p stanza.Packet, match *RouteMatch) bool { @@ -144,28 +137,16 @@ type TimeoutHandlerFunc func(err error) // IQResultRoute is a temporary route to match IQ result stanzas type IQResultRoute struct { - context context.Context - matched chan struct{} - handler IQResultHandler - timeoutHandler TimeoutHandlerFunc + context context.Context + result chan stanza.IQ } -// Handler adds an IQ handler to the route. -func (r *IQResultRoute) Handler(handler IQResultHandler) *IQResultRoute { - r.handler = handler - return r -} - -// HandlerFunc updates the route to call a handler function when an IQ result is received. -func (r *IQResultRoute) HandlerFunc(f IQResultHandlerFunc) *IQResultRoute { - return r.Handler(f) -} - -// TimeoutHandlerFunc registers a function that will be called automatically when -// the IQ result route is cancelled (most likely due to a timeout on the context). -func (r *IQResultRoute) TimeoutHandlerFunc(f TimeoutHandlerFunc) *IQResultRoute { - r.timeoutHandler = f - return r +// NewIQResultRoute creates a new IQResultRoute instance +func NewIQResultRoute(ctx context.Context) *IQResultRoute { + return &IQResultRoute{ + context: ctx, + result: make(chan stanza.IQ), + } } // ============================================================================ diff --git a/router_test.go b/router_test.go index b63553d..f9725ba 100644 --- a/router_test.go +++ b/router_test.go @@ -4,8 +4,8 @@ import ( "bytes" "context" "encoding/xml" - "runtime" "testing" + "time" "gosrc.io/xmpp/stanza" ) @@ -16,52 +16,41 @@ import ( func TestIQResultRoutes(t *testing.T) { t.Parallel() router := NewRouter() + conn := NewSenderMock() if router.IQResultRoutes == nil { t.Fatal("NewRouter does not initialize isResultRoutes") } - // Check other IQ does not matcah - conn := NewSenderMock() - iq := stanza.NewIQ(stanza.Attrs{Type: stanza.IQTypeResult, Id: "4321"}) - router.NewIQResultRoute(context.Background(), "1234").HandlerFunc(func(ctx context.Context, s Sender, iq stanza.IQ) { - _ = s.SendRaw(successFlag) - }) - if conn.String() == successFlag { - t.Fatal("IQ result with wrong ID was matched") - } - // Check if the IQ handler was called - conn = NewSenderMock() - iq = stanza.NewIQ(stanza.Attrs{Type: stanza.IQTypeResult, Id: "1234"}) - router.route(conn, iq) - if conn.String() != successFlag { + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*100) + defer cancel() + iq := stanza.NewIQ(stanza.Attrs{Type: stanza.IQTypeResult, Id: "1234"}) + res := router.NewIQResultRoute(ctx, "1234") + go router.route(conn, iq) + select { + case <-ctx.Done(): t.Fatal("IQ result was not matched") + case <-res: + // Success } - // The match must only happen once, so we if receive the same package again it - // must not be matched. - conn = NewSenderMock() - router.route(conn, iq) - if conn.String() == successFlag { - t.Fatal("IQ result was matched twice") + // The match must only happen once, so the id should no longer be in IQResultRoutes + if _, ok := router.IQResultRoutes[iq.Attrs.Id]; ok { + t.Fatal("IQ ID was not removed from the route map") } - // After cancelling a route it should no longer match - conn = NewSenderMock() - ctx, cancel := context.WithCancel(context.Background()) - iq = stanza.NewIQ(stanza.Attrs{Type: stanza.IQTypeResult, Id: "1234"}) - router.NewIQResultRoute(ctx, "1234").HandlerFunc(func(ctx context.Context, s Sender, iq stanza.IQ) { - _ = s.SendRaw(successFlag) - }).TimeoutHandlerFunc(func(err error) { - conn.SendRaw(cancelledFlag) - }) - cancel() - // Yield the processor so the cancellation goroutine is triggered - runtime.Gosched() - router.route(conn, iq) - if conn.String() != cancelledFlag { - t.Fatal("IQ result route was matched after cancellation") + // Check other IQ does not matcah + ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*100) + defer cancel() + iq.Attrs.Id = "4321" + res = router.NewIQResultRoute(ctx, "1234") + go router.route(conn, iq) + select { + case <-ctx.Done(): + // Success + case <-res: + t.Fatal("IQ result with wrong ID was matched") } }