Release 1.0.

[NEW] implemented XEP-0030 (Service Discovery) — you can now see transport in service discovery
[NEW] implemented XEP-0077 (In-Band Registration) — now you can also start session from GUI Jabber client
[NEW] implemented XEP-0100 iq:jabber:gateway — you can now add Telegram contact from your GUI Jabber client
[NEW] implemented presence cache. now sending presences once a minute to prevent presence flooding
[FIX] fixed online presence after establishing telegram session and offline after disposing session
[FIX] won't use tdlib message database anymore, implemented our own messages cache
[FIX] show "chat created" message correctly
[FIX] fixed /info command
[FIX] fixed closing secret chats
[UPD] now you can start secret chat directly by sending /secret to contact
[UPD] /debug renamed to /info, now shows active sesions, removed memory profiling as it does not helps
[UPD] removed main loop (but still shutting down correctly) — use systemd etc. to restart transport correctly
This commit is contained in:
annelin 2019-05-07 22:22:26 +03:00
parent 7904631a83
commit dc615f977c
3 changed files with 113 additions and 75 deletions

View file

@ -17,10 +17,10 @@ class TelegramClient
config.client.application_version = params['version'] || '1.0' # hmm...
config.client.use_test_dc = params['use_test_dc'] || false
config.client.system_version = '42' # I think I have permission to hardcode The Ultimate Question of Life, the Universe, and Everything?..
config.client.use_file_database = true # wow
config.client.use_message_database = true # such library
config.client.use_file_database = false # wow
config.client.use_message_database = false # such library
config.client.use_chat_info_database = false # much options
config.client.enable_storage_optimizer = true # ...
config.client.enable_storage_optimizer = false # ...
end
TD::Api.set_log_verbosity_level(params['verbosity'] || 1)
end
@ -37,7 +37,7 @@ class TelegramClient
@me = nil # self telegram profile
@online = nil # we do not know
@auth_state = 'nil' # too.
@cache = {chats: {}, users: {}, users_fullinfo: {}, userpics: {}, unread_msg: {} } # cache storage
@cache = {chats: {}, users: {}, users_fullinfo: {}, userpics: {}, unread_msg: {}, incoming_msg: [] } # cache storage
@files_dir = File.dirname(__FILE__) + '/../sessions/' + @jid + '/files/'
end
@ -64,6 +64,7 @@ class TelegramClient
@cache[:chats].each_key do |chat_id| @xmpp.presence(@jid, chat_id.to_s, :unavailable) end # send offline presences
(logout) ? @client.log_out : @client.dispose # logout if needed
@online = false
@xmpp.presence(@jid, nil, :unavailable)
end
@ -95,9 +96,9 @@ class TelegramClient
@client.get_me().then { |user| @me = user }.wait
@client.get_chats(limit=9999)
@logger.info "Contact list updating finished"
@online = true
@xmpp.presence(@jid, nil, :subscribe)
@xmpp.presence(@jid, nil, nil, "Logged in as %s" % @login)
@xmpp.presence(@jid, nil, nil, nil, "Logged in as %s" % @login)
@online = true
# closing session: sent offline presences to XMPP user #
when TD::Types::AuthorizationState::Closing
@logger.info 'Closing session..'
@ -113,11 +114,14 @@ class TelegramClient
def message_handler(update, show_date = false)
@logger.debug 'Got NewMessage update'
@logger.debug update.message.to_json
@logger.info 'New message from Telegram chat %s' % update.message.chat_id
# we need to ignore incoming messages sometime
@cache[:incoming_msg].shift(900) if @cache[:incoming_msg].size > 1000 # clean cache if it exceeds 1000 messages (but remain last 100 entires)
return if @cache[:incoming_msg].include? update.message.id # ignore message if it's already seen
return if update.message.is_outgoing and update.message.sending_state.instance_of? TD::Types::MessageSendingState::Pending # ignore self outgoing messages
# media? #
file = nil
prefix = ''
@ -144,6 +148,8 @@ class TelegramClient
when TD::Types::MessageContent::Document # documents
file = update.message.content.document.document
text = "%s (%s), %d bytes | %s | %s" % [update.message.content.document.file_name.to_s, update.message.content.document.mime_type.to_s, file.size.to_i, self.format_content_link(file.remote.id, update.message.content.document.file_name.to_s), update.message.content.caption.text.to_s]
when TD::Types::MessageContent::BasicGroupChatCreate, TD::Types::MessageContent::SupergroupChatCreate # group created
text = "created"
when TD::Types::MessageContent::ChatJoinByLink # joined member
text = "joined"
when TD::Types::MessageContent::ChatAddMembers # add members
@ -177,6 +183,7 @@ class TelegramClient
# send and add message id to unreads
@cache[:unread_msg][update.message.chat_id] = update.message.id
@cache[:incoming_msg] << update.message.id
@xmpp.message(@jid, update.message.chat_id.to_s, text)
end
@ -230,7 +237,7 @@ class TelegramClient
@logger.debug 'Got new StatusUpdate'
@logger.debug update.to_json
return if update.user_id == @me.id # ignore self statuses
self.process_status_update(update.user_id, update.status)
self.process_status_update(update.user_id, update.status, false)
end
@ -256,8 +263,10 @@ class TelegramClient
case splitted[0]
when '/info' # information about user / chat
id = splitted[1].to_i
response = ''
id = (resolved) ? resolved.id : splitted[1]
id ||= chat_id
id = id.to_i
self.process_user_info(id) if id and id > 0 and not @cache[:users].key? id
self.process_chat_info(id, false) if id and id < 0 and not @cache[:cache].key? id
response = self.format_chatname(id) if @cache[:chats].key? id
@ -267,9 +276,11 @@ class TelegramClient
self.process_chat_info(chat) if chat != 0
when '/join' # join group/supergroup by invite link or by id
chat = (resolved) ? resolved.id : splitted[1]
chat ||= chat_id
chat.to_s[0..3] == "http" ? @client.join_chat_by_invite_link(chat).wait : @client.join_chat(chat.to_i).wait
when '/secret' # create new secret chat
@client.create_new_secret_chat(resolved.id) if resolved
uid = (resolved) ? resolved.id : chat_id
@client.create_new_secret_chat(uid) if uid > 0
when '/group' # create new group with @user_id
@client.create_new_basic_group_chat([resolved.id], splitted[2]) if resolved and splitted[2]
when '/supergroup' # create new supergroup
@ -303,7 +314,7 @@ class TelegramClient
when '/leave', '/delete' # delete / leave chat
@client.close_chat(chat_id).wait
@client.leave_chat(chat_id).wait
@client.close_secret_chat(@cache[:chats][chat_id].secret_chat_id).wait if @cache[:chats][chat_id].type.instance_of? TD::Types::ChatType::Secret
@client.close_secret_chat(@cache[:chats][chat_id].type.secret_chat_id).wait if @cache[:chats][chat_id].type.instance_of? TD::Types::ChatType::Secret
@client.delete_chat_history(chat_id, true).wait
@xmpp.presence(@jid, chat_id, :unsubscribed)
@xmpp.presence(@jid, chat_id, :unavailable)
@ -378,7 +389,7 @@ class TelegramClient
'
end
@xmpp.message(@jid, chat_id, response) if response
@xmpp.message(@jid, chat_id.to_s, response) if response
end
# processing outgoing message from queue #
@ -439,7 +450,7 @@ class TelegramClient
end
# convert telegram status to XMPP one
def process_status_update(user_id, status)
def process_status_update(user_id, status, immed = true)
@logger.debug "Processing status update for user id %s.." % user_id.to_s
xmpp_show, xmpp_status, xmpp_photo = nil
case status
@ -460,7 +471,7 @@ class TelegramClient
xmpp_status = "Last seen last month"
end
xmpp_photo = @cache[:userpics][user_id] if @cache[:userpics].key? user_id
@xmpp.presence(@jid, user_id.to_s, nil, xmpp_show, xmpp_status, nil, xmpp_photo)
@xmpp.presence(@jid, user_id.to_s, nil, xmpp_show, xmpp_status, nil, xmpp_photo, immed)
end
# get contact information (for vcard).
@ -489,6 +500,21 @@ class TelegramClient
return title, username, firstname, lastname, phone, bio, userpic
end
# resolve id by @username (or just return id)
def resolve_username(username)
resolved = nil
if username[0] == '@' then # @username
@client.search_public_chat(username[1..-1]).then {|chat| resolved = chat.id}.wait
elsif username[0..3] == 'http' or username[0..3] == 't.me' then # chat link
@client.join_chat_by_invite_link(username)
elsif username.to_i != 0 then # user id
resolved = username
end
return '' if not resolved
return '@' + resolved.to_s
end
###########################################
## Format functions #######################
###########################################

