152 lines
3.3 KiB
Go
152 lines
3.3 KiB
Go
package xmpp
|
|
|
|
import (
|
|
"github.com/pkg/errors"
|
|
"time"
|
|
|
|
"dev.narayana.im/narayana/telegabber/config"
|
|
"dev.narayana.im/narayana/telegabber/persistence"
|
|
"dev.narayana.im/narayana/telegabber/telegram"
|
|
"dev.narayana.im/narayana/telegabber/xmpp/gateway"
|
|
|
|
log "github.com/sirupsen/logrus"
|
|
"gosrc.io/xmpp"
|
|
"gosrc.io/xmpp/stanza"
|
|
)
|
|
|
|
var tgConf config.TelegramConfig
|
|
var sessions map[string]*telegram.Client
|
|
var db *persistence.SessionsYamlDB
|
|
|
|
// NewComponent starts a new component and wraps it in
|
|
// a stream manager that you should start yourself
|
|
func NewComponent(conf config.XMPPConfig, tc config.TelegramConfig) (*xmpp.StreamManager, *xmpp.Component, error) {
|
|
var err error
|
|
|
|
gateway.Jid, err = stanza.NewJid(conf.Jid)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
tgConf = tc
|
|
|
|
options := xmpp.ComponentOptions{
|
|
TransportConfiguration: xmpp.TransportConfiguration{
|
|
Address: conf.Host + ":" + conf.Port,
|
|
Domain: conf.Jid,
|
|
},
|
|
Domain: conf.Jid,
|
|
Secret: conf.Password,
|
|
Name: "telegabber",
|
|
}
|
|
|
|
router := xmpp.NewRouter()
|
|
router.HandleFunc("iq", HandleIq)
|
|
router.HandleFunc("presence", HandlePresence)
|
|
router.HandleFunc("message", HandleMessage)
|
|
|
|
component, err := xmpp.NewComponent(options, router, func(err error) {
|
|
log.Error(err)
|
|
})
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
// probe all known sessions
|
|
err = loadSessions(conf.Db, component)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
sm := xmpp.NewStreamManager(component, func(s xmpp.Sender) {
|
|
go heartbeat(component)
|
|
})
|
|
|
|
return sm, component, nil
|
|
}
|
|
|
|
func heartbeat(component *xmpp.Component) {
|
|
var err error
|
|
probeType := gateway.SPType("probe")
|
|
|
|
for jid := range sessions {
|
|
err = gateway.SendPresence(component, jid, probeType)
|
|
if err != nil {
|
|
log.Error(err)
|
|
}
|
|
}
|
|
|
|
log.Info("Starting heartbeat queue")
|
|
|
|
// status updater thread
|
|
for {
|
|
time.Sleep(60e9)
|
|
for key, presence := range gateway.Queue {
|
|
err = gateway.ResumableSend(component, presence)
|
|
if err != nil {
|
|
gateway.LogBadPresence(presence)
|
|
} else {
|
|
delete(gateway.Queue, key)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func loadSessions(dbPath string, component *xmpp.Component) error {
|
|
var err error
|
|
|
|
sessions = make(map[string]*telegram.Client)
|
|
|
|
db, err = persistence.LoadSessions(dbPath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
db.Transaction(func() bool {
|
|
for jid, session := range db.Data.Sessions {
|
|
getTelegramInstance(jid, &session, component)
|
|
}
|
|
|
|
return false
|
|
}, persistence.SessionMarshaller)
|
|
|
|
return nil
|
|
}
|
|
|
|
func getTelegramInstance(jid string, savedSession *persistence.Session, component *xmpp.Component) (*telegram.Client, bool) {
|
|
var err error
|
|
session, ok := sessions[jid]
|
|
if !ok {
|
|
session, err = telegram.NewClient(tgConf, jid, component, savedSession)
|
|
if err != nil {
|
|
log.Error(errors.Wrap(err, "TDlib initialization failure"))
|
|
return session, false
|
|
}
|
|
sessions[jid] = session
|
|
}
|
|
|
|
return session, true
|
|
}
|
|
|
|
// Close gracefully terminates the component and saves active sessions
|
|
func Close(component *xmpp.Component) {
|
|
log.Error("Disconnecting...")
|
|
|
|
// close all sessions
|
|
for _, session := range sessions {
|
|
session.Disconnect("", true)
|
|
}
|
|
|
|
// save sessions
|
|
db.Transaction(func() bool {
|
|
for jid, session := range sessions {
|
|
db.Data.Sessions[jid] = *session.Session
|
|
}
|
|
|
|
return true
|
|
}, persistence.SessionMarshaller)
|
|
|
|
// close stream
|
|
component.Disconnect()
|
|
}
|