1.3.4. Edge Data Broker enabler

1.3.4.1. Introduction

The Edge Data Broker enables the efficient management of data demand and data supply among edge nodes based on a publish/subscribe schema, taking account load balancing criteria. This enabler distributes data where it is needed for application, services and further analysis while considered essential only in those deployments that involve IoT architectures.

1.3.4.2. Features

The Edge Data Broker enabler has the following operational and intelligent functionalities:

  • Subscriptions and messages between Edge Devices through the Edge Data Broker enabler

  • Management and distribution of messages using scheduling, routing and delivery mechanisms

  • Common interfaces for searching and finding information

  • Integration with other data brokers if needed

1.3.4.3. Place in architecture

The Edge Data Broker enabler is part of the Data Management Plane of ASSIST-IoT. The Data Management plane encompasses any process, in which data is processed to deliver features concerning data interoperability, annotation, security, acquisition, provenance, aggregation, fusion, etc. This enabler provides a data communication channel to all IoT devices.

1.3.4.4. User guide

The Edge Data Broker is an distributed MQTT Broker and follows the MQTT specification. As such in theory any MQTT compliant library can be used to connect, subscribe and publish messages to the Edge Data Broker.Here we provide an example using python.

This is a subscriber python script that uses the paho-mqtt library to connect to the Edge Data Broker, subscribe to a topic and receive and print messages from it.

import paho.mqtt.client as mqtt

broker= "127.0.0.1"
port  = 31883
topic = "assist.test"

# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, rc, test):
    print("Connected with result code "+str(rc))
    # Subscribing in on_connect() means that if we lose the connection and
    # reconnect then subscriptions will be renewed.
    client.subscribe(topic)

# The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg):
    print(msg.topic+" "+str(msg.payload))

client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message

client.connect(broker, port, 60)

# Blocking call that processes network traffic, dispatches callbacks and
# handles reconnecting.
# Other loop*() functions are available that give a threaded interface and a
# manual interface.
client.loop_forever()

This is a publisher python script that uses the paho-mqtt library to connect to the Edge Data Broker, subscribe to a topic and publishes messages to it.

import paho.mqtt.client as paho
import json, time

#broker= "10.0.2.15"
broker= "127.0.0.1"
port  = 31883
topic = "assist.test"

# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, rc, test):
    print("Connected with result code "+str(rc))

def on_publish(client,userdata,result):                    # create function for callback
    print("data published!")
    pass

client1= paho.Client("control1")                           # create client object
client1.on_publish = on_publish                            # assign function to callback
client1.on_connect = on_connect
client1.connect(broker,port)                               # establish connection
print("Connected to MQTT")
body = {}
body["name"] = "DeviceName"
body["raw-data"] = 1.000

i = 0
while i < 1000 :
    body["raw-data"] = float(i)
    bodyS = json.dumps(body)
    print("Publishig data: " + bodyS)
    ret= client1.publish(topic, bodyS)               # publish
    i += 1
    time.sleep(0.1)

Executing those two scripts will produce and consume json messages to the Edge Data Broker.

1.3.4.5. Prerequisites

The Edge Data Broker enabler is designed to be executed on a cluster of devices on ARM64 architecture. It can be executed of course on a x86 architecture as well by changing the docker image. It also requires Docker for building a new image and kubernetes/helm3 to deploy the enabler on a cluster.

1.3.4.6. Installation

1.3.4.6.1. Building the Docker image

On the Docker folder execute:

$ docker build . -t edb:latest

This will create the image. It should be visible with:

$ docker images

To push the image to a registry (using a local registry for this example):

$ docker tag edb:latest localhost:32000/edb:latest

$ docker tag edb:latest localhost:32000/edb:latest

Now we can use this image in kubernetes and helm.

1.3.4.6.2. Deploying with Kubernetes and Helm3

On the Helm folder execute:

$ cp values.yaml values.yaml.bu
$ helm3 install edb -f values.yaml .

This will install the enabler. To uninstall:

$ helm3 uninstall edb

1.3.4.6.3. Verification

Using the kubectl command:

$ kubectl get all -o wide
NAME        READY   STATUS    RESTARTS   AGE   IP             NODE     NOMINATED NODE   READINESS GATES
pod/edb-0   1/1     Running   0          9d    10.1.196.152   node01   <none>           <none>
pod/edb-1   1/1     Running   0          9d    10.1.140.84    node02   <none>           <none>

NAME                   TYPE        CLUSTER-IP       EXTERNAL-IP   PORT(S)                         AGE   SELECTOR
service/kubernetes     ClusterIP   10.152.183.1     <none>        443/TCP                         15d   <none>
service/edb-headless   ClusterIP   None             <none>        4369/TCP,8888/TCP               9d
app.kubernetes.io/instance=edb,app.kubernetes.io/name=edb
service/edb            NodePort    10.152.183.168   <none>        1883:31883/TCP,8888:30888/TCP   9d
app.kubernetes.io/instance=edb,app.kubernetes.io/name=edb

NAME                   READY   AGE   CONTAINERS   IMAGES
statefulset.apps/edb   2/2     9d    edb          localhost:32000/edb:latest

