2.1.3. Location processing
Documentation for the Location Processing enabler of ASSIST-IoT.
2.1.3.1. Introduction
The Location Processing enabler aims to provide highly configurable and flexible geofencing capabilities based on location data. The enabler consists of a Scala application and a Postgres database.
The application is written with the Akka framework. It runs user-defined SQL queries against the database. The incoming data is collected from input streams or HTTP requests; it allows for streaming the query results. The transferred data is in JSON format. The behavior of the application is configurable through an HTTP interface. The application streaming capabilities are compatible with the MQTT protocol.
The database is shipped with the Postgis extension. It stores the geolocation data and the application configuration.
This enabler has reached a TRL of 6 during the execution of the ASSIST-IoT project.
2.1.3.2. Features
2.1.3.2.1. Input stream settings
Passing credentials
Setting input topics
Providing MQTT connection URL
2.1.3.2.2. Output stream settings
Passing credentials
Setting parametrized1 output topics
Setting MQTT publish flags
Providing MQTT connection URL
Custom output JSON format
2.1.3.2.3. SQL queries
Access to a database with geolocation capabilities
Query parametrization1
2.1.3.2.4. HTTP interface
Creating queries
Updating queries
Deleting queries
Retrieving queries
Running queries manually
1 Parametrization refers to access to input or output data in JSON (with JSONPath), string, or byte string format. To do that, a special syntax is provided.
2.1.3.3. Place in architecture
The data management plane.
2.1.3.4. User guide
The user-defined queries can be created via the HTTP interface. After successful creation, a query is stored in the database. Then, it is run inside the application, and it starts processing the data.
It is assumed that the spatial model will be specified inside the database before starting the application. So that queries have access to the required tables and data.
2.1.3.5. User guide HTTP interface
String (optionally) containing parametrization syntax.
Examples:
"topic_{output['id']}"
"positions"
One of the following strings:
QoSAtLeastOnceDelivery
QoSAtMostOnceDelivery
QoSExactlyOnceDelivery
Retain
Examples:
"QoSAtLeastOnceDelivery"
Determines when to publish the produced output.
One of the following strings: - success - failure - always
failure refers to the situation if an error has occured during
processing the stream.
Examples:
"always"
Determines the records formatting. Two styles are enabled – array
and object.
array formatting:
{
"fields": [
{"name": "field_1", "type", "type_1"},
...
],
"records": [
[value_1_1, value_1_2, ..., value_1_n],
[value_2_1, value_2_2, ..., value_2, n],
...
]
}
object formatting:
{
"fields": [
{"name": "field_1", "type", "type_1"},
...
],
"records": [
{"field_1": value_1_1, "field_2": value_1_2, ..., "field_n": value_1_n},
{"field_1": value_2_1, "field_2": value_2_2, ..., "field_n": value_2_n},
...
]
}
Examples:
"array"
JSON format for the output data.
Name |
Description |
Type |
|---|---|---|
recordFormat |
Determines the records formatting |
recordFormat |
showHeader |
Whether to show the header |
boolean |
wrapSingleColumn |
Whether a single column should be wrapped |
boolean |
Examples:
{
"recordFormat": "object",
"showHeader": true,
"wrapSingleColumn": true
}
Topic to subscribe to.
Name |
Description |
Type |
|---|---|---|
name |
Topic name |
string |
Examples:
{
"name": "vehicles/excavators"
}
Topic where the output is published.
Name |
Description |
Type |
|---|---|---|
name |
Topic name |
string |
publishEmptyOutput optional |
Whether to publish empty output |
boolean |
publishWhen optional |
When to publish |
publishWhen |
publishFlags optional |
Publish flags |
array[publishFlag] |
{
"name": "cats",
"publishFlags": ["QoSExactlyOnceDelivery", "Retain"],
"publishWhen": "success",
"publishEmptyOutput": false
}
2.1.3.6. MQTT Input Settings
Name |
Description |
Type |
|---|---|---|
host |
MQTT host |
string |
port |
MQTT port |
number |
username |
Client credentials (optional) |
string |
password |
Client credentials (optional) |
string |
topics |
MQTT topics to subscribe (optional) |
array[inpu tTopic] |
Examples:
{
"host": "pilot1",
"port": 1883,
"username": "jared",
"password": "dunn",
"topics": [
{"name": "vehicles/excavators"},
{"name": "cats"}
]
}
2.1.3.7. MQTT Input Settings
Name |
Description |
Type |
|---|---|---|
host |
MQTT host |
string |
port |
MQTT port |
number |
username |
Client credentials (optional) |
string |
password |
Client credentials (optional) |
string |
topics |
MQTT topics to subscribe (optional) |
array[inp utTopic] |
Examples:
{
"host": "pilot2",
"port": 1883,
"username": "bob",
"password": "builder",
"topics": [
{
"name": "danger/{output['id']}",
"publishFlags": [
"QoSExactlyOnceDelivery",
"Retain"
],
"publishWhen": "always",
"publishEmptyOutput": true
}
],
"format": {
"recordFormat": "object",
"showHeader": true,
"wrapSingleColumn": true
}
}
2.1.3.8. Query Configuration
Name |
Description |
Type |
|---|---|---|
name |
Unique query name (required) |
string |
inputSettings |
Input settings (optional) |
inputSettings |
outputSet tings |
Output settings (optional) |
outputSettings |
sql |
SQL query (required) |
parametrizedString |
{
"name": "dangerous",
"inputSettings": {
"host": "pilot1",
"port": 1883,
"username": "jared",
"password": "dunn",
"topics": [
{"name": "vehicles/excavators"},
{"name": "cats"}
]
},
"outputSettings": {
"host": "pilot2",
"port": 1883,
"username": "bob",
"password": "builder",
"topics": [
{
"name": "danger/{output['id']}",
"publishFlags": [
"QoSExactlyOnceDelivery",
"Retain"
],
"publishWhen": "always",
"publishEmptyOutput": true
}
],
"format": {
"recordFormat": "object",
"showHeader": true,
"wrapSingleColumn": true
}
},
"sql": "select id, x, y from worker_positions where st_distance(st_makepoint(x, y), st_makepoint({input['x']}, {input['y']})) < 50;"
}
To create a query, use the POST method on the v1/queries endpoint.
The request’s body is expected to be a complete definition of a query.
{
"name": "dangerous",
"inputSettings": {
"host": "pilot1",
"port": 1883,
"username": "jared",
"password": "dunn",
"topics": [
{"name": "vehicles/excavators"},
{"name": "cats"}
]
},
"outputSettings": {
"host": "pilot2",
"port": 1883,
"username": "bob",
"password": "builder",
"topics": [
{
"name": "danger/{output['id']}",
"publishFlags": [
"QoSExactlyOnceDelivery",
"Retain"
],
"publishWhen": "always",
"publishEmptyOutput": true
}
],
"format": {
"recordFormat": "object",
"showHeader": true,
"wrapSingleColumn": true
}
},
"sql": "select id, x, y from worker_positions where st_distance(st_makepoint(x, y), st_makepoint({input['x']}, {input['y']})) < 50;"
}
The query’s name is its unique identifier which will be later
referenced to use the query. The inputSettings and
outputSettings refer to the MQTT input and output configuration. In
both cases, host, port, username, and password are used
to connect to brokers. The client subscribes to the list of topic names
provided in the topics inside the input settings. The topics
from the output settings are slightly different. Their names can be
parametrized (see the dedicated section for more explanation on that
matter), and they represent the topics to which the output messages will
be sent. In this example, the message would be sent to the topic, which
part would be determined after getting some specific field (in this case
– id) from the generated output (from running the SQL query).
"name": "danger/{output['id']}"
The part with publishFlags refers to the list of MQTT flags set
while sending the message. The next part of the outputSettings is
publishWhen. This option can control when the publish action is
triggered – for instance, only if there are no errors. This modifier is
practical as the output from running without errors can differ from the
output of a failed query. The option publishEmptyOutput should be
self-explanatory. Then there is format that controls how the output
is formatted (see the definitions section to see the examples). Finally
– the sql query. It is a parametrized string where one can use the
received input values. The query will be run against the database
included in the enabler. After creating the query, the response will
confirm the operation returning the created query.
The queries can be updated by sending a PUT request (v1/queries)
with the body describing the query the same as while creating a new
query.
To delete a query, send a DELETE request to v1/queries/{name}, where
name is the query name.
If one provides the input and output MQTT settings, then the queries are
typically triggered by the MQTT events. However, every query can be run
from the HTTP interface by calling its name and passing the input it
expects from the MQTT broker – the request body must be a valid JSON
with the input data. The endpoint format is v1/queries/{name}/input
(POST request). After running the query, the output is sent as the
response.
Retrieves all queries.
Parameters: none.
Body: none.
Returns:
On success - status code 200, body:
{
"queries": [
{...},
{...}
]
}
On failure - status code 500, body:
{
"description": "..."
}
Retrieves the query with name.
Parameters:
name: query name
Body: none.
Returns:
On success - status code 200, body:
{
"query": {
...
}
}
If query does not exist - status code 404, body:
{
"description: "..."
}
On failure - status code 500, body:
{
"description": "..."
}
Creates a query.
Parameters: none.
Body:
query to be created
Returns:
On success - status code 201, body:
{
"info": "...",
"query": {
...
}
}
On error - status code 400, body:
{
"description: "..."
}
Updates the query with name.
Parameters:
name: query name
Body:
query to be updated
Returns:
On success - status code 200, body:
{
"info": "...",
"query": {
...
}
}
On error - status code 400, body:
{
"description: "..."
}
Deletes the query with name.
Parameters:
name: query name
Body: none.
Returns:
On success - status code 200, body:
{
"info": "...",
"deletedQueriesCount": ...
}
On failure - status code 400, body:
{
"description": "..."
}
Runs the query with name. The topic parameters is optional.
Later, it can be one can refer to it as the input topic. If the input
topic is not specified the default one will be used – “http”.
Parameters:
name: query nametopic: input topic name (optional)
Body: JSON data to be inserted.
Returns:
On success - status code 200, body:
{
"..."
}
The returned body can be any output specified in the query settings.
On failure - status code 500, body:
{
"description": "..."
}
Returns the metrics of the enabler in the Prometheus format.
2.1.3.9. User guide parametrization
Parametrization is a feature that gives access to the incoming data and the results of running queries. For defining SQL queries, input data is available. Output MQTT topics have access to input and output data.
inputinput payload as a parsed JSONinputTopicMQTT topic on which the data arrived, as stringstrInputinput payload as string
outputoutput payload as a parsed JSONstrOutputoutput payload as string
All expressions must be inside curly braces {...}.
To access JSON data (input or output) use
JSONPath
syntax. - JSON strings will yield SQL string literals. - JSON integers
will yield SQL integer literals. - JSON nulls will yield SQL NULL. -
Unmatched paths will yield SQL NULL.
Matching other types of JSON values will result in an error.
Example – accessing the input value in PostgreSQL
"select {strInput}::json->>2;"
Example – JSON Path queries
"topic/{output..temperature.max()}"
2.1.3.10. Prerequisites
2.1.3.11. Installation
For development, run the following scripts:
# first terminal
./scripts/dev-env.sh
The dev-env.sh script starts the Postgres database (with a pgAdmin
instance) and the MQTT broker (with a MQTT explorer instance). The
database is accessible at localhost:5432. The pgAdmin instance is
accessible at localhost:5433. The MQTT broker is accessible at
localhost:1883. The MQTT explorer instance is accessible at
localhost:4000. Additionally, one can run the qgis.sh script to
start a QGIS instance to visualize the geolocation data.
# second terminal
./scripts/dev-app.sh
The dev-app.sh script starts the application. The application is
accessible at localhost:8080.
To simulate the production environment, run the following scripts:
# first terminal
./scripts/prod-env.sh
The prod-env.sh script starts the Postgres database.
# second terminal
./scripts/prod-app.sh
The prod-app.sh script starts the application. The application is
accessible at localhost:8080.
Use the provided helm charts in the helm directory.
helm install lp helm
2.1.3.12. Configuration
The app can be configured via environment variables. - HTTP_PORT:
port at which the API is accessible, i.e. 8080 - DB_ADMIN_PORT:
i.e. 5432 - DB_ADMIN_NAME: i.e. postgres -
DB_ADMIN_USER: i.e. postgres - DB_ADMIN_PASSWORD:
i.e. postgres - DB_QUERIES_SERVER_NAME: i.e. postgres -
DB_QUERIES_PORT: i.e. 5432 - DB_QUERIES_NAME:
i.e. queries - DB_QUERIES_USER: i.e. queries_user -
DB_QUERIES_PASSWORD: i.e. postgres123 -
DB_GEOLOCATION_SERVER_NAME: i.e. postgres -
DB_GEOLOCATION_PORT: i.e. 5432 - DB_GEOLOCATION_NAME:
i.e. geolocation - DB_GEOLOCATION_USER:
i.e. geolocation_user - DB_GEOLOCATION_PASSWORD:
i.e. postgres123
The database can be configured via environment variables. -
POSTGRES_DB: i.e. postgres - POSTGRES_USER:
i.e. postgres - POSTGRES_PASSWORD: i.e. postgres
2.1.3.13. Developer guide
Refer to the installation guide to setup the environment.
The development scripts are located in scripts directory. -
check.sh runs linter (in check mode) and tests - clean.sh cleans
docker data - fix.sh runs linter - sql-formatter.sh formats sql
files
Refer to the build.sbt file.
Configs for scalafmt, scalafix, and scalastyle can be found
in the configs directory.
2.1.3.14. Version control and releases
The latest version is 1.0.0.
2.1.3.15. License
The Location Processing is licensed under the Apache License, Version 2.0 (the “License”).
One may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
2.1.3.16. Notice (dependencies)
The enabler can run as a standalone application.