Skip to content

Commit

Permalink
[#13] Kuksa example application (#14)
Browse files Browse the repository at this point in the history
[#13] Kuksa example application
---------

Signed-off-by: Dimitar Dimitrov <[email protected]>
  • Loading branch information
dimitar-dimitrow authored May 17, 2024
1 parent f254b19 commit 71e6e49
Show file tree
Hide file tree
Showing 10 changed files with 468 additions and 0 deletions.
59 changes: 59 additions & 0 deletions .github/workflows/docker-build-kuksa.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
name: kuksa

on:
workflow_dispatch:
push:
paths:
- ".github/workflows/docker-build-kuksa.yml"
- "kuksa/**"
branches:
- "main"
pull_request:
paths:
- ".github/workflows/docker-build-kuksa.yml"
- "kuksa/**"
branches:
- "main"

env:
REGISTRY: ghcr.io
IMAGE_NAME: ${{ github.repository }}/kuksa

jobs:
build-kuksa:
name: "Build multi-arch image"
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4

- name: Set up QEMU
uses: docker/setup-qemu-action@v3

- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3

- name: Login to GitHub Container Registry
uses: docker/login-action@v3
with:
registry: ghcr.io
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}

- name: Docker meta
id: meta
uses: docker/metadata-action@v4
with:
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}

- name: Build and push
uses: docker/build-push-action@v5
with:
push: ${{ github.event_name != 'pull_request' }}
context: ./kuksa
file: ./kuksa/Dockerfile
platforms: linux/amd64,linux/arm64,linux/arm/v7
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
cache-from: type=gha
cache-to: type=gha,mode=max
2 changes: 2 additions & 0 deletions kuksa/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
deployment.json
__pycache__
1 change: 1 addition & 0 deletions kuksa/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
__pycache__
20 changes: 20 additions & 0 deletions kuksa/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Use an official Python runtime as a parent image
FROM python:3.8-slim

# Set the working directory in the container
WORKDIR /app

# Copy the current directory contents into the container at /app
COPY . /app

# Install any needed packages specified in requirements.txt
RUN apt-get -y update \
&& apt-get -y install git \
&& pip install --no-cache-dir -r /app/requirements.txt \
&& apt-get remove -y git \
&& apt-get autoremove -y \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*