View file

@ -9,9 +9,8 @@
/disconnect ­— Disconnect from Telegram network
/logout — Disconnect from Telegram network and forget session
/sessions — Shows current active sessions (available for admins)
/debug — Shows some debug information (available for admins)
/restart — Reset Zhabogram (available for admins)
/info — Show information and usage statistics of this instance (only for JIDs specified as administrators)
/restart — Restart this instance (only for JIDs specified as administrators)
'
#############################
@ -19,21 +18,27 @@
#############################
## XMPP Transport Class #####
#############################
include Jabber::Discovery
include Jabber::Dataforms
class XMPPComponent
# init class and set logger #
def initialize(params)
@@loglevel = params['loglevel'] || Logger::DEBUG
@logger = Logger.new(STDOUT); @logger.level = @@loglevel; @logger.progname = '[XMPPComponent]'
@config = { host: params["host"] || 'localhost', port: params["port"] || 8899, jid: params["jid"] || 'tlgrm.localhost', secret: params['password'] || '', admins: params['admins'] || [], debug: params['debug'] || false } # default config
@config = { host: params["host"] || 'localhost', port: params["port"] || 8899, jid: params["jid"] || 'tlgrm.localhost', secret: params['password'] || '', admins: params['admins'] || [], debug: params['debug'] } # default config
@sessions = {}
@presence_que = {}
@db = SQLite3::Database.new(params['db_path'] || 'users.db')
@db.execute("CREATE TABLE IF NOT EXISTS users(jid varchar(256), login varchar(256), PRIMARY KEY(jid) );")
@db.results_as_hash = true
self.load_db()
end
# load sessions from db #
def load_db(jid = nil) # load
def load_db(jid = nil)
@logger.info "Initializing database.."
query = (jid.nil?) ? "SELECT * FROM users" : "SELECT * FROM users where jid = '%s';" % jid
@logger.debug(query)
@ -41,34 +46,52 @@ class XMPPComponent
end
# store session to db #
def update_db(jid, delete = false) # write
return if not @sessions.key? jid
def update_db(jid, delete = false, register = false)
login = (not register and @sessions.key? jid) ? @sessions[jid].login.to_s : register
return if not login
@logger.info "Writing database [%s].." % jid.to_s
query = (delete) ? "DELETE FROM users where jid = '%s';" % jid.to_s : "INSERT OR REPLACE INTO users(jid, login) VALUES('%s', '%s');" % [jid.to_s, @sessions[jid].login.to_s]
query = (delete) ? "DELETE FROM users where jid = '%s';" % jid.to_s : "INSERT OR REPLACE INTO users(jid, login) VALUES('%s', '%s');" % [jid.to_s, login]
@logger.debug query
@db.execute(query)
end
# connecting to XMPP server #
def connect() # :jid => transport_jid, :host => xmpp_server, :port => xmpp_component_port, :secret => xmpp_component_secret
@logger.info "Connecting.."
begin
Jabber::debug = @config[:debug]
@@transport = Jabber::Component.new( @config[:jid] )
@@transport.connect( @config[:host], @config[:port] )
@@transport.auth( @config[:secret] )
@@transport.add_message_callback do |msg| msg.first_element_text('body') ? self.message_handler(msg) : nil end
@@transport.add_presence_callback do |presence| self.presence_handler(presence) end
@@transport.add_iq_callback do |iq| self.iq_handler(iq) end
@@transport.on_exception do |exception, stream, state| self.survive(exception, stream, state) end
@logger.info "Connection established"
self.load_db()
@logger.info 'Found %s sessions in database.' % @sessions.count
# component
@component = Jabber::Component.new( @config[:jid] )
@component.connect( @config[:host], @config[:port] )
@component.auth( @config[:secret] )
@component.add_message_callback do |msg| msg.first_element_text('body') ? self.message_handler(msg) : nil end
@component.add_presence_callback do |presence| self.presence_handler(presence) end
@component.add_iq_callback do |iq| self.iq_handler(iq) end
@component.on_exception do |exception, stream, state| self.survive(exception, stream, state) end
@logger.info "Connection to XMPP server established!"
# disco
@disco = Jabber::Discovery::Responder.new(@component)
@disco.identities = [ Identity.new('gateway', 'Telegram Gateway', 'telegram') ]
@disco.add_features(['http://jabber.org/protocol/disco','jabber:iq:register'])
# janbber::iq::register
@iq_register = Jabber::Register::Responder.new(@component)
@iq_register.instructions = 'Please enter your Telegram login'
@iq_register.add_field(:login, true) do |jid, login| self.process_command(jid, '/login %s' % login) end
# jabber::iq::gateway
@iq_gateway = Jabber::Gateway::Responder.new(@component) do |iq, query| (@sessions.key? iq.from.bare.to_s and @sessions[iq.from.bare.to_s].online?) ? @sessions[iq.from.bare.to_s].resolve_username(query).to_s + '@' + @component.jid.to_s : '' end
@iq_gateway.description = "Specify @username / ID / https://t.me/link"
@iq_gateway.prompt = "Telegram contact"
@logger.info 'Loaded %s sessions from database.' % @sessions.count
@sessions.each do |jid, session| self.presence(jid, nil, :subscribe) end
Thread.stop()
Thread.new { while @component.is_connected? do @presence_que.each_value { |p| @component.send(p) }; @presence_que.clear; sleep(60); end } # presence updater thread
Thread.stop()
rescue Interrupt
@logger.error 'Interrupted!'
@@transport.on_exception do |exception,| end
@component.on_exception do |exception,| end
self.disconnect()
return -11
rescue Exception => e
@ -80,35 +103,35 @@ class XMPPComponent
# transport shutdown #
def disconnect()
@logger.info "Closing all connections..."
@logger.info "Closing connections..."
@sessions.each do |jid, session| @sessions[jid].disconnect() end
@@transport.close()
@component.close()
end
# vse umrut a ya ostanus'... #
def survive(exception, stream, state)
@logger.error "Stream error on :%s (%s)" % [state.to_s, exception.to_s]
@logger.info "Trying to ressurect XMPP stream.."
@logger.info "Trying to revive stream.."
self.connect()
end
# message to users #
def message(to, from = nil, body = '')
@logger.info "Sending message from <%s> to <%s>" % [from || @@transport.jid, to]
@logger.info "Sending message from <%s> to <%s>" % [from || @component.jid, to]
msg = Jabber::Message.new
msg.from = (from) ? "%s@%s" % [from, @@transport.jid.to_s] : @@transport.jid
msg.from = (from) ? "%s@%s" % [from, @component.jid.to_s] : @component.jid
msg.to = to
msg.body = body
msg.type = :chat
@logger.debug msg.to_s
@@transport.send(msg)
@component.send(msg)
end
# presence update #
def presence(to, from = nil, type = nil, show = nil, status = nil, nickname = nil, photo = nil)
@logger.debug "Presence update request from %s.." % from.to_s
def presence(to, from = nil, type = nil, show = nil, status = nil, nickname = nil, photo = nil, immediately = true)
@logger.debug "Presence update request from %s (immed = %s).." % [from.to_s, immediately]
req = Jabber::Presence.new()
req.from = from.nil? ? @@transport.jid : "%s@%s" % [from, @@transport.jid] # presence <from>
req.from = from.nil? ? @component.jid : "%s@%s" % [from, @component.jid] # presence <from>
req.to = to # presence <to>
req.type = type unless type.nil? # pres. type
req.show = show unless show.nil? # presence <show>
@ -116,7 +139,7 @@ class XMPPComponent
req.add_element('nick', {'xmlns' => 'http://jabber.org/protocol/nick'} ).add_text(nickname) unless nickname.nil? # nickname
req.add_element('x', {'xmlns' => 'vcard-temp:x:update'} ).add_element("photo").add_text(photo) unless photo.nil? # nickname
@logger.debug req.to_s
@@transport.send(req)
(immediately) ? @component.send(req) : @presence_que.store(to, req)
end
# request timezone information #
@ -125,11 +148,11 @@ class XMPPComponent
iq = Jabber::Iq.new
iq.type = :get
iq.to = jid
iq.from = @@transport.jid
iq.from = @component.jid
iq.id = 'time_req_1'
iq.add_element("time", {"xmlns" => "urn:xmpp:time"})
@logger.debug iq.to_s
@@transport.send(iq)
@component.send(iq)
end
#############################
@ -141,7 +164,7 @@ class XMPPComponent
return if msg.type == :error
@logger.info 'Received message from <%s> to <%s>' % [msg.from.to_s, msg.to.to_s]
@logger.debug msg.to_s
if msg.to == @@transport.jid then self.process_command(msg.from, msg.first_element_text('body') ); return; end # treat message as internal command if received as transport jid
if msg.to == @component.jid then self.process_command(msg.from, msg.first_element_text('body') ); return; end # treat message as internal command if received as transport jid
if @sessions.key? msg.from.bare.to_s then self.request_tz(msg.from) if not @sessions[msg.from.bare.to_s].tz_set?; @sessions[msg.from.bare.to_s].process_outgoing_msg(msg.to.to_s.split('@')[0].to_i, msg.first_element_text('body')); return; end #if @sessions.key? msg.from.bare.to_s and @sessions[msg.from.bare.to_s].online? # queue message for processing session is active for jid from
end
@ -149,9 +172,9 @@ class XMPPComponent
def presence_handler(prsnc)
@logger.debug "Received presence :%s from <%s> to <%s>" % [prsnc.type.to_s, prsnc.from.to_s, prsnc.to.to_s]
@logger.debug(prsnc.to_s)
if prsnc.type == :subscribe then reply = prsnc.answer(false); reply.type = :subscribed; @@transport.send(reply); end # send "subscribed" reply to "subscribe" presence
if prsnc.to == @@transport.jid and @sessions.key? prsnc.from.bare.to_s and prsnc.type == :unavailable then @sessions[prsnc.from.bare.to_s].disconnect(); return; end # go offline when received offline presence from jabber user
if prsnc.to == @@transport.jid and @sessions.key? prsnc.from.bare.to_s then self.request_tz(prsnc.from); @sessions[prsnc.from.bare.to_s].connect(); return; end # connect if we have session
if prsnc.type == :subscribe then reply = prsnc.answer(false); reply.type = :subscribed; @component.send(reply); end # send "subscribed" reply to "subscribe" presence
if prsnc.to == @component.jid and @sessions.key? prsnc.from.bare.to_s and prsnc.type == :unavailable then @sessions[prsnc.from.bare.to_s].disconnect(); return; end # go offline when received offline presence from jabber user
if prsnc.to == @component.jid and @sessions.key? prsnc.from.bare.to_s then self.request_tz(prsnc.from); @sessions[prsnc.from.bare.to_s].connect(); return; end # connect if we have session
end
# new iq (vcard/tz) request to XMPP component #
@ -177,7 +200,7 @@ class XMPPComponent
reply.type = :result
reply.elements["vCard"] = vcard
@logger.debug reply.to_s
@@transport.send(reply)
@component.send(reply)
# time response #
elsif iq.type == :result and iq.elements["time"] and @sessions.key? iq.from.bare.to_s then
@logger.debug "Timezone response from <%s>" % iq.from.to_s
@ -188,7 +211,7 @@ class XMPPComponent
reply = iq.answer
reply.type = :error
end
@@transport.send(reply)
@component.send(reply)
end
#############################
@ -213,34 +236,24 @@ class XMPPComponent
@sessions[from.bare.to_s].disconnect(true) if @sessions.key? from.bare.to_s
self.update_db(from.bare.to_s, true)
@sessions.delete(from.bare.to_s)
when '/debug' # show some debug information
when '/info' # show some debug information
return if not @config[:admins].include? from.bare.to_s
GC.start
dump = (defined? Memprof2) ? "/tmp/zhabogram.%s.dump" % Time.now.to_i : nil
Memprof2.report(out: dump) if dump
response = "Debug information: \n\n"
response = "Information about this instance: \n\n"
response += "Running from: %s\n" % `ps -p #{$$} -o lstart`.lines.last.strip
response += "Sessions: %d online | %d total \n" % [ @sessions.inject(0){ |cnt, (jid, sess)| cnt = (sess.online?) ? cnt + 1 : cnt }, @sessions.count]
response += "System memory used: %d KB\n" % `ps -o rss -p #{$$}`.lines.last.strip.to_i
response += "Objects memory allocated: %d bytes \n" % `cut -d' ' -f1 #{dump}`.lines.map(&:to_i).reduce(0, :+) if dump
response += "\nDetailed memory info saved to %s\n" % dump if dump
response += "\nRun this transport with --profiler (depends on gem memprof2) to get detailed memory infnormation.\n" if not dump
self.message(from.bare, nil, response)
when '/sessions' # show active sessions
return if not @config[:admins].include? from.bare.to_s
response = "Active sessions list: \n\n"
@sessions.each do |jid, session| response += "JID: %s | Login: %s | Status: %s (%s) | Telegram profile: %s\n" % [jid, session.login, (session.online == true) ? 'Online' : 'Offline', session.auth_state, (session.me) ? session.format_username(session.me.id) : 'Unknown' ] end
response += "\n\nSessions: %d online | %d total \n" % [ @sessions.inject(0){ |cnt, (jid, sess)| cnt = (sess.online?) ? cnt + 1 : cnt }, @sessions.count]
@sessions.each do |jid, session| response += "JID: %s | Login: %s | Status: %s (%s) | %s\n" % [jid, session.login, (session.online == true) ? 'Online' : 'Offline', session.auth_state, (session.me) ? session.format_username(session.me.id) : 'Unknown' ] end
self.message(from.bare, nil, response)
when '/restart' # reset transport
return if not @config[:admins].include? from.bare.to_s
self.message(from.bare, nil, 'Trying to restart all active sessions and reconnect to XMPP server..')
sleep(0.5)
sleep(1)
Process.kill("INT", Process.pid)
else # unknown command -- display help #
self.message(from.bare, nil, ::HELP_MESSAGE)
end
return
return true
end
end

View file

@ -2,19 +2,18 @@
require 'yaml'
require 'logger'
require 'xmpp4r'
require 'xmpp4r/discovery'
require 'digest'
require 'base64'
require 'sqlite3'
require 'tdlib-ruby'
require 'memprof2' if ARGV.include? '--profiler'
require_relative 'inc/telegramclient'
require_relative 'inc/xmppregister'
require_relative 'inc/xmppgateway'
require_relative 'inc/xmppcomponent'
# profiler #
Memprof2.start if defined? Memprof2
# configuration
Config = YAML.load_file(File.dirname(__FILE__) + '/config.yml')
TelegramClient.configure(Config['telegram']) # configure tdlib
Zhabogram = XMPPComponent.new(Config['xmpp']) # spawn zhabogram
loop do Zhabogram.connect(); sleep(1); end # forever loop jk till double ctrl+c
Zhabogram.connect()