From b05e68c8441c9b226e519a80f99daec297089fb7 Mon Sep 17 00:00:00 2001 From: Mickael Remond Date: Thu, 13 Jun 2019 17:22:39 +0200 Subject: [PATCH] Add router to make it easier to set up routing info - Using the router, the dispatch is not done anymore by receiving from receive channel, but by registering callback functions in routers, with matchers. - Make IQPayload a real interface to make it easier to match namespaces. - The StreamManager Run command is now blocking, waiting for StreamManager to terminate. --- auth.go | 5 +- component.go | 13 ++-- iot_control.go | 9 ++- iq.go | 33 ++++++-- router.go | 193 ++++++++++++++++++++++++++++++++++++++++++++++ router_test.go | 75 ++++++++++++++++++ stream.go | 4 +- stream_manager.go | 18 ++++- 8 files changed, 333 insertions(+), 17 deletions(-) create mode 100644 router.go create mode 100644 router_test.go diff --git a/auth.go b/auth.go index e69f82e..5af154a 100644 --- a/auth.go +++ b/auth.go @@ -102,12 +102,15 @@ type auth struct { } type BindBind struct { - IQPayload XMLName xml.Name `xml:"urn:ietf:params:xml:ns:xmpp-bind bind"` Resource string `xml:"resource,omitempty"` Jid string `xml:"jid,omitempty"` } +func (b *BindBind) Namespace() string { + return b.XMLName.Space +} + // Session is obsolete in RFC 6121. // Added for compliance with RFC 3121. // Remove when ejabberd purely conforms to RFC 6121. diff --git a/component.go b/component.go index 6a6445b..e5fde3b 100644 --- a/component.go +++ b/component.go @@ -48,6 +48,7 @@ type ComponentOptions struct { // in XEP-0114, XEP-0355 and XEP-0356. type Component struct { ComponentOptions + router *Router // TCP level connection conn net.Conn @@ -57,8 +58,8 @@ type Component struct { decoder *xml.Decoder } -func NewComponent(opts ComponentOptions) (*Component, error) { - c := Component{ComponentOptions: opts} +func NewComponent(opts ComponentOptions, r *Router) (*Component, error) { + c := Component{ComponentOptions: opts, router: r} // Create a default channel that developers can override c.RecvChannel = make(chan Packet) return &c, nil @@ -122,10 +123,13 @@ func (c *Component) SetHandler(handler EventHandler) { // Recv abstracts receiving preparsed XMPP packets from a channel. // Channel allow client to receive / dispatch packets in for range loop. // TODO: Deprecate this function in favor of reading directly from the RecvChannel ? +/* func (c *Component) Recv() <-chan Packet { return c.RecvChannel } +*/ +// Receiver Go routine receiver func (c *Component) recv() (err error) { for { val, err := next(c.decoder) @@ -137,12 +141,11 @@ func (c *Component) recv() (err error) { // Handle stream errors switch p := val.(type) { case StreamError: - c.RecvChannel <- val - close(c.RecvChannel) + c.router.Route(c.conn, val) c.streamError(p.Error.Local, p.Text) return errors.New("stream error: " + p.Error.Local) } - c.RecvChannel <- val + c.router.Route(c.conn, val) } } diff --git a/iot_control.go b/iot_control.go index 3079246..5e80957 100644 --- a/iot_control.go +++ b/iot_control.go @@ -5,11 +5,14 @@ import ( ) type ControlSet struct { - IQPayload XMLName xml.Name `xml:"urn:xmpp:iot:control set"` Fields []ControlField `xml:",any"` } +func (c *ControlSet) Namespace() string { + return c.XMLName.Space +} + type ControlGetForm struct { XMLName xml.Name `xml:"urn:xmpp:iot:control getForm"` } @@ -24,3 +27,7 @@ type ControlSetResponse struct { IQPayload XMLName xml.Name `xml:"urn:xmpp:iot:control setResponse"` } + +func (c *ControlSetResponse) Namespace() string { + return c.XMLName.Space +} diff --git a/iq.go b/iq.go index d69df71..9cd334a 100644 --- a/iq.go +++ b/iq.go @@ -16,7 +16,6 @@ TODO support ability to put Raw payload inside IQ // presence or iq stanza. // It is intended to be added in the payload of the erroneous stanza. type Err struct { - IQPayload XMLName xml.Name `xml:"error"` Code int `xml:"code,attr,omitempty"` Type string `xml:"type,attr,omitempty"` @@ -24,6 +23,10 @@ type Err struct { Text string `xml:"urn:ietf:params:xml:ns:xmpp-stanzas text,omitempty"` } +func (x *Err) Namespace() string { + return x.XMLName.Space +} + // UnmarshalXML implements custom parsing for IQs func (x *Err) UnmarshalXML(d *xml.Decoder, start xml.StartElement) error { x.XMLName = start.Name @@ -120,6 +123,10 @@ func (x Err) MarshalXML(e *xml.Encoder, start xml.StartElement) (err error) { type IQ struct { // Info/Query XMLName xml.Name `xml:"iq"` PacketAttrs + // FIXME: We can only have one payload: + // "An IQ stanza of type "get" or "set" MUST contain exactly one + // child element, which specifies the semantics of the particular + // request." Payload []IQPayload `xml:",omitempty"` RawXML string `xml:",innerxml"` Error Err `xml:"error,omitempty"` @@ -229,18 +236,23 @@ func (iq *IQ) UnmarshalXML(d *xml.Decoder, start xml.StartElement) error { // ============================================================================ // Generic IQ Payload -type IQPayload interface{} +type IQPayload interface { + Namespace() string +} // Node is a generic structure to represent XML data. It is used to parse // unreferenced or custom stanza payload. type Node struct { - IQPayload XMLName xml.Name Attrs []xml.Attr `xml:"-"` Content string `xml:",innerxml"` Nodes []Node `xml:",any"` } +func (n *Node) Namespace() string { + return n.XMLName.Space +} + // Attr represents generic XML attributes, as used on the generic XML Node // representation. type Attr struct { @@ -283,13 +295,16 @@ const ( // Disco Info type DiscoInfo struct { - IQPayload XMLName xml.Name `xml:"http://jabber.org/protocol/disco#info query"` Node string `xml:"node,attr,omitempty"` Identity Identity `xml:"identity"` Features []Feature `xml:"feature"` } +func (d *DiscoInfo) Namespace() string { + return d.XMLName.Space +} + type Identity struct { XMLName xml.Name `xml:"identity,omitempty"` Name string `xml:"name,attr,omitempty"` @@ -304,12 +319,15 @@ type Feature struct { // Disco Items type DiscoItems struct { - IQPayload XMLName xml.Name `xml:"http://jabber.org/protocol/disco#items query"` Node string `xml:"node,attr,omitempty"` Items []DiscoItem `xml:"item"` } +func (d *DiscoItems) Namespace() string { + return d.XMLName.Space +} + type DiscoItem struct { XMLName xml.Name `xml:"item"` Name string `xml:"name,attr,omitempty"` @@ -322,13 +340,16 @@ type DiscoItem struct { // Version type Version struct { - IQPayload XMLName xml.Name `xml:"jabber:iq:version query"` Name string `xml:"name,omitempty"` Version string `xml:"version,omitempty"` OS string `xml:"os,omitempty"` } +func (v *Version) Namespace() string { + return v.XMLName.Space +} + // ============================================================================ // Registry init diff --git a/router.go b/router.go new file mode 100644 index 0000000..8fcba0b --- /dev/null +++ b/router.go @@ -0,0 +1,193 @@ +package xmpp + +import ( + "io" + "strings" +) + +/* +The XMPP router helps client and component developer select which XMPP they would like to process, +and associate processing code depending on the router configuration. + +TODO: Automatically reply to IQ that do not match any route, to comply to XMPP standard. +*/ + +type Router struct { + // Routes to be matched, in order. + routes []*Route +} + +// NewRouter returns a new router instance. +func NewRouter() *Router { + return &Router{} +} + +func (r *Router) Route(w io.Writer, p Packet) { + var match RouteMatch + if r.Match(p, &match) { + match.Handler.HandlePacket(w, p) + } +} + +// NewRoute registers an empty routes +func (r *Router) NewRoute() *Route { + route := &Route{} + r.routes = append(r.routes, route) + return route +} + +func (r *Router) Match(p Packet, match *RouteMatch) bool { + for _, route := range r.routes { + if route.Match(p, match) { + return true + } + } + return false +} + +// Handle registers a new route with a matcher for a given packet name (iq, message, presence) +// See Route.Packet() and Route.Handler(). +func (r *Router) Handle(name string, handler Handler) *Route { + return r.NewRoute().Packet(name).Handler(handler) +} + +// HandleFunc registers a new route with a matcher for for a given packet name (iq, message, presence) +// See Route.Path() and Route.HandlerFunc(). +func (r *Router) HandleFunc(name string, f func(io.Writer, Packet)) *Route { + return r.NewRoute().Packet(name).HandlerFunc(f) +} + +// ============================================================================ +// Route +type Handler interface { + HandlePacket(w io.Writer, p Packet) +} + +type Route struct { + handler Handler + // Matchers are used to "specialize" routes and focus on specific packet features + matchers []matcher +} + +func (r *Route) Handler(handler Handler) *Route { + r.handler = handler + return r +} + +// The HandlerFunc type is an adapter to allow the use of +// ordinary functions as XMPP handlers. If f is a function +// with the appropriate signature, HandlerFunc(f) is a +// Handler that calls f. +type HandlerFunc func(io.Writer, Packet) + +// HandlePacket calls f(w, p) +func (f HandlerFunc) HandlePacket(w io.Writer, p Packet) { + f(w, p) +} + +// HandlerFunc sets a handler function for the route +func (r *Route) HandlerFunc(f HandlerFunc) *Route { + return r.Handler(f) +} + +// addMatcher adds a matcher to the route +func (r *Route) addMatcher(m matcher) *Route { + r.matchers = append(r.matchers, m) + return r +} + +func (r *Route) Match(p Packet, match *RouteMatch) bool { + for _, m := range r.matchers { + if matched := m.Match(p, match); !matched { + return false + } + } + + // We have a match, let's pass info route match info + match.Route = r + match.Handler = r.handler + return true +} + +// -------------------- +// Match on packet name + +type nameMatcher string + +func (n nameMatcher) Match(p Packet, match *RouteMatch) bool { + var name string + // TODO: To avoid type switch everywhere in matching, I think we will need to have + // to move to a concrete type for packets, to make matching and comparison more natural. + // Current code structure is probably too rigid. + // Maybe packet types should even be from an enum. + switch p.(type) { + case Message: + name = "message" + case IQ: + name = "iq" + case Presence: + name = "presence" + } + if name == string(n) { + return true + } + return false +} + +// Packet matches on a packet name (iq, message, presence, ...) +// It matches on the Local part of the xml.Name +func (r *Route) Packet(name string) *Route { + name = strings.ToLower(name) + return r.addMatcher(nameMatcher(name)) +} + +// ------------------------- +// Match on IQ and namespace + +// nsIqMather matches on a list of IQ payload namespaces +type nsIQMatcher []string + +func (m nsIQMatcher) Match(p Packet, match *RouteMatch) bool { + // TODO + iq, ok := p.(IQ) + if !ok { + return false + } + if len(iq.Payload) < 1 { + return false + } + return matchInArray(m, iq.Payload[0].Namespace()) +} + +// IQNamespaces adds an IQ matcher, expecting both an IQ and a +func (r *Route) IQNamespaces(namespaces ...string) *Route { + for k, v := range namespaces { + namespaces[k] = strings.ToLower(v) + } + return r.addMatcher(nsIQMatcher(namespaces)) +} + +// ============================================================================ +// Matchers + +// Matchers are used to "specialize" routes and focus on specific packet features +type matcher interface { + Match(Packet, *RouteMatch) bool +} + +// RouteMatch extracts and gather match information +type RouteMatch struct { + Route *Route + Handler Handler +} + +// matchInArray is a generic matching function to check if a string is a list +// of specific function +func matchInArray(arr []string, value string) bool { + for _, str := range arr { + if str == value { + return true + } + } + return false +} diff --git a/router_test.go b/router_test.go new file mode 100644 index 0000000..09b2ae9 --- /dev/null +++ b/router_test.go @@ -0,0 +1,75 @@ +package xmpp_test + +import ( + "bytes" + "encoding/xml" + "io" + "testing" + + "gosrc.io/xmpp" +) + +var successFlag = []byte("matched") + +func TestNameMatcher(t *testing.T) { + router := xmpp.NewRouter() + router.HandleFunc("message", func(w io.Writer, p xmpp.Packet) { + _, _ = w.Write(successFlag) + }) + + // Check that a message packet is properly matched + var buf bytes.Buffer + // TODO: We want packet creation code to use struct to use default values + msg := xmpp.NewMessage("chat", "", "test@localhost", "1", "") + msg.Body = "Hello" + router.Route(&buf, msg) + if !bytes.Equal(buf.Bytes(), successFlag) { + t.Error("Message was not matched and routed properly") + } + + // Check that an IQ packet is not matched + buf = bytes.Buffer{} + iq := xmpp.NewIQ("get", "", "localhost", "1", "") + iq.Payload = append(iq.Payload, &xmpp.DiscoInfo{}) + router.Route(&buf, iq) + if bytes.Equal(buf.Bytes(), successFlag) { + t.Error("IQ should not have been matched and routed") + } +} + +func TestIQNSMatcher(t *testing.T) { + router := xmpp.NewRouter() + router.NewRoute(). + IQNamespaces(xmpp.NSDiscoInfo, xmpp.NSDiscoItems). + HandlerFunc(func(w io.Writer, p xmpp.Packet) { + _, _ = w.Write(successFlag) + }) + + // Check that an IQ with proper namespace does match + var buf bytes.Buffer + iqDisco := xmpp.NewIQ("get", "", "localhost", "1", "") + // TODO: Add a function to generate payload with proper namespace initialisation + iqDisco.Payload = append(iqDisco.Payload, &xmpp.DiscoInfo{ + XMLName: xml.Name{ + Space: xmpp.NSDiscoInfo, + Local: "query", + }}) + router.Route(&buf, iqDisco) + if !bytes.Equal(buf.Bytes(), successFlag) { + t.Errorf("IQ should have been matched and routed: %v", iqDisco) + } + + // Check that another namespace is not matched + buf = bytes.Buffer{} + iqVersion := xmpp.NewIQ("get", "", "localhost", "1", "") + // TODO: Add a function to generate payload with proper namespace initialisation + iqVersion.Payload = append(iqVersion.Payload, &xmpp.DiscoInfo{ + XMLName: xml.Name{ + Space: "jabber:iq:version", + Local: "query", + }}) + router.Route(&buf, iqVersion) + if bytes.Equal(buf.Bytes(), successFlag) { + t.Errorf("IQ should not have been matched and routed: %v", iqVersion) + } +} diff --git a/stream.go b/stream.go index d25ce58..20759ba 100644 --- a/stream.go +++ b/stream.go @@ -6,7 +6,9 @@ import ( // ============================================================================ // StreamFeatures Packet -// Reference: https://xmpp.org/registrar/stream-features.html +// Reference: The active stream features are published on +// https://xmpp.org/registrar/stream-features.html +// Note: That page misses draft and experimental XEP (i.e CSI, etc) type StreamFeatures struct { XMLName xml.Name `xml:"http://etherx.jabber.org/streams features"` diff --git a/stream_manager.go b/stream_manager.go index b353c2b..5ed780d 100644 --- a/stream_manager.go +++ b/stream_manager.go @@ -2,6 +2,7 @@ package xmpp // import "gosrc.io/xmpp" import ( "errors" + "sync" "time" "golang.org/x/xerrors" @@ -32,6 +33,8 @@ type StreamManager struct { // Store low level metrics Metrics *Metrics + + wg sync.WaitGroup } type PostConnect func(c StreamClient) @@ -47,8 +50,10 @@ func NewStreamManager(client StreamClient, pc PostConnect) *StreamManager { } } -// Start launch the connection loop -func (sm *StreamManager) Start() error { +// Run launchs the connection of the underlying client or component +// and wait until Disconnect is called, or for the manager to terminate due +// to an unrecoverable error. +func (sm *StreamManager) Run() error { if sm.client == nil { return errors.New("missing stream client") } @@ -72,7 +77,13 @@ func (sm *StreamManager) Start() error { } sm.client.SetHandler(handler) - return sm.connect() + sm.wg.Add(1) + if err := sm.connect(); err != nil { + sm.wg.Done() + return err + } + sm.wg.Wait() + return nil } // Stop cancels pending operations and terminates existing XMPP client. @@ -80,6 +91,7 @@ func (sm *StreamManager) Stop() { // Remove on disconnect handler to avoid triggering reconnect sm.client.SetHandler(nil) sm.client.Disconnect() + sm.wg.Done() } // connect manages the reconnection loop and apply the define backoff to avoid overloading the server.