From 5ef3563c9d57e7c5bec21869c0689bdbab150662 Mon Sep 17 00:00:00 2001 From: JarbasAI <33701864+JarbasAl@users.noreply.github.com> Date: Wed, 5 Jun 2024 02:49:33 +0100 Subject: [PATCH] feat/intercom (#86) * feat/intercom companion to https://github.com/JarbasHiveMind/hivemind-websocket-client/pull/27 * feat/intercom * requirements.txt --- .github/workflows/publish_alpha.yml | 2 +- hivemind_core/protocol.py | 87 ++++++++++++++++++++++++++++- hivemind_core/service.py | 2 +- requirements.txt | 13 +++-- 4 files changed, 94 insertions(+), 10 deletions(-) diff --git a/.github/workflows/publish_alpha.yml b/.github/workflows/publish_alpha.yml index a59006b..a60cbf8 100644 --- a/.github/workflows/publish_alpha.yml +++ b/.github/workflows/publish_alpha.yml @@ -14,7 +14,7 @@ on: - 'LICENSE' - 'CHANGELOG.md' - 'MANIFEST.in' - - 'readme.md' + - 'README.md' - 'scripts/**' workflow_dispatch: diff --git a/hivemind_core/protocol.py b/hivemind_core/protocol.py index a4e6823..834177e 100644 --- a/hivemind_core/protocol.py +++ b/hivemind_core/protocol.py @@ -2,6 +2,7 @@ from dataclasses import dataclass, field from enum import Enum, IntEnum from typing import List, Dict, Optional +import pgpy from ovos_bus_client import MessageBusClient from ovos_bus_client.message import Message @@ -13,6 +14,7 @@ from hivemind_bus_client.message import HiveMessage, HiveMessageType from hivemind_bus_client.serialization import decode_bitstring, get_bitstring +from hivemind_bus_client.identity import NodeIdentity from hivemind_bus_client.util import ( decrypt_bin, encrypt_bin, @@ -236,7 +238,7 @@ class HiveMindListenerProtocol: require_crypto: bool = True # throw error if crypto key not available handshake_enabled: bool = True # generate a key per session if not pre-shared - + identity: Optional[NodeIdentity] = None # below are optional callbacks to handle payloads # receives the payload + HiveMindClient that sent it escalate_callback = None # slave asked to escalate payload @@ -246,7 +248,8 @@ class HiveMindListenerProtocol: mycroft_bus_callback = None # slave asked to inject payload into mycroft bus shared_bus_callback = None # passive sharing of slave device bus (info) - def bind(self, websocket, bus): + def bind(self, websocket, bus, identity): + self.identity = identity websocket.protocol = self self.internal_protocol = HiveMindListenerInternalProtocol(bus) self.internal_protocol.register_bus_handlers() @@ -367,6 +370,8 @@ def handle_message(self, message: HiveMessage, client: HiveMindClientConnection) self.handle_broadcast_message(message, client) elif message.msg_type == HiveMessageType.ESCALATE: self.handle_escalate_message(message, client) + elif message.msg_type == HiveMessageType.INTERCOM: + self.handle_intercom_message(message, client) elif message.msg_type == HiveMessageType.BINARY: self.handle_binary_message(message, client) else: @@ -476,6 +481,16 @@ def handle_broadcast_message( if self.broadcast_callback: self.broadcast_callback(payload) + if message.payload.msg_type == HiveMessageType.INTERCOM: + if self.handle_intercom_message(message.payload, client): + return + + if message.payload.msg_type == HiveMessageType.BUS: + # if the message targets our site_id, send it to internal bus + site = message.target_site_id + if site and site == self.identity.site_id: + self.handle_bus_message(message.payload, client) + # broadcast message to other peers payload = self._unpack_message(message, client) for peer in self.clients: @@ -514,6 +529,16 @@ def handle_propagate_message( if self.propagate_callback: self.propagate_callback(payload) + if message.payload.msg_type == HiveMessageType.INTERCOM: + if self.handle_intercom_message(message.payload, client): + return + + if message.payload.msg_type == HiveMessageType.BUS: + # if the message targets our site_id, send it to internal bus + site = message.target_site_id + if site and site == self.identity.site_id: + self.handle_bus_message(message.payload, client) + # propagate message to other peers for peer in self.clients: if peer == client.peer: @@ -557,6 +582,16 @@ def handle_escalate_message( if self.escalate_callback: self.escalate_callback(payload) + if message.payload.msg_type == HiveMessageType.INTERCOM: + if self.handle_intercom_message(message.payload, client): + return + + if message.payload.msg_type == HiveMessageType.BUS: + # if the message targets our site_id, send it to internal bus + site = message.target_site_id + if site and site == self.identity.site_id: + self.handle_bus_message(message.payload, client) + # send to other masters message = Message( "hive.send.upstream", @@ -570,6 +605,54 @@ def handle_escalate_message( bus = self.get_bus(client) bus.emit(message) + def handle_intercom_message( + self, message: HiveMessage, client: HiveMindClientConnection + ) -> bool: + + # if the message targets us, send it to internal bus + k = message.target_public_key + if k and k != self.identity.public_key: + # not for us + return False + + pload = message.payload + if isinstance(pload, dict) and "ciphertext" in pload: + try: + message_from_blob = pgpy.PGPMessage.from_blob(pload["ciphertext"]) + + with open(self.identity.private_key, "r") as f: + private_key = pgpy.PGPKey.from_blob(f.read()) + + decrypted: str = private_key.decrypt(message_from_blob) + message._payload = HiveMessage.deserialize(decrypted) + except: + if k: + LOG.error("failed to decrypt message!") + else: + LOG.debug("failed to decrypt message, not for us") + return False + + if message.msg_type == HiveMessageType.BUS: + self.handle_bus_message(message, client) + return True + elif message.msg_type == HiveMessageType.PROPAGATE: + self.handle_propagate_message(message, client) + return True + elif message.msg_type == HiveMessageType.BROADCAST: + self.handle_broadcast_message(message, client) + return True + elif message.msg_type == HiveMessageType.ESCALATE: + self.handle_escalate_message(message, client) + return True + elif message.msg_type == HiveMessageType.BINARY: + self.handle_binary_message(message, client) + return True + elif message.msg_type == HiveMessageType.SHARED_BUS: + self.handle_client_bus(message.payload, client) + return True + + return False + # HiveMind mycroft bus messages - from slave -> master def handle_inject_mycroft_msg( self, message: Message, client: HiveMindClientConnection diff --git a/hivemind_core/service.py b/hivemind_core/service.py index bd50f97..0f857d6 100644 --- a/hivemind_core/service.py +++ b/hivemind_core/service.py @@ -254,7 +254,7 @@ def run(self): loop = ioloop.IOLoop.current() self.protocol = self._proto(loop=loop) - self.protocol.bind(self._ws_handler, self.bus) + self.protocol.bind(self._ws_handler, self.bus, self.identity) self.status.bind(self.bus) self.status.set_started() diff --git a/requirements.txt b/requirements.txt index fb116e4..2014088 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,11 +1,12 @@ tornado -ovos_utils>=0.0.33 -pycryptodomex -HiveMind_presence>=0.0.2a3 -ovos-bus-client>=0.0.6a5 -poorman_handshake>=0.1.0 click click_default_group rich pyOpenSSL -hivemind-ggwave \ No newline at end of file +pycryptodomex +poorman_handshake>=0.1.0 +hivemind-ggwave +hivemind_bus_client>=0.0.4a25 +HiveMind_presence>=0.0.2a3 +ovos_utils>=0.0.33 +ovos-bus-client>=0.0.6a5 \ No newline at end of file