Source code for muacrypt.states

"""
All muacrypt states are managed through this module.
We follow the Kappa architecture style
(http://milinda.pathirage.org/kappa-architecture.com/)
i.e. all state changes are added to append-only chains and they contain
immutable entries that may cross-reference other entries (even
from other chains). The linking between entries is done using
crytographic hashes.
"""
from __future__ import unicode_literals, print_function

import os
import logging
from .chainstore import HeadTracker, BlockService, Chain
from .myattr import (
    v, attr, attrs, attrib, attrib_text, attrib_bytes,
    attrib_bytes_or_none, attrib_text_or_none, attrib_float,
)

# ==================================================
# States
# =================================================


[docs]class States: """ Persisting Muacrypt and per-account settings.""" _account_pat = "." _own_pat = "own:{id}" _oob_pat = "oob:{id}" _peer_pat = "peer:{id}:{addr}"
[docs] def __init__(self, dirpath): self.dirpath = dirpath blockdir = os.path.join(dirpath, "blocks") if not os.path.exists(blockdir): os.makedirs(blockdir) self._heads = HeadTracker(os.path.join(dirpath, "heads")) self._blocks = BlockService(blockdir)
def _makechain(self, headname): return Chain(self._blocks, self._heads, headname) def get_accountmanager_state(self): chain = self._makechain(self._account_pat) return AccountManagerState(chain) def get_account_names(self): return sorted(self._heads._getheads(prefix=self._own_pat.format(id=""))) def get_num_peers(self, account): return len(self.get_peername_list()) def get_peername_list(self, account_name): prefix = self._peer_pat.format(id=account_name, addr="") return sorted(self._heads._getheads(prefix=prefix)) def get_peerstate(self, account_name, addr): head_name = self._peer_pat.format(id=account_name, addr=addr) chain = self._makechain(head_name) return PeerState(chain) def get_ownstate(self, account_name): head_name = self._own_pat.format(id=account_name) chain = self._makechain(head_name) return OwnState(chain) def get_own_gpghome(self, account_name): return os.path.join(self.dirpath, "gpg", account_name) def get_oobstate(self, account_name): head_name = self._oob_pat.format(id=account_name) chain = self._makechain(head_name) return OOBState(chain) def remove_account(self, account_name): def match_account(key, value): l = key.split(":", 2) if l[0] in ("own", "peer") and l[1] == account_name: return True self._heads.remove_if(match_account)
# =========================================================== # PeerState for keeping track of incoming messages per peer # =========================================================== @attr.s class MsgEntry(object): TAG = "msg" msg_id = attrib_text() msg_date = attrib_float() prefer_encrypt = attrib(validator=v.in_(['nopreference', 'mutual'])) keydata = attrib_bytes() keyhandle = attrib_text() @attr.s class MsgGossipEntry(object): TAG = "mge" msg_id = attrib_text() msg_date = attrib_float() keydata = attrib_bytes() keyhandle = attrib_text()
[docs]@attrs class PeerState(object): """Synthesized Autocrypt state from parsing messages from a peer. """ _chain = attrib() def __str__(self): return "PeerState addr={addr} key={keyhandle}".format( addr=self.addr, keyhandle=self.public_keyhandle ) @property def addr(self): return self._chain.name.split(":", 2)[-1] @property def last_seen(self): return getattr(self._latest_msg_entry(), "msg_date", 0.0) @property def autocrypt_timestamp(self): return getattr(self._latest_ac_entry(), "msg_date", 0.0) @property def public_keyhandle(self): return getattr(self.entry_for_encryption(), "keyhandle", '') @property def public_keydata(self): return getattr(self.entry_for_encryption(), "keydata", b'') def has_direct_key(self): return bool(getattr(self._latest_ac_entry(), "keyhandle", '')) def entry_for_encryption(self): direct = self._latest_ac_entry() # TODO: perform propper checks on usability of ac entry here if getattr(direct, "keyhandle", None): return direct else: return self.latest_gossip_entry() @property def prefer_encrypt(self): return getattr(self.entry_for_encryption(), "prefer_encrypt", '') def _latest_ac_entry(self): """ Return latest message with Autocrypt header. """ for entry in self._chain.iter_entries(MsgEntry): if entry.keydata: return entry
[docs] def latest_gossip_entry(self): """ Return latest gossip entry. """ return self._chain.latest_entry_of(MsgGossipEntry)
def _latest_msg_entry(self): """ Return latest message with or without Autocrypt header. """ return self._chain.latest_entry_of(MsgEntry) def has_message(self, msg_id): return self.get_message_entry(msg_id) is not None def get_message_entry(self, msg_id, class_=MsgEntry): # XXX make this less expensive for entry in self._chain.iter_entries(class_): if entry.msg_id == msg_id: return entry # methods which modify/add state def update_from_msg(self, msg_id, effective_date, prefer_encrypt, keydata, keyhandle): if effective_date < self.autocrypt_timestamp: return entry = self.get_message_entry(msg_id) if entry is not None: if (entry.msg_date == effective_date and entry.keydata == keydata and entry.keyhandle == keyhandle and entry.prefer_encrypt == prefer_encrypt): return if not keydata: if effective_date > self.last_seen: self._append_noac_entry( msg_id=msg_id, msg_date=effective_date, ) logging.debug("append noac %s", msg_id) return self._append_ac_entry( msg_id=msg_id, msg_date=effective_date, prefer_encrypt=prefer_encrypt, keydata=keydata or b'', keyhandle=keyhandle or '', ) def update_from_msg_gossip(self, msg_id, effective_date, keydata, keyhandle): if effective_date < self.autocrypt_timestamp: return assert keydata entry = self.get_message_entry(msg_id, class_=MsgGossipEntry) if entry is not None: if (entry.msg_date == effective_date and entry.keydata == keydata and entry.keyhandle == keyhandle): return self._append_ac_gossip_entry( msg_id=msg_id, msg_date=effective_date, keydata=keydata, keyhandle=keyhandle, ) def _append_ac_entry(self, msg_id, msg_date, prefer_encrypt, keydata, keyhandle): """append an Autocrypt message entry. """ self._chain.append_entry(MsgEntry( msg_id=msg_id, msg_date=msg_date, prefer_encrypt=prefer_encrypt, keydata=keydata, keyhandle=keyhandle)) def _append_ac_gossip_entry(self, msg_id, msg_date, keydata, keyhandle): """append an Autocrypt gossip entry. """ self._chain.append_entry(MsgGossipEntry( msg_id=msg_id, msg_date=msg_date, keydata=keydata, keyhandle=keyhandle)) def _append_noac_entry(self, msg_id, msg_date): """append a non-Autocrypt message entry. """ self._chain.append_entry(MsgEntry( msg_id=msg_id, msg_date=msg_date, prefer_encrypt="nopreference", keyhandle="", keydata=b"" ))
# =========================================================== # OwnState keeps track of own crypto settings # =========================================================== def config_property(name): def get(self): return getattr(self._latest_config(), name) return property(get) @attr.s class KeygenEntry(object): TAG = "keygen" keydata = attrib_bytes_or_none() keyhandle = attrib_text_or_none() def convert_bytes(x): if hasattr(x, "decode"): return x.decode("ascii") return x @attr.s class OwnConfigEntry(object): TAG = "cfg" prefer_encrypt = attrib(validator=v.in_(['nopreference', 'mutual']), converter=convert_bytes) name = attrib_text() email_regex = attrib_text() gpgmode = attrib(validator=v.in_(['system', 'own'])) gpgbin = attrib_text()
[docs]@attrs class OwnState(object): """Synthesized own state for an account. """ _chain = attrib() def __str__(self): return "OwnState key={}".format(self.keyhandle) name = config_property("name") email_regex = config_property("email_regex") gpgmode = config_property("gpgmode") gpgbin = config_property("gpgbin") prefer_encrypt = config_property("prefer_encrypt") @property def keyhandle(self): return self._latest_keygen().keyhandle def exists(self): return self.name def _latest_keygen(self): return self._chain.latest_entry_of(KeygenEntry) def _latest_config(self): return self._chain.latest_entry_of(OwnConfigEntry) # methods which modify/add state def new_config(self, name, prefer_encrypt, email_regex, gpgmode, gpgbin): self._chain.append_entry(OwnConfigEntry( name=name, prefer_encrypt=prefer_encrypt, email_regex=email_regex, gpgmode=gpgmode, gpgbin=gpgbin, )) def change_config(self, **kwargs): entry = self._latest_config() new_entry = attr.evolve(entry, **kwargs) if new_entry != entry: self._chain.append_entry(new_entry) return True def append_keygen(self, keydata, keyhandle): self._chain.append_entry(KeygenEntry( keydata=keydata, keyhandle=keyhandle )) def is_configured(self): return self._latest_config() and self._latest_keygen()
# =========================================================== # OOBChain keeps track of out-of-band verifications # =========================================================== @attr.s class VerificationEntry(object): TAG = "oobverify" addr = attrib_text() public_keydata = attrib_bytes() origin = attrib(validator=v.in_(["self", "peer"]))
[docs]@attrs class OOBState(object): """Synthesized Out of Band verification state for an account. """ _chain = attrib() def get_verification(self, addr): for entry in self._chain.iter_entries(VerificationEntry): if addr == entry.addr: return entry def append_self_verification(self, addr, public_keydata): self._chain.append_entry(VerificationEntry( addr=addr, public_keydata=public_keydata, origin="self", )) def append_peer_verification(self, addr, public_keydata): self._chain.append_entry(VerificationEntry( addr=addr, public_keydata=public_keydata, origin="peer", ))
# =========================================================== # AccountManagerState keeps track of account modifications # =========================================================== @attr.s class AConfigEntry(object): TAG = "acfg" version = attrib_text()
[docs]@attrs class AccountManagerState(object): """Synthesized AccountManagerState. """ _chain = attrib() def _latest_config(self): return self._chain.latest_entry_of(AConfigEntry) @property def version(self): return getattr(self._latest_config(), "version", None) def __str__(self): return "AccountManagerState version={version}".format(version=self.version) def set_version(self, version): assert not self._latest_config() self._chain.append_entry(AConfigEntry(version=version))