1.3.4. Edge Data Broker enabler
1.3.4.1. Introduction
The Edge Data Broker enables the efficient management of data demand and data supply among edge nodes based on a publish/subscribe schema, taking account load balancing criteria. This enabler distributes data where it is needed for application, services and further analysis while considered essential only in those deployments that involve IoT architectures.
1.3.4.2. Features
The Edge Data Broker enabler has the following operational and intelligent functionalities:
Subscriptions and messages between Edge Devices through the Edge Data Broker enabler
Management and distribution of messages using scheduling, routing and delivery mechanisms
Common interfaces for searching and finding information
Integration with other data brokers if needed
1.3.4.3. Place in architecture
The Edge Data Broker enabler is part of the Data Management Plane of ASSIST-IoT. The Data Management plane encompasses any process, in which data is processed to deliver features concerning data interoperability, annotation, security, acquisition, provenance, aggregation, fusion, etc. This enabler provides a data communication channel to all IoT devices.
1.3.4.4. User guide
The Edge Data Broker is an distributed MQTT Broker and follows the MQTT specification. As such in theory any MQTT compliant library can be used to connect, subscribe and publish messages to the Edge Data Broker.Here we provide an example using python.
This is a subscriber python script that uses the paho-mqtt library to connect to the Edge Data Broker, subscribe to a topic and receive and print messages from it.
:emphasize-lines: 3,5
import paho.mqtt.client as mqtt
broker= "127.0.0.1"
port = 31883
topic = "assist.test"
# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, rc, test):
print("Connected with result code "+str(rc))
# Subscribing in on_connect() means that if we lose the connection and
# reconnect then subscriptions will be renewed.
client.subscribe(topic)
# The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg):
print(msg.topic+" "+str(msg.payload))
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect(broker, port, 60)
# Blocking call that processes network traffic, dispatches callbacks and
# handles reconnecting.
# Other loop*() functions are available that give a threaded interface and a
# manual interface.
client.loop_forever()
This is a publisher python script that uses the paho-mqtt library to connect to the Edge Data Broker, subscribe to a topic and publishes messages to it.
:emphasize-lines: 3,5
import paho.mqtt.client as paho
import json, time
#broker= "10.0.2.15"
broker= "127.0.0.1"
port = 31883
topic = "assist.test"
# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, rc, test):
print("Connected with result code "+str(rc))
def on_publish(client,userdata,result): # create function for callback
print("data published!")
pass
client1= paho.Client("control1") # create client object
client1.on_publish = on_publish # assign function to callback
client1.on_connect = on_connect
client1.connect(broker,port) # establish connection
print("Connected to MQTT")
body = {}
body["name"] = "DeviceName"
body["raw-data"] = 1.000
i = 0
while i < 1000 :
body["raw-data"] = float(i)
bodyS = json.dumps(body)
print("Publishig data: " + bodyS)
ret= client1.publish(topic, bodyS) # publish
i += 1
time.sleep(0.1)
Executing those two scripts will produce and consume json messages to the Edge Data Broker.
1.3.4.5. Prerequisites
The Edge Data Broker enabler is designed to be executed on a cluster of devices on ARM64 architecture. It can be executed of course on a x86 architecture as well by changing the docker image.
Kubernetes 1.16+
Helm 3+
1.3.4.6. Installation
1.3.4.6.1. Edge Data Broker (EDB) Enabler Installation
To install the chart with the release name edbe:
Clone the repository to your machine.
Install Edge Data Broker Enabler.
helm install edbe ./edge-data-broker
The command deploys EDB on the Kubernetes cluster in the default configuration.
Note: kostasiccs/vernemq image is suitable for ARM architectures (Assist-IoT’s GWEN, Raspberry Pi, etc.) and is the defaul image used in Edbe’s helm chart. For other architectures use vernemq/vernemq official image and accept the VerneMQ EULA by appending the following in the additionalEnv.
- name: DOCKER_VERNEMQ_ACCEPT_EULA
value: "yes"
To check if the installation was successful run:
kubectl get pods
The result should show something like:
NAME READY STATUS RESTARTS AGE
edbe-0 1/1 Running 0 174m
edbe-1 1/1 Running 0 172m
fr-script-66f6f8688d-7x6ts 1/1 Running 0 174m
To make the two VerneMQ nodes (edbe-0, edbe-1) run as a singular cluster, you’ll need to join one node to the other like this:
Connect to a shell of a running container within Kubernetes pod (edbe-0 or edbe-1).
kubectl exec -it edbe-0 -- /bin/bash
Check the cluster state (you should see a 1 node cluster):
vmq-admin cluster show
The result should show something like:
+--------------------------------------------------------+---------+
| Node | Running |
+--------------------------------------------------------+---------+
| VerneMQ@edbe-0.edbe-headless.default.svc.cluster.local | true |
+--------------------------------------------------------+---------+
Join one node to the other with:
vmq-admin cluster join discovery-node=<OtherClusterNode>
Check the cluster state (you should see a 2 node cluster):
vmq-admin cluster show
The result should show something like:
+--------------------------------------------------------+---------+
| Node | Running |
+--------------------------------------------------------+---------+
| VerneMQ@edbe-0.edbe-headless.default.svc.cluster.local | true |
+--------------------------------------------------------+---------+
| VerneMQ@edbe-1.edbe-headless.default.svc.cluster.local | true |
+--------------------------------------------------------+---------+
To make an MQTT Bridge connection between two different VermeMQ clusters before the installation append in one of the cluster’s values.yaml file the following additionaEnv:
- name: DOCKER_VERNEMQ_PLUGINS__VMQ_BRIDGE
value: "on"
- name: DOCKER_VERNEMQ_VMQ_BRIDGE__TCP__BR0
value: "10.43.0.1:31883"
- name: DOCKER_VERNEMQ_VMQ_BRIDGE__TCP__BR0__TOPIC__1
value: "* in"
- name: DOCKER_VERNEMQ_VMQ_BRIDGE__TCP__BR0__MAX_OUTGOING_BUFFERED_MESSAGES
value: "100"
Note: With the above configuration we allow to only import messages (all of them, ‘*’=”#” wildcard) from a remote broker with address 10.43.0.1 and port 31883 and store up to 100 messages to our buffer.
For more info refer to vernemq official Documentation page.
Connect to a shell of a running container within Kubernetes pod.
kubectl exec -it edbe-0 -- /bin/bash
Check the bridges state:
vmq-admin bridge show
The result should show something like:
+------+-----------------+-------------+------------+---------------------+--------------------------+
| name | endpoint | buffer size | buffer max | buffer dropped msgs | MQTT process mailbox len |
+------+-----------------+-------------+------------+---------------------+--------------------------+
| br0 | 10.43.0.1:31883 | 0 | 100 | 0 | 0 |
+------+-----------------+-------------+------------+---------------------+--------------------------+
To monitor Edge Data Broker Enabler, type to your browser:
http://<IP>:30888/status to get EDBE’s status page.
http://<IP>:30888/metrics to get EDBE’s metrics page made for Performance and Usage Diagnosis Enabler’s consumption.
To access Filtering and Ruling json file:
Port forward fr-script’s pod to port 8000:
kubectl port-forward fr-script-66f6f8688d-7x6ts 8000
GET or POST Filtering and Ruling json file by Postman, CURL, etc, with http://<ip>:8000/.
To see fr-script’s logs:
kubectl logs fr-script-66f6f8688d-7x6ts
1.3.4.7. Configuration options
The following table lists the configurable parameters of the chart and their default values.
Parameter |
Description |
Default |
|---|---|---|
additionalEnv |
additional environment variables |
see values.yaml |
envFrom |
additional envFrom configmaps or secrets |
see values.yaml |
image.pullPolicy |
container image pull policy |
|
image.repository |
container image repository |
|
image.tag |
container image tag |
the current versions (e.g. 1.12.3) |
ingress.enabled |
whether to enable an ingress object to route to the WebSocket service. Requires an ingress controller and the WebSocket service to be enabled. |
|
ingress.labels |
additional ingress labels |
|
ingress.annotations |
additional service annotations |
|
ingress.hosts |
a list of routable hostnames for host-based routing of traffic to the WebSocket service |
|
ingress.paths |
a list of paths for path-based routing of traffic to the WebSocket service |
|
ingress.tls |
a list of TLS ingress configurations for securing the WebSocket ingress |
|
nodeSelector |
node labels for pod assignment |
|
persistentVolume.accessModes |
data Persistent Volume access modes |
|
persistentVolume.annotations |
annotations for Persistent Volume Claim |
|
persistentVolume.enabled |
if true, create a Persistent Volume Claim |
|
persistentVolume.size |
data Persistent Volume size |
|
persistentVolume.storageClass |
data Persistent Volume Storage Class |
|
extraVolumeMounts |
Additional volumeMounts to the pod |
|
extraVolumes |
Additional volumes to the pod |
|
secretMounts |
mounts a secret as a file inside the statefulset. Useful for mounting certificates and other secrets |
|
podAntiAffinity |
pod anti affinity, soft for trying not to run pods on the same nodes, hard to force kubernetes not to run 2 pods on the same node |
|
rbac.create |
if true, create & use RBAC resources |
|
rbac.serviceAccount.create |
if true, create a serviceAccount |
|
rbac.serviceAccount.name |
name of the service account to use or create |
|
replicaCount |
desired number of nodes |
|
resources |
resource requests and limits (YAML) |
|
securityContext |
securityContext for containers in pod |
|
service.annotations |
service annotations |
|
service.clusterIP |
custom cluster IP when service.type is ClusterIP |
|
service.externalIPs |
optional service external IPs |
|
service.labels |
additional service labels |
|
service.loadBalancerIP |
optional load balancer IP when service.type is LoadBalancer |
|
service.loadBalancerSourceRanges |
optional load balancer source ranges when service.type is LoadBalancer |
|
service.externalTrafficPolicy |
set this to Local to preserve client source IPs and prevent additional hops between K8s nodes if the service type is LoadBalancer or NodePort |
|
service.sessionAffinity |
service session affinity |
|
service.sessionAffinityConfig |
service session affinity config |
|
service.mqtt.enabled |
whether to expose MQTT port |
|
service.mqtt.nodePort |
the MQTT port exposed by the node when service.type is NodePort |
|
service.mqtt.port |
the MQTT port exposed by the service |
|
service.mqtts.enabled |
whether to expose MQTTS port |
|
service.mqtts.nodePort |
the MQTTS port exposed by the node when service.type is NodePort |
|
service.mqtts.port |
the MQTTS port exposed by the service |
|
service.type |
type of service to create |
|
service.ws.enabled |
whether to expose WebSocket port |
|
service.ws.nodePort |
the WebSocket port exposed by the node when service.type is NodePort |
|
service.ws.port |
the WebSocket port exposed by the service |
|
service.wss.enabled |
whether to expose secure WebSocket port |
|
service.wss.nodePort |
the secure WebSocket port exposed by the node when service.type is NodePort |
|
service.wss.port |
the secure WebSocket port exposed by the service |
|
statefulset.annotations |
additional annotations to the StatefulSet |
|
statefulset.labels |
additional labels on the StatefulSet |
|
statefulset.podAnnotations |
additional pod annotations |
|
statefulset.podManagementPolicy |
start and stop pods in Parallel or OrderedReady (one-by-one.) Note - Cannot change after first release. |
|
statefulset.terminationGracePeriodSeconds |
configure how much time VerneMQ takes to move offline queues to other nodes |
|
statefulset.updateStrategy |
Statefulset updateStrategy |
|
statefulset.lifecycle |
Statefulset lifecycle hooks |
|
serviceMonitor.create |
whether to create a ServiceMonitor for Prometheus Operator |
|
serviceMonitor.labels |
whether to add more labels to ServiceMonitor for Prometheus Operator |
|
pdb.enabled |
whether to create a Pod Disruption Budget |
|
pdb.minAvailable |
PDB (min available) for the cluster |
|
pdb.maxUnavailable |
PDB (max unavailable) for the cluster |
|
certificates.cert |
String (not base64 encoded) containing the listener certificate in PEM format |
|
certificates.key |
String (not base64 encoded) containing the listener private key in PEM format |
|
certificates.ca |
String (not base64 encoded) containing the CA certificate for validating client certs |
|
certificates.secret.labels |
additional labels for the created secret containing certificates and keys |
|
certificates.secret.annotations |
additional labels for the created secret containing certificates and keys |
|
acl.enabled |
whether acls should be applied |
|
acl.content |
content of the acl file |
|
acl.labels |
additional labels on the acl configmap |
|
acl.annotations |
additional annotations on the acl configmap |
|
1.3.4.8. Developer guide
1.3.4.8.1. FR-Script Documentation
In order for the fr_script to operate the user should provide relevant filters and rules corresponding to different use cases (scenarios).
The filters and rules should be provided in json format. GET, POST, PATCH, DELETE HTTP Methods can be used to fetch, post, update and delete json objects via an API respectively. The APIs can get accessed on port 30008. Use endpoint /docs#/ for accessing swgger UI.
The _json_ consists of two parts.
{
“filters”: [],
“rules”: []
}
Witch both contains an array of objects.
1.3.4.8.2. Filters
For the filtering, the MQTT topic which the user wants to filter is required. It consists of one or more topic levels and can contain “#” and “+” wildcard as well.
A subtopic is also required. It will get appended to the topic that is being filtered and create the new topic in which the filtered messages will be published. This can also consist one or more topic levels.
After setting the topic and subtopic of the filter, statements also need to get defined. Statements is an array of objects. Every statement consists of two components, a condition and a new_payload. A condition takes as value the same thing that an if statement expression would. Variables, values, comparison operators, logical operators and parenthesis, to set the priority of the operations. NOTE: Use spaces between every instance of the condition.
The variables should exist as key values in the json message sent to the topic that is being filtered. In the json file with the filters and rules that the user provides, those same variables should start with the $ sign, followed by their name. If the filtered json message has nested objects, the parent variable comes after the $ sign, followed by a dot . and then the child variable. Example: $parent.child
The new_payload takes as value a string value or "". The new_payload’s value is the new message that will be published at the newlly set filtered topic. If the new_payload’s value is "" and the statements condition is met, the initial message of the filtered topic will be sent.
Example
Let’s say we have a number of houses in a smart city. There are sensors installed inside and outside of those houses that generate data like the json below.
{"h_id":1,"inside":{"temperature":35,"humidity":60},"temperature":43,"wind_speed":34}
The sensors of every house publish their data in a topic like house/1, house/2, etc.
The team that inspects and monitors the smart city wants to receive the sensor’s data only when those exceed some threshold and not all of them, so they subscribe on the topic house/+/alert/ (“+” is a single-level wildcard that matches any name for a specific topic level.) and use the json below to set the rules for the filtering of the data being published on
house/#.
{
"filters": [
{
"topic": "house/#",
"subtopic": "alert/",
"statements": [
{
"condition": "( $inside.temperature < 20 and $inside.humidity >= 60 ) or $temperature < 5",
"new_payload": ""
},
{
"condition": "$inside.temperature >= 45 and $inside.humidity <= 15",
"new_payload": "fire_danger"
}
]
}
],
"rules": []
}
The messages below published by the sensors of houses 1,2 and 3 in topics house/1, house/2 and house/3 respectively.
{"h_id":1,"inside":{"temperature":50,"humidity":6},"temperature":8,"wind_speed":34}
{"h_id":2,"inside":{"temperature":15,"humidity":60},"temperature":8,"wind_speed":34}
{"h_id":3,"inside":{"temperature":22,"humidity":55},"temperature":8,"wind_speed":35}
And the monitoring team’s client that was subscribed to the topic house/+/alert/ got the messages:
house/1/alert/--> b'fire_danger'
house/2/alert/--> b'{"h_id":2,"inside":{"temperature":15,"humidity":60},"temperature":8,"wind_speed":34}'
1.3.4.8.3. Rules
In the rules part of fr_script, every rule consists of two parts.
{
“filters”: [],
“rules”: [
“statements”: [],
“logic”: []
]
}
statements and logic witch both contains an array of objects.
The statements are situated very similar to the filters.
Every statement consists of the MQTT topic that the user wants to apply rules against, the condition which work exactly like the conditions in filtering, an id unique for every statement and the payload type of the messages’ fields sent to the above defined topic and are used as variables in our condition. Those can be int, float, str, bool.
Every instance in logic array consist of the logical operations which constitute the essence of the ruling part of the script, the newly created topic new_topic and the payload that would be published in it only if the logical operations return true.
Example
Let’s say we are managers in a mine. We have sensors inside the mine monitoring its environment as well as biometric sensors on every miner. The sensors monitoring mine’s environment produces messages like the json below:
{“temperature”: 25, “humidity”: 90}
and publish them in mine/environment topic.
The miners’ biometric sensors produce messages like:
{“m_id”:1, “body-temperature”: 36.6, “heart-rate”: 80}
And publish their data in a topic like miner/1, miner/2, etc.
So as managers we want to apply the following rules to monitor the miners’ wellbeing.
If miner’s heart rate is between 100-120 and the mine’s temperature is above 35 or the humidity is above 85 the miner should rest.
If miner’s body temperature is above 38 degrees and the mine’s temperature is above 30 the miner should leave.
If miner’s heart rate is 0 the miner is dead.
The fr_script should be as follows:
{
“filters”: [],
“rules”: [
{
"statements": [
{
"id": 1,
"topic": "miner/#",
"payload_type": "float",
"condition": "$heart-rate >= 100 and $heart-rate <= 120"
},
{
"id": 2,
"topic": "mine/environment",
"payload_type": "int",
"condition": "$temperature > 35 or $humidity > 85"
}
],
"logic": [
{
"operations": "( $1 ) and ( $2 )",
"new_topic": "action/rest",
"new_payload": ""
}
]
},
{
"statements": [
{
"id": 3,
"topic": "miner/#",
"payload_type": "float",
"condition": "$body-temperature > 38"
},
{
"id": 4,
"topic": "mine/environment",
"payload_type": "int",
"condition": "$temperature > 30"
}
],
"logic": [
{
"operations": "$3 and $4",
"new_topic": "action/leave",
"new_payload": ""
}
]
},
{
"statements": [
{
"id": 5,
"topic": "miner/#",
"payload_type": "float",
"condition": "$heart-rate == 0"
}
],
"logic": [
{
"operations": "$5",
"new_topic": "action/dead",
"new_payload": ""
}
]
}
]
}
The messages below published by the sensors on the workers’ 1 and workers’ 2 equipment as well as sensors on the mine itself. Our topics are miner/1, miner/2 and mine/environment respectively and the messages are published in the order shown bellow.
{"m_id":1, "body-temperature": 36.6, "heart-rate": 105}
to topic miner/1
{"m_id":2, "body-temperature": 38.6, "heart-rate": 75}
to topic miner/2
{"temperature": 35, "humidity": 90}
to topic mine/environment
{"m_id":1, "body-temperature": 16.6, "heart-rate": 0}
to topic miner/1
The monitoring team’s client that was subscribed to the topic !action will get the messages:
!action/rest--> "{'miner/1': {'m_id': 1, 'body-temperature': 39.6, 'heart-rate': 105}, 'mine/environment': {'temperature': 35, 'humidity': 90}}"
!action/leave--> "{'miner/2': {'m_id': 2, 'body-temperature': 38.6, 'heart-rate': 75}, 'mine/environment': {'temperature': 35, 'humidity': 90}}"
(Just after the message sent to topic mine/environment)
!action/dead--> "{'miner/1': {'m_id': 1, 'body-temperature': 16.6, 'heart-rate': 0}}"
NOTE: If the messages were sent in a different order like bellow:
{"m_id":1, "body-temperature": 36.6, "heart-rate": 105}
to topic miner/1
{"temperature": 35, "humidity": 90}
to topic mine/environment
{"m_id":2, "body-temperature": 38.6, "heart-rate": 75}
to topic miner/2
{"m_id":1, "body-temperature": 16.6, "heart-rate": 0}
to topic miner/1
And the monitoring team’s client that was subscribed to the topic !action will get the messages:
!action/rest--> "{'miner/1': {'m_id': 1, 'body-temperature': 39.6, 'heart-rate': 105}, 'mine/environment': {'temperature': 35, 'humidity': 90}}"
!action/dead--> "{'miner/1': {'m_id': 1, 'body-temperature': 16.6, 'heart-rate': 0}}"
This happens because when a logical operation comes True in fr_script’s rules and a new message is sent, the array holding the messages previously sent to fr_script, empty itself.
Lastly as we can see when “new_payload”: “” the new payload generated by fr_script is a json with the topic(s) used in the logic’s operations and their payload(s). Topics created by fr_script will always start with “!” as shown above.
1.3.4.9. Version control and release
VerneMQ v1.12.3
FR_Script v1.0
1.3.4.10. License
Will be determined after the release of the enabler.