Also, the python scripts (provided in the User Guide section above) with the correct IP and PORT values can be used for testing.

1.3.4.7. Configuration options

The following table lists the configurable parameters of the chart and their default values.

Parameter

Description

Default

additionalEnv

additional environment variables

see values.y aml

envFrom

additional envFrom configmaps or secrets

see values.y aml

image.pullPolicy

container image pull policy

IfNotPresent

image.repository

container image repository

vernemq/vernemq

image.tag

container image tag

the current versions (e.g. 1.12.3)

ingress.enabled

whether to enable an ingress object to route to the WebSocket service. Requires an ingress controller and the WebSocket service to be enabled.

false

ingress.labels

additional ingress labels

{}

i ngress.annotations

additional service annotations

{}

ingress.hosts

a list of routable hostnames for host-based routing of traffic to the WebSocket service

[]

ingress.paths

a list of paths for path-based routing of traffic to the WebSocket service

/

ingress.tls

a list of TLS ingress configurations for securing the WebSocket ingress

[]

nodeSelector

node labels for pod assignment

{}

persistent Volume.accessModes

data Persistent Volume access modes

[ReadWriteOnce]

persistent Volume.annotations

annotations for Persistent Volume Claim

{}

persis tentVolume.enabled

if true, create a Persistent Volume Claim

true

per sistentVolume.size

data Persistent Volume size

5Gi

persistentV olume.storageClass

data Persistent Volume Storage Class

unset

` extraVolumeMounts`

Additional volumeMounts to the pod

[]

extraVolumes

Additional volumes to the pod

[]

secretMounts

mounts a secret as a file inside the statefulset. Useful for mounting certificates and other secrets.

[]

podAntiAffinity

pod anti affinity, soft for trying not to run pods on the same nodes, hard to force kubernetes not to run 2 pods on the same node

soft

rbac.create

if true, create & use RBAC resources

true

rbac.ser viceAccount.create

if true, create a serviceAccount

true

rbac.s erviceAccount.name

name of the service account to use or create

{{ include "vern emq.fullname" . }}

replicaCount

desired number of nodes

1

resources

resource requests and limits (YAML)

{}

securityContext

securityContext for containers in pod

{}

s ervice.annotations

service annotations

{}

` service.clusterIP`

custom cluster IP when service.type is ClusterIP

none

s ervice.externalIPs

optional service external IPs

none

service.labels

additional service labels

{}

serv ice.loadBalancerIP

optional load balancer IP when service.type is LoadBalancer

none

service.loadBa lancerSourceRanges

optional load balancer source ranges when service.type is LoadBalancer

none

service.ext ernalTrafficPolicy

set this to Local to preserve client source IPs and prevent additional hops between K8s nodes if the service type is LoadBalancer or NodePort

unset

servi ce.sessionAffinity

service session affinity

none

service.ses sionAffinityConfig

service session affinity config

none

se rvice.mqtt.enabled

whether to expose MQTT port

true

ser vice.mqtt.nodePort

the MQTT port exposed by the node when service.type is NodePort

1883

` service.mqtt.port`

the MQTT port exposed by the service

1883

ser vice.mqtts.enabled

whether to expose MQTTS port

false

serv ice.mqtts.nodePort

the MQTTS port exposed by the node when service.type is NodePort

8883

`` service.mqtts.port``

the MQTTS port exposed by the service

8883

service.type

type of service to create

ClusterIP

`` service.ws.enabled``

whether to expose WebSocket port

false

s ervice.ws.nodePort

the WebSocket port exposed by the node when service.type is NodePort

8080

service.ws.port

the WebSocket port exposed by the service

8080

s ervice.wss.enabled

whether to expose secure WebSocket port

false

se rvice.wss.nodePort

the secure WebSocket port exposed by the node when service.type is NodePort

8443

service.wss.port

the secure WebSocket port exposed by the service

8443

state fulset.annotations

additional annotations to the StatefulSet

{}

`` statefulset.labels``

additional labels on the StatefulSet

{}

stateful set.podAnnotations

additional pod annotations

{}

statefulset.p odManagementPolicy

start and stop pods in Parallel or OrderedReady (one-by-one.) Note - Cannot change after first release.

OrderedReady

sta tefulset.termination GracePeriodSeconds

configure how much time VerneMQ takes to move offline queues to other nodes

60

stateful set.updateStrategy

Statefulset updateStrategy

RollingUpdate

sta tefulset.lifecycle

Statefulset lifecycle hooks

{}

ser viceMonitor.create

whether to create a ServiceMonitor for Prometheus Operator

false

ser viceMonitor.labels

whether to add more labels to ServiceMonitor for Prometheus Operator

{}

pdb.enabled

whether to create a Pod Disruption Budget

false

pdb.minAvailable

PDB (min available) for the cluster

1

pdb.maxUnavailable

PDB (max unavailable) for the cluster

nil

1.3.4.8. Developer guide

Will be determined after the release of the enabler.

1.3.4.9. Version control and release

Will be determined after the release of the enabler.

1.3.4.10. License

Will be determined after the release of the enabler.

1.3.4.11. Notice(dependencies)

Will be determined after the release of the enabler.