184 lines
8.2 KiB
Python
184 lines
8.2 KiB
Python
## Copyright (C) 2008-2012 Kjell Braden <afflux@pentabarf.de>
|
|
## Copyright (C) 2019 Pavel R. <pd at narayana dot im>
|
|
## Copyright (C) 2022 Bohdan Horbeshko <bodqhrohro@gmail.com>
|
|
#
|
|
|
|
# This file is part of Gajim OTR Plugin.
|
|
#
|
|
# Gajim OTR Plugin is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published
|
|
# by the Free Software Foundation; version 3 only.
|
|
#
|
|
# This software is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You can always obtain full license text at <http://www.gnu.org/licenses/>.
|
|
import os
|
|
import string
|
|
import random
|
|
import itertools
|
|
import logging
|
|
from gajim.common import const, app, helpers, configpaths
|
|
from gajim.common.const import Trust
|
|
from gajim.common.structs import OutgoingMessage
|
|
from nbxmpp.protocol import Message, JID
|
|
from nbxmpp.simplexml import Node
|
|
from nbxmpp.structs import EncryptionData
|
|
|
|
import pathlib
|
|
import sys
|
|
sys.path.insert(0, pathlib.Path(__file__).parent.resolve())
|
|
sys.modules['gajim_otrplugin'] = sys.modules['gajim-otrplugin']
|
|
|
|
from gajim_otrplugin.potr import context, crypt, proto
|
|
from .keystore import Keystore
|
|
|
|
|
|
# Prototype of OTR Channel (secure conversations between Gajim user (Alice) and Gajim peer (Bob)
|
|
class OTRChannel(context.Context):
|
|
# this method may be called self.sendMessage() when we need to send some data to our <peer> via XMPP
|
|
def inject(self,msg,appdata=None):
|
|
stanza = Message(to=self.peer, body=msg.decode(), typ='chat')
|
|
stanza.setThread(appdata or self.generateThreadId())
|
|
self.user.stream.send_stanza(stanza)
|
|
|
|
# this method called on channel state change
|
|
def setState(self,state=0):
|
|
if state and state != self.state:
|
|
self.getCurrentTrust() is None and self.setCurrentTrust(0) != 0 and self.printl(OTR.TRUSTED[None].format(fprint=self.getCurrentKey())) # new fingerprint
|
|
self.printl(OTR.STATUS[state].format(peer=self.peer,trust=OTR.TRUSTED[self.getCurrentTrust()],fprint=self.getCurrentKey())) # state is changed
|
|
self.state = state
|
|
|
|
# print some text to chat window
|
|
def printl(self,line):
|
|
control = app.window.get_control()
|
|
control and control.add_info_message("OTR: "+line)
|
|
|
|
@staticmethod
|
|
def getPolicy(policy): return OTR.DEFAULT_POLICY.get(policy)
|
|
|
|
@staticmethod
|
|
def generateThreadId():
|
|
return ''.join(
|
|
[f(string.ascii_letters) for f in itertools.repeat(
|
|
random.choice, 32)]
|
|
)
|
|
|
|
# OTR instance for Gajim user (Alice)
|
|
class OTR(context.Account):
|
|
PROTO = ('XMPP', 1024)
|
|
ENCRYPTION_NAME = ('OTR')
|
|
DEFAULT_POLICY = {
|
|
'REQUIRE_ENCRYPTION': True,
|
|
'ALLOW_V1': False,
|
|
'ALLOW_V2': True,
|
|
'SEND_TAG': True,
|
|
'WHITESPACE_START_AKE': True,
|
|
'ERROR_START_AKE': True,
|
|
}
|
|
TRUSTED = {None:"new fingerprint received: *{fprint}*", 0:"untrusted", 1:"trusted", 2:"authenticated"}
|
|
STATUS = {
|
|
context.STATE_PLAINTEXT: "(re-)starting encrypted conversation with {peer}..",
|
|
context.STATE_ENCRYPTED: "{trust} encrypted conversation started (fingerprint: {fprint})",
|
|
context.STATE_FINISHED: "encrypted conversation with {peer} closed (fingerprint: {fprint})",
|
|
context.UnencryptedMessage: "this message is *not encrypted*: {msg}",
|
|
context.NotEncryptedError: "unable to process message (channel lost)",
|
|
context.ErrorReceived: "received error message: {err}",
|
|
crypt.InvalidParameterError: "unable to decrypt message (key/signature mismatch)",
|
|
}
|
|
|
|
def __init__(self,account):
|
|
super(OTR,self).__init__(account,*OTR.PROTO)
|
|
self.log = logging.getLogger('gajim.p.otr.otr')
|
|
self.account = account
|
|
self.stream = app.connections[account]
|
|
self.jid = self.stream.get_own_jid()
|
|
self.keystore = Keystore(os.path.join(configpaths.get('MY_DATA'), 'otr_' + self.jid.bare + '.db'))
|
|
self.loadTrusts()
|
|
|
|
# get chat control
|
|
def getControl(self,peer):
|
|
return app.window.get_control()
|
|
|
|
# get OTR context (encrypted dialog between Alice and Bob)
|
|
def getContext(self,peer):
|
|
peer in self.ctxs and self.ctxs[peer].state == context.STATE_FINISHED and self.ctxs.pop(peer).disconnect() # close dead channels
|
|
self.ctxs[peer] = self.ctxs.get(peer) or OTRChannel(self,peer)
|
|
return self.ctxs[peer]
|
|
|
|
# factory for Gajim 1.4+
|
|
def makeOutgoingMessage(self,message,control,peer):
|
|
contact = control.client.get_module('Contacts').get_contact(peer, groupchat=False)
|
|
return OutgoingMessage(account=self.account,
|
|
contact=contact,
|
|
text=message)
|
|
|
|
# load my private key
|
|
def loadPrivkey(self):
|
|
my = self.keystore.load(jid=str(self.jid))
|
|
return (my and my.privatekey) and crypt.PK.parsePrivateKey(bytes.fromhex(my.privatekey))[0] or None
|
|
|
|
# save my privatekey
|
|
def savePrivkey(self):
|
|
self.keystore.save(jid=str(self.jid),privatekey=self.getPrivkey().serializePrivateKey().hex())
|
|
|
|
# load known fingerprints
|
|
def loadTrusts(self):
|
|
for peer in self.keystore.load(): self.setTrust(peer.jid,peer.fingerprint,peer.trust)
|
|
|
|
# save known fingerprints
|
|
def saveTrusts(self):
|
|
for peer,fingerprints in self.trusts.items():
|
|
for fingerprint,trust in fingerprints.items(): self.keystore.save(jid=str(peer),fingerprint=fingerprint,trust=trust)
|
|
|
|
# decrypt message
|
|
def decrypt(self,stanza,properties):
|
|
sFrom = stanza.getFrom()
|
|
peer = sFrom.new_as_bare()
|
|
msgtxt = stanza.getBody()
|
|
channel, ctl = self.getContext(peer), self.getControl(peer)
|
|
try:
|
|
text, tlvs = channel.receiveMessage(msgtxt.encode(),appdata=stanza.getThread()) or b''
|
|
except (context.UnencryptedMessage,context.NotEncryptedError,context.ErrorReceived,crypt.InvalidParameterError) as e:
|
|
self.log.error("** got exception while decrypting message: %s" % e)
|
|
status = OTR.STATUS[e.__class__]
|
|
channel.printl(status.format(msg=msgtxt,err=e.args[0].error) if len(e.args) > 0 and hasattr(e.args[0], 'error') else status)
|
|
else:
|
|
channel.resource = sFrom.resource
|
|
stanza.setBody(text and text.decode() or "")
|
|
properties.encrypted = EncryptionData(protocol=OTR.ENCRYPTION_NAME, key='Unknown', trust=Trust.UNDECIDED)
|
|
finally:
|
|
if channel.mayRetransmit and channel.state and ctl: channel.mayRetransmit = ctl.client.send_message(self.makeOutgoingMessage(channel.lastMessage.decode(), ctl, peer))
|
|
|
|
# encrypt message
|
|
def encrypt(self,event,callback):
|
|
peer = event.contact.jid
|
|
channel, ctl = self.getContext(peer), self.getControl(peer)
|
|
stanza = event.get_stanza()
|
|
if not hasattr(channel, 'resource'):
|
|
channel.resource = ""
|
|
if channel.resource:
|
|
peer = peer.new_with(resource=channel.resource)
|
|
try:
|
|
encrypted = channel.sendMessage(context.FRAGMENT_SEND_ALL_BUT_LAST,event.get_text().encode(),appdata=stanza.getThread()) or b''
|
|
message = (encrypted != self.getDefaultQueryMessage(OTR.DEFAULT_POLICY.get)) and event.get_text() or ""
|
|
except context.NotEncryptedError as e:
|
|
self.log.error("** got exception while encrypting message: %s" % e)
|
|
channel.printl(peer,OTR.STATUS[e.__class__])
|
|
else:
|
|
stanza.setBody(encrypted.decode()) # encrypted data goes here
|
|
event._text = message # message that will be displayed in our chat goes here
|
|
event.set_encryption(
|
|
EncryptionData(
|
|
protocol=OTR.ENCRYPTION_NAME,
|
|
key='Unknown',
|
|
trust=Trust.UNDECIDED,
|
|
)
|
|
) # some mandatory encryption flags
|
|
if channel.resource:
|
|
stanza.addChild('no-copy', namespace='urn:xmpp:hints') # don't send carbons
|
|
stanza.addChild('no-store', namespace='urn:xmpp:hints') # don't store in MAM
|
|
callback(event)
|