diff --git a/backoff.go b/backoff.go
new file mode 100644
index 0000000..bbb399c
--- /dev/null
+++ b/backoff.go
@@ -0,0 +1,101 @@
+/*
+Interesting reference on backoff:
+- Exponential Backoff And Jitter (AWS Blog):
+ https://www.awsarchitectureblog.com/2015/03/backoff.html
+
+We use Jitter as a default for exponential backoff, as the goal of
+this module is not to provide precise 'ticks', but good behaviour to
+implement retries that are helping the server to recover faster in
+case of congestion.
+
+It can be used in several ways:
+- Using duration to get next sleep time.
+- Using ticker channel to trigger callback function on tick
+
+The functions for Backoff are not threadsafe, but you can:
+- Keep the attempt counter on your end and use DurationForAttempt(int)
+- Use lock in your own code to protect the Backoff structure.
+
+TODO: Implement Backoff Ticker channel
+TODO: Implement throttler interface. Throttler could be used to implement various reconnect strategies.
+*/
+
+package xmpp // import "gosrc.io/xmpp"
+
+import (
+ "math"
+ "math/rand"
+ "time"
+)
+
+const (
+ defaultBase int = 20 // Backoff base, in ms
+ defaultFactor int = 2
+ defaultCap int = 180000 // 3 minutes
+)
+
+// Backoff can provide increasing duration with the number of attempt
+// performed. The structure is used to support exponential backoff on
+// connection attempts to avoid hammering the server we are connecting
+// to.
+type Backoff struct {
+ NoJitter bool
+ Base int
+ Factor int
+ Cap int
+ lastDuration int
+ attempt int
+}
+
+// Duration returns the duration to apply to the current attempt.
+func (b *Backoff) Duration() time.Duration {
+ d := b.DurationForAttempt(b.attempt)
+ b.attempt++
+ return d
+}
+
+// Wait sleeps for backoff duration for current attempt.
+func (b *Backoff) Wait() {
+ time.Sleep(b.Duration())
+}
+
+// DurationForAttempt returns a duration for an attempt number, in a stateless way.
+func (b *Backoff) DurationForAttempt(attempt int) time.Duration {
+ b.setDefault()
+ expBackoff := math.Min(float64(b.Cap), float64(b.Base)*math.Pow(float64(b.Factor), float64(b.attempt)))
+ d := int(math.Trunc(expBackoff))
+ if !b.NoJitter {
+ d = rand.Intn(d)
+ }
+ return time.Duration(d) * time.Millisecond
+}
+
+// Reset sets back the number of attempts to 0. This is to be called after a successfull operation has been performed,
+// to reset the exponential backoff interval.
+func (b *Backoff) Reset() {
+ b.attempt = 0
+}
+
+func (b *Backoff) setDefault() {
+ if b.Base == 0 {
+ b.Base = defaultBase
+ }
+
+ if b.Cap == 0 {
+ b.Cap = defaultCap
+ }
+
+ if b.Factor == 0 {
+ b.Factor = defaultFactor
+ }
+}
+
+/*
+We use full jitter as default for now as it seems to provide good behaviour for reconnect.
+
+Base is the default interval between attempts (if backoff Factor was equal to 1)
+
+Attempt is the number of retry for operation. If we start attempt at 0, first sleep equals base.
+
+Cap is the maximum sleep time duration we tolerate between attempts
+*/
diff --git a/backoff_test.go b/backoff_test.go
new file mode 100644
index 0000000..9ef7ce0
--- /dev/null
+++ b/backoff_test.go
@@ -0,0 +1,24 @@
+package xmpp_test
+
+import (
+ "testing"
+ "time"
+
+ "gosrc.io/xmpp"
+)
+
+func TestDurationForAttempt_NoJitter(t *testing.T) {
+ b := xmpp.Backoff{Base: 25, NoJitter: true}
+ bInMS := time.Duration(b.Base) * time.Millisecond
+ if b.DurationForAttempt(0) != bInMS {
+ t.Errorf("incorrect default duration for attempt #0 (%d) = %d", b.DurationForAttempt(0)/time.Millisecond, bInMS/time.Millisecond)
+ }
+ var prevDuration, d time.Duration
+ for i := 0; i < 10; i++ {
+ d = b.DurationForAttempt(i)
+ if !(d >= prevDuration) {
+ t.Errorf("duration should be increasing between attempts. #%d (%d) > %d", i, d, prevDuration)
+ }
+ prevDuration = d
+ }
+}
diff --git a/client.go b/client.go
index f6f5be2..d269d7c 100644
--- a/client.go
+++ b/client.go
@@ -10,53 +10,61 @@ import (
"time"
)
-// Client Metrics
-// ============================================================================
+//=============================================================================
-type Metrics struct {
- startTime time.Time
- // ConnectTime returns the duration between client initiation of the TCP/IP
- // connection to the server and actual TCP/IP session establishment.
- // This time includes DNS resolution and can be slightly higher if the DNS
- // resolution result was not in cache.
- ConnectTime time.Duration
- // LoginTime returns the between client initiation of the TCP/IP
- // connection to the server and the return of the login result.
- // This includes ConnectTime, but also XMPP level protocol negociation
- // like starttls.
- LoginTime time.Duration
+// ConnState represents the current connection state.
+type ConnState = uint8
+
+// This is a the list of events happening on the connection that the
+// client can be notified about.
+const (
+ StateDisconnected ConnState = iota
+ StateConnected
+ StateSessionEstablished
+)
+
+// Event is a structure use to convey event changes related to client state. This
+// is for example used to notify the client when the client get disconnected.
+type Event struct {
+ State ConnState
+ Description string
}
-// initMetrics set metrics with default value and define the starting point
-// for duration calculation (connect time, login time, etc).
-func initMetrics() *Metrics {
- return &Metrics{
- startTime: time.Now(),
+// EventHandler is use to pass events about state of the connection to
+// client implementation.
+type EventHandler func(Event)
+
+type EventManager struct {
+ // Store current state
+ CurrentState ConnState
+
+ // Callback used to propagate connection state changes
+ Handler EventHandler
+}
+
+func (em EventManager) updateState(state ConnState) {
+ em.CurrentState = state
+ if em.Handler != nil {
+ em.Handler(Event{State: em.CurrentState})
}
}
-func (m *Metrics) setConnectTime() {
- m.ConnectTime = time.Since(m.startTime)
-}
-
-func (m *Metrics) setLoginTime() {
- m.LoginTime = time.Since(m.startTime)
-}
-
// Client
// ============================================================================
// Client is the main structure used to connect as a client on an XMPP
// server.
type Client struct {
- // Store user defined options
+ // Store user defined options and states
config Config
// Session gather data that can be accessed by users of this library
Session *Session
// TCP level connection / can be replaced by a TLS session after starttls
conn net.Conn
- // store low level metrics
- Metrics *Metrics
+ // Packet channel
+ RecvChannel chan interface{}
+ // Track and broadcast connection state
+ EventManager
}
/*
@@ -89,6 +97,10 @@ func NewClient(config Config) (c *Client, err error) {
if c.config.ConnectTimeout == 0 {
c.config.ConnectTimeout = 15 // 15 second as default
}
+
+ // Create a default channel that developer can override
+ c.RecvChannel = make(chan interface{})
+
return
}
@@ -112,59 +124,55 @@ func checkAddress(addr string) (string, error) {
// Connect triggers actual TCP connection, based on previously defined parameters.
func (c *Client) Connect() (*Session, error) {
- var tcpconn net.Conn
var err error
- // TODO: Refactor = abstract retry loop in capped exponential back-off function
- var try = 0
- var success bool
- c.Metrics = initMetrics()
- for try <= c.config.Retry && !success {
- if tcpconn, err = net.DialTimeout("tcp", c.config.Address, time.Duration(c.config.ConnectTimeout)*time.Second); err == nil {
- c.Metrics.setConnectTime()
- success = true
- }
- try++
- }
+ c.conn, err = net.DialTimeout("tcp", c.config.Address, time.Duration(c.config.ConnectTimeout)*time.Second)
if err != nil {
return nil, err
}
+ c.updateState(StateConnected)
// Connection is ok, we now open XMPP session
- c.conn = tcpconn
if c.conn, c.Session, err = NewSession(c.conn, c.config); err != nil {
return c.Session, err
}
+ c.updateState(StateSessionEstablished)
- c.Metrics.setLoginTime()
// We're connected and can now receive and send messages.
//fmt.Fprintf(client.conn, "%s%s", "chat", "Online")
// TODO: Do we always want to send initial presence automatically ?
// Do we need an option to avoid that or do we rely on client to send the presence itself ?
fmt.Fprintf(c.Session.socketProxy, "")
+ // Start the receiver go routine
+ go c.recv()
+
return c.Session, err
}
-func (c *Client) recv(receiver chan<- interface{}) (err error) {
+func (c *Client) Disconnect() {
+ _ = c.SendRaw("")
+ // TODO: Add a way to wait for stream close acknowledgement from the server for clean disconnect
+ _ = c.conn.Close()
+}
+
+func (c *Client) recv() (err error) {
for {
val, err := next(c.Session.decoder)
if err != nil {
- close(receiver)
+ c.updateState(StateDisconnected)
return err
}
- receiver <- val
+ c.RecvChannel <- val
val = nil
}
- panic("unreachable")
}
// Recv abstracts receiving preparsed XMPP packets from a channel.
// Channel allow client to receive / dispatch packets in for range loop.
+// TODO: Deprecate this function in favor of reading directly from the RecvChannel
func (c *Client) Recv() <-chan interface{} {
- ch := make(chan interface{})
- go c.recv(ch)
- return ch
+ return c.RecvChannel
}
// Send marshalls XMPP stanza and sends it to the server.
@@ -185,8 +193,9 @@ func (c *Client) Send(packet Packet) error {
// disconnect the client. It is up to the user of this method to
// carefully craft the XML content to produce valid XMPP.
func (c *Client) SendRaw(packet string) error {
- fmt.Fprintf(c.Session.socketProxy, packet) // TODO handle errors
- return nil
+ var err error
+ _, err = fmt.Fprintf(c.Session.socketProxy, packet)
+ return err
}
func xmlEscape(s string) string {
diff --git a/client_manager.go b/client_manager.go
new file mode 100644
index 0000000..a61d8db
--- /dev/null
+++ b/client_manager.go
@@ -0,0 +1,106 @@
+package xmpp // import "gosrc.io/xmpp"
+
+import (
+ "log"
+ "time"
+)
+
+type PostConnect func(c *Client)
+
+// ClientManager supervises an XMPP client connection. Its role is to handle connection events and
+// apply reconnection strategy.
+type ClientManager struct {
+ Client *Client
+ Session *Session
+ PostConnect PostConnect
+
+ // Store low level metrics
+ Metrics *Metrics
+}
+
+// NewClientManager creates a new client manager structure, intended to support
+// handling XMPP client state event changes and auto-trigger reconnection
+// based on ClientManager configuration.
+func NewClientManager(client *Client, pc PostConnect) *ClientManager {
+ return &ClientManager{
+ Client: client,
+ PostConnect: pc,
+ }
+}
+
+// Start launch the connection loop
+func (cm *ClientManager) Start() {
+ cm.Client.Handler = func(e Event) {
+ switch e.State {
+ case StateConnected:
+ cm.Metrics.setConnectTime()
+ case StateSessionEstablished:
+ cm.Metrics.setLoginTime()
+ case StateDisconnected:
+ // Reconnect on disconnection
+ cm.connect()
+ }
+ }
+ cm.connect()
+}
+
+// Stop cancels pending operations and terminates existing XMPP client.
+func (cm *ClientManager) Stop() {
+ // Remove on disconnect handler to avoid triggering reconnect
+ cm.Client.Handler = nil
+ cm.Client.Disconnect()
+}
+
+// connect manages the reconnection loop and apply the define backoff to avoid overloading the server.
+func (cm *ClientManager) connect() {
+ var backoff Backoff // TODO: Group backoff calculation features with connection manager?
+
+ for {
+ var err error
+ cm.Metrics = initMetrics()
+
+ if cm.Client.Session, err = cm.Client.Connect(); err != nil {
+ log.Printf("Connection error: %v\n", err)
+ backoff.Wait()
+ } else {
+ break
+ }
+ }
+
+ if cm.PostConnect != nil {
+ cm.PostConnect(cm.Client)
+ }
+}
+
+// Client Metrics
+// ============================================================================
+
+type Metrics struct {
+ startTime time.Time
+ // ConnectTime returns the duration between client initiation of the TCP/IP
+ // connection to the server and actual TCP/IP session establishment.
+ // This time includes DNS resolution and can be slightly higher if the DNS
+ // resolution result was not in cache.
+ ConnectTime time.Duration
+ // LoginTime returns the between client initiation of the TCP/IP
+ // connection to the server and the return of the login result.
+ // This includes ConnectTime, but also XMPP level protocol negociation
+ // like starttls.
+ LoginTime time.Duration
+}
+
+// initMetrics set metrics with default value and define the starting point
+// for duration calculation (connect time, login time, etc).
+func initMetrics() *Metrics {
+ return &Metrics{
+ startTime: time.Now(),
+ }
+}
+
+func (m *Metrics) setConnectTime() {
+ m.ConnectTime = time.Since(m.startTime)
+}
+
+func (m *Metrics) setLoginTime() {
+ m.LoginTime = time.Since(m.startTime)
+}
diff --git a/cmd/xmpp_echo/xmpp_echo.go b/cmd/xmpp_echo/xmpp_echo.go
index ab0a367..c132ffa 100644
--- a/cmd/xmpp_echo/xmpp_echo.go
+++ b/cmd/xmpp_echo/xmpp_echo.go
@@ -26,17 +26,14 @@ func main() {
log.Fatal("Error: ", err)
}
- session, err := client.Connect()
- if err != nil {
- log.Fatal("Error: ", err)
- }
-
- fmt.Println("Stream opened, we have streamID = ", session.StreamId)
+ cm := xmpp.NewClientManager(client, nil)
+ cm.Start()
+ // connection can be stopped with cm.Stop().
// Iterator to receive packets coming from our XMPP connection
for packet := range client.Recv() {
switch packet := packet.(type) {
- case *xmpp.Message:
+ case xmpp.Message:
_, _ = fmt.Fprintf(os.Stdout, "Body = %s - from = %s\n", packet.Body, packet.From)
reply := xmpp.Message{PacketAttrs: xmpp.PacketAttrs{To: packet.From}, Body: packet.Body}
_ = client.Send(reply)
@@ -47,6 +44,4 @@ func main() {
}
// TODO create default command line client to send message or to send an arbitrary XMPP sequence from a file,
-// (using templates ?)
-
-// TODO: autoreconnect when connection is lost
+// (using templates ?)
diff --git a/cmd/xmpp_jukebox/xmpp_jukebox.go b/cmd/xmpp_jukebox/xmpp_jukebox.go
index a60e164..b020399 100644
--- a/cmd/xmpp_jukebox/xmpp_jukebox.go
+++ b/cmd/xmpp_jukebox/xmpp_jukebox.go
@@ -101,8 +101,7 @@ func playSCURL(p *mpg123.Player, rawURL string) {
func connectXmpp(jid string, password string, address string) (client *xmpp.Client, err error) {
xmppConfig := xmpp.Config{Address: address,
- Jid: jid, Password: password, PacketLogger: os.Stdout, Insecure: true,
- Retry: 10}
+ Jid: jid, Password: password, PacketLogger: os.Stdout, Insecure: true}
if client, err = xmpp.NewClient(xmppConfig); err != nil {
return
diff --git a/config.go b/config.go
index e49ab5b..79770b1 100644
--- a/config.go
+++ b/config.go
@@ -12,7 +12,6 @@ type Config struct {
Password string
PacketLogger *os.File // Used for debugging
Lang string // TODO: should default to 'en'
- Retry int // Number of retries for connect
ConnectTimeout int // Connection timeout in seconds. Default to 15
// Insecure can be set to true to allow to open a session without TLS. If TLS
// is supported on the server, we will still try to use it.
diff --git a/parser.go b/parser.go
index 0ad24fc..ddcd526 100644
--- a/parser.go
+++ b/parser.go
@@ -39,7 +39,6 @@ func initDecoder(p *xml.Decoder) (sessionID string, err error) {
return
}
}
- panic("unreachable")
}
// Scan XML token stream to find next StartElement.
@@ -47,7 +46,7 @@ func nextStart(p *xml.Decoder) (xml.StartElement, error) {
for {
t, err := p.Token()
if err == io.EOF {
- return xml.StartElement{}, nil
+ return xml.StartElement{}, errors.New("connection closed")
}
if err != nil {
return xml.StartElement{}, fmt.Errorf("nextStart %s", err)
@@ -57,7 +56,6 @@ func nextStart(p *xml.Decoder) (xml.StartElement, error) {
return t, nil
}
}
- panic("unreachable")
}
// next scans XML token stream for next element and then assign a structure to decode