diff --git a/.github/workflows/flake8.yaml b/.github/workflows/flake8.yaml new file mode 100644 index 00000000..2fd67945 --- /dev/null +++ b/.github/workflows/flake8.yaml @@ -0,0 +1,32 @@ +name: Python flake8 + +on: + push: + branches: [ main, master, dev, development ] + pull_request: + branches: [ main, master, dev, development ] + +jobs: + test: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: [3.6, 3.7, 3.8, 3.9] + steps: + - uses: actions/checkout@v2 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v2 + with: + python-version: ${{ matrix.python-version }} + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install flake8 wemake-python-styleguide + if [ -f requirements.txt ]; then pip install -r requirements.txt; fi + - name: Lint with flake8 + run: | + # stop the build if there are Python syntax errors or undefined names + flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics + # exit-zero treats all errors as warnings. ignore magic numbers and use double quotes and ignore numbers with zeroes before them. + # and ignore lowercase hex numbers and ignore isort incorrect imports + flake8 . --count --exit-zero --max-complexity=10 --max-line-length=90 --ignore=WPS432,WPS339,WPS341,I --inline-quotes double --statistics diff --git a/broadlink/__init__.py b/broadlink/__init__.py index 604ed88a..11172514 100644 --- a/broadlink/__init__.py +++ b/broadlink/__init__.py @@ -20,88 +20,103 @@ 0x2716: (sp2, "NEO PRO", "Ankuoo"), 0x2717: (sp2, "NEO", "Ankuoo"), 0x2719: (sp2, "SP2-compatible", "Honeywell"), - 0x271a: (sp2, "SP2-compatible", "Honeywell"), + 0x271A: (sp2, "SP2-compatible", "Honeywell"), + 0x271D: (sp2, "Ego", "Efergy"), 0x2720: (sp2, "SP mini", "Broadlink"), 0x2728: (sp2, "SP2-compatible", "URANT"), 0x2733: (sp2, "SP3", "Broadlink"), 0x2736: (sp2, "SP mini+", "Broadlink"), - 0x273e: (sp2, "SP mini", "Broadlink"), + 0x273E: (sp2, "SP mini", "Broadlink"), 0x7530: (sp2, "SP2", "Broadlink (OEM)"), 0x7539: (sp2, "SP2-IL", "Broadlink (OEM)"), - 0x753e: (sp2, "SP mini 3", "Broadlink"), + 0x753E: (sp2, "SP mini 3", "Broadlink"), 0x7540: (sp2, "MP2", "Broadlink"), - 0X7544: (sp2, "SP2-CL", "Broadlink"), + 0x7544: (sp2, "SP2-CL", "Broadlink"), 0x7546: (sp2, "SP2-UK/BR/IN", "Broadlink (OEM)"), 0x7547: (sp2, "SC1", "Broadlink"), 0x7918: (sp2, "SP2", "Broadlink (OEM)"), 0x7919: (sp2, "SP2-compatible", "Honeywell"), - 0x791a: (sp2, "SP2-compatible", "Honeywell"), - 0x7d00: (sp2, "SP3-EU", "Broadlink (OEM)"), - 0x7d0d: (sp2, "SP mini 3", "Broadlink (OEM)"), + 0x791A: (sp2, "SP2-compatible", "Honeywell"), + 0x7D00: (sp2, "SP3-EU", "Broadlink (OEM)"), + 0x7D0D: (sp2, "SP mini 3", "Broadlink (OEM)"), 0x9479: (sp2, "SP3S-US", "Broadlink"), - 0x947a: (sp2, "SP3S-EU", "Broadlink"), - 0x756c: (sp4, "SP4M", "Broadlink"), + 0x947A: (sp2, "SP3S-EU", "Broadlink"), + 0x756C: (sp4, "SP4M", "Broadlink"), + 0x756F: (sp4, "MCB1", "Broadlink"), 0x7579: (sp4, "SP4L-EU", "Broadlink"), 0x7583: (sp4, "SP mini 3", "Broadlink"), - 0x7d11: (sp4, "SP mini 3", "Broadlink"), - 0x648b: (sp4b, "SP4M-US", "Broadlink"), + 0x7D11: (sp4, "SP mini 3", "Broadlink"), + 0xA56A: (sp4, "MCB1", "Broadlink"), + 0xA589: (sp4, "SP4L-UK", "Broadlink"), + 0x5115: (sp4b, "SCB1E", "Broadlink"), + 0x51E2: (sp4b, "AHC/U-01", "BG Electrical"), + 0x6111: (sp4b, "MCB1", "Broadlink"), + 0x6113: (sp4b, "SCB1E", "Broadlink"), + 0x618B: (sp4b, "SP4L-EU", "Broadlink"), + 0x6489: (sp4b, "SP4L-AU", "Broadlink"), + 0x648B: (sp4b, "SP4M-US", "Broadlink"), 0x2712: (rm, "RM pro/pro+", "Broadlink"), - 0x272a: (rm, "RM pro", "Broadlink"), + 0x272A: (rm, "RM pro", "Broadlink"), 0x2737: (rm, "RM mini 3", "Broadlink"), - 0x273d: (rm, "RM pro", "Broadlink"), - 0x277c: (rm, "RM home", "Broadlink"), + 0x273D: (rm, "RM pro", "Broadlink"), + 0x277C: (rm, "RM home", "Broadlink"), 0x2783: (rm, "RM home", "Broadlink"), 0x2787: (rm, "RM pro", "Broadlink"), - 0x278b: (rm, "RM plus", "Broadlink"), - 0x278f: (rm, "RM mini", "Broadlink"), + 0x278B: (rm, "RM plus", "Broadlink"), + 0x278F: (rm, "RM mini", "Broadlink"), 0x2797: (rm, "RM pro+", "Broadlink"), - 0x279d: (rm, "RM pro+", "Broadlink"), - 0x27a1: (rm, "RM plus", "Broadlink"), - 0x27a6: (rm, "RM plus", "Broadlink"), - 0x27a9: (rm, "RM pro+", "Broadlink"), - 0x27c2: (rm, "RM mini 3", "Broadlink"), - 0x27c3: (rm, "RM pro+", "Broadlink"), - 0x27c7: (rm, "RM mini 3", "Broadlink"), - 0x27cc: (rm, "RM mini 3", "Broadlink"), - 0x27cd: (rm, "RM mini 3", "Broadlink"), - 0x27d0: (rm, "RM mini 3", "Broadlink"), - 0x27d1: (rm, "RM mini 3", "Broadlink"), - 0x27de: (rm, "RM mini 3", "Broadlink"), - 0x51da: (rm4, "RM4 mini", "Broadlink"), - 0x5f36: (rm4, "RM mini 3", "Broadlink"), + 0x279D: (rm, "RM pro+", "Broadlink"), + 0x27A1: (rm, "RM plus", "Broadlink"), + 0x27A6: (rm, "RM plus", "Broadlink"), + 0x27A9: (rm, "RM pro+", "Broadlink"), + 0x27C2: (rm, "RM mini 3", "Broadlink"), + 0x27C3: (rm, "RM pro+", "Broadlink"), + 0x27C7: (rm, "RM mini 3", "Broadlink"), + 0x27CC: (rm, "RM mini 3", "Broadlink"), + 0x27CD: (rm, "RM mini 3", "Broadlink"), + 0x27D0: (rm, "RM mini 3", "Broadlink"), + 0x27D1: (rm, "RM mini 3", "Broadlink"), + 0x27D3: (rm, "RM mini 3", "Broadlink"), + 0x27DE: (rm, "RM mini 3", "Broadlink"), + 0x51DA: (rm4, "RM4 mini", "Broadlink"), + 0x5F36: (rm4, "RM mini 3", "Broadlink"), 0x6026: (rm4, "RM4 pro", "Broadlink"), 0x6070: (rm4, "RM4C mini", "Broadlink"), - 0x610e: (rm4, "RM4 mini", "Broadlink"), - 0x610f: (rm4, "RM4C mini", "Broadlink"), - 0x61a2: (rm4, "RM4 pro", "Broadlink"), - 0x62bc: (rm4, "RM4 mini", "Broadlink"), - 0x62be: (rm4, "RM4C mini", "Broadlink"), - 0x648d: (rm4, "RM4 mini", "Broadlink"), - 0x649b: (rm4, "RM4 pro", "Broadlink"), - 0x653a: (rm4, "RM4 mini", "Broadlink"), + 0x610E: (rm4, "RM4 mini", "Broadlink"), + 0x610F: (rm4, "RM4C mini", "Broadlink"), + 0x61A2: (rm4, "RM4 pro", "Broadlink"), + 0x62BC: (rm4, "RM4 mini", "Broadlink"), + 0x62BE: (rm4, "RM4C mini", "Broadlink"), + 0x6364: (rm4, "RM4S", "Broadlink"), + 0x648D: (rm4, "RM4 mini", "Broadlink"), + 0x649B: (rm4, "RM4 pro", "Broadlink"), + 0x6508: (rm4, "RM mini 3", "Broadlink"), + 0x6539: (rm4, "RM4C mini", "Broadlink"), + 0x653A: (rm4, "RM4 mini", "Broadlink"), + 0x653C: (rm4, "RM4 pro", "Broadlink"), 0x2714: (a1, "e-Sensor", "Broadlink"), - 0x4eb5: (mp1, "MP1-1K4S", "Broadlink"), - 0x4ef7: (mp1, "MP1-1K4S", "Broadlink (OEM)"), - 0x4f1b: (mp1, "MP1-1K3S2U", "Broadlink (OEM)"), - 0x4f65: (mp1, "MP1-1K3S2U", "Broadlink"), + 0x4EB5: (mp1, "MP1-1K4S", "Broadlink"), + 0x4EF7: (mp1, "MP1-1K4S", "Broadlink (OEM)"), + 0x4F1B: (mp1, "MP1-1K3S2U", "Broadlink (OEM)"), + 0x4F65: (mp1, "MP1-1K3S2U", "Broadlink"), 0x5043: (lb1, "SB800TD", "Broadlink (OEM)"), - 0x504e: (lb1, "LB1", "Broadlink"), - 0x60c7: (lb1, "LB1", "Broadlink"), - 0x60c8: (lb1, "LB1", "Broadlink"), + 0x504E: (lb1, "LB1", "Broadlink"), + 0x60C7: (lb1, "LB1", "Broadlink"), + 0x60C8: (lb1, "LB1", "Broadlink"), 0x6112: (lb1, "LB1", "Broadlink"), 0x2722: (S1C, "S2KIT", "Broadlink"), - 0x4ead: (hysen, "HY02B05H", "Hysen"), - 0x4e4d: (dooya, "DT360E-45/20", "Dooya"), - 0x51e3: (bg1, "BG800/BG900", "BG Electrical"), + 0x4EAD: (hysen, "HY02B05H", "Hysen"), + 0x4E4D: (dooya, "DT360E-45/20", "Dooya"), + 0x51E3: (bg1, "BG800/BG900", "BG Electrical"), } def gendevice( - dev_type: int, - host: Tuple[str, int], - mac: Union[bytes, str], - name: str = None, - is_locked: bool = None, + dev_type: int, + host: Tuple[str, int], + mac: Union[bytes, str], + name: str = None, + is_locked: bool = None, ) -> device: """Generate a device.""" try: @@ -122,10 +137,10 @@ def gendevice( def hello( - host: str, - port: int = 80, - timeout: int = 10, - local_ip_address: str = None, + host: str, + port: int = 80, + timeout: int = 10, + local_ip_address: str = None, ) -> device: """Direct device discovery. @@ -138,31 +153,27 @@ def hello( def discover( - timeout: int = 10, - local_ip_address: str = None, - discover_ip_address: str = '255.255.255.255', - discover_ip_port: int = 80, + timeout: int = 10, + local_ip_address: str = None, + discover_ip_address: str = "255.255.255.255", + discover_ip_port: int = 80, ) -> List[device]: """Discover devices connected to the local network.""" - responses = scan( - timeout, local_ip_address, discover_ip_address, discover_ip_port - ) + responses = scan(timeout, local_ip_address, discover_ip_address, discover_ip_port) return [gendevice(*resp) for resp in responses] def xdiscover( - timeout: int = 10, - local_ip_address: str = None, - discover_ip_address: str = '255.255.255.255', - discover_ip_port: int = 80, + timeout: int = 10, + local_ip_address: str = None, + discover_ip_address: str = "255.255.255.255", + discover_ip_port: int = 80, ) -> Generator[device, None, None]: """Discover devices connected to the local network. This function returns a generator that yields devices instantly. """ - responses = scan( - timeout, local_ip_address, discover_ip_address, discover_ip_port - ) + responses = scan(timeout, local_ip_address, discover_ip_address, discover_ip_port) for resp in responses: yield gendevice(*resp) @@ -191,13 +202,12 @@ def setup(ssid: str, password: str, security_mode: int) -> None: payload[0x85] = pass_length # Character length of password payload[0x86] = security_mode # Type of encryption - checksum = sum(payload, 0xbeaf) & 0xffff - payload[0x20] = checksum & 0xff # Checksum 1 position + checksum = sum(payload, 0xBEAF) & 0xFFFF + payload[0x20] = checksum & 0xFF # Checksum 1 position payload[0x21] = checksum >> 8 # Checksum 2 position - sock = socket.socket(socket.AF_INET, # Internet - socket.SOCK_DGRAM) # UDP + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Internet # UDP sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) - sock.sendto(payload, ('255.255.255.255', 80)) + sock.sendto(payload, ("255.255.255.255", 80)) sock.close() diff --git a/broadlink/alarm.py b/broadlink/alarm.py index faded9d4..e73b8fad 100644 --- a/broadlink/alarm.py +++ b/broadlink/alarm.py @@ -7,21 +7,21 @@ class S1C(device): """Controls a Broadlink S1C.""" _SENSORS_TYPES = { - 0x31: 'Door Sensor', # 49 as hex - 0x91: 'Key Fob', # 145 as hex, as serial on fob corpse - 0x21: 'Motion Sensor' # 33 as hex + 0x31: "Door Sensor", # 49 as hex + 0x91: "Key Fob", # 145 as hex, as serial on fob corpse + 0x21: "Motion Sensor", # 33 as hex } def __init__(self, *args, **kwargs) -> None: """Initialize the controller.""" device.__init__(self, *args, **kwargs) - self.type = 'S1C' + self.type = "S1C" def get_sensors_status(self) -> dict: """Return the state of the sensors.""" packet = bytearray(16) packet[0] = 0x06 # 0x06 - get sensors info, 0x07 - probably add sensors - response = self.send_packet(0x6a, packet) + response = self.send_packet(0x6A, packet) check_error(response[0x22:0x24]) payload = self.decrypt(response[0x38:]) if not payload: @@ -29,20 +29,20 @@ def get_sensors_status(self) -> dict: count = payload[0x4] sensor_data = payload[0x6:] sensors = [ - bytearray(sensor_data[i * 83:(i + 1) * 83]) + bytearray(sensor_data[i * 83 : (i + 1) * 83]) for i in range(len(sensor_data) // 83) ] return { - 'count': count, - 'sensors': [ + "count": count, + "sensors": [ { - 'status': sensor[0], - 'name': sensor[4:26].decode().strip('\x00'), - 'type': self._SENSORS_TYPES.get(sensor[3], 'Unknown'), - 'order': sensor[1], - 'serial': sensor[26:30].hex(), + "status": sensor[0], + "name": sensor[4:26].decode().strip("\x00"), + "type": self._SENSORS_TYPES.get(sensor[3], "Unknown"), + "order": sensor[1], + "serial": sensor[26:30].hex(), } for sensor in sensors if any(sensor[26:30]) - ] + ], } diff --git a/broadlink/climate.py b/broadlink/climate.py index f0c337e5..036cc9a8 100644 --- a/broadlink/climate.py +++ b/broadlink/climate.py @@ -33,19 +33,22 @@ def send_request(self, input_payload: bytes) -> bytes: request_payload.append((crc >> 8) & 0xFF) # send to device - response = self.send_packet(0x6a, request_payload) + response = self.send_packet(0x6A, request_payload) check_error(response[0x22:0x24]) response_payload = self.decrypt(response[0x38:]) # experimental check on CRC in response (first 2 bytes are len, and trailing bytes are crc) response_payload_len = response_payload[0] if response_payload_len + 2 > len(response_payload): - raise ValueError('hysen_response_error', 'first byte of response is not length') + raise ValueError( + "hysen_response_error", "first byte of response is not length" + ) crc = calculate_crc16(response_payload[2:response_payload_len]) if (response_payload[response_payload_len] == crc & 0xFF) and ( - response_payload[response_payload_len + 1] == (crc >> 8) & 0xFF): + response_payload[response_payload_len + 1] == (crc >> 8) & 0xFF + ): return response_payload[2:response_payload_len] - raise ValueError('hysen_response_error', 'CRC check on response failed') + raise ValueError("hysen_response_error", "CRC check on response failed") def get_temp(self) -> int: """Return the room temperature in degrees celsius.""" @@ -64,43 +67,53 @@ def get_full_status(self) -> dict: """ payload = self.send_request(bytearray([0x01, 0x03, 0x00, 0x00, 0x00, 0x16])) data = {} - data['remote_lock'] = payload[3] & 1 - data['power'] = payload[4] & 1 - data['active'] = (payload[4] >> 4) & 1 - data['temp_manual'] = (payload[4] >> 6) & 1 - data['room_temp'] = (payload[5] & 255) / 2.0 - data['thermostat_temp'] = (payload[6] & 255) / 2.0 - data['auto_mode'] = payload[7] & 15 - data['loop_mode'] = (payload[7] >> 4) & 15 - data['sensor'] = payload[8] - data['osv'] = payload[9] - data['dif'] = payload[10] - data['svh'] = payload[11] - data['svl'] = payload[12] - data['room_temp_adj'] = ((payload[13] << 8) + payload[14]) / 2.0 - if data['room_temp_adj'] > 32767: - data['room_temp_adj'] = 32767 - data['room_temp_adj'] - data['fre'] = payload[15] - data['poweron'] = payload[16] - data['unknown'] = payload[17] - data['external_temp'] = (payload[18] & 255) / 2.0 - data['hour'] = payload[19] - data['min'] = payload[20] - data['sec'] = payload[21] - data['dayofweek'] = payload[22] + data["remote_lock"] = payload[3] & 1 + data["power"] = payload[4] & 1 + data["active"] = (payload[4] >> 4) & 1 + data["temp_manual"] = (payload[4] >> 6) & 1 + data["room_temp"] = (payload[5] & 255) / 2.0 + data["thermostat_temp"] = (payload[6] & 255) / 2.0 + data["auto_mode"] = payload[7] & 15 + data["loop_mode"] = (payload[7] >> 4) & 15 + data["sensor"] = payload[8] + data["osv"] = payload[9] + data["dif"] = payload[10] + data["svh"] = payload[11] + data["svl"] = payload[12] + data["room_temp_adj"] = ((payload[13] << 8) + payload[14]) / 2.0 + if data["room_temp_adj"] > 32767: + data["room_temp_adj"] = 32767 - data["room_temp_adj"] + data["fre"] = payload[15] + data["poweron"] = payload[16] + data["unknown"] = payload[17] + data["external_temp"] = (payload[18] & 255) / 2.0 + data["hour"] = payload[19] + data["min"] = payload[20] + data["sec"] = payload[21] + data["dayofweek"] = payload[22] weekday = [] for i in range(0, 6): weekday.append( - {'start_hour': payload[2 * i + 23], 'start_minute': payload[2 * i + 24], 'temp': payload[i + 39] / 2.0}) - - data['weekday'] = weekday + { + "start_hour": payload[2 * i + 23], + "start_minute": payload[2 * i + 24], + "temp": payload[i + 39] / 2.0, + } + ) + + data["weekday"] = weekday weekend = [] for i in range(6, 8): weekend.append( - {'start_hour': payload[2 * i + 23], 'start_minute': payload[2 * i + 24], 'temp': payload[i + 39] / 2.0}) - - data['weekend'] = weekend + { + "start_hour": payload[2 * i + 23], + "start_minute": payload[2 * i + 24], + "temp": payload[i + 39] / 2.0, + } + ) + + data["weekend"] = weekend return data # Change controller mode @@ -140,8 +153,27 @@ def set_advanced( poweron: int, ) -> None: """Set advanced options.""" - input_payload = bytearray([0x01, 0x10, 0x00, 0x02, 0x00, 0x05, 0x0a, loop_mode, sensor, osv, dif, svh, svl, - (int(adj * 2) >> 8 & 0xff), (int(adj * 2) & 0xff), fre, poweron]) + input_payload = bytearray( + [ + 0x01, + 0x10, + 0x00, + 0x02, + 0x00, + 0x05, + 0x0A, + loop_mode, + sensor, + osv, + dif, + svh, + svl, + (int(adj * 2) >> 8 & 0xFF), + (int(adj * 2) & 0xFF), + fre, + poweron, + ] + ) self.send_request(input_payload) # For backwards compatibility only. Prefer calling set_mode directly. @@ -169,7 +201,11 @@ def set_power(self, power: int = 1, remote_lock: int = 0) -> None: # n.b. day=1 is Monday, ..., day=7 is Sunday def set_time(self, hour: int, minute: int, second: int, day: int) -> None: """Set the time.""" - self.send_request(bytearray([0x01, 0x10, 0x00, 0x08, 0x00, 0x02, 0x04, hour, minute, second, day])) + self.send_request( + bytearray( + [0x01, 0x10, 0x00, 0x08, 0x00, 0x02, 0x04, hour, minute, second, day] + ) + ) # Set timer schedule # Format is the same as you get from get_full_status. @@ -180,25 +216,25 @@ def set_time(self, hour: int, minute: int, second: int, day: int) -> None: def set_schedule(self, weekday: List[dict], weekend: List[dict]) -> None: """Set timer schedule.""" # Begin with some magic values ... - input_payload = bytearray([0x01, 0x10, 0x00, 0x0a, 0x00, 0x0c, 0x18]) + input_payload = bytearray([0x01, 0x10, 0x00, 0x0A, 0x00, 0x0C, 0x18]) # Now simply append times/temps # weekday times for i in range(0, 6): - input_payload.append(weekday[i]['start_hour']) - input_payload.append(weekday[i]['start_minute']) + input_payload.append(weekday[i]["start_hour"]) + input_payload.append(weekday[i]["start_minute"]) # weekend times for i in range(0, 2): - input_payload.append(weekend[i]['start_hour']) - input_payload.append(weekend[i]['start_minute']) + input_payload.append(weekend[i]["start_hour"]) + input_payload.append(weekend[i]["start_minute"]) # weekday temperatures for i in range(0, 6): - input_payload.append(int(weekday[i]['temp'] * 2)) + input_payload.append(int(weekday[i]["temp"] * 2)) # weekend temperatures for i in range(0, 2): - input_payload.append(int(weekend[i]['temp'] * 2)) + input_payload.append(int(weekend[i]["temp"] * 2)) self.send_request(input_payload) diff --git a/broadlink/cover.py b/broadlink/cover.py index 236e747c..2691fe97 100644 --- a/broadlink/cover.py +++ b/broadlink/cover.py @@ -17,12 +17,12 @@ def _send(self, magic1: int, magic2: int) -> int: """Send a packet to the device.""" packet = bytearray(16) packet[0] = 0x09 - packet[2] = 0xbb + packet[2] = 0xBB packet[3] = magic1 packet[4] = magic2 - packet[9] = 0xfa + packet[9] = 0xFA packet[10] = 0x44 - response = self.send_packet(0x6a, packet) + response = self.send_packet(0x6A, packet) check_error(response[0x22:0x24]) payload = self.decrypt(response[0x38:]) return payload[4] @@ -41,7 +41,7 @@ def stop(self) -> int: def get_percentage(self) -> int: """Return the position of the curtain.""" - return self._send(0x06, 0x5d) + return self._send(0x06, 0x5D) def set_percentage_and_wait(self, new_percentage: int) -> None: """Set the position of the curtain.""" diff --git a/broadlink/device.py b/broadlink/device.py index 9392024f..e6753774 100644 --- a/broadlink/device.py +++ b/broadlink/device.py @@ -15,10 +15,10 @@ def scan( - timeout: int = 10, - local_ip_address: str = None, - discover_ip_address: str = '255.255.255.255', - discover_ip_port: int = 80, + timeout: int = 10, + local_ip_address: str = None, + discover_ip_address: str = "255.255.255.255", + discover_ip_port: int = 80, ) -> Generator[HelloResponse, None, None]: """Broadcast a hello message and yield responses.""" conn = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) @@ -36,47 +36,48 @@ def scan( timezone = int(time.timezone / -3600) if timezone < 0: - packet[0x08] = 0xff + timezone - 1 - packet[0x09] = 0xff - packet[0x0a] = 0xff - packet[0x0b] = 0xff + packet[0x08] = 0xFF + timezone - 1 + packet[0x09] = 0xFF + packet[0x0A] = 0xFF + packet[0x0B] = 0xFF else: packet[0x08] = timezone packet[0x09] = 0 - packet[0x0a] = 0 - packet[0x0b] = 0 + packet[0x0A] = 0 + packet[0x0B] = 0 year = datetime.now().year - packet[0x0c] = year & 0xff - packet[0x0d] = year >> 8 - packet[0x0e] = datetime.now().minute - packet[0x0f] = datetime.now().hour + packet[0x0C] = year & 0xFF + packet[0x0D] = year >> 8 + packet[0x0E] = datetime.now().minute + packet[0x0F] = datetime.now().hour subyear = str(year)[2:] packet[0x10] = int(subyear) packet[0x11] = datetime.now().isoweekday() packet[0x12] = datetime.now().day packet[0x13] = datetime.now().month - address = local_ip_address.split('.') + address = local_ip_address.split(".") packet[0x18] = int(address[3]) packet[0x19] = int(address[2]) - packet[0x1a] = int(address[1]) - packet[0x1b] = int(address[0]) - packet[0x1c] = port & 0xff - packet[0x1d] = port >> 8 + packet[0x1A] = int(address[1]) + packet[0x1B] = int(address[0]) + packet[0x1C] = port & 0xFF + packet[0x1D] = port >> 8 packet[0x26] = 6 - checksum = sum(packet, 0xbeaf) & 0xffff - packet[0x20] = checksum & 0xff + checksum = sum(packet, 0xBEAF) & 0xFFFF + packet[0x20] = checksum & 0xFF packet[0x21] = checksum >> 8 - starttime = time.time() + start_time = time.time() discovered = [] try: - while (time.time() - starttime) < timeout: + while (time.time() - start_time) < timeout: + time_left = timeout - (time.time() - start_time) + conn.settimeout(min(1, time_left)) conn.sendto(packet, (discover_ip_address, discover_ip_port)) - conn.settimeout(1) while True: try: @@ -85,12 +86,12 @@ def scan( break devtype = response[0x34] | response[0x35] << 8 - mac = bytes(reversed(response[0x3a:0x40])) + mac = bytes(reversed(response[0x3A:0x40])) if (host, mac, devtype) in discovered: continue discovered.append((host, mac, devtype)) - name = response[0x40:].split(b'\x00')[0].decode('utf-8') + name = response[0x40:].split(b"\x00")[0].decode("utf-8") is_locked = bool(response[-1]) yield devtype, host, mac, name, is_locked finally: @@ -101,33 +102,33 @@ class device: """Controls a Broadlink device.""" def __init__( - self, - host: Tuple[str, int], - mac: Union[bytes, str], - devtype: int, - timeout: int = 10, - name: str = None, - model: str = None, - manufacturer: str = None, - is_locked: bool = None, + self, + host: Tuple[str, int], + mac: Union[bytes, str], + devtype: int, + timeout: int = 10, + name: str = None, + model: str = None, + manufacturer: str = None, + is_locked: bool = None, ) -> None: """Initialize the controller.""" self.host = host self.mac = bytes.fromhex(mac) if isinstance(mac, str) else mac - self.devtype = devtype if devtype is not None else 0x272a + self.devtype = devtype if devtype is not None else 0x272A self.timeout = timeout self.name = name self.model = model self.manufacturer = manufacturer self.is_locked = is_locked - self.count = random.randrange(0xffff) - self.iv = bytes.fromhex('562e17996d093d28ddb3ba695a2e6f58') + self.count = random.randint(0x8000, 0xFFFF) + self.iv = bytes.fromhex("562e17996d093d28ddb3ba695a2e6f58") self.id = bytes(4) self.type = "Unknown" self.lock = threading.Lock() self.aes = None - key = bytes.fromhex('097628343fe99e23765c1513accf8b02') + key = bytes.fromhex("097628343fe99e23765c1513accf8b02") self.update_aes(key) def __repr__(self): @@ -138,7 +139,7 @@ def __repr__(self): hex(self.devtype), self.host[0], self.host[1], - ':'.join(format(x, '02x') for x in self.mac), + ":".join(format(x, "02x") for x in self.mac), self.name, "Locked" if self.is_locked else "Unlocked", ) @@ -153,18 +154,18 @@ def __str__(self): def update_aes(self, key: bytes) -> None: """Update AES.""" self.aes = Cipher( - algorithms.AES(key), modes.CBC(self.iv), backend=default_backend() + algorithms.AES(bytes(key)), modes.CBC(self.iv), backend=default_backend() ) def encrypt(self, payload: bytes) -> bytes: """Encrypt the payload.""" encryptor = self.aes.encryptor() - return encryptor.update(payload) + encryptor.finalize() + return encryptor.update(bytes(payload)) + encryptor.finalize() def decrypt(self, payload: bytes) -> bytes: """Decrypt the payload.""" decryptor = self.aes.decryptor() - return decryptor.update(payload) + decryptor.finalize() + return decryptor.update(bytes(payload)) + decryptor.finalize() def auth(self) -> bool: """Authenticate to the device.""" @@ -175,24 +176,24 @@ def auth(self) -> bool: payload[0x07] = 0x31 payload[0x08] = 0x31 payload[0x09] = 0x31 - payload[0x0a] = 0x31 - payload[0x0b] = 0x31 - payload[0x0c] = 0x31 - payload[0x0d] = 0x31 - payload[0x0e] = 0x31 - payload[0x0f] = 0x31 + payload[0x0A] = 0x31 + payload[0x0B] = 0x31 + payload[0x0C] = 0x31 + payload[0x0D] = 0x31 + payload[0x0E] = 0x31 + payload[0x0F] = 0x31 payload[0x10] = 0x31 payload[0x11] = 0x31 payload[0x12] = 0x31 - payload[0x1e] = 0x01 - payload[0x2d] = 0x01 - payload[0x30] = ord('T') - payload[0x31] = ord('e') - payload[0x32] = ord('s') - payload[0x33] = ord('t') - payload[0x34] = ord(' ') - payload[0x35] = ord(' ') - payload[0x36] = ord('1') + payload[0x1E] = 0x01 + payload[0x2D] = 0x01 + payload[0x30] = ord("T") + payload[0x31] = ord("e") + payload[0x32] = ord("s") + payload[0x33] = ord("t") + payload[0x34] = ord(" ") + payload[0x35] = ord(" ") + payload[0x36] = ord("1") response = self.send_packet(0x65, payload) check_error(response[0x22:0x24]) @@ -202,7 +203,6 @@ def auth(self) -> bool: if len(key) % 16 != 0: return False - self.count = int.from_bytes(response[0x28:0x30], "little") self.id = payload[0x03::-1] self.update_aes(key) return True @@ -233,7 +233,7 @@ def hello(self, local_ip_address=None) -> bool: def get_fwversion(self) -> int: """Get firmware version.""" packet = bytearray([0x68]) - response = self.send_packet(0x6a, packet) + response = self.send_packet(0x6A, packet) check_error(response[0x22:0x24]) payload = self.decrypt(response[0x38:]) return payload[0x4] | payload[0x5] << 8 @@ -241,20 +241,20 @@ def get_fwversion(self) -> int: def set_name(self, name: str) -> None: """Set device name.""" packet = bytearray(4) - packet += name.encode('utf-8') + packet += name.encode("utf-8") packet += bytearray(0x50 - len(packet)) packet[0x43] = bool(self.is_locked) - response = self.send_packet(0x6a, packet) + response = self.send_packet(0x6A, packet) check_error(response[0x22:0x24]) self.name = name def set_lock(self, state: bool) -> None: """Lock/unlock the device.""" packet = bytearray(4) - packet += self.name.encode('utf-8') + packet += self.name.encode("utf-8") packet += bytearray(0x50 - len(packet)) packet[0x43] = bool(state) - response = self.send_packet(0x6a, packet) + response = self.send_packet(0x6A, packet) check_error(response[0x22:0x24]) self.is_locked = bool(state) @@ -264,27 +264,27 @@ def get_type(self) -> str: def send_packet(self, command: int, payload: bytes) -> bytes: """Send a packet to the device.""" - self.count = (self.count + 1) & 0xffff + self.count = ((self.count + 1) | 0x8000) & 0xFFFF packet = bytearray(0x38) - packet[0x00] = 0x5a - packet[0x01] = 0xa5 - packet[0x02] = 0xaa + packet[0x00] = 0x5A + packet[0x01] = 0xA5 + packet[0x02] = 0xAA packet[0x03] = 0x55 - packet[0x04] = 0x5a - packet[0x05] = 0xa5 - packet[0x06] = 0xaa + packet[0x04] = 0x5A + packet[0x05] = 0xA5 + packet[0x06] = 0xAA packet[0x07] = 0x55 - packet[0x24] = self.devtype & 0xff + packet[0x24] = self.devtype & 0xFF packet[0x25] = self.devtype >> 8 packet[0x26] = command - packet[0x28] = self.count & 0xff + packet[0x28] = self.count & 0xFF packet[0x29] = self.count >> 8 - packet[0x2a] = self.mac[5] - packet[0x2b] = self.mac[4] - packet[0x2c] = self.mac[3] - packet[0x2d] = self.mac[2] - packet[0x2e] = self.mac[1] - packet[0x2f] = self.mac[0] + packet[0x2A] = self.mac[5] + packet[0x2B] = self.mac[4] + packet[0x2C] = self.mac[3] + packet[0x2D] = self.mac[2] + packet[0x2E] = self.mac[1] + packet[0x2F] = self.mac[0] packet[0x30] = self.id[3] packet[0x31] = self.id[2] packet[0x32] = self.id[1] @@ -296,40 +296,40 @@ def send_packet(self, command: int, payload: bytes) -> bytes: payload = bytearray(payload) payload += bytearray(padding) - checksum = sum(payload, 0xbeaf) & 0xffff - packet[0x34] = checksum & 0xff + checksum = sum(payload, 0xBEAF) & 0xFFFF + packet[0x34] = checksum & 0xFF packet[0x35] = checksum >> 8 payload = self.encrypt(payload) for i in range(len(payload)): packet.append(payload[i]) - checksum = sum(packet, 0xbeaf) & 0xffff - packet[0x20] = checksum & 0xff + checksum = sum(packet, 0xBEAF) & 0xFFFF + packet[0x20] = checksum & 0xFF packet[0x21] = checksum >> 8 - start_time = time.time() with self.lock: - conn = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - conn.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as conn: + timeout = self.timeout + start_time = time.time() - while True: - try: + while True: + time_left = timeout - (time.time() - start_time) + conn.settimeout(min(1, time_left)) conn.sendto(packet, self.host) - conn.settimeout(1) - resp, _ = conn.recvfrom(2048) - break - except socket.timeout: - if (time.time() - start_time) > self.timeout: - conn.close() - raise exception(-4000) # Network timeout. - conn.close() + + try: + resp = conn.recvfrom(2048)[0] + break + except socket.timeout: + if (time.time() - start_time) > timeout: + raise exception(-4000) # Network timeout. if len(resp) < 0x30: raise exception(-4007) # Length error. checksum = resp[0x20] | (resp[0x21] << 8) - if sum(resp, 0xbeaf) - sum(resp[0x20:0x22]) & 0xffff != checksum: + if sum(resp, 0xBEAF) - sum(resp[0x20:0x22]) & 0xFFFF != checksum: raise exception(-4008) # Checksum error. return resp diff --git a/broadlink/light.py b/broadlink/light.py index cffc77d5..adadfab5 100644 --- a/broadlink/light.py +++ b/broadlink/light.py @@ -11,14 +11,14 @@ class lb1(device): state_dict = [] effect_map_dict = { - 'lovely color': 0, - 'flashlight': 1, - 'lightning': 2, - 'color fading': 3, - 'color breathing': 4, - 'multicolor breathing': 5, - 'color jumping': 6, - 'multicolor jumping': 7, + "lovely color": 0, + "flashlight": 1, + "lightning": 2, + "color fading": 3, + "color breathing": 4, + "multicolor breathing": 5, + "color jumping": 6, + "multicolor jumping": 7, } def __init__(self, *args, **kwargs) -> None: @@ -26,36 +26,38 @@ def __init__(self, *args, **kwargs) -> None: device.__init__(self, *args, **kwargs) self.type = "SmartBulb" - def send_command(self, command: str, type: str = 'set') -> None: + def send_command(self, command: str, type: str = "set") -> None: """Send a command to the device.""" - packet = bytearray(16+(int(len(command)/16) + 1)*16) - packet[0x00] = 0x0c + len(command) & 0xff - packet[0x02] = 0xa5 - packet[0x03] = 0xa5 - packet[0x04] = 0x5a - packet[0x05] = 0x5a - packet[0x08] = 0x02 if type == "set" else 0x01 # 0x01 => query, # 0x02 => set - packet[0x09] = 0x0b - packet[0x0a] = len(command) - packet[0x0e:] = map(ord, command) + packet = bytearray(16 + (int(len(command) / 16) + 1) * 16) + packet[0x00] = 0x0C + len(command) & 0xFF + packet[0x02] = 0xA5 + packet[0x03] = 0xA5 + packet[0x04] = 0x5A + packet[0x05] = 0x5A + packet[0x08] = 0x02 if type == "set" else 0x01 # 0x01 => query, # 0x02 => set + packet[0x09] = 0x0B + packet[0x0A] = len(command) + packet[0x0E:] = map(ord, command) - checksum = sum(packet, 0xbeaf) & 0xffff - packet[0x06] = checksum & 0xff # Checksum 1 position + checksum = sum(packet, 0xBEAF) & 0xFFFF + packet[0x06] = checksum & 0xFF # Checksum 1 position packet[0x07] = checksum >> 8 # Checksum 2 position - response = self.send_packet(0x6a, packet) + response = self.send_packet(0x6A, packet) check_error(response[0x36:0x38]) payload = self.decrypt(response[0x38:]) - responseLength = int(payload[0x0a]) | (int(payload[0x0b]) << 8) + responseLength = int(payload[0x0A]) | (int(payload[0x0B]) << 8) if responseLength > 0: - self.state_dict = json.loads(payload[0x0e:0x0e+responseLength]) + self.state_dict = json.loads(payload[0x0E : 0x0E + responseLength]) def set_json(self, jsonstr: str) -> str: """Send a command to the device and return state.""" reconvert = json.loads(jsonstr) - if 'bulb_sceneidx' in reconvert.keys(): - reconvert['bulb_sceneidx'] = self.effect_map_dict.get(reconvert['bulb_sceneidx'], 255) + if "bulb_sceneidx" in reconvert.keys(): + reconvert["bulb_sceneidx"] = self.effect_map_dict.get( + reconvert["bulb_sceneidx"], 255 + ) self.send_command(json.dumps(reconvert)) return json.dumps(self.state_dict) diff --git a/broadlink/remote.py b/broadlink/remote.py index 4a427f69..03b59ebd 100644 --- a/broadlink/remote.py +++ b/broadlink/remote.py @@ -1,4 +1,6 @@ """Support for universal remotes.""" +import struct + from .device import device from .exceptions import check_error @@ -10,87 +12,55 @@ def __init__(self, *args, **kwargs) -> None: """Initialize the controller.""" device.__init__(self, *args, **kwargs) self.type = "RM2" - self._request_header = bytes() - self._code_sending_header = bytes() + + def _send(self, command: int, data: bytes = b'') -> bytes: + """Send a packet to the device.""" + packet = struct.pack(" bytes: """Return the last captured code.""" - packet = bytearray(self._request_header) - packet.append(0x04) - response = self.send_packet(0x6a, packet) - check_error(response[0x22:0x24]) - payload = self.decrypt(response[0x38:]) - return payload[len(self._request_header) + 4:] + return self._send(0x4) def send_data(self, data: bytes) -> None: """Send a code to the device.""" - packet = bytearray(self._code_sending_header) - packet += bytearray([0x02, 0x00, 0x00, 0x00]) - packet += data - response = self.send_packet(0x6a, packet) - check_error(response[0x22:0x24]) + self._send(0x2, data) def enter_learning(self) -> None: """Enter infrared learning mode.""" - packet = bytearray(self._request_header) - packet.append(0x03) - response = self.send_packet(0x6a, packet) - check_error(response[0x22:0x24]) + self._send(0x3) def sweep_frequency(self) -> None: """Sweep frequency.""" - packet = bytearray(self._request_header) - packet.append(0x19) - response = self.send_packet(0x6a, packet) - check_error(response[0x22:0x24]) + self._send(0x19) def cancel_sweep_frequency(self) -> None: """Cancel sweep frequency.""" - packet = bytearray(self._request_header) - packet.append(0x1e) - response = self.send_packet(0x6a, packet) - check_error(response[0x22:0x24]) + self._send(0x1E) def check_frequency(self) -> bool: """Return True if the frequency was identified successfully.""" - packet = bytearray(self._request_header) - packet.append(0x1a) - response = self.send_packet(0x6a, packet) - check_error(response[0x22:0x24]) - payload = self.decrypt(response[0x38:]) - if payload[len(self._request_header) + 4] == 1: - return True - return False + resp = self._send(0x1A) + return resp[0] == 1 def find_rf_packet(self) -> bool: """Enter radiofrequency learning mode.""" - packet = bytearray(self._request_header) - packet.append(0x1b) - response = self.send_packet(0x6a, packet) - check_error(response[0x22:0x24]) - payload = self.decrypt(response[0x38:]) - if payload[len(self._request_header) + 4] == 1: - return True - return False - - def _check_sensors(self, command: int) -> bytes: - """Return the state of the sensors in raw format.""" - packet = bytearray(self._request_header) - packet.append(command) - response = self.send_packet(0x6a, packet) - check_error(response[0x22:0x24]) - payload = self.decrypt(response[0x38:]) - return bytearray(payload[len(self._request_header) + 4:]) - - def check_temperature(self) -> int: + resp = self._send(0x1B) + return resp[0] == 1 + + def check_temperature(self) -> float: """Return the temperature.""" - data = self._check_sensors(0x1) - return data[0x0] + data[0x1] / 10.0 + return self.check_sensors()["temperature"] def check_sensors(self) -> dict: """Return the state of the sensors.""" - data = self._check_sensors(0x1) - return {'temperature': data[0x0] + data[0x1] / 10.0} + resp = self._send(0x1) + temperature = struct.unpack(" None: """Initialize the controller.""" device.__init__(self, *args, **kwargs) self.type = "RM4" - self._request_header = b'\x04\x00' - self._code_sending_header = b'\xda\x00' - def check_temperature(self) -> int: - """Return the temperature.""" - data = self._check_sensors(0x24) - return data[0x0] + data[0x1] / 100.0 + def _send(self, command: int, data: bytes = b'') -> bytes: + """Send a packet to the device.""" + packet = struct.pack(" int: + def check_humidity(self) -> float: """Return the humidity.""" - data = self._check_sensors(0x24) - return data[0x2] + data[0x3] / 100.0 + return self.check_sensors()["humidity"] def check_sensors(self) -> dict: """Return the state of the sensors.""" - data = self._check_sensors(0x24) - return { - 'temperature': data[0x0] + data[0x1] / 100.0, - 'humidity': data[0x2] + data[0x3] / 100.0 - } + resp = self._send(0x24) + temperature = struct.unpack(" None: @@ -24,20 +26,25 @@ def check_sensors(self) -> dict: try: data[sensor] = levels[data[sensor]] except IndexError: - data[sensor] = 'unknown' + data[sensor] = "unknown" return data def check_sensors_raw(self) -> dict: """Return the state of the sensors in raw format.""" packet = bytearray([0x1]) - response = self.send_packet(0x6a, packet) + response = self.send_packet(0x6A, packet) check_error(response[0x22:0x24]) payload = self.decrypt(response[0x38:]) - data = bytearray(payload[0x4:]) + data = payload[0x4:] + + temperature = struct.unpack(" None: def set_power_mask(self, sid_mask: int, state: bool) -> None: """Set the power state of the device.""" packet = bytearray(16) - packet[0x00] = 0x0d - packet[0x02] = 0xa5 - packet[0x03] = 0xa5 - packet[0x04] = 0x5a - packet[0x05] = 0x5a - packet[0x06] = 0xb2 + ((sid_mask << 1) if state else sid_mask) - packet[0x07] = 0xc0 + packet[0x00] = 0x0D + packet[0x02] = 0xA5 + packet[0x03] = 0xA5 + packet[0x04] = 0x5A + packet[0x05] = 0x5A + packet[0x06] = 0xB2 + ((sid_mask << 1) if state else sid_mask) + packet[0x07] = 0xC0 packet[0x08] = 0x02 - packet[0x0a] = 0x03 - packet[0x0d] = sid_mask - packet[0x0e] = sid_mask if state else 0 + packet[0x0A] = 0x03 + packet[0x0D] = sid_mask + packet[0x0E] = sid_mask if state else 0 - response = self.send_packet(0x6a, packet) + response = self.send_packet(0x6A, packet) check_error(response[0x22:0x24]) def set_power(self, sid: int, state: bool) -> None: @@ -37,33 +37,33 @@ def set_power(self, sid: int, state: bool) -> None: sid_mask = 0x01 << (sid - 1) self.set_power_mask(sid_mask, state) - def check_power_raw(self) -> bool: + def check_power_raw(self) -> int: """Return the power state of the device in raw format.""" packet = bytearray(16) - packet[0x00] = 0x0a - packet[0x02] = 0xa5 - packet[0x03] = 0xa5 - packet[0x04] = 0x5a - packet[0x05] = 0x5a - packet[0x06] = 0xae - packet[0x07] = 0xc0 + packet[0x00] = 0x0A + packet[0x02] = 0xA5 + packet[0x03] = 0xA5 + packet[0x04] = 0x5A + packet[0x05] = 0x5A + packet[0x06] = 0xAE + packet[0x07] = 0xC0 packet[0x08] = 0x01 - response = self.send_packet(0x6a, packet) + response = self.send_packet(0x6A, packet) check_error(response[0x22:0x24]) payload = self.decrypt(response[0x38:]) - return payload[0x0e] + return payload[0x0E] def check_power(self) -> dict: """Return the power state of the device.""" state = self.check_power_raw() if state is None: - return {'s1': None, 's2': None, 's3': None, 's4': None} + return {"s1": None, "s2": None, "s3": None, "s4": None} data = {} - data['s1'] = bool(state & 0x01) - data['s2'] = bool(state & 0x02) - data['s3'] = bool(state & 0x04) - data['s4'] = bool(state & 0x08) + data["s1"] = bool(state & 0x01) + data["s2"] = bool(state & 0x02) + data["s3"] = bool(state & 0x04) + data["s4"] = bool(state & 0x08) return data @@ -80,8 +80,8 @@ def get_state(self) -> dict: Example: `{"pwr":1,"pwr1":1,"pwr2":0,"maxworktime":60,"maxworktime1":60,"maxworktime2":0,"idcbrightness":50}` """ - packet = self._encode(1, b'{}') - response = self.send_packet(0x6a, packet) + packet = self._encode(1, b"{}") + response = self.send_packet(0x6A, packet) check_error(response[0x22:0x24]) return self._decode(response) @@ -98,22 +98,22 @@ def set_state( """Set the power state of the device.""" data = {} if pwr is not None: - data['pwr'] = int(bool(pwr)) + data["pwr"] = int(bool(pwr)) if pwr1 is not None: - data['pwr1'] = int(bool(pwr1)) + data["pwr1"] = int(bool(pwr1)) if pwr2 is not None: - data['pwr2'] = int(bool(pwr2)) + data["pwr2"] = int(bool(pwr2)) if maxworktime is not None: - data['maxworktime'] = maxworktime + data["maxworktime"] = maxworktime if maxworktime1 is not None: - data['maxworktime1'] = maxworktime1 + data["maxworktime1"] = maxworktime1 if maxworktime2 is not None: - data['maxworktime2'] = maxworktime2 + data["maxworktime2"] = maxworktime2 if idcbrightness is not None: - data['idcbrightness'] = idcbrightness - js = json.dumps(data).encode('utf8') + data["idcbrightness"] = idcbrightness + js = json.dumps(data).encode("utf8") packet = self._encode(2, js) - response = self.send_packet(0x6a, packet) + response = self.send_packet(0x6A, packet) check_error(response[0x22:0x24]) return self._decode(response) @@ -129,20 +129,22 @@ def _encode(self, flag: int, js: str) -> bytes: # 0x0e- json data packet = bytearray(14) length = 4 + 2 + 2 + 4 + len(js) - struct.pack_into('> 8 return packet def _decode(self, response: bytes) -> dict: """Decode a message.""" payload = self.decrypt(response[0x38:]) - js_len = struct.unpack_from(' None: packet[4] = 3 if state else 2 else: packet[4] = 1 if state else 0 - response = self.send_packet(0x6a, packet) + response = self.send_packet(0x6A, packet) check_error(response[0x22:0x24]) def set_nightlight(self, state: bool) -> None: @@ -189,14 +191,14 @@ def set_nightlight(self, state: bool) -> None: packet[4] = 3 if state else 1 else: packet[4] = 2 if state else 0 - response = self.send_packet(0x6a, packet) + response = self.send_packet(0x6A, packet) check_error(response[0x22:0x24]) def check_power(self) -> bool: """Return the power state of the device.""" packet = bytearray(16) packet[0] = 1 - response = self.send_packet(0x6a, packet) + response = self.send_packet(0x6A, packet) check_error(response[0x22:0x24]) payload = self.decrypt(response[0x38:]) return bool(payload[0x4] == 1 or payload[0x4] == 3 or payload[0x4] == 0xFD) @@ -205,22 +207,23 @@ def check_nightlight(self) -> bool: """Return the state of the night light.""" packet = bytearray(16) packet[0] = 1 - response = self.send_packet(0x6a, packet) + response = self.send_packet(0x6A, packet) check_error(response[0x22:0x24]) payload = self.decrypt(response[0x38:]) return bool(payload[0x4] == 2 or payload[0x4] == 3 or payload[0x4] == 0xFF) - def get_energy(self) -> int: - """Return the energy state of the device.""" + def get_energy(self) -> float: + """Return the power consumption in W.""" packet = bytearray([8, 0, 254, 1, 5, 1, 0, 0, 0, 45]) - response = self.send_packet(0x6a, packet) + response = self.send_packet(0x6A, packet) check_error(response[0x22:0x24]) payload = self.decrypt(response[0x38:]) - return int(hex(payload[0x07] * 256 + payload[0x06])[2:]) + int(hex(payload[0x05])[2:]) / 100.0 + energy = payload[0x7:0x4:-1].hex() + return int(energy) / 100 class sp4(device): - """Controls a Broadlink SP4.""" + """Controls a Broadlink SP4.""" def __init__(self, *args, **kwargs) -> None: """Initialize the controller.""" @@ -309,12 +312,35 @@ def __init__(self, *args, **kwargs) -> None: device.__init__(self, *args, **kwargs) self.type = "SP4B" + def get_state(self) -> dict: + """Get full state of device.""" + state = super().get_state() + + # Convert sensor data to float. Remove keys if sensors are not supported. + sensor_attrs = ["current", "volt", "power", "totalconsum", "overload"] + for attr in sensor_attrs: + value = state.pop(attr, -1) + if value != -1: + state[attr] = value / 1000 + return state + def _encode(self, flag: int, state: dict) -> bytes: """Encode a message.""" payload = json.dumps(state, separators=(",", ":")).encode() packet = bytearray(14) length = 4 + 2 + 2 + 4 + len(payload) - struct.pack_into('=2.1.1'], - description='Python API for controlling Broadlink IR controllers', + install_requires=["cryptography>=2.1.1"], + description="Python API for controlling Broadlink IR controllers", classifiers=[ - 'Development Status :: 4 - Beta', - 'Intended Audience :: Developers', - 'License :: OSI Approved :: MIT License', - 'Operating System :: OS Independent', - 'Programming Language :: Python', + "Development Status :: 4 - Beta", + "Intended Audience :: Developers", + "License :: OSI Approved :: MIT License", + "Operating System :: OS Independent", + "Programming Language :: Python", ], include_package_data=True, zip_safe=False,