1.3.4. Edge Data Broker enabler
1.3.4.1. Introduction
The Edge Data Broker enables the efficient management of data demand and data supply among edge nodes based on a publish/subscribe schema, taking account load balancing criteria. This enabler distributes data where it is needed for application, services and further analysis while considered essential only in those deployments that involve IoT architectures.
1.3.4.2. Features
The Edge Data Broker enabler has the following operational and intelligent functionalities:
Subscriptions and messages between Edge Devices through the Edge Data Broker enabler
Management and distribution of messages using scheduling, routing and delivery mechanisms
Common interfaces for searching and finding information
Integration with other data brokers if needed
1.3.4.3. Place in architecture
The Edge Data Broker enabler is part of the Data Management Plane of ASSIST-IoT. The Data Management plane encompasses any process, in which data is processed to deliver features concerning data interoperability, annotation, security, acquisition, provenance, aggregation, fusion, etc. This enabler provides a data communication channel to all IoT devices.
1.3.4.4. User guide
The Edge Data Broker is an distributed MQTT Broker and follows the MQTT specification. As such in theory any MQTT compliant library can be used to connect, subscribe and publish messages to the Edge Data Broker.Here we provide an example using python.
This is a subscriber python script that uses the paho-mqtt library to connect to the Edge Data Broker, subscribe to a topic and receive and print messages from it.
import paho.mqtt.client as mqtt
broker= "127.0.0.1"
port = 31883
topic = "assist.test"
# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, rc, test):
print("Connected with result code "+str(rc))
# Subscribing in on_connect() means that if we lose the connection and
# reconnect then subscriptions will be renewed.
client.subscribe(topic)
# The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg):
print(msg.topic+" "+str(msg.payload))
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect(broker, port, 60)
# Blocking call that processes network traffic, dispatches callbacks and
# handles reconnecting.
# Other loop*() functions are available that give a threaded interface and a
# manual interface.
client.loop_forever()
This is a publisher python script that uses the paho-mqtt library to connect to the Edge Data Broker, subscribe to a topic and publishes messages to it.
import paho.mqtt.client as paho
import json, time
#broker= "10.0.2.15"
broker= "127.0.0.1"
port = 31883
topic = "assist.test"
# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, rc, test):
print("Connected with result code "+str(rc))
def on_publish(client,userdata,result): # create function for callback
print("data published!")
pass
client1= paho.Client("control1") # create client object
client1.on_publish = on_publish # assign function to callback
client1.on_connect = on_connect
client1.connect(broker,port) # establish connection
print("Connected to MQTT")
body = {}
body["name"] = "DeviceName"
body["raw-data"] = 1.000
i = 0
while i < 1000 :
body["raw-data"] = float(i)
bodyS = json.dumps(body)
print("Publishig data: " + bodyS)
ret= client1.publish(topic, bodyS) # publish
i += 1
time.sleep(0.1)
Executing those two scripts will produce and consume json messages to the Edge Data Broker.
1.3.4.5. Prerequisites
The Edge Data Broker enabler is designed to be executed on a cluster of devices on ARM64 architecture. It can be executed of course on a x86 architecture as well by changing the docker image. It also requires Docker for building a new image and kubernetes/helm3 to deploy the enabler on a cluster.
1.3.4.6. Installation
1.3.4.6.1. Building the Docker image
On the Docker folder execute:
$ docker build . -t edb:latest
This will create the image. It should be visible with:
$ docker images
To push the image to a registry (using a local registry for this example):
$ docker tag edb:latest localhost:32000/edb:latest
$ docker tag edb:latest localhost:32000/edb:latest
Now we can use this image in kubernetes and helm.
1.3.4.6.2. Deploying with Kubernetes and Helm3
On the Helm folder execute:
$ cp values.yaml values.yaml.bu
$ helm3 install edb -f values.yaml .
This will install the enabler. To uninstall:
$ helm3 uninstall edb
1.3.4.6.3. Verification
Using the kubectl command:
$ kubectl get all -o wide
NAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES
pod/edb-0 1/1 Running 0 9d 10.1.196.152 node01 <none> <none>
pod/edb-1 1/1 Running 0 9d 10.1.140.84 node02 <none> <none>
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE SELECTOR
service/kubernetes ClusterIP 10.152.183.1 <none> 443/TCP 15d <none>
service/edb-headless ClusterIP None <none> 4369/TCP,8888/TCP 9d
app.kubernetes.io/instance=edb,app.kubernetes.io/name=edb
service/edb NodePort 10.152.183.168 <none> 1883:31883/TCP,8888:30888/TCP 9d
app.kubernetes.io/instance=edb,app.kubernetes.io/name=edb
NAME READY AGE CONTAINERS IMAGES
statefulset.apps/edb 2/2 9d edb localhost:32000/edb:latest
Also, the python scripts (provided in the User Guide section above) with the correct IP and PORT values can be used for testing.
1.3.4.7. Configuration options
The following table lists the configurable parameters of the chart and their default values.
Parameter |
Description |
Default |
|---|---|---|
|
additional environment variables |
see values.y aml |
|
additional envFrom configmaps or secrets |
see values.y aml |
|
container image pull policy |
|
|
container image repository |
|
|
container image tag |
the current versions
(e.g. |
|
whether to enable an ingress object to route to the WebSocket service. Requires an ingress controller and the WebSocket service to be enabled. |
|
|
additional ingress labels |
|
|
additional service annotations |
|
|
a list of routable hostnames for host-based routing of traffic to the WebSocket service |
|
|
a list of paths for path-based routing of traffic to the WebSocket service |
|
|
a list of TLS ingress configurations for securing the WebSocket ingress |
|
|
node labels for pod assignment |
|
|
data Persistent Volume access modes |
|
|
annotations for Persistent Volume Claim |
|
|
if true, create a Persistent Volume Claim |
|
|
data Persistent Volume size |
|
|
data Persistent Volume Storage Class |
|
` extraVolumeMounts` |
Additional volumeMounts to the pod |
|
|
Additional volumes to the pod |
|
|
mounts a secret as a file inside the statefulset. Useful for mounting certificates and other secrets. |
|
|
pod anti affinity,
|
|
|
if true, create & use RBAC resources |
|
|
if true, create a serviceAccount |
|
|
name of the service account to use or create |
|
|
desired number of nodes |
|
|
resource requests and limits (YAML) |
|
|
securityContext for containers in pod |
|
|
service annotations |
|
` service.clusterIP` |
custom cluster IP
when
|
|
|
optional service external IPs |
|
|
additional service labels |
|
|
optional load
balancer IP when
|
|
|
optional load
balancer source
ranges when
|
|
|
set this to
|
|
|
service session affinity |
|
|
service session affinity config |
|
|
whether to expose MQTT port |
|
|
the MQTT port
exposed by the node
when
|
|
` service.mqtt.port` |
the MQTT port exposed by the service |
|
|
whether to expose MQTTS port |
|
|
the MQTTS port
exposed by the node
when
|
|
`` service.mqtts.port`` |
the MQTTS port exposed by the service |
|
|
type of service to create |
|
`` service.ws.enabled`` |
whether to expose WebSocket port |
|
|
the WebSocket port
exposed by the node
when
|
|
|
the WebSocket port exposed by the service |
|
|
whether to expose secure WebSocket port |
|
|
the secure WebSocket
port exposed by the
node when
|
|
|
the secure WebSocket port exposed by the service |
|
|
additional annotations to the StatefulSet |
|
`` statefulset.labels`` |
additional labels on the StatefulSet |
|
|
additional pod annotations |
|
|
start and stop pods in Parallel or OrderedReady (one-by-one.) Note - Cannot change after first release. |
|
|
configure how much time VerneMQ takes to move offline queues to other nodes |
|
|
Statefulset updateStrategy |
|
|
Statefulset lifecycle hooks |
|
|
whether to create a ServiceMonitor for Prometheus Operator |
|
|
whether to add more labels to ServiceMonitor for Prometheus Operator |
|
|
whether to create a Pod Disruption Budget |
|
|
PDB (min available) for the cluster |
|
|
PDB (max unavailable) for the cluster |
|
1.3.4.8. Developer guide
Will be determined after the release of the enabler.
1.3.4.9. Version control and release
Will be determined after the release of the enabler.
1.3.4.10. License
Will be determined after the release of the enabler.
1.3.4.11. Notice(dependencies)
Will be determined after the release of the enabler.