package xmpp import ( "encoding/xml" "github.com/pkg/errors" "time" "dev.narayana.im/narayana/telegabber/config" "dev.narayana.im/narayana/telegabber/persistence" "dev.narayana.im/narayana/telegabber/telegram" "dev.narayana.im/narayana/telegabber/xmpp/gateway" log "github.com/sirupsen/logrus" "github.com/soheilhy/args" "gosrc.io/xmpp" "gosrc.io/xmpp/stanza" ) const pollingInterval time.Duration = 1e7 var tgConf config.TelegramConfig var sessions map[string]*telegram.Client var queue map[string]*stanza.Presence var db persistence.SessionsYamlDB // NewComponent starts a new component and wraps it in // a stream manager that you should start yourself func NewComponent(conf config.XMPPConfig, tc config.TelegramConfig) (*xmpp.StreamManager, *xmpp.Component, error) { var err error gateway.Jid, err = xmpp.NewJid(conf.Jid) if err != nil { return nil, nil, err } tgConf = tc options := xmpp.ComponentOptions{ Address: conf.Host + ":" + conf.Port, Domain: conf.Jid, Secret: conf.Password, Name: "telegabber", } router := xmpp.NewRouter() router.HandleFunc("iq", HandleIq) router.HandleFunc("presence", HandlePresence) router.HandleFunc("message", HandleMessage) component, err := xmpp.NewComponent(options, router) if err != nil { return nil, nil, err } err = loadSessions(conf.Db, component) if err != nil { return nil, nil, err } sm := xmpp.NewStreamManager(component, nil) go heartbeat(component) return sm, component, nil } func logPresence(err error, presence *stanza.Presence) { log.WithFields(log.Fields{ "presence": *presence, }).Error(errors.Wrap(err, "Couldn't send presence")) } func heartbeat(component *xmpp.Component) { var err error probeType := SPType("probe") for jid := range sessions { for { err = sendPresence(component, jid, probeType) if err == nil { break } time.Sleep(pollingInterval) } } log.Info("Starting heartbeat queue") for { for key, presence := range queue { err = component.Send(presence) if err != nil { logPresence(err, presence) } else { delete(queue, key) } } time.Sleep(60e9) } } func loadSessions(dbPath string, component *xmpp.Component) error { var err error sessions = make(map[string]*telegram.Client) db, err = persistence.LoadSessions(dbPath) if err != nil { return err } db.Transaction(func() bool { for jid, session := range db.Data.Sessions { getTelegramInstance(jid, &session, component) } return false }, persistence.SessionMarshaller) return nil } func getTelegramInstance(jid string, savedSession *persistence.Session, component *xmpp.Component) (*telegram.Client, bool) { var err error session, ok := sessions[jid] if !ok { session, err = telegram.NewClient(tgConf, jid, component, savedSession) if err != nil { log.Error(errors.Wrap(err, "TDlib initialization failure")) return session, false } sessions[jid] = session } return session, true } // SPFrom is a Telegram user id var SPFrom = args.NewString() // SPType is a presence type var SPType = args.NewString() // SPShow is a availability status var SPShow = args.NewString() // SPStatus is a verbose status var SPStatus = args.NewString() // SPNickname is a XEP-0172 nickname var SPNickname = args.NewString() // SPPhoto is a XEP-0153 hash of avatar in vCard var SPPhoto = args.NewString() // SPImmed skips queueing var SPImmed = args.NewBool(args.Default(true)) func newPresence(bareJid string, to string, args ...args.V) stanza.Presence { var presenceFrom string if SPFrom.IsSet(args) { presenceFrom = SPFrom.Get(args) + "@" + bareJid } else { presenceFrom = bareJid } presence := stanza.Presence{Attrs: stanza.Attrs{ From: presenceFrom, To: to, }} if SPType.IsSet(args) { presence.Attrs.Type = stanza.StanzaType(SPType.Get(args)) } if SPShow.IsSet(args) { presence.Show = stanza.PresenceShow(SPShow.Get(args)) } if SPStatus.IsSet(args) { presence.Status = SPStatus.Get(args) } if SPNickname.IsSet(args) { presence.Extensions = append(presence.Extensions, PresenceNickExtension{ Text: SPNickname.Get(args), }) } if SPPhoto.IsSet(args) { presence.Extensions = append(presence.Extensions, PresenceXVCardUpdateExtension{ Photo: PresenceXVCardUpdatePhoto{ Text: SPPhoto.Get(args), }, }) } return presence } func sendPresence(component *xmpp.Component, to string, args ...args.V) error { var logFrom string bareJid := gateway.Jid.Bare() if SPFrom.IsSet(args) { logFrom = SPFrom.Get(args) } else { logFrom = bareJid } log.WithFields(log.Fields{ "type": SPType.Get(args), "from": logFrom, "to": to, }).Info("Got presence") presence := newPresence(bareJid, to, args...) // explicit check, as marshalling is expensive if log.GetLevel() == log.DebugLevel { xmlPresence, err := xml.Marshal(presence) if err == nil { log.Debug(string(xmlPresence)) } else { log.Debugf("%#v", presence) } } immed := SPImmed.Get(args) if immed { err := component.Send(presence) if err != nil { logPresence(err, &presence) return err } } else { queue[presence.From+presence.To] = &presence } return nil } // Close gracefully terminates the component and saves active sessions func Close(component *xmpp.Component) { log.Error("Disconnecting...") for _, session := range sessions { session.Disconnect() } db.Transaction(func() bool { for jid, session := range sessions { db.Data.Sessions[jid] = *session.Session } return true }, persistence.SessionMarshaller) component.Disconnect() }