From 099733b4722f05dcb69ece5f2d6e30da74c036ca Mon Sep 17 00:00:00 2001 From: Kendell R Date: Thu, 5 Nov 2020 11:00:42 -0800 Subject: [PATCH 01/13] Add GH Actions --- .github/workflows/flake8.yaml | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) create mode 100644 .github/workflows/flake8.yaml diff --git a/.github/workflows/flake8.yaml b/.github/workflows/flake8.yaml new file mode 100644 index 00000000..2fd67945 --- /dev/null +++ b/.github/workflows/flake8.yaml @@ -0,0 +1,32 @@ +name: Python flake8 + +on: + push: + branches: [ main, master, dev, development ] + pull_request: + branches: [ main, master, dev, development ] + +jobs: + test: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: [3.6, 3.7, 3.8, 3.9] + steps: + - uses: actions/checkout@v2 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v2 + with: + python-version: ${{ matrix.python-version }} + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install flake8 wemake-python-styleguide + if [ -f requirements.txt ]; then pip install -r requirements.txt; fi + - name: Lint with flake8 + run: | + # stop the build if there are Python syntax errors or undefined names + flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics + # exit-zero treats all errors as warnings. ignore magic numbers and use double quotes and ignore numbers with zeroes before them. + # and ignore lowercase hex numbers and ignore isort incorrect imports + flake8 . --count --exit-zero --max-complexity=10 --max-line-length=90 --ignore=WPS432,WPS339,WPS341,I --inline-quotes double --statistics From 1bc015042a2ab93b5f9250b5191775b5be955c22 Mon Sep 17 00:00:00 2001 From: Kendell R Date: Thu, 5 Nov 2020 11:25:33 -0800 Subject: [PATCH 02/13] Run black --- broadlink/__init__.py | 147 +++++++++++++++++++--------------------- broadlink/alarm.py | 28 ++++---- broadlink/climate.py | 124 ++++++++++++++++++++++------------ broadlink/cover.py | 8 +-- broadlink/device.py | 154 +++++++++++++++++++++--------------------- broadlink/light.py | 54 ++++++++------- broadlink/remote.py | 36 +++++----- broadlink/sensor.py | 20 +++--- broadlink/switch.py | 114 +++++++++++++++++-------------- setup.py | 22 +++--- 10 files changed, 378 insertions(+), 329 deletions(-) diff --git a/broadlink/__init__.py b/broadlink/__init__.py index 604ed88a..74ae1fa6 100644 --- a/broadlink/__init__.py +++ b/broadlink/__init__.py @@ -20,88 +20,88 @@ 0x2716: (sp2, "NEO PRO", "Ankuoo"), 0x2717: (sp2, "NEO", "Ankuoo"), 0x2719: (sp2, "SP2-compatible", "Honeywell"), - 0x271a: (sp2, "SP2-compatible", "Honeywell"), + 0x271A: (sp2, "SP2-compatible", "Honeywell"), 0x2720: (sp2, "SP mini", "Broadlink"), 0x2728: (sp2, "SP2-compatible", "URANT"), 0x2733: (sp2, "SP3", "Broadlink"), 0x2736: (sp2, "SP mini+", "Broadlink"), - 0x273e: (sp2, "SP mini", "Broadlink"), + 0x273E: (sp2, "SP mini", "Broadlink"), 0x7530: (sp2, "SP2", "Broadlink (OEM)"), 0x7539: (sp2, "SP2-IL", "Broadlink (OEM)"), - 0x753e: (sp2, "SP mini 3", "Broadlink"), + 0x753E: (sp2, "SP mini 3", "Broadlink"), 0x7540: (sp2, "MP2", "Broadlink"), - 0X7544: (sp2, "SP2-CL", "Broadlink"), + 0x7544: (sp2, "SP2-CL", "Broadlink"), 0x7546: (sp2, "SP2-UK/BR/IN", "Broadlink (OEM)"), 0x7547: (sp2, "SC1", "Broadlink"), 0x7918: (sp2, "SP2", "Broadlink (OEM)"), 0x7919: (sp2, "SP2-compatible", "Honeywell"), - 0x791a: (sp2, "SP2-compatible", "Honeywell"), - 0x7d00: (sp2, "SP3-EU", "Broadlink (OEM)"), - 0x7d0d: (sp2, "SP mini 3", "Broadlink (OEM)"), + 0x791A: (sp2, "SP2-compatible", "Honeywell"), + 0x7D00: (sp2, "SP3-EU", "Broadlink (OEM)"), + 0x7D0D: (sp2, "SP mini 3", "Broadlink (OEM)"), 0x9479: (sp2, "SP3S-US", "Broadlink"), - 0x947a: (sp2, "SP3S-EU", "Broadlink"), - 0x756c: (sp4, "SP4M", "Broadlink"), + 0x947A: (sp2, "SP3S-EU", "Broadlink"), + 0x756C: (sp4, "SP4M", "Broadlink"), 0x7579: (sp4, "SP4L-EU", "Broadlink"), 0x7583: (sp4, "SP mini 3", "Broadlink"), - 0x7d11: (sp4, "SP mini 3", "Broadlink"), - 0x648b: (sp4b, "SP4M-US", "Broadlink"), + 0x7D11: (sp4, "SP mini 3", "Broadlink"), + 0x648B: (sp4b, "SP4M-US", "Broadlink"), 0x2712: (rm, "RM pro/pro+", "Broadlink"), - 0x272a: (rm, "RM pro", "Broadlink"), + 0x272A: (rm, "RM pro", "Broadlink"), 0x2737: (rm, "RM mini 3", "Broadlink"), - 0x273d: (rm, "RM pro", "Broadlink"), - 0x277c: (rm, "RM home", "Broadlink"), + 0x273D: (rm, "RM pro", "Broadlink"), + 0x277C: (rm, "RM home", "Broadlink"), 0x2783: (rm, "RM home", "Broadlink"), 0x2787: (rm, "RM pro", "Broadlink"), - 0x278b: (rm, "RM plus", "Broadlink"), - 0x278f: (rm, "RM mini", "Broadlink"), + 0x278B: (rm, "RM plus", "Broadlink"), + 0x278F: (rm, "RM mini", "Broadlink"), 0x2797: (rm, "RM pro+", "Broadlink"), - 0x279d: (rm, "RM pro+", "Broadlink"), - 0x27a1: (rm, "RM plus", "Broadlink"), - 0x27a6: (rm, "RM plus", "Broadlink"), - 0x27a9: (rm, "RM pro+", "Broadlink"), - 0x27c2: (rm, "RM mini 3", "Broadlink"), - 0x27c3: (rm, "RM pro+", "Broadlink"), - 0x27c7: (rm, "RM mini 3", "Broadlink"), - 0x27cc: (rm, "RM mini 3", "Broadlink"), - 0x27cd: (rm, "RM mini 3", "Broadlink"), - 0x27d0: (rm, "RM mini 3", "Broadlink"), - 0x27d1: (rm, "RM mini 3", "Broadlink"), - 0x27de: (rm, "RM mini 3", "Broadlink"), - 0x51da: (rm4, "RM4 mini", "Broadlink"), - 0x5f36: (rm4, "RM mini 3", "Broadlink"), + 0x279D: (rm, "RM pro+", "Broadlink"), + 0x27A1: (rm, "RM plus", "Broadlink"), + 0x27A6: (rm, "RM plus", "Broadlink"), + 0x27A9: (rm, "RM pro+", "Broadlink"), + 0x27C2: (rm, "RM mini 3", "Broadlink"), + 0x27C3: (rm, "RM pro+", "Broadlink"), + 0x27C7: (rm, "RM mini 3", "Broadlink"), + 0x27CC: (rm, "RM mini 3", "Broadlink"), + 0x27CD: (rm, "RM mini 3", "Broadlink"), + 0x27D0: (rm, "RM mini 3", "Broadlink"), + 0x27D1: (rm, "RM mini 3", "Broadlink"), + 0x27DE: (rm, "RM mini 3", "Broadlink"), + 0x51DA: (rm4, "RM4 mini", "Broadlink"), + 0x5F36: (rm4, "RM mini 3", "Broadlink"), 0x6026: (rm4, "RM4 pro", "Broadlink"), 0x6070: (rm4, "RM4C mini", "Broadlink"), - 0x610e: (rm4, "RM4 mini", "Broadlink"), - 0x610f: (rm4, "RM4C mini", "Broadlink"), - 0x61a2: (rm4, "RM4 pro", "Broadlink"), - 0x62bc: (rm4, "RM4 mini", "Broadlink"), - 0x62be: (rm4, "RM4C mini", "Broadlink"), - 0x648d: (rm4, "RM4 mini", "Broadlink"), - 0x649b: (rm4, "RM4 pro", "Broadlink"), - 0x653a: (rm4, "RM4 mini", "Broadlink"), + 0x610E: (rm4, "RM4 mini", "Broadlink"), + 0x610F: (rm4, "RM4C mini", "Broadlink"), + 0x61A2: (rm4, "RM4 pro", "Broadlink"), + 0x62BC: (rm4, "RM4 mini", "Broadlink"), + 0x62BE: (rm4, "RM4C mini", "Broadlink"), + 0x648D: (rm4, "RM4 mini", "Broadlink"), + 0x649B: (rm4, "RM4 pro", "Broadlink"), + 0x653A: (rm4, "RM4 mini", "Broadlink"), 0x2714: (a1, "e-Sensor", "Broadlink"), - 0x4eb5: (mp1, "MP1-1K4S", "Broadlink"), - 0x4ef7: (mp1, "MP1-1K4S", "Broadlink (OEM)"), - 0x4f1b: (mp1, "MP1-1K3S2U", "Broadlink (OEM)"), - 0x4f65: (mp1, "MP1-1K3S2U", "Broadlink"), + 0x4EB5: (mp1, "MP1-1K4S", "Broadlink"), + 0x4EF7: (mp1, "MP1-1K4S", "Broadlink (OEM)"), + 0x4F1B: (mp1, "MP1-1K3S2U", "Broadlink (OEM)"), + 0x4F65: (mp1, "MP1-1K3S2U", "Broadlink"), 0x5043: (lb1, "SB800TD", "Broadlink (OEM)"), - 0x504e: (lb1, "LB1", "Broadlink"), - 0x60c7: (lb1, "LB1", "Broadlink"), - 0x60c8: (lb1, "LB1", "Broadlink"), + 0x504E: (lb1, "LB1", "Broadlink"), + 0x60C7: (lb1, "LB1", "Broadlink"), + 0x60C8: (lb1, "LB1", "Broadlink"), 0x6112: (lb1, "LB1", "Broadlink"), 0x2722: (S1C, "S2KIT", "Broadlink"), - 0x4ead: (hysen, "HY02B05H", "Hysen"), - 0x4e4d: (dooya, "DT360E-45/20", "Dooya"), - 0x51e3: (bg1, "BG800/BG900", "BG Electrical"), + 0x4EAD: (hysen, "HY02B05H", "Hysen"), + 0x4E4D: (dooya, "DT360E-45/20", "Dooya"), + 0x51E3: (bg1, "BG800/BG900", "BG Electrical"), } def gendevice( - dev_type: int, - host: Tuple[str, int], - mac: Union[bytes, str], - name: str = None, - is_locked: bool = None, + dev_type: int, + host: Tuple[str, int], + mac: Union[bytes, str], + name: str = None, + is_locked: bool = None, ) -> device: """Generate a device.""" try: @@ -122,10 +122,10 @@ def gendevice( def hello( - host: str, - port: int = 80, - timeout: int = 10, - local_ip_address: str = None, + host: str, + port: int = 80, + timeout: int = 10, + local_ip_address: str = None, ) -> device: """Direct device discovery. @@ -138,31 +138,27 @@ def hello( def discover( - timeout: int = 10, - local_ip_address: str = None, - discover_ip_address: str = '255.255.255.255', - discover_ip_port: int = 80, + timeout: int = 10, + local_ip_address: str = None, + discover_ip_address: str = "255.255.255.255", + discover_ip_port: int = 80, ) -> List[device]: """Discover devices connected to the local network.""" - responses = scan( - timeout, local_ip_address, discover_ip_address, discover_ip_port - ) + responses = scan(timeout, local_ip_address, discover_ip_address, discover_ip_port) return [gendevice(*resp) for resp in responses] def xdiscover( - timeout: int = 10, - local_ip_address: str = None, - discover_ip_address: str = '255.255.255.255', - discover_ip_port: int = 80, + timeout: int = 10, + local_ip_address: str = None, + discover_ip_address: str = "255.255.255.255", + discover_ip_port: int = 80, ) -> Generator[device, None, None]: """Discover devices connected to the local network. This function returns a generator that yields devices instantly. """ - responses = scan( - timeout, local_ip_address, discover_ip_address, discover_ip_port - ) + responses = scan(timeout, local_ip_address, discover_ip_address, discover_ip_port) for resp in responses: yield gendevice(*resp) @@ -191,13 +187,12 @@ def setup(ssid: str, password: str, security_mode: int) -> None: payload[0x85] = pass_length # Character length of password payload[0x86] = security_mode # Type of encryption - checksum = sum(payload, 0xbeaf) & 0xffff - payload[0x20] = checksum & 0xff # Checksum 1 position + checksum = sum(payload, 0xBEAF) & 0xFFFF + payload[0x20] = checksum & 0xFF # Checksum 1 position payload[0x21] = checksum >> 8 # Checksum 2 position - sock = socket.socket(socket.AF_INET, # Internet - socket.SOCK_DGRAM) # UDP + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Internet # UDP sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) - sock.sendto(payload, ('255.255.255.255', 80)) + sock.sendto(payload, ("255.255.255.255", 80)) sock.close() diff --git a/broadlink/alarm.py b/broadlink/alarm.py index faded9d4..e73b8fad 100644 --- a/broadlink/alarm.py +++ b/broadlink/alarm.py @@ -7,21 +7,21 @@ class S1C(device): """Controls a Broadlink S1C.""" _SENSORS_TYPES = { - 0x31: 'Door Sensor', # 49 as hex - 0x91: 'Key Fob', # 145 as hex, as serial on fob corpse - 0x21: 'Motion Sensor' # 33 as hex + 0x31: "Door Sensor", # 49 as hex + 0x91: "Key Fob", # 145 as hex, as serial on fob corpse + 0x21: "Motion Sensor", # 33 as hex } def __init__(self, *args, **kwargs) -> None: """Initialize the controller.""" device.__init__(self, *args, **kwargs) - self.type = 'S1C' + self.type = "S1C" def get_sensors_status(self) -> dict: """Return the state of the sensors.""" packet = bytearray(16) packet[0] = 0x06 # 0x06 - get sensors info, 0x07 - probably add sensors - response = self.send_packet(0x6a, packet) + response = self.send_packet(0x6A, packet) check_error(response[0x22:0x24]) payload = self.decrypt(response[0x38:]) if not payload: @@ -29,20 +29,20 @@ def get_sensors_status(self) -> dict: count = payload[0x4] sensor_data = payload[0x6:] sensors = [ - bytearray(sensor_data[i * 83:(i + 1) * 83]) + bytearray(sensor_data[i * 83 : (i + 1) * 83]) for i in range(len(sensor_data) // 83) ] return { - 'count': count, - 'sensors': [ + "count": count, + "sensors": [ { - 'status': sensor[0], - 'name': sensor[4:26].decode().strip('\x00'), - 'type': self._SENSORS_TYPES.get(sensor[3], 'Unknown'), - 'order': sensor[1], - 'serial': sensor[26:30].hex(), + "status": sensor[0], + "name": sensor[4:26].decode().strip("\x00"), + "type": self._SENSORS_TYPES.get(sensor[3], "Unknown"), + "order": sensor[1], + "serial": sensor[26:30].hex(), } for sensor in sensors if any(sensor[26:30]) - ] + ], } diff --git a/broadlink/climate.py b/broadlink/climate.py index f0c337e5..036cc9a8 100644 --- a/broadlink/climate.py +++ b/broadlink/climate.py @@ -33,19 +33,22 @@ def send_request(self, input_payload: bytes) -> bytes: request_payload.append((crc >> 8) & 0xFF) # send to device - response = self.send_packet(0x6a, request_payload) + response = self.send_packet(0x6A, request_payload) check_error(response[0x22:0x24]) response_payload = self.decrypt(response[0x38:]) # experimental check on CRC in response (first 2 bytes are len, and trailing bytes are crc) response_payload_len = response_payload[0] if response_payload_len + 2 > len(response_payload): - raise ValueError('hysen_response_error', 'first byte of response is not length') + raise ValueError( + "hysen_response_error", "first byte of response is not length" + ) crc = calculate_crc16(response_payload[2:response_payload_len]) if (response_payload[response_payload_len] == crc & 0xFF) and ( - response_payload[response_payload_len + 1] == (crc >> 8) & 0xFF): + response_payload[response_payload_len + 1] == (crc >> 8) & 0xFF + ): return response_payload[2:response_payload_len] - raise ValueError('hysen_response_error', 'CRC check on response failed') + raise ValueError("hysen_response_error", "CRC check on response failed") def get_temp(self) -> int: """Return the room temperature in degrees celsius.""" @@ -64,43 +67,53 @@ def get_full_status(self) -> dict: """ payload = self.send_request(bytearray([0x01, 0x03, 0x00, 0x00, 0x00, 0x16])) data = {} - data['remote_lock'] = payload[3] & 1 - data['power'] = payload[4] & 1 - data['active'] = (payload[4] >> 4) & 1 - data['temp_manual'] = (payload[4] >> 6) & 1 - data['room_temp'] = (payload[5] & 255) / 2.0 - data['thermostat_temp'] = (payload[6] & 255) / 2.0 - data['auto_mode'] = payload[7] & 15 - data['loop_mode'] = (payload[7] >> 4) & 15 - data['sensor'] = payload[8] - data['osv'] = payload[9] - data['dif'] = payload[10] - data['svh'] = payload[11] - data['svl'] = payload[12] - data['room_temp_adj'] = ((payload[13] << 8) + payload[14]) / 2.0 - if data['room_temp_adj'] > 32767: - data['room_temp_adj'] = 32767 - data['room_temp_adj'] - data['fre'] = payload[15] - data['poweron'] = payload[16] - data['unknown'] = payload[17] - data['external_temp'] = (payload[18] & 255) / 2.0 - data['hour'] = payload[19] - data['min'] = payload[20] - data['sec'] = payload[21] - data['dayofweek'] = payload[22] + data["remote_lock"] = payload[3] & 1 + data["power"] = payload[4] & 1 + data["active"] = (payload[4] >> 4) & 1 + data["temp_manual"] = (payload[4] >> 6) & 1 + data["room_temp"] = (payload[5] & 255) / 2.0 + data["thermostat_temp"] = (payload[6] & 255) / 2.0 + data["auto_mode"] = payload[7] & 15 + data["loop_mode"] = (payload[7] >> 4) & 15 + data["sensor"] = payload[8] + data["osv"] = payload[9] + data["dif"] = payload[10] + data["svh"] = payload[11] + data["svl"] = payload[12] + data["room_temp_adj"] = ((payload[13] << 8) + payload[14]) / 2.0 + if data["room_temp_adj"] > 32767: + data["room_temp_adj"] = 32767 - data["room_temp_adj"] + data["fre"] = payload[15] + data["poweron"] = payload[16] + data["unknown"] = payload[17] + data["external_temp"] = (payload[18] & 255) / 2.0 + data["hour"] = payload[19] + data["min"] = payload[20] + data["sec"] = payload[21] + data["dayofweek"] = payload[22] weekday = [] for i in range(0, 6): weekday.append( - {'start_hour': payload[2 * i + 23], 'start_minute': payload[2 * i + 24], 'temp': payload[i + 39] / 2.0}) - - data['weekday'] = weekday + { + "start_hour": payload[2 * i + 23], + "start_minute": payload[2 * i + 24], + "temp": payload[i + 39] / 2.0, + } + ) + + data["weekday"] = weekday weekend = [] for i in range(6, 8): weekend.append( - {'start_hour': payload[2 * i + 23], 'start_minute': payload[2 * i + 24], 'temp': payload[i + 39] / 2.0}) - - data['weekend'] = weekend + { + "start_hour": payload[2 * i + 23], + "start_minute": payload[2 * i + 24], + "temp": payload[i + 39] / 2.0, + } + ) + + data["weekend"] = weekend return data # Change controller mode @@ -140,8 +153,27 @@ def set_advanced( poweron: int, ) -> None: """Set advanced options.""" - input_payload = bytearray([0x01, 0x10, 0x00, 0x02, 0x00, 0x05, 0x0a, loop_mode, sensor, osv, dif, svh, svl, - (int(adj * 2) >> 8 & 0xff), (int(adj * 2) & 0xff), fre, poweron]) + input_payload = bytearray( + [ + 0x01, + 0x10, + 0x00, + 0x02, + 0x00, + 0x05, + 0x0A, + loop_mode, + sensor, + osv, + dif, + svh, + svl, + (int(adj * 2) >> 8 & 0xFF), + (int(adj * 2) & 0xFF), + fre, + poweron, + ] + ) self.send_request(input_payload) # For backwards compatibility only. Prefer calling set_mode directly. @@ -169,7 +201,11 @@ def set_power(self, power: int = 1, remote_lock: int = 0) -> None: # n.b. day=1 is Monday, ..., day=7 is Sunday def set_time(self, hour: int, minute: int, second: int, day: int) -> None: """Set the time.""" - self.send_request(bytearray([0x01, 0x10, 0x00, 0x08, 0x00, 0x02, 0x04, hour, minute, second, day])) + self.send_request( + bytearray( + [0x01, 0x10, 0x00, 0x08, 0x00, 0x02, 0x04, hour, minute, second, day] + ) + ) # Set timer schedule # Format is the same as you get from get_full_status. @@ -180,25 +216,25 @@ def set_time(self, hour: int, minute: int, second: int, day: int) -> None: def set_schedule(self, weekday: List[dict], weekend: List[dict]) -> None: """Set timer schedule.""" # Begin with some magic values ... - input_payload = bytearray([0x01, 0x10, 0x00, 0x0a, 0x00, 0x0c, 0x18]) + input_payload = bytearray([0x01, 0x10, 0x00, 0x0A, 0x00, 0x0C, 0x18]) # Now simply append times/temps # weekday times for i in range(0, 6): - input_payload.append(weekday[i]['start_hour']) - input_payload.append(weekday[i]['start_minute']) + input_payload.append(weekday[i]["start_hour"]) + input_payload.append(weekday[i]["start_minute"]) # weekend times for i in range(0, 2): - input_payload.append(weekend[i]['start_hour']) - input_payload.append(weekend[i]['start_minute']) + input_payload.append(weekend[i]["start_hour"]) + input_payload.append(weekend[i]["start_minute"]) # weekday temperatures for i in range(0, 6): - input_payload.append(int(weekday[i]['temp'] * 2)) + input_payload.append(int(weekday[i]["temp"] * 2)) # weekend temperatures for i in range(0, 2): - input_payload.append(int(weekend[i]['temp'] * 2)) + input_payload.append(int(weekend[i]["temp"] * 2)) self.send_request(input_payload) diff --git a/broadlink/cover.py b/broadlink/cover.py index 236e747c..2691fe97 100644 --- a/broadlink/cover.py +++ b/broadlink/cover.py @@ -17,12 +17,12 @@ def _send(self, magic1: int, magic2: int) -> int: """Send a packet to the device.""" packet = bytearray(16) packet[0] = 0x09 - packet[2] = 0xbb + packet[2] = 0xBB packet[3] = magic1 packet[4] = magic2 - packet[9] = 0xfa + packet[9] = 0xFA packet[10] = 0x44 - response = self.send_packet(0x6a, packet) + response = self.send_packet(0x6A, packet) check_error(response[0x22:0x24]) payload = self.decrypt(response[0x38:]) return payload[4] @@ -41,7 +41,7 @@ def stop(self) -> int: def get_percentage(self) -> int: """Return the position of the curtain.""" - return self._send(0x06, 0x5d) + return self._send(0x06, 0x5D) def set_percentage_and_wait(self, new_percentage: int) -> None: """Set the position of the curtain.""" diff --git a/broadlink/device.py b/broadlink/device.py index 9392024f..e040de79 100644 --- a/broadlink/device.py +++ b/broadlink/device.py @@ -15,10 +15,10 @@ def scan( - timeout: int = 10, - local_ip_address: str = None, - discover_ip_address: str = '255.255.255.255', - discover_ip_port: int = 80, + timeout: int = 10, + local_ip_address: str = None, + discover_ip_address: str = "255.255.255.255", + discover_ip_port: int = 80, ) -> Generator[HelloResponse, None, None]: """Broadcast a hello message and yield responses.""" conn = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) @@ -36,38 +36,38 @@ def scan( timezone = int(time.timezone / -3600) if timezone < 0: - packet[0x08] = 0xff + timezone - 1 - packet[0x09] = 0xff - packet[0x0a] = 0xff - packet[0x0b] = 0xff + packet[0x08] = 0xFF + timezone - 1 + packet[0x09] = 0xFF + packet[0x0A] = 0xFF + packet[0x0B] = 0xFF else: packet[0x08] = timezone packet[0x09] = 0 - packet[0x0a] = 0 - packet[0x0b] = 0 + packet[0x0A] = 0 + packet[0x0B] = 0 year = datetime.now().year - packet[0x0c] = year & 0xff - packet[0x0d] = year >> 8 - packet[0x0e] = datetime.now().minute - packet[0x0f] = datetime.now().hour + packet[0x0C] = year & 0xFF + packet[0x0D] = year >> 8 + packet[0x0E] = datetime.now().minute + packet[0x0F] = datetime.now().hour subyear = str(year)[2:] packet[0x10] = int(subyear) packet[0x11] = datetime.now().isoweekday() packet[0x12] = datetime.now().day packet[0x13] = datetime.now().month - address = local_ip_address.split('.') + address = local_ip_address.split(".") packet[0x18] = int(address[3]) packet[0x19] = int(address[2]) - packet[0x1a] = int(address[1]) - packet[0x1b] = int(address[0]) - packet[0x1c] = port & 0xff - packet[0x1d] = port >> 8 + packet[0x1A] = int(address[1]) + packet[0x1B] = int(address[0]) + packet[0x1C] = port & 0xFF + packet[0x1D] = port >> 8 packet[0x26] = 6 - checksum = sum(packet, 0xbeaf) & 0xffff - packet[0x20] = checksum & 0xff + checksum = sum(packet, 0xBEAF) & 0xFFFF + packet[0x20] = checksum & 0xFF packet[0x21] = checksum >> 8 starttime = time.time() @@ -85,12 +85,12 @@ def scan( break devtype = response[0x34] | response[0x35] << 8 - mac = bytes(reversed(response[0x3a:0x40])) + mac = bytes(reversed(response[0x3A:0x40])) if (host, mac, devtype) in discovered: continue discovered.append((host, mac, devtype)) - name = response[0x40:].split(b'\x00')[0].decode('utf-8') + name = response[0x40:].split(b"\x00")[0].decode("utf-8") is_locked = bool(response[-1]) yield devtype, host, mac, name, is_locked finally: @@ -101,33 +101,33 @@ class device: """Controls a Broadlink device.""" def __init__( - self, - host: Tuple[str, int], - mac: Union[bytes, str], - devtype: int, - timeout: int = 10, - name: str = None, - model: str = None, - manufacturer: str = None, - is_locked: bool = None, + self, + host: Tuple[str, int], + mac: Union[bytes, str], + devtype: int, + timeout: int = 10, + name: str = None, + model: str = None, + manufacturer: str = None, + is_locked: bool = None, ) -> None: """Initialize the controller.""" self.host = host self.mac = bytes.fromhex(mac) if isinstance(mac, str) else mac - self.devtype = devtype if devtype is not None else 0x272a + self.devtype = devtype if devtype is not None else 0x272A self.timeout = timeout self.name = name self.model = model self.manufacturer = manufacturer self.is_locked = is_locked - self.count = random.randrange(0xffff) - self.iv = bytes.fromhex('562e17996d093d28ddb3ba695a2e6f58') + self.count = random.randrange(0xFFFF) + self.iv = bytes.fromhex("562e17996d093d28ddb3ba695a2e6f58") self.id = bytes(4) self.type = "Unknown" self.lock = threading.Lock() self.aes = None - key = bytes.fromhex('097628343fe99e23765c1513accf8b02') + key = bytes.fromhex("097628343fe99e23765c1513accf8b02") self.update_aes(key) def __repr__(self): @@ -138,7 +138,7 @@ def __repr__(self): hex(self.devtype), self.host[0], self.host[1], - ':'.join(format(x, '02x') for x in self.mac), + ":".join(format(x, "02x") for x in self.mac), self.name, "Locked" if self.is_locked else "Unlocked", ) @@ -175,24 +175,24 @@ def auth(self) -> bool: payload[0x07] = 0x31 payload[0x08] = 0x31 payload[0x09] = 0x31 - payload[0x0a] = 0x31 - payload[0x0b] = 0x31 - payload[0x0c] = 0x31 - payload[0x0d] = 0x31 - payload[0x0e] = 0x31 - payload[0x0f] = 0x31 + payload[0x0A] = 0x31 + payload[0x0B] = 0x31 + payload[0x0C] = 0x31 + payload[0x0D] = 0x31 + payload[0x0E] = 0x31 + payload[0x0F] = 0x31 payload[0x10] = 0x31 payload[0x11] = 0x31 payload[0x12] = 0x31 - payload[0x1e] = 0x01 - payload[0x2d] = 0x01 - payload[0x30] = ord('T') - payload[0x31] = ord('e') - payload[0x32] = ord('s') - payload[0x33] = ord('t') - payload[0x34] = ord(' ') - payload[0x35] = ord(' ') - payload[0x36] = ord('1') + payload[0x1E] = 0x01 + payload[0x2D] = 0x01 + payload[0x30] = ord("T") + payload[0x31] = ord("e") + payload[0x32] = ord("s") + payload[0x33] = ord("t") + payload[0x34] = ord(" ") + payload[0x35] = ord(" ") + payload[0x36] = ord("1") response = self.send_packet(0x65, payload) check_error(response[0x22:0x24]) @@ -233,7 +233,7 @@ def hello(self, local_ip_address=None) -> bool: def get_fwversion(self) -> int: """Get firmware version.""" packet = bytearray([0x68]) - response = self.send_packet(0x6a, packet) + response = self.send_packet(0x6A, packet) check_error(response[0x22:0x24]) payload = self.decrypt(response[0x38:]) return payload[0x4] | payload[0x5] << 8 @@ -241,20 +241,20 @@ def get_fwversion(self) -> int: def set_name(self, name: str) -> None: """Set device name.""" packet = bytearray(4) - packet += name.encode('utf-8') + packet += name.encode("utf-8") packet += bytearray(0x50 - len(packet)) packet[0x43] = bool(self.is_locked) - response = self.send_packet(0x6a, packet) + response = self.send_packet(0x6A, packet) check_error(response[0x22:0x24]) self.name = name def set_lock(self, state: bool) -> None: """Lock/unlock the device.""" packet = bytearray(4) - packet += self.name.encode('utf-8') + packet += self.name.encode("utf-8") packet += bytearray(0x50 - len(packet)) packet[0x43] = bool(state) - response = self.send_packet(0x6a, packet) + response = self.send_packet(0x6A, packet) check_error(response[0x22:0x24]) self.is_locked = bool(state) @@ -264,27 +264,27 @@ def get_type(self) -> str: def send_packet(self, command: int, payload: bytes) -> bytes: """Send a packet to the device.""" - self.count = (self.count + 1) & 0xffff + self.count = (self.count + 1) & 0xFFFF packet = bytearray(0x38) - packet[0x00] = 0x5a - packet[0x01] = 0xa5 - packet[0x02] = 0xaa + packet[0x00] = 0x5A + packet[0x01] = 0xA5 + packet[0x02] = 0xAA packet[0x03] = 0x55 - packet[0x04] = 0x5a - packet[0x05] = 0xa5 - packet[0x06] = 0xaa + packet[0x04] = 0x5A + packet[0x05] = 0xA5 + packet[0x06] = 0xAA packet[0x07] = 0x55 - packet[0x24] = self.devtype & 0xff + packet[0x24] = self.devtype & 0xFF packet[0x25] = self.devtype >> 8 packet[0x26] = command - packet[0x28] = self.count & 0xff + packet[0x28] = self.count & 0xFF packet[0x29] = self.count >> 8 - packet[0x2a] = self.mac[5] - packet[0x2b] = self.mac[4] - packet[0x2c] = self.mac[3] - packet[0x2d] = self.mac[2] - packet[0x2e] = self.mac[1] - packet[0x2f] = self.mac[0] + packet[0x2A] = self.mac[5] + packet[0x2B] = self.mac[4] + packet[0x2C] = self.mac[3] + packet[0x2D] = self.mac[2] + packet[0x2E] = self.mac[1] + packet[0x2F] = self.mac[0] packet[0x30] = self.id[3] packet[0x31] = self.id[2] packet[0x32] = self.id[1] @@ -296,16 +296,16 @@ def send_packet(self, command: int, payload: bytes) -> bytes: payload = bytearray(payload) payload += bytearray(padding) - checksum = sum(payload, 0xbeaf) & 0xffff - packet[0x34] = checksum & 0xff + checksum = sum(payload, 0xBEAF) & 0xFFFF + packet[0x34] = checksum & 0xFF packet[0x35] = checksum >> 8 payload = self.encrypt(payload) for i in range(len(payload)): packet.append(payload[i]) - checksum = sum(packet, 0xbeaf) & 0xffff - packet[0x20] = checksum & 0xff + checksum = sum(packet, 0xBEAF) & 0xFFFF + packet[0x20] = checksum & 0xFF packet[0x21] = checksum >> 8 start_time = time.time() @@ -329,7 +329,7 @@ def send_packet(self, command: int, payload: bytes) -> bytes: raise exception(-4007) # Length error. checksum = resp[0x20] | (resp[0x21] << 8) - if sum(resp, 0xbeaf) - sum(resp[0x20:0x22]) & 0xffff != checksum: + if sum(resp, 0xBEAF) - sum(resp[0x20:0x22]) & 0xFFFF != checksum: raise exception(-4008) # Checksum error. return resp diff --git a/broadlink/light.py b/broadlink/light.py index cffc77d5..adadfab5 100644 --- a/broadlink/light.py +++ b/broadlink/light.py @@ -11,14 +11,14 @@ class lb1(device): state_dict = [] effect_map_dict = { - 'lovely color': 0, - 'flashlight': 1, - 'lightning': 2, - 'color fading': 3, - 'color breathing': 4, - 'multicolor breathing': 5, - 'color jumping': 6, - 'multicolor jumping': 7, + "lovely color": 0, + "flashlight": 1, + "lightning": 2, + "color fading": 3, + "color breathing": 4, + "multicolor breathing": 5, + "color jumping": 6, + "multicolor jumping": 7, } def __init__(self, *args, **kwargs) -> None: @@ -26,36 +26,38 @@ def __init__(self, *args, **kwargs) -> None: device.__init__(self, *args, **kwargs) self.type = "SmartBulb" - def send_command(self, command: str, type: str = 'set') -> None: + def send_command(self, command: str, type: str = "set") -> None: """Send a command to the device.""" - packet = bytearray(16+(int(len(command)/16) + 1)*16) - packet[0x00] = 0x0c + len(command) & 0xff - packet[0x02] = 0xa5 - packet[0x03] = 0xa5 - packet[0x04] = 0x5a - packet[0x05] = 0x5a - packet[0x08] = 0x02 if type == "set" else 0x01 # 0x01 => query, # 0x02 => set - packet[0x09] = 0x0b - packet[0x0a] = len(command) - packet[0x0e:] = map(ord, command) + packet = bytearray(16 + (int(len(command) / 16) + 1) * 16) + packet[0x00] = 0x0C + len(command) & 0xFF + packet[0x02] = 0xA5 + packet[0x03] = 0xA5 + packet[0x04] = 0x5A + packet[0x05] = 0x5A + packet[0x08] = 0x02 if type == "set" else 0x01 # 0x01 => query, # 0x02 => set + packet[0x09] = 0x0B + packet[0x0A] = len(command) + packet[0x0E:] = map(ord, command) - checksum = sum(packet, 0xbeaf) & 0xffff - packet[0x06] = checksum & 0xff # Checksum 1 position + checksum = sum(packet, 0xBEAF) & 0xFFFF + packet[0x06] = checksum & 0xFF # Checksum 1 position packet[0x07] = checksum >> 8 # Checksum 2 position - response = self.send_packet(0x6a, packet) + response = self.send_packet(0x6A, packet) check_error(response[0x36:0x38]) payload = self.decrypt(response[0x38:]) - responseLength = int(payload[0x0a]) | (int(payload[0x0b]) << 8) + responseLength = int(payload[0x0A]) | (int(payload[0x0B]) << 8) if responseLength > 0: - self.state_dict = json.loads(payload[0x0e:0x0e+responseLength]) + self.state_dict = json.loads(payload[0x0E : 0x0E + responseLength]) def set_json(self, jsonstr: str) -> str: """Send a command to the device and return state.""" reconvert = json.loads(jsonstr) - if 'bulb_sceneidx' in reconvert.keys(): - reconvert['bulb_sceneidx'] = self.effect_map_dict.get(reconvert['bulb_sceneidx'], 255) + if "bulb_sceneidx" in reconvert.keys(): + reconvert["bulb_sceneidx"] = self.effect_map_dict.get( + reconvert["bulb_sceneidx"], 255 + ) self.send_command(json.dumps(reconvert)) return json.dumps(self.state_dict) diff --git a/broadlink/remote.py b/broadlink/remote.py index 4a427f69..b6528d68 100644 --- a/broadlink/remote.py +++ b/broadlink/remote.py @@ -17,45 +17,45 @@ def check_data(self) -> bytes: """Return the last captured code.""" packet = bytearray(self._request_header) packet.append(0x04) - response = self.send_packet(0x6a, packet) + response = self.send_packet(0x6A, packet) check_error(response[0x22:0x24]) payload = self.decrypt(response[0x38:]) - return payload[len(self._request_header) + 4:] + return payload[len(self._request_header) + 4 :] def send_data(self, data: bytes) -> None: """Send a code to the device.""" packet = bytearray(self._code_sending_header) packet += bytearray([0x02, 0x00, 0x00, 0x00]) packet += data - response = self.send_packet(0x6a, packet) + response = self.send_packet(0x6A, packet) check_error(response[0x22:0x24]) def enter_learning(self) -> None: """Enter infrared learning mode.""" packet = bytearray(self._request_header) packet.append(0x03) - response = self.send_packet(0x6a, packet) + response = self.send_packet(0x6A, packet) check_error(response[0x22:0x24]) def sweep_frequency(self) -> None: """Sweep frequency.""" packet = bytearray(self._request_header) packet.append(0x19) - response = self.send_packet(0x6a, packet) + response = self.send_packet(0x6A, packet) check_error(response[0x22:0x24]) def cancel_sweep_frequency(self) -> None: """Cancel sweep frequency.""" packet = bytearray(self._request_header) - packet.append(0x1e) - response = self.send_packet(0x6a, packet) + packet.append(0x1E) + response = self.send_packet(0x6A, packet) check_error(response[0x22:0x24]) def check_frequency(self) -> bool: """Return True if the frequency was identified successfully.""" packet = bytearray(self._request_header) - packet.append(0x1a) - response = self.send_packet(0x6a, packet) + packet.append(0x1A) + response = self.send_packet(0x6A, packet) check_error(response[0x22:0x24]) payload = self.decrypt(response[0x38:]) if payload[len(self._request_header) + 4] == 1: @@ -65,8 +65,8 @@ def check_frequency(self) -> bool: def find_rf_packet(self) -> bool: """Enter radiofrequency learning mode.""" packet = bytearray(self._request_header) - packet.append(0x1b) - response = self.send_packet(0x6a, packet) + packet.append(0x1B) + response = self.send_packet(0x6A, packet) check_error(response[0x22:0x24]) payload = self.decrypt(response[0x38:]) if payload[len(self._request_header) + 4] == 1: @@ -77,10 +77,10 @@ def _check_sensors(self, command: int) -> bytes: """Return the state of the sensors in raw format.""" packet = bytearray(self._request_header) packet.append(command) - response = self.send_packet(0x6a, packet) + response = self.send_packet(0x6A, packet) check_error(response[0x22:0x24]) payload = self.decrypt(response[0x38:]) - return bytearray(payload[len(self._request_header) + 4:]) + return bytearray(payload[len(self._request_header) + 4 :]) def check_temperature(self) -> int: """Return the temperature.""" @@ -90,7 +90,7 @@ def check_temperature(self) -> int: def check_sensors(self) -> dict: """Return the state of the sensors.""" data = self._check_sensors(0x1) - return {'temperature': data[0x0] + data[0x1] / 10.0} + return {"temperature": data[0x0] + data[0x1] / 10.0} class rm4(rm): @@ -100,8 +100,8 @@ def __init__(self, *args, **kwargs) -> None: """Initialize the controller.""" device.__init__(self, *args, **kwargs) self.type = "RM4" - self._request_header = b'\x04\x00' - self._code_sending_header = b'\xda\x00' + self._request_header = b"\x04\x00" + self._code_sending_header = b"\xda\x00" def check_temperature(self) -> int: """Return the temperature.""" @@ -117,6 +117,6 @@ def check_sensors(self) -> dict: """Return the state of the sensors.""" data = self._check_sensors(0x24) return { - 'temperature': data[0x0] + data[0x1] / 100.0, - 'humidity': data[0x2] + data[0x3] / 100.0 + "temperature": data[0x0] + data[0x1] / 100.0, + "humidity": data[0x2] + data[0x3] / 100.0, } diff --git a/broadlink/sensor.py b/broadlink/sensor.py index 63c23b4b..ef7f6f12 100644 --- a/broadlink/sensor.py +++ b/broadlink/sensor.py @@ -7,9 +7,9 @@ class a1(device): """Controls a Broadlink A1.""" _SENSORS_AND_LEVELS = ( - ('light', ('dark', 'dim', 'normal', 'bright')), - ('air_quality', ('excellent', 'good', 'normal', 'bad')), - ('noise', ('quiet', 'normal', 'noisy')), + ("light", ("dark", "dim", "normal", "bright")), + ("air_quality", ("excellent", "good", "normal", "bad")), + ("noise", ("quiet", "normal", "noisy")), ) def __init__(self, *args, **kwargs) -> None: @@ -24,20 +24,20 @@ def check_sensors(self) -> dict: try: data[sensor] = levels[data[sensor]] except IndexError: - data[sensor] = 'unknown' + data[sensor] = "unknown" return data def check_sensors_raw(self) -> dict: """Return the state of the sensors in raw format.""" packet = bytearray([0x1]) - response = self.send_packet(0x6a, packet) + response = self.send_packet(0x6A, packet) check_error(response[0x22:0x24]) payload = self.decrypt(response[0x38:]) data = bytearray(payload[0x4:]) return { - 'temperature': data[0x0] + data[0x1] / 10.0, - 'humidity': data[0x2] + data[0x3] / 10.0, - 'light': data[0x4], - 'air_quality': data[0x6], - 'noise': data[0x8], + "temperature": data[0x0] + data[0x1] / 10.0, + "humidity": data[0x2] + data[0x3] / 10.0, + "light": data[0x4], + "air_quality": data[0x6], + "noise": data[0x8], } diff --git a/broadlink/switch.py b/broadlink/switch.py index efea03d7..96225451 100644 --- a/broadlink/switch.py +++ b/broadlink/switch.py @@ -17,19 +17,19 @@ def __init__(self, *args, **kwargs) -> None: def set_power_mask(self, sid_mask: int, state: bool) -> None: """Set the power state of the device.""" packet = bytearray(16) - packet[0x00] = 0x0d - packet[0x02] = 0xa5 - packet[0x03] = 0xa5 - packet[0x04] = 0x5a - packet[0x05] = 0x5a - packet[0x06] = 0xb2 + ((sid_mask << 1) if state else sid_mask) - packet[0x07] = 0xc0 + packet[0x00] = 0x0D + packet[0x02] = 0xA5 + packet[0x03] = 0xA5 + packet[0x04] = 0x5A + packet[0x05] = 0x5A + packet[0x06] = 0xB2 + ((sid_mask << 1) if state else sid_mask) + packet[0x07] = 0xC0 packet[0x08] = 0x02 - packet[0x0a] = 0x03 - packet[0x0d] = sid_mask - packet[0x0e] = sid_mask if state else 0 + packet[0x0A] = 0x03 + packet[0x0D] = sid_mask + packet[0x0E] = sid_mask if state else 0 - response = self.send_packet(0x6a, packet) + response = self.send_packet(0x6A, packet) check_error(response[0x22:0x24]) def set_power(self, sid: int, state: bool) -> None: @@ -40,30 +40,30 @@ def set_power(self, sid: int, state: bool) -> None: def check_power_raw(self) -> bool: """Return the power state of the device in raw format.""" packet = bytearray(16) - packet[0x00] = 0x0a - packet[0x02] = 0xa5 - packet[0x03] = 0xa5 - packet[0x04] = 0x5a - packet[0x05] = 0x5a - packet[0x06] = 0xae - packet[0x07] = 0xc0 + packet[0x00] = 0x0A + packet[0x02] = 0xA5 + packet[0x03] = 0xA5 + packet[0x04] = 0x5A + packet[0x05] = 0x5A + packet[0x06] = 0xAE + packet[0x07] = 0xC0 packet[0x08] = 0x01 - response = self.send_packet(0x6a, packet) + response = self.send_packet(0x6A, packet) check_error(response[0x22:0x24]) payload = self.decrypt(response[0x38:]) - return payload[0x0e] + return payload[0x0E] def check_power(self) -> dict: """Return the power state of the device.""" state = self.check_power_raw() if state is None: - return {'s1': None, 's2': None, 's3': None, 's4': None} + return {"s1": None, "s2": None, "s3": None, "s4": None} data = {} - data['s1'] = bool(state & 0x01) - data['s2'] = bool(state & 0x02) - data['s3'] = bool(state & 0x04) - data['s4'] = bool(state & 0x08) + data["s1"] = bool(state & 0x01) + data["s2"] = bool(state & 0x02) + data["s3"] = bool(state & 0x04) + data["s4"] = bool(state & 0x08) return data @@ -80,8 +80,8 @@ def get_state(self) -> dict: Example: `{"pwr":1,"pwr1":1,"pwr2":0,"maxworktime":60,"maxworktime1":60,"maxworktime2":0,"idcbrightness":50}` """ - packet = self._encode(1, b'{}') - response = self.send_packet(0x6a, packet) + packet = self._encode(1, b"{}") + response = self.send_packet(0x6A, packet) check_error(response[0x22:0x24]) return self._decode(response) @@ -98,22 +98,22 @@ def set_state( """Set the power state of the device.""" data = {} if pwr is not None: - data['pwr'] = int(bool(pwr)) + data["pwr"] = int(bool(pwr)) if pwr1 is not None: - data['pwr1'] = int(bool(pwr1)) + data["pwr1"] = int(bool(pwr1)) if pwr2 is not None: - data['pwr2'] = int(bool(pwr2)) + data["pwr2"] = int(bool(pwr2)) if maxworktime is not None: - data['maxworktime'] = maxworktime + data["maxworktime"] = maxworktime if maxworktime1 is not None: - data['maxworktime1'] = maxworktime1 + data["maxworktime1"] = maxworktime1 if maxworktime2 is not None: - data['maxworktime2'] = maxworktime2 + data["maxworktime2"] = maxworktime2 if idcbrightness is not None: - data['idcbrightness'] = idcbrightness - js = json.dumps(data).encode('utf8') + data["idcbrightness"] = idcbrightness + js = json.dumps(data).encode("utf8") packet = self._encode(2, js) - response = self.send_packet(0x6a, packet) + response = self.send_packet(0x6A, packet) check_error(response[0x22:0x24]) return self._decode(response) @@ -129,20 +129,22 @@ def _encode(self, flag: int, js: str) -> bytes: # 0x0e- json data packet = bytearray(14) length = 4 + 2 + 2 + 4 + len(js) - struct.pack_into('> 8 return packet def _decode(self, response: bytes) -> dict: """Decode a message.""" payload = self.decrypt(response[0x38:]) - js_len = struct.unpack_from(' None: packet[4] = 3 if state else 2 else: packet[4] = 1 if state else 0 - response = self.send_packet(0x6a, packet) + response = self.send_packet(0x6A, packet) check_error(response[0x22:0x24]) def set_nightlight(self, state: bool) -> None: @@ -189,14 +191,14 @@ def set_nightlight(self, state: bool) -> None: packet[4] = 3 if state else 1 else: packet[4] = 2 if state else 0 - response = self.send_packet(0x6a, packet) + response = self.send_packet(0x6A, packet) check_error(response[0x22:0x24]) def check_power(self) -> bool: """Return the power state of the device.""" packet = bytearray(16) packet[0] = 1 - response = self.send_packet(0x6a, packet) + response = self.send_packet(0x6A, packet) check_error(response[0x22:0x24]) payload = self.decrypt(response[0x38:]) return bool(payload[0x4] == 1 or payload[0x4] == 3 or payload[0x4] == 0xFD) @@ -205,7 +207,7 @@ def check_nightlight(self) -> bool: """Return the state of the night light.""" packet = bytearray(16) packet[0] = 1 - response = self.send_packet(0x6a, packet) + response = self.send_packet(0x6A, packet) check_error(response[0x22:0x24]) payload = self.decrypt(response[0x38:]) return bool(payload[0x4] == 2 or payload[0x4] == 3 or payload[0x4] == 0xFF) @@ -213,14 +215,17 @@ def check_nightlight(self) -> bool: def get_energy(self) -> int: """Return the energy state of the device.""" packet = bytearray([8, 0, 254, 1, 5, 1, 0, 0, 0, 45]) - response = self.send_packet(0x6a, packet) + response = self.send_packet(0x6A, packet) check_error(response[0x22:0x24]) payload = self.decrypt(response[0x38:]) - return int(hex(payload[0x07] * 256 + payload[0x06])[2:]) + int(hex(payload[0x05])[2:]) / 100.0 + return ( + int(hex(payload[0x07] * 256 + payload[0x06])[2:]) + + int(hex(payload[0x05])[2:]) / 100.0 + ) class sp4(device): - """Controls a Broadlink SP4.""" + """Controls a Broadlink SP4.""" def __init__(self, *args, **kwargs) -> None: """Initialize the controller.""" @@ -314,7 +319,18 @@ def _encode(self, flag: int, state: dict) -> bytes: payload = json.dumps(state, separators=(",", ":")).encode() packet = bytearray(14) length = 4 + 2 + 2 + 4 + len(payload) - struct.pack_into('=2.1.1'], - description='Python API for controlling Broadlink IR controllers', + install_requires=["cryptography>=2.1.1"], + description="Python API for controlling Broadlink IR controllers", classifiers=[ - 'Development Status :: 4 - Beta', - 'Intended Audience :: Developers', - 'License :: OSI Approved :: MIT License', - 'Operating System :: OS Independent', - 'Programming Language :: Python', + "Development Status :: 4 - Beta", + "Intended Audience :: Developers", + "License :: OSI Approved :: MIT License", + "Operating System :: OS Independent", + "Programming Language :: Python", ], include_package_data=True, zip_safe=False, From 8c594f197463a527a8e815b354b99645bc3c4190 Mon Sep 17 00:00:00 2001 From: Felipe Martins Diel <41558831+felipediel@users.noreply.github.com> Date: Sat, 7 Nov 2020 05:02:53 -0300 Subject: [PATCH 03/13] Clean up get_energy() (#471) --- broadlink/switch.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/broadlink/switch.py b/broadlink/switch.py index 96225451..fe693abd 100644 --- a/broadlink/switch.py +++ b/broadlink/switch.py @@ -212,16 +212,14 @@ def check_nightlight(self) -> bool: payload = self.decrypt(response[0x38:]) return bool(payload[0x4] == 2 or payload[0x4] == 3 or payload[0x4] == 0xFF) - def get_energy(self) -> int: - """Return the energy state of the device.""" + def get_energy(self) -> float: + """Return the power consumption in W.""" packet = bytearray([8, 0, 254, 1, 5, 1, 0, 0, 0, 45]) response = self.send_packet(0x6A, packet) check_error(response[0x22:0x24]) payload = self.decrypt(response[0x38:]) - return ( - int(hex(payload[0x07] * 256 + payload[0x06])[2:]) - + int(hex(payload[0x05])[2:]) / 100.0 - ) + energy = payload[0x7:0x4:-1].hex() + return int(energy) / 100 class sp4(device): From d5e241a09496d47ceeced068d3749f180c663e7b Mon Sep 17 00:00:00 2001 From: Felipe Martins Diel <41558831+felipediel@users.noreply.github.com> Date: Mon, 16 Nov 2020 17:04:17 -0300 Subject: [PATCH 04/13] Add new product ids Add support for Broadlink RM4C mini (0x6539) Add support for Broadlink RM4 pro (0x653C) Add support for Broadlink RM4S (0x6364) Add support for Broadlink MCB1 (0x756F) Add support for Broadlink MCB1 (0xA56A) Add support for Broadlink RM mini 3 (0x6508) Add support for Efergy Ego (0x271D) Add support for Broadlink SCB1E (0x5115) Add support for Broadlink SCB1E (0x6113) Add support for Broadlink SP4L-EU (0x618B) Add support for Broadlink SP4L-UK (0xA589) Add support for Broadlink RM3 mini (0x27d3) Add support for Broadlink SP4L-AU (0x6489) Add support for BG Electrical AHC/U-01 (0x51E2) Add support for Broadlink MCB1 (0x6111) --- broadlink/__init__.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/broadlink/__init__.py b/broadlink/__init__.py index 74ae1fa6..11172514 100644 --- a/broadlink/__init__.py +++ b/broadlink/__init__.py @@ -21,6 +21,7 @@ 0x2717: (sp2, "NEO", "Ankuoo"), 0x2719: (sp2, "SP2-compatible", "Honeywell"), 0x271A: (sp2, "SP2-compatible", "Honeywell"), + 0x271D: (sp2, "Ego", "Efergy"), 0x2720: (sp2, "SP mini", "Broadlink"), 0x2728: (sp2, "SP2-compatible", "URANT"), 0x2733: (sp2, "SP3", "Broadlink"), @@ -41,9 +42,18 @@ 0x9479: (sp2, "SP3S-US", "Broadlink"), 0x947A: (sp2, "SP3S-EU", "Broadlink"), 0x756C: (sp4, "SP4M", "Broadlink"), + 0x756F: (sp4, "MCB1", "Broadlink"), 0x7579: (sp4, "SP4L-EU", "Broadlink"), 0x7583: (sp4, "SP mini 3", "Broadlink"), 0x7D11: (sp4, "SP mini 3", "Broadlink"), + 0xA56A: (sp4, "MCB1", "Broadlink"), + 0xA589: (sp4, "SP4L-UK", "Broadlink"), + 0x5115: (sp4b, "SCB1E", "Broadlink"), + 0x51E2: (sp4b, "AHC/U-01", "BG Electrical"), + 0x6111: (sp4b, "MCB1", "Broadlink"), + 0x6113: (sp4b, "SCB1E", "Broadlink"), + 0x618B: (sp4b, "SP4L-EU", "Broadlink"), + 0x6489: (sp4b, "SP4L-AU", "Broadlink"), 0x648B: (sp4b, "SP4M-US", "Broadlink"), 0x2712: (rm, "RM pro/pro+", "Broadlink"), 0x272A: (rm, "RM pro", "Broadlink"), @@ -66,6 +76,7 @@ 0x27CD: (rm, "RM mini 3", "Broadlink"), 0x27D0: (rm, "RM mini 3", "Broadlink"), 0x27D1: (rm, "RM mini 3", "Broadlink"), + 0x27D3: (rm, "RM mini 3", "Broadlink"), 0x27DE: (rm, "RM mini 3", "Broadlink"), 0x51DA: (rm4, "RM4 mini", "Broadlink"), 0x5F36: (rm4, "RM mini 3", "Broadlink"), @@ -76,9 +87,13 @@ 0x61A2: (rm4, "RM4 pro", "Broadlink"), 0x62BC: (rm4, "RM4 mini", "Broadlink"), 0x62BE: (rm4, "RM4C mini", "Broadlink"), + 0x6364: (rm4, "RM4S", "Broadlink"), 0x648D: (rm4, "RM4 mini", "Broadlink"), 0x649B: (rm4, "RM4 pro", "Broadlink"), + 0x6508: (rm4, "RM mini 3", "Broadlink"), + 0x6539: (rm4, "RM4C mini", "Broadlink"), 0x653A: (rm4, "RM4 mini", "Broadlink"), + 0x653C: (rm4, "RM4 pro", "Broadlink"), 0x2714: (a1, "e-Sensor", "Broadlink"), 0x4EB5: (mp1, "MP1-1K4S", "Broadlink"), 0x4EF7: (mp1, "MP1-1K4S", "Broadlink (OEM)"), From 381d9b86011de248bec1278906fd5621753965e7 Mon Sep 17 00:00:00 2001 From: Felipe Martins Diel Date: Sun, 29 Nov 2020 13:51:09 -0300 Subject: [PATCH 05/13] Clamp the packet count between 0x8000 and 0xFFFF --- broadlink/device.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/broadlink/device.py b/broadlink/device.py index e040de79..5582209d 100644 --- a/broadlink/device.py +++ b/broadlink/device.py @@ -120,7 +120,7 @@ def __init__( self.model = model self.manufacturer = manufacturer self.is_locked = is_locked - self.count = random.randrange(0xFFFF) + self.count = random.randint(0x8000, 0xFFFF) self.iv = bytes.fromhex("562e17996d093d28ddb3ba695a2e6f58") self.id = bytes(4) self.type = "Unknown" @@ -202,7 +202,6 @@ def auth(self) -> bool: if len(key) % 16 != 0: return False - self.count = int.from_bytes(response[0x28:0x30], "little") self.id = payload[0x03::-1] self.update_aes(key) return True @@ -264,7 +263,7 @@ def get_type(self) -> str: def send_packet(self, command: int, payload: bytes) -> bytes: """Send a packet to the device.""" - self.count = (self.count + 1) & 0xFFFF + self.count = ((self.count + 1) | 0x8000) & 0xFFFF packet = bytearray(0x38) packet[0x00] = 0x5A packet[0x01] = 0xA5 From 5ef9200731e6f52e0c91505f3105bfd06d741550 Mon Sep 17 00:00:00 2001 From: Felipe Martins Diel <41558831+felipediel@users.noreply.github.com> Date: Fri, 27 Nov 2020 17:16:52 -0300 Subject: [PATCH 06/13] Fix MP1's check_power_raw() annotation This method returns an integer. --- broadlink/switch.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/broadlink/switch.py b/broadlink/switch.py index fe693abd..41e2c5b8 100644 --- a/broadlink/switch.py +++ b/broadlink/switch.py @@ -37,7 +37,7 @@ def set_power(self, sid: int, state: bool) -> None: sid_mask = 0x01 << (sid - 1) self.set_power_mask(sid_mask, state) - def check_power_raw(self) -> bool: + def check_power_raw(self) -> int: """Return the power state of the device in raw format.""" packet = bytearray(16) packet[0x00] = 0x0A From 89892f17838d0b1cd4ea1ad0e41b848b6fb23c7d Mon Sep 17 00:00:00 2001 From: Felipe Martins Diel Date: Sat, 9 Jan 2021 19:21:54 -0300 Subject: [PATCH 07/13] Improve RM4 communication --- broadlink/remote.py | 110 +++++++++++++++++--------------------------- 1 file changed, 42 insertions(+), 68 deletions(-) diff --git a/broadlink/remote.py b/broadlink/remote.py index b6528d68..88498b61 100644 --- a/broadlink/remote.py +++ b/broadlink/remote.py @@ -1,4 +1,6 @@ """Support for universal remotes.""" +import struct + from .device import device from .exceptions import check_error @@ -10,87 +12,53 @@ def __init__(self, *args, **kwargs) -> None: """Initialize the controller.""" device.__init__(self, *args, **kwargs) self.type = "RM2" - self._request_header = bytes() - self._code_sending_header = bytes() + + def _send(self, command: int, data: bytes = b'') -> bytes: + """Send a packet to the device.""" + packet = struct.pack(" bytes: """Return the last captured code.""" - packet = bytearray(self._request_header) - packet.append(0x04) - response = self.send_packet(0x6A, packet) - check_error(response[0x22:0x24]) - payload = self.decrypt(response[0x38:]) - return payload[len(self._request_header) + 4 :] + return self._send(0x4) def send_data(self, data: bytes) -> None: """Send a code to the device.""" - packet = bytearray(self._code_sending_header) - packet += bytearray([0x02, 0x00, 0x00, 0x00]) - packet += data - response = self.send_packet(0x6A, packet) - check_error(response[0x22:0x24]) + self._send(0x2, data) def enter_learning(self) -> None: """Enter infrared learning mode.""" - packet = bytearray(self._request_header) - packet.append(0x03) - response = self.send_packet(0x6A, packet) - check_error(response[0x22:0x24]) + self._send(0x3) def sweep_frequency(self) -> None: """Sweep frequency.""" - packet = bytearray(self._request_header) - packet.append(0x19) - response = self.send_packet(0x6A, packet) - check_error(response[0x22:0x24]) + self._send(0x19) def cancel_sweep_frequency(self) -> None: """Cancel sweep frequency.""" - packet = bytearray(self._request_header) - packet.append(0x1E) - response = self.send_packet(0x6A, packet) - check_error(response[0x22:0x24]) + self._send(0x1E) def check_frequency(self) -> bool: """Return True if the frequency was identified successfully.""" - packet = bytearray(self._request_header) - packet.append(0x1A) - response = self.send_packet(0x6A, packet) - check_error(response[0x22:0x24]) - payload = self.decrypt(response[0x38:]) - if payload[len(self._request_header) + 4] == 1: - return True - return False + resp = self._send(0x1A) + return resp[0] == 1 def find_rf_packet(self) -> bool: """Enter radiofrequency learning mode.""" - packet = bytearray(self._request_header) - packet.append(0x1B) - response = self.send_packet(0x6A, packet) - check_error(response[0x22:0x24]) - payload = self.decrypt(response[0x38:]) - if payload[len(self._request_header) + 4] == 1: - return True - return False - - def _check_sensors(self, command: int) -> bytes: - """Return the state of the sensors in raw format.""" - packet = bytearray(self._request_header) - packet.append(command) - response = self.send_packet(0x6A, packet) - check_error(response[0x22:0x24]) - payload = self.decrypt(response[0x38:]) - return bytearray(payload[len(self._request_header) + 4 :]) - - def check_temperature(self) -> int: + resp = self._send(0x1B) + return resp[0] == 1 + + def check_temperature(self) -> float: """Return the temperature.""" - data = self._check_sensors(0x1) - return data[0x0] + data[0x1] / 10.0 + return self.check_sensors()["temperature"] def check_sensors(self) -> dict: """Return the state of the sensors.""" - data = self._check_sensors(0x1) - return {"temperature": data[0x0] + data[0x1] / 10.0} + resp = self._send(0x1) + return {"temperature": resp[0x0] + resp[0x1] / 10.0} class rm4(rm): @@ -100,23 +68,29 @@ def __init__(self, *args, **kwargs) -> None: """Initialize the controller.""" device.__init__(self, *args, **kwargs) self.type = "RM4" - self._request_header = b"\x04\x00" - self._code_sending_header = b"\xda\x00" - def check_temperature(self) -> int: - """Return the temperature.""" - data = self._check_sensors(0x24) - return data[0x0] + data[0x1] / 100.0 + def _send(self, command: int, data: bytes = b'') -> bytes: + """Send a packet to the device.""" + packet = struct.pack(" bool: + """Enter radiofrequency learning mode.""" + self._send(0x1B) + return True - def check_humidity(self) -> int: + def check_humidity(self) -> float: """Return the humidity.""" - data = self._check_sensors(0x24) - return data[0x2] + data[0x3] / 100.0 + return self.check_sensors()["humidity"] def check_sensors(self) -> dict: """Return the state of the sensors.""" - data = self._check_sensors(0x24) + resp = self._send(0x24) return { - "temperature": data[0x0] + data[0x1] / 100.0, - "humidity": data[0x2] + data[0x3] / 100.0, + "temperature": resp[0x0] + resp[0x1] / 100.0, + "humidity": resp[0x2] + resp[0x3] / 100.0, } From 330127bdc6d715659a43087787137224477a49b2 Mon Sep 17 00:00:00 2001 From: Felipe Martins Diel Date: Sat, 9 Jan 2021 22:18:56 -0300 Subject: [PATCH 08/13] Allow negative temperatures --- broadlink/remote.py | 12 +++++++----- broadlink/sensor.py | 13 ++++++++++--- 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/broadlink/remote.py b/broadlink/remote.py index 88498b61..6cf32f27 100644 --- a/broadlink/remote.py +++ b/broadlink/remote.py @@ -58,7 +58,9 @@ def check_temperature(self) -> float: def check_sensors(self) -> dict: """Return the state of the sensors.""" resp = self._send(0x1) - return {"temperature": resp[0x0] + resp[0x1] / 10.0} + temperature = struct.unpack(" float: def check_sensors(self) -> dict: """Return the state of the sensors.""" resp = self._send(0x24) - return { - "temperature": resp[0x0] + resp[0x1] / 100.0, - "humidity": resp[0x2] + resp[0x3] / 100.0, - } + temperature = struct.unpack(" dict: response = self.send_packet(0x6A, packet) check_error(response[0x22:0x24]) payload = self.decrypt(response[0x38:]) - data = bytearray(payload[0x4:]) + data = payload[0x4:] + + temperature = struct.unpack(" Date: Thu, 29 Oct 2020 03:25:47 -0300 Subject: [PATCH 09/13] Use context manager for connection --- broadlink/device.py | 24 ++++++++++-------------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/broadlink/device.py b/broadlink/device.py index 5582209d..c5eb8638 100644 --- a/broadlink/device.py +++ b/broadlink/device.py @@ -309,20 +309,16 @@ def send_packet(self, command: int, payload: bytes) -> bytes: start_time = time.time() with self.lock: - conn = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - conn.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - - while True: - try: - conn.sendto(packet, self.host) - conn.settimeout(1) - resp, _ = conn.recvfrom(2048) - break - except socket.timeout: - if (time.time() - start_time) > self.timeout: - conn.close() - raise exception(-4000) # Network timeout. - conn.close() + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as conn: + while True: + try: + conn.sendto(packet, self.host) + conn.settimeout(1) + resp, _ = conn.recvfrom(2048) + break + except socket.timeout: + if (time.time() - start_time) > self.timeout: + raise exception(-4000) # Network timeout. if len(resp) < 0x30: raise exception(-4007) # Length error. From 69afcd77a5040efa847f5a410eba7ec3ee2186b7 Mon Sep 17 00:00:00 2001 From: Felipe Martins Diel Date: Thu, 3 Dec 2020 02:43:58 -0300 Subject: [PATCH 10/13] Timeout improvements --- broadlink/device.py | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/broadlink/device.py b/broadlink/device.py index c5eb8638..6a0c2dee 100644 --- a/broadlink/device.py +++ b/broadlink/device.py @@ -70,13 +70,14 @@ def scan( packet[0x20] = checksum & 0xFF packet[0x21] = checksum >> 8 - starttime = time.time() + start_time = time.time() discovered = [] try: - while (time.time() - starttime) < timeout: + while (time.time() - start_time) < timeout: + time_left = timeout - (time.time() - start_time) + conn.settimeout(min(1, time_left)) conn.sendto(packet, (discover_ip_address, discover_ip_port)) - conn.settimeout(1) while True: try: @@ -307,17 +308,21 @@ def send_packet(self, command: int, payload: bytes) -> bytes: packet[0x20] = checksum & 0xFF packet[0x21] = checksum >> 8 - start_time = time.time() with self.lock: with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as conn: + timeout = self.timeout + start_time = time.time() + while True: + time_left = timeout - (time.time() - start_time) + conn.settimeout(min(1, time_left)) + conn.sendto(packet, self.host) + try: - conn.sendto(packet, self.host) - conn.settimeout(1) - resp, _ = conn.recvfrom(2048) + resp = conn.recvfrom(2048)[0] break except socket.timeout: - if (time.time() - start_time) > self.timeout: + if (time.time() - start_time) > timeout: raise exception(-4000) # Network timeout. if len(resp) < 0x30: From 5ebfb1f2f300fbdd374b0aaa2a0db41f2420d5b4 Mon Sep 17 00:00:00 2001 From: Felipe Martins Diel Date: Mon, 11 Jan 2021 02:25:50 -0300 Subject: [PATCH 11/13] Filter unsupported features and convert sensor data to float --- broadlink/switch.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/broadlink/switch.py b/broadlink/switch.py index 41e2c5b8..f7f23084 100644 --- a/broadlink/switch.py +++ b/broadlink/switch.py @@ -312,6 +312,18 @@ def __init__(self, *args, **kwargs) -> None: device.__init__(self, *args, **kwargs) self.type = "SP4B" + def get_state(self) -> dict: + """Get full state of device.""" + state = super().get_state() + + # Convert sensor data to float. Remove keys if sensors are not supported. + sensor_attrs = ["current", "volt", "power", "totalconsum", "overload"] + for attr in sensor_attrs: + value = state.pop(attr, -1) + if value != -1: + state[attr] = value / 1000 + return state + def _encode(self, flag: int, state: dict) -> bytes: """Encode a message.""" payload = json.dumps(state, separators=(",", ":")).encode() From 4dfda479aa7c5feecae3bf4ade17c7b3dc5a5b29 Mon Sep 17 00:00:00 2001 From: Felipe Martins Diel Date: Mon, 11 Jan 2021 17:51:31 -0300 Subject: [PATCH 12/13] Fix encryption errors --- broadlink/device.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/broadlink/device.py b/broadlink/device.py index 6a0c2dee..e6753774 100644 --- a/broadlink/device.py +++ b/broadlink/device.py @@ -154,18 +154,18 @@ def __str__(self): def update_aes(self, key: bytes) -> None: """Update AES.""" self.aes = Cipher( - algorithms.AES(key), modes.CBC(self.iv), backend=default_backend() + algorithms.AES(bytes(key)), modes.CBC(self.iv), backend=default_backend() ) def encrypt(self, payload: bytes) -> bytes: """Encrypt the payload.""" encryptor = self.aes.encryptor() - return encryptor.update(payload) + encryptor.finalize() + return encryptor.update(bytes(payload)) + encryptor.finalize() def decrypt(self, payload: bytes) -> bytes: """Decrypt the payload.""" decryptor = self.aes.decryptor() - return decryptor.update(payload) + decryptor.finalize() + return decryptor.update(bytes(payload)) + decryptor.finalize() def auth(self) -> bool: """Authenticate to the device.""" From 49dcc54d1334d8667cab4a62dbb585188c614ed9 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Sat, 23 Jan 2021 03:09:54 +0000 Subject: [PATCH 13/13] Bump cryptography from 2.6.1 to 3.2 Bumps [cryptography](https://github.com/pyca/cryptography) from 2.6.1 to 3.2. - [Release notes](https://github.com/pyca/cryptography/releases) - [Changelog](https://github.com/pyca/cryptography/blob/master/CHANGELOG.rst) - [Commits](https://github.com/pyca/cryptography/compare/2.6.1...3.2) Signed-off-by: dependabot[bot] --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 09f445bf..2c6c996c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1 @@ -cryptography==2.6.1 +cryptography==3.2