Add router to make it easier to set up routing info
- Using the router, the dispatch is not done anymore by receiving from receive channel, but by registering callback functions in routers, with matchers. - Make IQPayload a real interface to make it easier to match namespaces. - The StreamManager Run command is now blocking, waiting for StreamManager to terminate.
This commit is contained in:
parent
f7b7482d2e
commit
b05e68c844
5
auth.go
5
auth.go
|
@ -102,12 +102,15 @@ type auth struct {
|
|||
}
|
||||
|
||||
type BindBind struct {
|
||||
IQPayload
|
||||
XMLName xml.Name `xml:"urn:ietf:params:xml:ns:xmpp-bind bind"`
|
||||
Resource string `xml:"resource,omitempty"`
|
||||
Jid string `xml:"jid,omitempty"`
|
||||
}
|
||||
|
||||
func (b *BindBind) Namespace() string {
|
||||
return b.XMLName.Space
|
||||
}
|
||||
|
||||
// Session is obsolete in RFC 6121.
|
||||
// Added for compliance with RFC 3121.
|
||||
// Remove when ejabberd purely conforms to RFC 6121.
|
||||
|
|
13
component.go
13
component.go
|
@ -48,6 +48,7 @@ type ComponentOptions struct {
|
|||
// in XEP-0114, XEP-0355 and XEP-0356.
|
||||
type Component struct {
|
||||
ComponentOptions
|
||||
router *Router
|
||||
|
||||
// TCP level connection
|
||||
conn net.Conn
|
||||
|
@ -57,8 +58,8 @@ type Component struct {
|
|||
decoder *xml.Decoder
|
||||
}
|
||||
|
||||
func NewComponent(opts ComponentOptions) (*Component, error) {
|
||||
c := Component{ComponentOptions: opts}
|
||||
func NewComponent(opts ComponentOptions, r *Router) (*Component, error) {
|
||||
c := Component{ComponentOptions: opts, router: r}
|
||||
// Create a default channel that developers can override
|
||||
c.RecvChannel = make(chan Packet)
|
||||
return &c, nil
|
||||
|
@ -122,10 +123,13 @@ func (c *Component) SetHandler(handler EventHandler) {
|
|||
// Recv abstracts receiving preparsed XMPP packets from a channel.
|
||||
// Channel allow client to receive / dispatch packets in for range loop.
|
||||
// TODO: Deprecate this function in favor of reading directly from the RecvChannel ?
|
||||
/*
|
||||
func (c *Component) Recv() <-chan Packet {
|
||||
return c.RecvChannel
|
||||
}
|
||||
*/
|
||||
|
||||
// Receiver Go routine receiver
|
||||
func (c *Component) recv() (err error) {
|
||||
for {
|
||||
val, err := next(c.decoder)
|
||||
|
@ -137,12 +141,11 @@ func (c *Component) recv() (err error) {
|
|||
// Handle stream errors
|
||||
switch p := val.(type) {
|
||||
case StreamError:
|
||||
c.RecvChannel <- val
|
||||
close(c.RecvChannel)
|
||||
c.router.Route(c.conn, val)
|
||||
c.streamError(p.Error.Local, p.Text)
|
||||
return errors.New("stream error: " + p.Error.Local)
|
||||
}
|
||||
c.RecvChannel <- val
|
||||
c.router.Route(c.conn, val)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -5,11 +5,14 @@ import (
|
|||
)
|
||||
|
||||
type ControlSet struct {
|
||||
IQPayload
|
||||
XMLName xml.Name `xml:"urn:xmpp:iot:control set"`
|
||||
Fields []ControlField `xml:",any"`
|
||||
}
|
||||
|
||||
func (c *ControlSet) Namespace() string {
|
||||
return c.XMLName.Space
|
||||
}
|
||||
|
||||
type ControlGetForm struct {
|
||||
XMLName xml.Name `xml:"urn:xmpp:iot:control getForm"`
|
||||
}
|
||||
|
@ -24,3 +27,7 @@ type ControlSetResponse struct {
|
|||
IQPayload
|
||||
XMLName xml.Name `xml:"urn:xmpp:iot:control setResponse"`
|
||||
}
|
||||
|
||||
func (c *ControlSetResponse) Namespace() string {
|
||||
return c.XMLName.Space
|
||||
}
|
||||
|
|
33
iq.go
33
iq.go
|
@ -16,7 +16,6 @@ TODO support ability to put Raw payload inside IQ
|
|||
// presence or iq stanza.
|
||||
// It is intended to be added in the payload of the erroneous stanza.
|
||||
type Err struct {
|
||||
IQPayload
|
||||
XMLName xml.Name `xml:"error"`
|
||||
Code int `xml:"code,attr,omitempty"`
|
||||
Type string `xml:"type,attr,omitempty"`
|
||||
|
@ -24,6 +23,10 @@ type Err struct {
|
|||
Text string `xml:"urn:ietf:params:xml:ns:xmpp-stanzas text,omitempty"`
|
||||
}
|
||||
|
||||
func (x *Err) Namespace() string {
|
||||
return x.XMLName.Space
|
||||
}
|
||||
|
||||
// UnmarshalXML implements custom parsing for IQs
|
||||
func (x *Err) UnmarshalXML(d *xml.Decoder, start xml.StartElement) error {
|
||||
x.XMLName = start.Name
|
||||
|
@ -120,6 +123,10 @@ func (x Err) MarshalXML(e *xml.Encoder, start xml.StartElement) (err error) {
|
|||
type IQ struct { // Info/Query
|
||||
XMLName xml.Name `xml:"iq"`
|
||||
PacketAttrs
|
||||
// FIXME: We can only have one payload:
|
||||
// "An IQ stanza of type "get" or "set" MUST contain exactly one
|
||||
// child element, which specifies the semantics of the particular
|
||||
// request."
|
||||
Payload []IQPayload `xml:",omitempty"`
|
||||
RawXML string `xml:",innerxml"`
|
||||
Error Err `xml:"error,omitempty"`
|
||||
|
@ -229,18 +236,23 @@ func (iq *IQ) UnmarshalXML(d *xml.Decoder, start xml.StartElement) error {
|
|||
// ============================================================================
|
||||
// Generic IQ Payload
|
||||
|
||||
type IQPayload interface{}
|
||||
type IQPayload interface {
|
||||
Namespace() string
|
||||
}
|
||||
|
||||
// Node is a generic structure to represent XML data. It is used to parse
|
||||
// unreferenced or custom stanza payload.
|
||||
type Node struct {
|
||||
IQPayload
|
||||
XMLName xml.Name
|
||||
Attrs []xml.Attr `xml:"-"`
|
||||
Content string `xml:",innerxml"`
|
||||
Nodes []Node `xml:",any"`
|
||||
}
|
||||
|
||||
func (n *Node) Namespace() string {
|
||||
return n.XMLName.Space
|
||||
}
|
||||
|
||||
// Attr represents generic XML attributes, as used on the generic XML Node
|
||||
// representation.
|
||||
type Attr struct {
|
||||
|
@ -283,13 +295,16 @@ const (
|
|||
|
||||
// Disco Info
|
||||
type DiscoInfo struct {
|
||||
IQPayload
|
||||
XMLName xml.Name `xml:"http://jabber.org/protocol/disco#info query"`
|
||||
Node string `xml:"node,attr,omitempty"`
|
||||
Identity Identity `xml:"identity"`
|
||||
Features []Feature `xml:"feature"`
|
||||
}
|
||||
|
||||
func (d *DiscoInfo) Namespace() string {
|
||||
return d.XMLName.Space
|
||||
}
|
||||
|
||||
type Identity struct {
|
||||
XMLName xml.Name `xml:"identity,omitempty"`
|
||||
Name string `xml:"name,attr,omitempty"`
|
||||
|
@ -304,12 +319,15 @@ type Feature struct {
|
|||
|
||||
// Disco Items
|
||||
type DiscoItems struct {
|
||||
IQPayload
|
||||
XMLName xml.Name `xml:"http://jabber.org/protocol/disco#items query"`
|
||||
Node string `xml:"node,attr,omitempty"`
|
||||
Items []DiscoItem `xml:"item"`
|
||||
}
|
||||
|
||||
func (d *DiscoItems) Namespace() string {
|
||||
return d.XMLName.Space
|
||||
}
|
||||
|
||||
type DiscoItem struct {
|
||||
XMLName xml.Name `xml:"item"`
|
||||
Name string `xml:"name,attr,omitempty"`
|
||||
|
@ -322,13 +340,16 @@ type DiscoItem struct {
|
|||
|
||||
// Version
|
||||
type Version struct {
|
||||
IQPayload
|
||||
XMLName xml.Name `xml:"jabber:iq:version query"`
|
||||
Name string `xml:"name,omitempty"`
|
||||
Version string `xml:"version,omitempty"`
|
||||
OS string `xml:"os,omitempty"`
|
||||
}
|
||||
|
||||
func (v *Version) Namespace() string {
|
||||
return v.XMLName.Space
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Registry init
|
||||
|
||||
|
|
193
router.go
Normal file
193
router.go
Normal file
|
@ -0,0 +1,193 @@
|
|||
package xmpp
|
||||
|
||||
import (
|
||||
"io"
|
||||
"strings"
|
||||
)
|
||||
|
||||
/*
|
||||
The XMPP router helps client and component developer select which XMPP they would like to process,
|
||||
and associate processing code depending on the router configuration.
|
||||
|
||||
TODO: Automatically reply to IQ that do not match any route, to comply to XMPP standard.
|
||||
*/
|
||||
|
||||
type Router struct {
|
||||
// Routes to be matched, in order.
|
||||
routes []*Route
|
||||
}
|
||||
|
||||
// NewRouter returns a new router instance.
|
||||
func NewRouter() *Router {
|
||||
return &Router{}
|
||||
}
|
||||
|
||||
func (r *Router) Route(w io.Writer, p Packet) {
|
||||
var match RouteMatch
|
||||
if r.Match(p, &match) {
|
||||
match.Handler.HandlePacket(w, p)
|
||||
}
|
||||
}
|
||||
|
||||
// NewRoute registers an empty routes
|
||||
func (r *Router) NewRoute() *Route {
|
||||
route := &Route{}
|
||||
r.routes = append(r.routes, route)
|
||||
return route
|
||||
}
|
||||
|
||||
func (r *Router) Match(p Packet, match *RouteMatch) bool {
|
||||
for _, route := range r.routes {
|
||||
if route.Match(p, match) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// Handle registers a new route with a matcher for a given packet name (iq, message, presence)
|
||||
// See Route.Packet() and Route.Handler().
|
||||
func (r *Router) Handle(name string, handler Handler) *Route {
|
||||
return r.NewRoute().Packet(name).Handler(handler)
|
||||
}
|
||||
|
||||
// HandleFunc registers a new route with a matcher for for a given packet name (iq, message, presence)
|
||||
// See Route.Path() and Route.HandlerFunc().
|
||||
func (r *Router) HandleFunc(name string, f func(io.Writer, Packet)) *Route {
|
||||
return r.NewRoute().Packet(name).HandlerFunc(f)
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Route
|
||||
type Handler interface {
|
||||
HandlePacket(w io.Writer, p Packet)
|
||||
}
|
||||
|
||||
type Route struct {
|
||||
handler Handler
|
||||
// Matchers are used to "specialize" routes and focus on specific packet features
|
||||
matchers []matcher
|
||||
}
|
||||
|
||||
func (r *Route) Handler(handler Handler) *Route {
|
||||
r.handler = handler
|
||||
return r
|
||||
}
|
||||
|
||||
// The HandlerFunc type is an adapter to allow the use of
|
||||
// ordinary functions as XMPP handlers. If f is a function
|
||||
// with the appropriate signature, HandlerFunc(f) is a
|
||||
// Handler that calls f.
|
||||
type HandlerFunc func(io.Writer, Packet)
|
||||
|
||||
// HandlePacket calls f(w, p)
|
||||
func (f HandlerFunc) HandlePacket(w io.Writer, p Packet) {
|
||||
f(w, p)
|
||||
}
|
||||
|
||||
// HandlerFunc sets a handler function for the route
|
||||
func (r *Route) HandlerFunc(f HandlerFunc) *Route {
|
||||
return r.Handler(f)
|
||||
}
|
||||
|
||||
// addMatcher adds a matcher to the route
|
||||
func (r *Route) addMatcher(m matcher) *Route {
|
||||
r.matchers = append(r.matchers, m)
|
||||
return r
|
||||
}
|
||||
|
||||
func (r *Route) Match(p Packet, match *RouteMatch) bool {
|
||||
for _, m := range r.matchers {
|
||||
if matched := m.Match(p, match); !matched {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// We have a match, let's pass info route match info
|
||||
match.Route = r
|
||||
match.Handler = r.handler
|
||||
return true
|
||||
}
|
||||
|
||||
// --------------------
|
||||
// Match on packet name
|
||||
|
||||
type nameMatcher string
|
||||
|
||||
func (n nameMatcher) Match(p Packet, match *RouteMatch) bool {
|
||||
var name string
|
||||
// TODO: To avoid type switch everywhere in matching, I think we will need to have
|
||||
// to move to a concrete type for packets, to make matching and comparison more natural.
|
||||
// Current code structure is probably too rigid.
|
||||
// Maybe packet types should even be from an enum.
|
||||
switch p.(type) {
|
||||
case Message:
|
||||
name = "message"
|
||||
case IQ:
|
||||
name = "iq"
|
||||
case Presence:
|
||||
name = "presence"
|
||||
}
|
||||
if name == string(n) {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// Packet matches on a packet name (iq, message, presence, ...)
|
||||
// It matches on the Local part of the xml.Name
|
||||
func (r *Route) Packet(name string) *Route {
|
||||
name = strings.ToLower(name)
|
||||
return r.addMatcher(nameMatcher(name))
|
||||
}
|
||||
|
||||
// -------------------------
|
||||
// Match on IQ and namespace
|
||||
|
||||
// nsIqMather matches on a list of IQ payload namespaces
|
||||
type nsIQMatcher []string
|
||||
|
||||
func (m nsIQMatcher) Match(p Packet, match *RouteMatch) bool {
|
||||
// TODO
|
||||
iq, ok := p.(IQ)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
if len(iq.Payload) < 1 {
|
||||
return false
|
||||
}
|
||||
return matchInArray(m, iq.Payload[0].Namespace())
|
||||
}
|
||||
|
||||
// IQNamespaces adds an IQ matcher, expecting both an IQ and a
|
||||
func (r *Route) IQNamespaces(namespaces ...string) *Route {
|
||||
for k, v := range namespaces {
|
||||
namespaces[k] = strings.ToLower(v)
|
||||
}
|
||||
return r.addMatcher(nsIQMatcher(namespaces))
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Matchers
|
||||
|
||||
// Matchers are used to "specialize" routes and focus on specific packet features
|
||||
type matcher interface {
|
||||
Match(Packet, *RouteMatch) bool
|
||||
}
|
||||
|
||||
// RouteMatch extracts and gather match information
|
||||
type RouteMatch struct {
|
||||
Route *Route
|
||||
Handler Handler
|
||||
}
|
||||
|
||||
// matchInArray is a generic matching function to check if a string is a list
|
||||
// of specific function
|
||||
func matchInArray(arr []string, value string) bool {
|
||||
for _, str := range arr {
|
||||
if str == value {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
75
router_test.go
Normal file
75
router_test.go
Normal file
|
@ -0,0 +1,75 @@
|
|||
package xmpp_test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/xml"
|
||||
"io"
|
||||
"testing"
|
||||
|
||||
"gosrc.io/xmpp"
|
||||
)
|
||||
|
||||
var successFlag = []byte("matched")
|
||||
|
||||
func TestNameMatcher(t *testing.T) {
|
||||
router := xmpp.NewRouter()
|
||||
router.HandleFunc("message", func(w io.Writer, p xmpp.Packet) {
|
||||
_, _ = w.Write(successFlag)
|
||||
})
|
||||
|
||||
// Check that a message packet is properly matched
|
||||
var buf bytes.Buffer
|
||||
// TODO: We want packet creation code to use struct to use default values
|
||||
msg := xmpp.NewMessage("chat", "", "test@localhost", "1", "")
|
||||
msg.Body = "Hello"
|
||||
router.Route(&buf, msg)
|
||||
if !bytes.Equal(buf.Bytes(), successFlag) {
|
||||
t.Error("Message was not matched and routed properly")
|
||||
}
|
||||
|
||||
// Check that an IQ packet is not matched
|
||||
buf = bytes.Buffer{}
|
||||
iq := xmpp.NewIQ("get", "", "localhost", "1", "")
|
||||
iq.Payload = append(iq.Payload, &xmpp.DiscoInfo{})
|
||||
router.Route(&buf, iq)
|
||||
if bytes.Equal(buf.Bytes(), successFlag) {
|
||||
t.Error("IQ should not have been matched and routed")
|
||||
}
|
||||
}
|
||||
|
||||
func TestIQNSMatcher(t *testing.T) {
|
||||
router := xmpp.NewRouter()
|
||||
router.NewRoute().
|
||||
IQNamespaces(xmpp.NSDiscoInfo, xmpp.NSDiscoItems).
|
||||
HandlerFunc(func(w io.Writer, p xmpp.Packet) {
|
||||
_, _ = w.Write(successFlag)
|
||||
})
|
||||
|
||||
// Check that an IQ with proper namespace does match
|
||||
var buf bytes.Buffer
|
||||
iqDisco := xmpp.NewIQ("get", "", "localhost", "1", "")
|
||||
// TODO: Add a function to generate payload with proper namespace initialisation
|
||||
iqDisco.Payload = append(iqDisco.Payload, &xmpp.DiscoInfo{
|
||||
XMLName: xml.Name{
|
||||
Space: xmpp.NSDiscoInfo,
|
||||
Local: "query",
|
||||
}})
|
||||
router.Route(&buf, iqDisco)
|
||||
if !bytes.Equal(buf.Bytes(), successFlag) {
|
||||
t.Errorf("IQ should have been matched and routed: %v", iqDisco)
|
||||
}
|
||||
|
||||
// Check that another namespace is not matched
|
||||
buf = bytes.Buffer{}
|
||||
iqVersion := xmpp.NewIQ("get", "", "localhost", "1", "")
|
||||
// TODO: Add a function to generate payload with proper namespace initialisation
|
||||
iqVersion.Payload = append(iqVersion.Payload, &xmpp.DiscoInfo{
|
||||
XMLName: xml.Name{
|
||||
Space: "jabber:iq:version",
|
||||
Local: "query",
|
||||
}})
|
||||
router.Route(&buf, iqVersion)
|
||||
if bytes.Equal(buf.Bytes(), successFlag) {
|
||||
t.Errorf("IQ should not have been matched and routed: %v", iqVersion)
|
||||
}
|
||||
}
|
|
@ -6,7 +6,9 @@ import (
|
|||
|
||||
// ============================================================================
|
||||
// StreamFeatures Packet
|
||||
// Reference: https://xmpp.org/registrar/stream-features.html
|
||||
// Reference: The active stream features are published on
|
||||
// https://xmpp.org/registrar/stream-features.html
|
||||
// Note: That page misses draft and experimental XEP (i.e CSI, etc)
|
||||
|
||||
type StreamFeatures struct {
|
||||
XMLName xml.Name `xml:"http://etherx.jabber.org/streams features"`
|
||||
|
|
|
@ -2,6 +2,7 @@ package xmpp // import "gosrc.io/xmpp"
|
|||
|
||||
import (
|
||||
"errors"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"golang.org/x/xerrors"
|
||||
|
@ -32,6 +33,8 @@ type StreamManager struct {
|
|||
|
||||
// Store low level metrics
|
||||
Metrics *Metrics
|
||||
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
type PostConnect func(c StreamClient)
|
||||
|
@ -47,8 +50,10 @@ func NewStreamManager(client StreamClient, pc PostConnect) *StreamManager {
|
|||
}
|
||||
}
|
||||
|
||||
// Start launch the connection loop
|
||||
func (sm *StreamManager) Start() error {
|
||||
// Run launchs the connection of the underlying client or component
|
||||
// and wait until Disconnect is called, or for the manager to terminate due
|
||||
// to an unrecoverable error.
|
||||
func (sm *StreamManager) Run() error {
|
||||
if sm.client == nil {
|
||||
return errors.New("missing stream client")
|
||||
}
|
||||
|
@ -72,7 +77,13 @@ func (sm *StreamManager) Start() error {
|
|||
}
|
||||
sm.client.SetHandler(handler)
|
||||
|
||||
return sm.connect()
|
||||
sm.wg.Add(1)
|
||||
if err := sm.connect(); err != nil {
|
||||
sm.wg.Done()
|
||||
return err
|
||||
}
|
||||
sm.wg.Wait()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stop cancels pending operations and terminates existing XMPP client.
|
||||
|
@ -80,6 +91,7 @@ func (sm *StreamManager) Stop() {
|
|||
// Remove on disconnect handler to avoid triggering reconnect
|
||||
sm.client.SetHandler(nil)
|
||||
sm.client.Disconnect()
|
||||
sm.wg.Done()
|
||||
}
|
||||
|
||||
// connect manages the reconnection loop and apply the define backoff to avoid overloading the server.
|
||||
|
|
Loading…
Reference in a new issue