From 247eb22fba878a746dc0c478ec0ad4c01fc0e502 Mon Sep 17 00:00:00 2001 From: "Jamie C. Driver" Date: Tue, 3 Sep 2024 16:09:50 +0100 Subject: [PATCH] jade: update Jade api to 1.0.31 This extends the serial api to recognise recently supported hardware. --- hwilib/devices/jade.py | 4 +- hwilib/devices/jadepy/README.md | 2 +- hwilib/devices/jadepy/__init__.py | 2 +- hwilib/devices/jadepy/jade.py | 187 +++++++++++++++++++++++++-- hwilib/devices/jadepy/jade_serial.py | 10 +- 5 files changed, 191 insertions(+), 14 deletions(-) diff --git a/hwilib/devices/jade.py b/hwilib/devices/jade.py index 6d471f6fc..bff69cd7c 100644 --- a/hwilib/devices/jade.py +++ b/hwilib/devices/jade.py @@ -5,6 +5,7 @@ from .jadepy import jade from .jadepy.jade import JadeAPI, JadeError +from .jadepy.jade_serial import JadeSerialImpl from serial.tools import list_ports @@ -58,7 +59,6 @@ # The test emulator port SIMULATOR_PATH = 'tcp:127.0.0.1:30121' -JADE_DEVICE_IDS = [(0x10c4, 0xea60), (0x1a86, 0x55d4), (0x0403, 0x6001), (0x1a86, 0x7523)] HAS_NETWORKING = hasattr(jade, '_http_request') py_enumerate = enumerate # To use the enumerate built-in, since the name is overridden below @@ -533,7 +533,7 @@ def _get_device_entry(device_model: str, device_path: str) -> Dict[str, Any]: # Scan com ports looking for the relevant vid and pid, and use 'path' to # hold the path to the serial port device, eg. /dev/ttyUSB0 for devinfo in list_ports.comports(): - if (devinfo.vid, devinfo.pid) in JADE_DEVICE_IDS: + if (devinfo.vid, devinfo.pid) in JadeSerialImpl.JADE_DEVICE_IDS: results.append(_get_device_entry('jade', devinfo.device)) # If we can connect to the simulator, add it too diff --git a/hwilib/devices/jadepy/README.md b/hwilib/devices/jadepy/README.md index 6ce232e0c..38a9fcf19 100644 --- a/hwilib/devices/jadepy/README.md +++ b/hwilib/devices/jadepy/README.md @@ -2,7 +2,7 @@ This is a slightly stripped down version of the official [Jade](https://github.com/Blockstream/Jade) python library. -This stripped down version was made from tag [0.1.38](https://github.com/Blockstream/Jade/releases/tag/0.1.38) +This stripped down version was made from tag [1.0.31](https://github.com/Blockstream/Jade/releases/tag/1.0.31) ## Changes diff --git a/hwilib/devices/jadepy/__init__.py b/hwilib/devices/jadepy/__init__.py index 64e2ceb7e..ce732cbcd 100644 --- a/hwilib/devices/jadepy/__init__.py +++ b/hwilib/devices/jadepy/__init__.py @@ -1,4 +1,4 @@ from .jade import JadeAPI from .jade_error import JadeError -__version__ = "0.2.0" +__version__ = "1.0.31" diff --git a/hwilib/devices/jadepy/jade.py b/hwilib/devices/jadepy/jade.py index b21ee4c1e..72d98927d 100644 --- a/hwilib/devices/jadepy/jade.py +++ b/hwilib/devices/jadepy/jade.py @@ -7,6 +7,7 @@ import collections.abc import traceback import random +import socket import sys # JadeError @@ -65,7 +66,7 @@ def _http_request(params): The default implementation used in JadeAPI._jadeRpc() below. NOTE: Only available if the 'requests' dependency is available. - Callers can supply their own implmentation of this call where it is required. + Callers can supply their own implementation of this call where it is required. Parameters ---------- @@ -113,6 +114,32 @@ def http_call_fn(): return requests.post(url, data) logger.info('Default _http_requests() function will not be available') +def generate_dump(): + while True: + try: + with socket.create_connection(("localhost", 4444)) as s: + output = b"" + while b"Open On-Chip Debugger" not in output: + data = s.recv(1024) + if not data: + continue + output += data + + s.sendall(b"esp gcov dump\n") + + output = b"" + while b"Targets disconnected." not in output: + data = s.recv(1024) + if not data: + continue + output += data + s.sendall(b"resume\n") + time.sleep(1) + return + except ConnectionRefusedError: + pass + + class JadeAPI: """ High-Level Jade Client API @@ -421,7 +448,8 @@ def logout(self): """ return self._jadeRpc('logout') - def ota_update(self, fwcmp, fwlen, chunksize, fwhash=None, patchlen=None, cb=None): + def ota_update(self, fwcmp, fwlen, chunksize, fwhash=None, patchlen=None, cb=None, + gcov_dump=False): """ RPC call to attempt to update the unit's firmware. @@ -497,6 +525,9 @@ def ota_update(self, fwcmp, fwlen, chunksize, fwhash=None, patchlen=None, cb=Non if (cb): cb(written, cmplen) + if gcov_dump: + self.run_remote_gcov_dump() + # All binary data uploaded return self._jadeRpc('ota_complete') @@ -513,6 +544,22 @@ def run_remote_selfcheck(self): """ return self._jadeRpc('debug_selfcheck', long_timeout=True) + def run_remote_gcov_dump(self): + """ + RPC call to run in-built gcov-dump. + NOTE: Only available in a DEBUG build of the firmware. + + Returns + ------- + bool + Always True. + """ + result = self._jadeRpc('debug_gcov_dump', long_timeout=True) + time.sleep(0.5) + generate_dump() + time.sleep(2) + return result + def capture_image_data(self, check_qr=False): """ RPC call to capture raw image data from the camera. @@ -815,6 +862,55 @@ def get_registered_multisigs(self): """ return self._jadeRpc('get_registered_multisigs') + def get_registered_multisig(self, multisig_name, as_file=False): + """ + RPC call to fetch details of a named multisig wallet registered to this signer. + NOTE: the multisig wallet must have been registered with firmware v1.0.23 or later + for the full signer details to be persisted and available. + + Parameters + ---------- + multisig_name : string + Name of multsig registration record to return. + + as_file : string, optional + If true the flat file format is returned, otherwise structured json is returned. + Defaults to false. + + Returns + ------- + dict + Description of registered multisig wallet identified by registration name. + Contains keys: + is_file is true: + multisig_file - str, the multisig file as produced by several wallet apps. + eg: + Name: MainWallet + Policy: 2 of 3 + Format: P2WSH + Derivation: m/48'/0'/0'/2' + + B237FE9D: xpub6E8C7BX4c7qfTsX7urnXggcAyFuhDmYLQhwRwZGLD9maUGWPinuc9k96ej... + 249192D2: xpub6EbXynW6xjYR3crcztum6KzSWqDJoAJQoovwamwVnLaCSHA6syXKPnJo6U... + 67F90FFC: xpub6EHuWWrYd8bp5FS1XAZsMPkmCqLSjpULmygWqAqWRCCjSWQwz6ntq5KnuQ... + + is_file is false: + multisig_name - str, name of multisig registration + variant - str, script type, eg. 'sh(wsh(multi(k)))' + sorted - boolean, whether bip67 key sorting is applied + threshold - int, number of signers required,N + master_blinding_key - 32-bytes, any liquid master blinding key for this wallet + signers - dict containing keys: + fingerprint - 4 bytes, origin fingerprint + derivation - [int], bip32 path from origin to signer xpub provided + xpub - str, base58 xpub of signer + path - [int], any fixed path to always apply after the xpub - usually empty. + + """ + params = {'multisig_name': multisig_name, + 'as_file': as_file} + return self._jadeRpc('get_registered_multisig', params) + def register_multisig(self, network, multisig_name, variant, sorted_keys, threshold, signers, master_blinding_key=None): """ @@ -892,6 +988,42 @@ def register_multisig_file(self, multisig_file): params = {'multisig_file': multisig_file} return self._jadeRpc('register_multisig', params) + def get_registered_descriptors(self): + """ + RPC call to fetch brief summaries of any descriptor wallets registered to this signer. + + Returns + ------- + dict + Brief description of registered descriptor, keyed by registration name. + Each entry contains keys: + descriptor_len - int, length of descriptor output script + num_datavalues - int, total number of substitution placeholders passed with script + master_blinding_key - 32-bytes, any liquid master blinding key for this wallet + """ + return self._jadeRpc('get_registered_descriptors') + + def get_registered_descriptor(self, descriptor_name): + """ + RPC call to fetch details of a named descriptor wallet registered to this signer. + + Parameters + ---------- + descriptor_name : string + Name of descriptor registration record to return. + + Returns + ------- + dict + Description of registered descriptor wallet identified by registration name. + Contains keys: + descriptor_name - str, name of descritpor registration + descriptor - str, descriptor output script, may contain substitution placeholders + datavalues - dict containing placeholders for substitution into script + """ + params = {'descriptor_name': descriptor_name} + return self._jadeRpc('get_registered_descriptor', params) + def register_descriptor(self, network, descriptor_name, descriptor_script, datavalues=None): """ RPC call to register a new descriptor wallet, which must contain the hw signer. @@ -900,7 +1032,7 @@ def register_descriptor(self, network, descriptor_name, descriptor_script, datav Parameters ---------- network : string - Network to which the multisig should apply - eg. 'mainnet', 'liquid', 'testnet', etc. + Network to which the descriptor should apply - eg. 'mainnet', 'liquid', 'testnet', etc. descriptor_name : string Name to use to identify this descriptor wallet registration record. @@ -1162,18 +1294,57 @@ def sign_identity(self, identity, curve, challenge, index=0): params = {'identity': identity, 'curve': curve, 'index': index, 'challenge': challenge} return self._jadeRpc('sign_identity', params) - def get_master_blinding_key(self): + def sign_attestation(self, challenge): + """ + RPC call to sign passed challenge with embedded hw RSA-4096 key, such that the caller + can check the authenticity of the hardware unit. eg. whether it is a genuine + Blockstream production Jade unit. + Caller must have the public key of the external verifying authority they wish to validate + against (eg. Blockstream's Jade verification public key). + NOTE: only supported by ESP32S3-based hardware units. + + Parameters + ---------- + challenge : bytes + Challenge bytes to sign + + Returns + ------- + dict + Contains keys: + signature - 512-bytes, hardware RSA signature of the SHA256 hash of the passed + challenge bytes. + pubkey_pem - str, PEM export of RSA pubkey of the hardware unit, to verify the returned + RSA signature. + ext_signature - bytes, RSA signature of the verifying authority over the returned + pubkey_pem data. + (Caller can verify this signature with the public key of the verifying authority.) + """ + params = {'challenge': challenge} + return self._jadeRpc('sign_attestation', params) + + def get_master_blinding_key(self, only_if_silent=False): """ RPC call to fetch the master (SLIP-077) blinding key for the hw signer. + May block temporarily to request the user's permission to export. Passing 'only_if_silent' + causes the call to return the 'denied' error if it would normally ask the user. NOTE: the master blinding key of any registered multisig wallets can be obtained from the result of `get_registered_multisigs()`. + Parameters + ---------- + only_if_silent : boolean, optional + If True Jade will return the denied error if it would normally ask the user's permission + to export the master blinding key. Passing False (or letting default) may block while + asking the user to confirm the export on Jade. + Returns ------- 32-bytes SLIP-077 master blinding key """ - return self._jadeRpc('get_master_blinding_key') + params = {'only_if_silent': only_if_silent} + return self._jadeRpc('get_master_blinding_key', params) def get_blinding_key(self, script, multisig_name=None): """ @@ -1699,7 +1870,7 @@ def create_serial(device=None, baud=None, timeout=None): Returns ------- JadeInterface - Inerface object configured to use given serial parameters. + Interface object configured to use given serial parameters. NOTE: the instance has not yet tried to contact the hw - caller must call 'connect()' before trying to use the Jade. """ @@ -1741,7 +1912,7 @@ def create_ble(device_name=None, serial_number=None, Returns ------- JadeInterface - Inerface object configured to use given BLE parameters. + Interface object configured to use given BLE parameters. NOTE: the instance has not yet tried to contact the hw - caller must call 'connect()' before trying to use the Jade. @@ -1995,7 +2166,7 @@ def validate_reply(request, reply): def make_rpc_call(self, request, long_timeout=False): """ Method to send a request over the underlying interface, and await a response. - The request is minimally validated before it is sent, and the response is simialrly + The request is minimally validated before it is sent, and the response is similarly validated before being returned. Any read-timeout is respected unless 'long_timeout' is passed, in which case the call blocks indefinitely awaiting a response. diff --git a/hwilib/devices/jadepy/jade_serial.py b/hwilib/devices/jadepy/jade_serial.py index ac08119d0..4b17c59dc 100644 --- a/hwilib/devices/jadepy/jade_serial.py +++ b/hwilib/devices/jadepy/jade_serial.py @@ -2,6 +2,7 @@ import logging from serial.tools import list_ports +from .jade_error import JadeError logger = logging.getLogger(__name__) @@ -21,7 +22,9 @@ # class JadeSerialImpl: # Used when searching for devices that might be a Jade/compatible hw - JADE_DEVICE_IDS = [(0x10c4, 0xea60), (0x1a86, 0x55d4), (0x0403, 0x6001), (0x1a86, 0x7523)] + JADE_DEVICE_IDS = [ + (0x10c4, 0xea60), (0x1a86, 0x55d4), (0x0403, 0x6001), + (0x1a86, 0x7523), (0x303a, 0x4001), (0x303a, 0x1001)] @classmethod def _get_first_compatible_device(cls): @@ -51,7 +54,10 @@ def connect(self): assert self.ser is not None if not self.ser.is_open: - self.ser.open() + try: + self.ser.open() + except serial.serialutil.SerialException: + raise JadeError(1, "Unable to open port", self.device) # Ensure RTS and DTR are not set (as this can cause the hw to reboot) self.ser.setRTS(False)