Add basic support for keep-alive (#48)
Fix #35 This should also help with #8
This commit is contained in:
parent
2af9521036
commit
4d4710463d
79
client.go
79
client.go
|
@ -1,7 +1,6 @@
|
||||||
package xmpp // import "gosrc.io/xmpp"
|
package xmpp // import "gosrc.io/xmpp"
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"encoding/xml"
|
"encoding/xml"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
@ -155,8 +154,11 @@ func (c *Client) Connect() error {
|
||||||
// Do we need an option to avoid that or do we rely on client to send the presence itself ?
|
// Do we need an option to avoid that or do we rely on client to send the presence itself ?
|
||||||
fmt.Fprintf(c.Session.socketProxy, "<presence/>")
|
fmt.Fprintf(c.Session.socketProxy, "<presence/>")
|
||||||
|
|
||||||
|
// Start the keepalive go routine
|
||||||
|
keepaliveQuit := make(chan struct{})
|
||||||
|
go keepalive(c.conn, keepaliveQuit)
|
||||||
// Start the receiver go routine
|
// Start the receiver go routine
|
||||||
go c.recv()
|
go c.recv(keepaliveQuit)
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -178,28 +180,7 @@ func (c *Client) Recv() <-chan Packet {
|
||||||
return c.RecvChannel
|
return c.RecvChannel
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) recv() (err error) {
|
// Send marshals XMPP stanza and sends it to the server.
|
||||||
for {
|
|
||||||
val, err := next(c.Session.decoder)
|
|
||||||
if err != nil {
|
|
||||||
c.updateState(StateDisconnected)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Handle stream errors
|
|
||||||
switch packet := val.(type) {
|
|
||||||
case StreamError:
|
|
||||||
c.RecvChannel <- val
|
|
||||||
close(c.RecvChannel)
|
|
||||||
c.streamError(packet.Error.Local, packet.Text)
|
|
||||||
return errors.New("stream error: " + packet.Error.Local)
|
|
||||||
}
|
|
||||||
|
|
||||||
c.RecvChannel <- val
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Send marshalls XMPP stanza and sends it to the server.
|
|
||||||
func (c *Client) Send(packet Packet) error {
|
func (c *Client) Send(packet Packet) error {
|
||||||
data, err := xml.Marshal(packet)
|
data, err := xml.Marshal(packet)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -222,8 +203,50 @@ func (c *Client) SendRaw(packet string) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func xmlEscape(s string) string {
|
// ============================================================================
|
||||||
var b bytes.Buffer
|
// Go routines
|
||||||
xml.Escape(&b, []byte(s))
|
|
||||||
return b.String()
|
// Loop: Receive data from server
|
||||||
|
func (c *Client) recv(keepaliveQuit chan<- struct{}) (err error) {
|
||||||
|
for {
|
||||||
|
val, err := next(c.Session.decoder)
|
||||||
|
if err != nil {
|
||||||
|
close(keepaliveQuit)
|
||||||
|
c.updateState(StateDisconnected)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handle stream errors
|
||||||
|
switch packet := val.(type) {
|
||||||
|
case StreamError:
|
||||||
|
c.RecvChannel <- val
|
||||||
|
close(c.RecvChannel)
|
||||||
|
c.streamError(packet.Error.Local, packet.Text)
|
||||||
|
return errors.New("stream error: " + packet.Error.Local)
|
||||||
|
}
|
||||||
|
|
||||||
|
c.RecvChannel <- val
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Loop: send whitespace keepalive to server
|
||||||
|
// This is use to keep the connection open, but also to detect connection loss
|
||||||
|
// and trigger proper client connection shutdown.
|
||||||
|
func keepalive(conn net.Conn, quit <-chan struct{}) {
|
||||||
|
// TODO: Make keepalive interval configurable
|
||||||
|
ticker := time.NewTicker(30 * time.Second)
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ticker.C:
|
||||||
|
if n, err := fmt.Fprintf(conn, "\n"); err != nil || n != 1 {
|
||||||
|
// When keep alive fails, we force close the connection. In all cases, the recv will also fail.
|
||||||
|
ticker.Stop()
|
||||||
|
_ = conn.Close()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
case <-quit:
|
||||||
|
ticker.Stop()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue