gajim-otrplugin/modules/otr.py

186 lines
7.8 KiB
Python
Raw Normal View History

## Copyright (C) 2008-2012 Kjell Braden <afflux@pentabarf.de>
## Copyright (C) 2019 Pavel R. <pd at narayana dot im>
#
# This file is part of Gajim OTR Plugin.
#
# Gajim OTR Plugin is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published
# by the Free Software Foundation; version 3 only.
#
# This software is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You can always obtain full license text at <http://www.gnu.org/licenses/>.
name = 'OTR'
zeroconf = None
ENCRYPTION_NAME = 'OTR'
ERROR = None
OTR = {
'PROTOCOL': 'XMPP',
'MMS': 1000,
'POLICY': {
'REQUIRE_ENCRYPTION': True,
'ALLOW_V1': False,
'ALLOW_V2': True,
'SEND_TAG': True,
'WHITESPACE_START_AKE': True,
'ERROR_START_AKE': True,
}
}
import os
from collections import namedtuple
from nbxmpp.structs import StanzaHandler
from nbxmpp.protocol import Message, JID
from nbxmpp.const import MessageType
from gajim.common import app
from gajim.common import configpaths
from gajim.common.nec import NetworkEvent
from gajim.common.const import EncryptionData
from gajim.common.modules.base import BaseModule
from gajim.plugins.plugins_i18n import _
from otrplugin.keystore import Keystore
try:
import potr as otr
except:
ERROR = 'Unable to import python-otr (python3-potr)'
# otr channel prototype #
class OTRChannel(otr.context.Context):
global OTR
# init OTR channel here
def __init__(self, account, peer):
super(OTRChannel, self).__init__(account, peer)
self.peer = peer
self.resend = []
self.defaultQuery = account.getDefaultQueryMessage(self.getPolicy)
self.trustName = peer.getBare()
self.println = lambda line: account.otrmodule.get_control(peer).conv_textview.print_conversation_line(line, 'status', '', None)
# send message to jabber network
def inject(self, msg, appdata=None):
stanza = Message(to=self.peer, body=msg.decode(), typ='chat')
stanza.setThread(appdata.get('thread')) if appdata and appdata.get('thread') else None
self.user.otrmodule.stream.send_stanza(stanza)
# we can catch state change here
def setState(self, newstate):
state, self.state = self.state, newstate
if self.getCurrentTrust() is None: # new fingerprint
self.setCurrentTrust(0)
self.println("OTR: new fingerprint received [%s]" % self.getCurrentKey())
if newstate == otr.context.STATE_ENCRYPTED and state != newstate: # channel established
self.println("OTR: %s encrypted conversation started [%s]" % (self.getCurrentTrust() and 'trusted' or '**untrusted**', self.getCurrentKey()) )
elif newstate == otr.context.STATE_FINISHED and state != newstate: # channel closed
self.println("OTR: encrypted conversation closed")
def getPolicy(self, key):
return OTR['POLICY'][key] if key in OTR['POLICY'] else None
# otr account prototype
class OTRAccount(otr.context.Account):
global OTR
contextclass = OTRChannel
# init otrfor gajim acct
def __init__(self, otrmodule):
super(OTRAccount, self).__init__(otrmodule.jid, OTR['PROTOCOL'], OTR['MMS'])
self.jid = otrmodule.jid
self.keystore = otrmodule.keystore
self.otrmodule = otrmodule
self.loadTrusts()
def loadPrivkey(self):
my = self.keystore.load({'jid': str(self.jid)})
return otr.crypt.PK.parsePrivateKey(bytes.fromhex(my.privatekey))[0] if my and my.privatekey else None
def savePrivkey(self):
return self.keystore.save({'jid': self.jid, 'privatekey': self.getPrivkey().serializePrivateKey().hex()})
def loadTrusts(self):
for c in self.keystore.load(): self.setTrust(c.jid, c.fingerprint, c.trust)
def saveTrusts(self):
for jid, keys in self.trusts.items():
for fingerprint, trust in keys.items(): self.keystore.save({'jid': jid, 'fingerprint': fingerprint, 'trust': trust})
# Module name
class OTRModule(BaseModule):
def __init__(self, con):
BaseModule.__init__(self, con)
self.handlers = [
StanzaHandler(name='message', callback=self.receive_message, priority=9),
]
self.jid = self._con.get_own_jid() # JID
self.stream = self._con # XMPP stream
self.keystore = Keystore(os.path.join(configpaths.get('MY_DATA'), 'otr_' + self.jid.getBare())) # Key storage
self.otr = OTRAccount(self) # OTR object
self.channels = {}
self.controls = {}
# get chat control for <peer>
def get_control(self, peer):
return self.controls.setdefault(peer, app.interface.msg_win_mgr.get_control(peer.getBare(), self._account))
# get otr channel for <peer>
def get_channel(self, peer):
self.channels.get(peer) and self.channels[peer].state == otr.context.STATE_FINISHED and self.channels.pop(peer).disconnect()
return self.channels.setdefault(peer, self.otr.getContext(peer))
# receive and decrypt message
def receive_message(self, con, stanza, message):
if message.type != MessageType.CHAT or not stanza.getBody() or stanza.getBody().encode().find(otr.proto.OTRTAG) == -1: return # it is not OTR message
if message.is_mam_message: return stanza.setBody('') # it is OTR message from archive, we can not decrypt it
self._log.debug('got otr message: %s' % stanza) # everything is fine, we can try to decrypt it
try:
channel = self.get_channel(stanza.getFrom())
text, tlvs = channel.receiveMessage(stanza.getBody().encode(), {'thread': stanza.getThread()})
stanza.setBody(text and text.decode() or '')
message.encrypted = EncryptionData({'name':'OTR'})
except otr.context.UnencryptedMessage:
channel.println("OTR: received plain message [%s]" % stanza.getBody())
self._log.error('** got plain text over encrypted channel ** %s' % stanza.getBody())
except otr.context.ErrorReceived as e:
channel.println("OTR: received error [%s]" % e)
self._log.error('** otr error ** %s' % e)
except otr.crypt.InvalidParameterError:
channel.println("OTR: received unreadable message (session expired?)")
self._log.error('** unreadable message **')
except otr.context.NotEncryptedError:
channel.println("OTR: session lost")
self._log.error('** otr channel lost **')
channel.resend and channel.state == otr.context.STATE_ENCRYPTED and channel.sendMessage(otr.context.FRAGMENT_SEND_ALL, **(channel.resend.pop())) # resend any spooled messages
return
# encrypt and send message
def encrypt_message(self, obj, callback):
self._log.warning('sending otr message: %s ' % obj.msg_iq)
try:
peer = obj.msg_iq.getTo()
channel = self.get_channel(peer)
session = obj.session or self.stream.make_new_session(peer)
message = obj.message.encode()
encrypted = channel.sendMessage(otr.context.FRAGMENT_SEND_ALL_BUT_LAST, message, appdata = {'thread': session.thread_id}) or b''
encrypted == channel.defaultQuery and channel.resend.append({'msg': message, 'appdata': {'thread': session.thread_id}})
except otr.context.NotEncryptedError:
self._log.warning('message was not sent.')
return
obj.encrypted = 'OTR'
obj.additional_data['encrypted'] = {'name': 'OTR'}
obj.msg_iq.setBody(encrypted.decode())
callback(obj)
def get_instance(*args, **kwargs):
return OTRModule(*args, **kwargs), name