Save sessions on exit
This commit is contained in:
parent
a09817976e
commit
f0c0d0ba94
|
@ -6,6 +6,7 @@ import (
|
||||||
|
|
||||||
"dev.narayana.im/narayana/telegabber/yamldb"
|
"dev.narayana.im/narayana/telegabber/yamldb"
|
||||||
|
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
"gopkg.in/yaml.v2"
|
"gopkg.in/yaml.v2"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -44,6 +45,10 @@ func LoadSessions(path string) (SessionsYamlDB, error) {
|
||||||
return sessionDB, nil
|
return sessionDB, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func emptySessionsMap(dataPtr *SessionsMap) {
|
||||||
|
dataPtr.Sessions = make(map[string]Session)
|
||||||
|
}
|
||||||
|
|
||||||
func initYamlDB(path string, dataPtr *SessionsMap) (SessionsYamlDB, error) {
|
func initYamlDB(path string, dataPtr *SessionsMap) (SessionsYamlDB, error) {
|
||||||
file, err := ioutil.ReadFile(path)
|
file, err := ioutil.ReadFile(path)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
@ -51,9 +56,14 @@ func initYamlDB(path string, dataPtr *SessionsMap) (SessionsYamlDB, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return SessionsYamlDB{}, errors.Wrap(err, "YamlDB is corrupted")
|
return SessionsYamlDB{}, errors.Wrap(err, "YamlDB is corrupted")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if dataPtr.Sessions == nil {
|
||||||
|
emptySessionsMap(dataPtr)
|
||||||
|
}
|
||||||
|
log.Debugf("Unmarshalled YAML: %#v", *dataPtr)
|
||||||
} else {
|
} else {
|
||||||
// DB file does not exist, create an empty DB
|
// DB file does not exist, create an empty DB
|
||||||
dataPtr.Sessions = make(map[string]Session)
|
emptySessionsMap(dataPtr)
|
||||||
}
|
}
|
||||||
|
|
||||||
return SessionsYamlDB{
|
return SessionsYamlDB{
|
||||||
|
|
|
@ -1,10 +1,14 @@
|
||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"os"
|
||||||
|
"os/signal"
|
||||||
|
|
||||||
"dev.narayana.im/narayana/telegabber/config"
|
"dev.narayana.im/narayana/telegabber/config"
|
||||||
"dev.narayana.im/narayana/telegabber/xmpp"
|
"dev.narayana.im/narayana/telegabber/xmpp"
|
||||||
|
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
|
goxmpp "gosrc.io/xmpp"
|
||||||
)
|
)
|
||||||
|
|
||||||
// YAML config, compatible with the format of Zhabogram 2.0.0
|
// YAML config, compatible with the format of Zhabogram 2.0.0
|
||||||
|
@ -13,7 +17,18 @@ const configPath string = "config.yml"
|
||||||
// JSON schema (not for editing by a user)
|
// JSON schema (not for editing by a user)
|
||||||
const schemaPath string = "./config_schema.json"
|
const schemaPath string = "./config_schema.json"
|
||||||
|
|
||||||
|
var sm *goxmpp.StreamManager
|
||||||
|
var component *goxmpp.Component
|
||||||
|
var err error
|
||||||
|
|
||||||
|
var cleanupDone chan struct{}
|
||||||
|
var sigintChannel chan os.Signal
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
|
cleanupDone = make(chan struct{})
|
||||||
|
sigintChannel = make(chan os.Signal, 1)
|
||||||
|
signal.Notify(sigintChannel, os.Interrupt)
|
||||||
|
|
||||||
config, err := config.ReadConfig(configPath, schemaPath)
|
config, err := config.ReadConfig(configPath, schemaPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
|
@ -21,11 +36,30 @@ func main() {
|
||||||
|
|
||||||
SetLogrusLevel(config.XMPP.Loglevel)
|
SetLogrusLevel(config.XMPP.Loglevel)
|
||||||
|
|
||||||
cm, err := xmpp.NewComponent(config.XMPP, config.Telegram)
|
sm, component, err = xmpp.NewComponent(config.XMPP, config.Telegram)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
<-sigintChannel
|
||||||
|
log.Error("Interrupting...")
|
||||||
|
exit()
|
||||||
|
|
||||||
|
os.Exit(0)
|
||||||
|
}()
|
||||||
|
|
||||||
// reconnect automatically
|
// reconnect automatically
|
||||||
log.Fatal(cm.Run())
|
err = sm.Run()
|
||||||
|
exit()
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
<-cleanupDone
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func exit() {
|
||||||
|
xmpp.Close(component)
|
||||||
|
close(cleanupDone)
|
||||||
}
|
}
|
||||||
|
|
|
@ -36,7 +36,7 @@ type Client struct {
|
||||||
client *client.Client
|
client *client.Client
|
||||||
jid string
|
jid string
|
||||||
parameters *client.TdlibParameters
|
parameters *client.TdlibParameters
|
||||||
session *persistence.Session
|
Session *persistence.Session
|
||||||
online bool
|
online bool
|
||||||
logVerbosity client.Option
|
logVerbosity client.Option
|
||||||
}
|
}
|
||||||
|
@ -78,7 +78,7 @@ func NewClient(conf config.TelegramConfig, jid string, session *persistence.Sess
|
||||||
return Client{
|
return Client{
|
||||||
parameters: ¶meters,
|
parameters: ¶meters,
|
||||||
jid: jid,
|
jid: jid,
|
||||||
session: session,
|
Session: session,
|
||||||
logVerbosity: logVerbosity,
|
logVerbosity: logVerbosity,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,19 +25,19 @@ var db persistence.SessionsYamlDB
|
||||||
|
|
||||||
// NewComponent starts a new component and wraps it in
|
// NewComponent starts a new component and wraps it in
|
||||||
// a stream manager that you should start yourself
|
// a stream manager that you should start yourself
|
||||||
func NewComponent(conf config.XMPPConfig, tc config.TelegramConfig) (*xmpp.StreamManager, error) {
|
func NewComponent(conf config.XMPPConfig, tc config.TelegramConfig) (*xmpp.StreamManager, *xmpp.Component, error) {
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
jid, err = xmpp.NewJid(conf.Jid)
|
jid, err = xmpp.NewJid(conf.Jid)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
tgConf = tc
|
tgConf = tc
|
||||||
|
|
||||||
err = loadSessions(conf.Db)
|
err = loadSessions(conf.Db)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
options := xmpp.ComponentOptions{
|
options := xmpp.ComponentOptions{
|
||||||
|
@ -54,14 +54,14 @@ func NewComponent(conf config.XMPPConfig, tc config.TelegramConfig) (*xmpp.Strea
|
||||||
|
|
||||||
component, err := xmpp.NewComponent(options, router)
|
component, err := xmpp.NewComponent(options, router)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
cm := xmpp.NewStreamManager(component, nil)
|
sm := xmpp.NewStreamManager(component, nil)
|
||||||
|
|
||||||
go heartbeat(component)
|
go heartbeat(component)
|
||||||
|
|
||||||
return cm, nil
|
return sm, component, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func logPresence(err error, presence *stanza.Presence) {
|
func logPresence(err error, presence *stanza.Presence) {
|
||||||
|
@ -228,3 +228,22 @@ func sendPresence(component *xmpp.Component, to string, args ...args.V) error {
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Close gracefully terminates the component and saves active sessions
|
||||||
|
func Close(component *xmpp.Component) {
|
||||||
|
log.Error("Disconnecting...")
|
||||||
|
|
||||||
|
for _, session := range sessions {
|
||||||
|
session.Disconnect()
|
||||||
|
}
|
||||||
|
|
||||||
|
db.Transaction(func() bool {
|
||||||
|
for jid, session := range sessions {
|
||||||
|
db.Data.Sessions[jid] = *session.Session
|
||||||
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
|
}, persistence.SessionMarshaller)
|
||||||
|
|
||||||
|
component.Disconnect()
|
||||||
|
}
|
||||||
|
|
|
@ -3,6 +3,8 @@ package xmpp
|
||||||
import (
|
import (
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
|
|
||||||
|
"dev.narayana.im/narayana/telegabber/persistence"
|
||||||
|
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
"gosrc.io/xmpp"
|
"gosrc.io/xmpp"
|
||||||
"gosrc.io/xmpp/stanza"
|
"gosrc.io/xmpp/stanza"
|
||||||
|
@ -88,7 +90,7 @@ func handlePresence(s xmpp.Sender, p stanza.Presence) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
bareFromJid := fromJid.Bare()
|
bareFromJid := fromJid.Bare()
|
||||||
session, ok := getTelegramInstance(bareFromJid, nil)
|
session, ok := getTelegramInstance(bareFromJid, &persistence.Session{})
|
||||||
if !ok {
|
if !ok {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,13 +30,16 @@ func (db *YamlDB) Transaction(callback func() bool, marshaller func() ([]byte, e
|
||||||
|
|
||||||
if isDataChanged {
|
if isDataChanged {
|
||||||
yamlData, err := marshaller()
|
yamlData, err := marshaller()
|
||||||
|
log.Debugf("Marshalled YAML: %#v", string(yamlData))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "Data marshalling error")
|
return errors.Wrap(err, "Data marshalling error")
|
||||||
}
|
}
|
||||||
|
|
||||||
err = ioutil.WriteFile(db.PathNew, yamlData, 0644)
|
err = ioutil.WriteFile(db.PathNew, yamlData, 0644)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "YamlDB write failure")
|
return errors.Wrap(err, "YamlDB write failure")
|
||||||
}
|
}
|
||||||
|
|
||||||
err = os.Rename(db.PathNew, db.Path)
|
err = os.Rename(db.PathNew, db.Path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "Couldn't rewrite an old YamlDB file")
|
return errors.Wrap(err, "Couldn't rewrite an old YamlDB file")
|
||||||
|
|
Loading…
Reference in a new issue