Skip to content

Commit

Permalink
Added detection between session and main meter energy values (#919)
Browse files Browse the repository at this point in the history
* Added better support for chargers not using errata 3.9

* Added requirement jsonschema==4.19.0

* Increase timeout to 60sec for test_cms_responses

* Restore session energy on HA restart
  • Loading branch information
nlindn authored Oct 6, 2023
1 parent b2a7f75 commit 6616437
Show file tree
Hide file tree
Showing 3 changed files with 201 additions and 17 deletions.
87 changes: 73 additions & 14 deletions custom_components/ocpp/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from homeassistant.components.persistent_notification import DOMAIN as PN_DOMAIN
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import STATE_OK, STATE_UNAVAILABLE, TIME_MINUTES
from homeassistant.const import STATE_OK, STATE_UNAVAILABLE, STATE_UNKNOWN, TIME_MINUTES
from homeassistant.core import HomeAssistant
from homeassistant.helpers import device_registry, entity_component, entity_registry
import homeassistant.helpers.config_validation as cv
Expand Down Expand Up @@ -245,6 +245,12 @@ def get_metric(self, cp_id: str, measurand: str):
return self.charge_points[cp_id]._metrics[measurand].value
return None

def del_metric(self, cp_id: str, measurand: str):
"""Set given measurand to None."""
if cp_id in self.charge_points:
self.charge_points[cp_id]._metrics[measurand].value = None
return None

def get_unit(self, cp_id: str, measurand: str):
"""Return unit of given measurand."""
if cp_id in self.charge_points:
Expand Down Expand Up @@ -353,6 +359,7 @@ def __init__(
self.received_boot_notification = False
self.post_connect_success = False
self.tasks = None
self._charger_reports_session_energy = False
self._metrics = defaultdict(lambda: Metric(None, None))
self._metrics[cdet.identifier.value].value = id
self._metrics[csess.session_time.value].unit = TIME_MINUTES
Expand Down Expand Up @@ -1073,6 +1080,30 @@ def on_meter_values(self, connector_id: int, meter_value: dict, **kwargs):

transaction_id: int = kwargs.get(om.transaction_id.name, 0)

# If missing meter_start or active_transaction_id try to restore from HA states. If HA
# does not have values either, generate new ones.
if self._metrics[csess.meter_start.value].value is None:
value = self.get_ha_metric(csess.meter_start.value)
if value is None:
value = self._metrics[DEFAULT_MEASURAND].value
else:
value = float(value)
_LOGGER.debug(
f"{csess.meter_start.value} was None, restored value={value} from HA."
)
self._metrics[csess.meter_start.value].value = value
if self._metrics[csess.transaction_id.value].value is None:
value = self.get_ha_metric(csess.transaction_id.value)
if value is None:
value = kwargs.get(om.transaction_id.name)
else:
value = int(value)
_LOGGER.debug(
f"{csess.transaction_id.value} was None, restored value={value} from HA."
)
self._metrics[csess.transaction_id.value].value = value
self.active_transaction_id = value

transaction_matches: bool = False
# match is also false if no transaction is in progress ie active_transaction_id==transaction_id==0
if transaction_id == self.active_transaction_id and transaction_id != 0:
Expand All @@ -1099,8 +1130,25 @@ def on_meter_values(self, connector_id: int, meter_value: dict, **kwargs):
if unit == DEFAULT_POWER_UNIT:
self._metrics[measurand].value = float(value) / 1000
self._metrics[measurand].unit = HA_POWER_UNIT
elif unit == DEFAULT_ENERGY_UNIT:
if transaction_matches:
elif unit == DEFAULT_ENERGY_UNIT or "Energy" in str(measurand):
if self._metrics[csess.meter_start.value].value == 0:
# Charger reports Energy.Active.Import.Register directly as Session energy for transactions
self._charger_reports_session_energy = True
if (
transaction_matches
and self._charger_reports_session_energy
and measurand == DEFAULT_MEASURAND
and connector_id
):
self._metrics[csess.session_energy.value].value = (
float(value) / 1000
)
self._metrics[csess.session_energy.value].extra_attr[
cstat.id_tag.name
] = self._metrics[cstat.id_tag.value].value
elif (
transaction_matches or self._charger_reports_session_energy
):
self._metrics[measurand].value = float(value) / 1000
self._metrics[measurand].unit = HA_ENERGY_UNIT
else:
Expand All @@ -1118,15 +1166,6 @@ def on_meter_values(self, connector_id: int, meter_value: dict, **kwargs):
# _LOGGER.debug("Meter data not yet processed: %s", unprocessed)
if unprocessed is not None:
self.process_phases(unprocessed)
if csess.meter_start.value not in self._metrics:
self._metrics[csess.meter_start.value].value = self._metrics[
DEFAULT_MEASURAND
]
if csess.transaction_id.value not in self._metrics:
self._metrics[csess.transaction_id.value].value = kwargs.get(
om.transaction_id.name
)
self.active_transaction_id = kwargs.get(om.transaction_id.name)
if transaction_matches:
self._metrics[csess.session_time.value].value = round(
(
Expand All @@ -1136,7 +1175,10 @@ def on_meter_values(self, connector_id: int, meter_value: dict, **kwargs):
/ 60
)
self._metrics[csess.session_time.value].unit = "min"
if self._metrics[csess.meter_start.value].value is not None:
if (
self._metrics[csess.meter_start.value].value is not None
and not self._charger_reports_session_energy
):
self._metrics[csess.session_energy.value].value = float(
self._metrics[DEFAULT_MEASURAND].value or 0
) - float(self._metrics[csess.meter_start.value].value)
Expand Down Expand Up @@ -1313,7 +1355,10 @@ def on_stop_transaction(self, meter_stop, timestamp, transaction_id, **kwargs):
)
self.active_transaction_id = 0
self._metrics[cstat.stop_reason.value].value = kwargs.get(om.reason.name, None)
if self._metrics[csess.meter_start.value].value is not None:
if (
self._metrics[csess.meter_start.value].value is not None
and not self._charger_reports_session_energy
):
self._metrics[csess.session_energy.value].value = int(
meter_stop
) / 1000 - float(self._metrics[csess.meter_start.value].value)
Expand Down Expand Up @@ -1361,6 +1406,20 @@ def get_metric(self, measurand: str):
"""Return last known value for given measurand."""
return self._metrics[measurand].value

def get_ha_metric(self, measurand: str):
"""Return last known value in HA for given measurand."""
entity_id = "sensor." + "_".join(
[self.central.cpid.lower(), measurand.lower().replace(".", "_")]
)
try:
value = self.hass.states.get(entity_id).state
except Exception as e:
_LOGGER.debug(f"An error occurred when getting entity state from HA: {e}")
return None
if value == STATE_UNAVAILABLE or value == STATE_UNKNOWN:
return None
return value

def get_extra_attr(self, measurand: str):
"""Return last known extra attributes for given measurand."""
return self._metrics[measurand].extra_attr
Expand Down
1 change: 1 addition & 0 deletions requirements_dev.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
homeassistant>=2023.1.0b1
ocpp==0.19.0
websockets==11.0.3
jsonschema==4.19.0
130 changes: 127 additions & 3 deletions tests/test_charge_point.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
SERVICE_TURN_ON,
)
from homeassistant.const import ATTR_ENTITY_ID
import pytest
from pytest_homeassistant_custom_component.common import MockConfigEntry
import websockets

Expand Down Expand Up @@ -44,6 +45,7 @@
from .const import MOCK_CONFIG_DATA, MOCK_CONFIG_DATA_2


@pytest.mark.timeout(60) # Set timeout to 60 seconds for this test
async def test_cms_responses(hass, socket_enabled):
"""Test central system responses to a charger."""

Expand Down Expand Up @@ -227,6 +229,63 @@ async def test_services(hass, socket_enabled):

await asyncio.sleep(1)

# test restore feature of meter_start and active_tranasction_id.
async with websockets.connect(
"ws://127.0.0.1:9000/CP_1_res_vals",
subprotocols=["ocpp1.6"],
) as ws:
# use a different id for debugging
cp = ChargePoint("CP_1_restore_values", ws)
cp.active_transactionId = None
# send None values
try:
await asyncio.wait_for(
asyncio.gather(
cp.start(),
cp.send_meter_periodic_data(),
),
timeout=5,
)
except asyncio.TimeoutError:
pass
# check if None
assert cs.get_metric("test_cpid", "Energy.Meter.Start") is None
assert cs.get_metric("test_cpid", "Transaction.Id") is None
# send new data
try:
await asyncio.wait_for(
asyncio.gather(
cp.send_start_transaction(12344),
cp.send_meter_periodic_data(),
),
timeout=5,
)
except asyncio.TimeoutError:
pass
# save for reference the values for meter_start and transaction_id
saved_meter_start = int(cs.get_metric("test_cpid", "Energy.Meter.Start"))
saved_transactionId = int(cs.get_metric("test_cpid", "Transaction.Id"))
# delete current values from api memory
cs.del_metric("test_cpid", "Energy.Meter.Start")
cs.del_metric("test_cpid", "Transaction.Id")
# send new data
try:
await asyncio.wait_for(
asyncio.gather(
cp.send_meter_periodic_data(),
),
timeout=5,
)
except asyncio.TimeoutError:
pass
await ws.close()

# check if restored old values from HA when api have lost the values, i.e. simulated reboot of HA
assert int(cs.get_metric("test_cpid", "Energy.Meter.Start")) == saved_meter_start
assert int(cs.get_metric("test_cpid", "Transaction.Id")) == saved_transactionId

await asyncio.sleep(1)

# test ocpp messages sent from charger to cms
async with websockets.connect(
"ws://127.0.0.1:9000/CP_1_norm",
Expand All @@ -245,10 +304,11 @@ async def test_services(hass, socket_enabled):
cp.send_security_event(),
cp.send_firmware_status(),
cp.send_data_transfer(),
cp.send_start_transaction(),
cp.send_start_transaction(12345),
cp.send_meter_err_phases(),
cp.send_meter_line_voltage(),
cp.send_meter_periodic_data(),
cp.send_main_meter_clock_data(),
# add delay to allow meter data to be processed
cp.send_stop_transaction(2),
),
Expand All @@ -260,6 +320,9 @@ async def test_services(hass, socket_enabled):
assert int(cs.get_metric("test_cpid", "Energy.Active.Import.Register")) == int(
1305570 / 1000
)
assert int(cs.get_metric("test_cpid", "Energy.Session")) == int(
(54321 - 12345) / 1000
)
assert int(cs.get_metric("test_cpid", "Current.Import")) == int(0)
assert int(cs.get_metric("test_cpid", "Voltage")) == int(228)
assert cs.get_unit("test_cpid", "Energy.Active.Import.Register") == "kWh"
Expand Down Expand Up @@ -310,6 +373,43 @@ async def test_services(hass, socket_enabled):

await asyncio.sleep(1)

# test ocpp messages sent from charger that don't support errata 3.9
# i.e. "Energy.Meter.Start" starts from 0 for each session and "Energy.Active.Import.Register"
# reports starting from 0 Wh for every new transaction id. Total main meter values are without transaction id.
async with websockets.connect(
"ws://127.0.0.1:9000/CP_1_non_er_3.9",
subprotocols=["ocpp1.6"],
) as ws:
# use a different id for debugging
cp = ChargePoint("CP_1_non_errata_3.9", ws)
try:
await asyncio.wait_for(
asyncio.gather(
cp.start(),
cp.send_start_transaction(0),
cp.send_meter_periodic_data(),
cp.send_main_meter_clock_data(),
# add delay to allow meter data to be processed
cp.send_stop_transaction(2),
),
timeout=5,
)
except asyncio.TimeoutError:
pass
await ws.close()

# Last sent "Energy.Active.Import.Register" value without transaction id should be here.
assert int(cs.get_metric("test_cpid", "Energy.Active.Import.Register")) == int(
67230012 / 1000
)
assert cs.get_unit("test_cpid", "Energy.Active.Import.Register") == "kWh"

# Last sent "Energy.Active.Import.Register" value with transaction id should be here.
assert int(cs.get_metric("test_cpid", "Energy.Session")) == int(1305570 / 1000)
assert cs.get_unit("test_cpid", "Energy.Session") == "kWh"

await asyncio.sleep(1)

# test ocpp rejection messages sent from charger to cms
cs.charge_points["test_cpid"].received_boot_notification = False
cs.charge_points["test_cpid"].post_connect_success = False
Expand Down Expand Up @@ -604,12 +704,12 @@ async def send_data_transfer(self):
resp = await self.call(request)
assert resp.status == DataTransferStatus.accepted

async def send_start_transaction(self):
async def send_start_transaction(self, meter_start: int = 12345):
"""Send a start transaction notification."""
request = call.StartTransactionPayload(
connector_id=1,
id_tag="test_cp",
meter_start=12345,
meter_start=meter_start,
timestamp=datetime.now(tz=timezone.utc).isoformat(),
)
resp = await self.call(request)
Expand Down Expand Up @@ -871,6 +971,30 @@ async def send_meter_err_phases(self):
resp = await self.call(request)
assert resp is not None

async def send_main_meter_clock_data(self):
"""Send periodic main meter value. Main meter values dont have transaction_id."""
while self.active_transactionId == 0:
await asyncio.sleep(1)
request = call.MeterValuesPayload(
connector_id=1,
meter_value=[
{
"timestamp": "2021-06-21T16:15:09Z",
"sampledValue": [
{
"value": "67230012",
"context": "Sample.Clock",
"format": "Raw",
"measurand": "Energy.Active.Import.Register",
"location": "Inlet",
},
],
}
],
)
resp = await self.call(request)
assert resp is not None

async def send_meter_clock_data(self):
"""Send periodic meter data notification."""
self.active_transactionId = 0
Expand Down

0 comments on commit 6616437

Please sign in to comment.