# Run edge_client.py when the container launches
CMD ["python", "-u" "/app/edge_client.py"]
125 changes: 125 additions & 0 deletions kuksa/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
![Kanto logo](https://github.com/eclipse-kanto/kanto/raw/main/logo/kanto.svg)

# Eclipse Kanto - Eclipse Kuksa Integration

# Introduction

This is an example application that connects to [Eclipse Kuksa Databroker](https://github.com/eclipse-kuksa/kuksa-databroker) and demonstrates how the COVESA Vehicle Signal Specification(VSS) data could be transformed to a digital twin using Eclipse Kanto. For the application it is transparent which Kanto cloud connectivity options is used(AWS, Azure or Suite). Nevertheless, for the completeness of the next guide steps, an AWS connector is chosen, so the VSS data from a Kuksa Databroker will be presented as an AWS IoT Shadow.

# Installation

## Prerequisites
You must have an installed and working instance of:
* Eclipse Kanto Container Management
* Eclipse Kanto AWS Connector that is connected to a Thing in AWS IoT Core

## Steps
Create container and start the Kuksa Databroker
```shell
kanto-cm create --name=databroker ghcr.io/eclipse-kuksa/kuksa-databroker:0.4.4 --insecure
kanto-cm start --name=databroker
```

Create container and start the Kuksa Databroker CLI in a dedicated terminal, where VSS data will be fed at later point
```shell
kanto-cm create --name=cli --i --t --hosts=databroker:container_databroker-host --rp=no ghcr.io/eclipse-kuksa/kuksa-databroker-cli:0.4.4 --server=databroker:55555
kanto-cm start --i --a --name=cli
```

Create container and start the Kuksa Example Application
```shell
kanto-cm create -f ./deployment.json
kanto-cm start --name=vss
```

There should be a new device shadow named 'VSS' in your AWS IoT Thing. With the VSS data from the Kuksa Databroker displayed as a shadow state
```json
{
"state": {
"reported": {
"Vehicle": {
"Length": {
"timestamp": "2024-05-06T12:31:33.732487+00:00",
"value": 0
},
"CurbWeight": {
"timestamp": "2024-05-06T12:31:33.732469+00:00",
"value": 0
},
"GrossWeight": {
"timestamp": "2024-05-06T12:31:33.732484+00:00",
"value": 0
},
"StartTime": {
"timestamp": "2024-05-06T12:31:33.732633+00:00",
"value": "0000-01-01T00:00Z"
},
"MaxTowWeight": {
"timestamp": "2024-05-06T12:31:33.732493+00:00",
"value": 0
},
"MaxTowBallWeight": {
"timestamp": "2024-05-06T12:31:33.732492+00:00",
"value": 0
},
"Speed": {
"timestamp": "2024-05-06T12:31:33.732486+00:00",
"value": 0
},
"Height": {
"timestamp": "2024-05-06T12:31:33.732485+00:00",
"value": 0
},
"Width": {
"timestamp": "2024-05-06T12:31:33.732869+00:00",
"value": 0
}
}
}
}
}
```

You can go back to the Kuksa Databroker CLI terminal and feed new data to the Kuksa Databroker
```shell
feed Vehicle.Speed 120
feed Vehicle.CurrentLocation.Altitude 640
feed Vehicle.CurrentLocation.Latitude 43
feed Vehicle.CurrentLocation.Longitude 25
```

The Kuksa Example Application is subscribed for changes of this VSS data paths and the values are updated in the VSS shadow as well
```json
{
"state": {
"reported": {
"Vehicle": {
...
"Speed": {
"timestamp": "2024-05-06T15:11:12.911755+00:00",
"value": 120
},
...
"CurrentLocation": {
"Altitude": 640,
"Latitude": 43,
"Longitude": 25
}
}
}
}
}
```

# Control
The Kuksa Example Application is based on python scripts that allows configuring of connection settings for the local MQTT broker and the Kuksa Databroker and also which VSS data paths to be followed. Allowed arguments and their default values:
| Argument | Type | Default | Description |
| -------- | ---- | ------- | ----------- |
|mqtt_host |string|localhost|MQTT broker host |
|mqtt_port |int |1883 |MQTT broker port |
|mqtt_username |string| |MQTT username |
|mqtt_password |string| |MQTT password |
|kuksa_host |string|localhost|Kuksa Databroker host|
|kuksa_port |int |55555 |Kuksa Databroker port|
|vss_paths |string|Vehicle.CurrentLocation.Altitude,Vehicle.CurrentLocation.Latitude,Vehicle.CurrentLocation.Longitude,Vehicle.Speed| Comma separated VSS data paths to subscribe to |
|log_level |string| info |Logging level, possible values are critical,fatal,error,warn,warning,info,debug,notset
22 changes: 22 additions & 0 deletions kuksa/deployment.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"container_name": "vss",
"image": {
"name": "ghcr.io/eclipse-kanto/example-applications/kuksa:main"
},
"config": {
"cmd": [
"python",
"-u",
"/app/edge_client.py",
"--mqtt_host=ctrhost",
"--kuksa_host=databroker"
]
},
"host_config": {
"extra_hosts": [
"ctrhost:host_ip",
"databroker:container_databroker-host"
]
}
}

149 changes: 149 additions & 0 deletions kuksa/edge_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
# Copyright (c) 2024 Contributors to the Eclipse Foundation
#
# See the NOTICE file(s) distributed with this work for additional
# information regarding copyright ownership.
#
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License 2.0 which is available at
# https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
# which is available at https://www.apache.org/licenses/LICENSE-2.0.
#
# SPDX-License-IDentifier: EPL-2.0 OR Apache-2.0

import argparse
import logging
import signal
import sys

import paho.mqtt.client as mqtt
from ditto.client import Client
from ditto.model.feature import Feature
from ditto.protocol.things.commands import Command
from kuksa_client import KuksaClientThread

from edge_device_info import EdgeDeviceInfo
from utils import process_tree, process_signal

FEATURE_ID_VSS = "VSS"

EDGE_CLOUD_CONNECTOR_TOPIC_DEV_INFO = "edge/thing/response"
EDGE_CLOUD_CONNECTOR_TOPIC_DEV_INFO_REQUEST = "edge/thing/request"

VSS_PATHS = "Vehicle.CurrentLocation.Altitude,Vehicle.CurrentLocation.Latitude,Vehicle.CurrentLocation.Longitude,Vehicle.Speed"

class EdgeClient:
def __init__(self, host, port, paths):
self.mqtt_client = None
self.kuksa_client = KuksaClientThread(config={'ip':host,'protocol': 'grpc', 'port': port, 'insecure': True})
self.device_info = None
self.ditto_client = None
self.vss_paths = paths
self.log = logging.getLogger('EDGE_CLIENT')

def on_connect(self, client:mqtt.Client, obj, flags, rc):
self.log.info("Connected with result code - " + str(rc))
self.mqtt_client = client
# init ditto client
self.ditto_client = Client(paho_client=self.mqtt_client)
self.ditto_client.connect()
# init kuksa client
self.kuksa_client.start()
# trigger initialization
self.mqtt_client.subscribe(EDGE_CLOUD_CONNECTOR_TOPIC_DEV_INFO)
self.mqtt_client.publish(EDGE_CLOUD_CONNECTOR_TOPIC_DEV_INFO_REQUEST, None, 1)

def on_message(self, client, userdata, msg):
try:
if msg.topic == EDGE_CLOUD_CONNECTOR_TOPIC_DEV_INFO:
if self.device_info is None:
self.device_info = EdgeDeviceInfo()
self.device_info.unmarshal_json(msg.payload)
self.add_vss_feature()
self.subscribe()
else:
self.log.info('Device info already available - discarding message')
return
except Exception as ex:
self.log.error(ex)

def subscribe(self):
self.log.info(f'Subscribing to VSS data paths - {self.vss_paths}')
self.kuksa_client.subscribeMultiple(self.vss_paths, self.on_kuksa_signal)

def add_vss_feature(self):
# add the vss feature
feature = Feature
cmd = Command(self.device_info.deviceId).feature(FEATURE_ID_VSS).modify(Feature().to_ditto_dict())
cmd_envelope = cmd.envelope(response_required=False, content_type="application/json")
self.ditto_client.send(cmd_envelope)

# add the vss tree as properties
vss_tree = self.kuksa_client.getValue('Vehicle.*')
processed = process_tree(vss_tree)
for key, val in processed.items():
cmd = Command(self.device_info.deviceId).feature_property(FEATURE_ID_VSS, key.replace('.','/')).modify(val)
cmd_envelope = cmd.envelope(response_required=False, content_type="application/json")
self.ditto_client.send(cmd_envelope)

def on_kuksa_signal(self, message):
self.log.info(f'Received signal - {message}')
if self.device_info is None:
self.log.info("No device info is initialized to process VSS data")
return
processed = process_signal(message)
# update property
self.log.info(f'Updating VSS properties - {processed}')
for key, val in processed.items():
cmd = Command(self.device_info.deviceId).feature_property(FEATURE_ID_VSS, key.replace('.','/')).modify(val)
cmd_envelope = cmd.envelope(response_required=False, content_type="application/json")
self.ditto_client.send(cmd_envelope)

def shutdown(self):
self.kuksa_client.stop()
self.ditto_client.disconnect()

def parse_args():
parser = argparse.ArgumentParser(description="Edge Client with configurable MQTT and Kuksa settings", formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("--mqtt_host", type=str, default="localhost", help="MQTT broker host")
parser.add_argument("--mqtt_port", type=int, default=1883, help="MQTT broker port")
parser.add_argument("--mqtt_username", type=str, default=None, help="MQTT username")
parser.add_argument("--mqtt_password", type=str, default=None, help="MQTT password")
parser.add_argument("--kuksa_host", type=str, default="localhost", help="Kuksa Databroker host")
parser.add_argument("--kuksa_port", type=int, default=55555, help="Kuksa Databroker port")
parser.add_argument("--vss_paths", type=str, default=VSS_PATHS, help="Comma separated VSS data paths to subscribe to")
parser.add_argument("--log_level", type=str, default='info', help="Logging level", choices=list(map(str.lower,logging._nameToLevel.keys())))
return parser.parse_args()

if __name__ == "__main__":
args = parse_args()

# Set logging
logging.basicConfig(level=args.log_level.upper())
log = logging.getLogger(__name__)

# Set VSS data paths to subscribe to
args.vss_paths = [s.strip() for s in args.vss_paths.split(",")]

paho_client = mqtt.Client()
edge_client = EdgeClient(args.kuksa_host, args.kuksa_port, args.vss_paths)

# Set MQTT username and password if provided
if args.mqtt_username and args.mqtt_password:
self.mqtt_client.username_pw_set(mqtt_username, mqtt_password)

paho_client.on_connect = edge_client.on_connect
paho_client.on_message = edge_client.on_message
paho_client.connect(args.mqtt_host, args.mqtt_port)


def termination_signal_received(signal_number, frame):
log.info("Received termination signal. Shutting down")
edge_client.shutdown()
paho_client.disconnect()


signal.signal(signal.SIGINT, termination_signal_received)
signal.signal(signal.SIGQUIT, termination_signal_received)
signal.signal(signal.SIGTERM, termination_signal_received)
log.info('before loop forever')
paho_client.loop_forever()
Loading

0 comments on commit 71e6e49

Please sign in to comment.