2.1.3. Location Processing Enabler
Documentation for the Location Processing enabler of ASSIST-IoT.
2.1.3.1. Introduction
The Location Processing enabler aims to provide highly configurable and flexible geofencing capabilities based on location data. The enabler consists of a Scala application and a Postgres database.
The application is written with the Akka framework. It runs user-defined SQL queries against the database. The incoming data is collected from input streams or HTTP requests; it allows for streaming the query results. The transferred data is in JSON format. The behavior of the application is configurable through an HTTP interface. The application streaming capabilities are compatible with the MQTT protocol.
The database is shipped with the Postgis extension. It stores the geolocation data and the application configuration.
2.1.3.2. Features
The enabler is still under development. Missing functionalities are in italics.
2.1.3.2.1. Input stream settings
Passing credentials
Setting input topics
Providing MQTT connection URL
2.1.3.2.2. Output stream settings
Passing credentials
Setting parametrized1 output topics
Setting MQTT publish flags
Providing MQTT connection URL
Custom output JSON format
2.1.3.2.3. SQL queries
Access to a database with geolocation capabilities
Query parametrization1
2.1.3.2.4. HTTP interface
Creating queries
Updating queries
Deleting queries
Retrieving queries
Running queries manually
Authorization
1 Parametrization refers to access to input or output data in JSON (with JSONPath), string, or byte string format. To do that, a special syntax is provided.
2.1.3.3. Place in architecture
For ASSIST-IoT Pilot it will be closely used with Location Trackin Enabler
2.1.3.4. User guide
The user-defined queries can be created via the HTTP interface. After successful creation, a query is stored in the database. Then, it is run inside the application, and it starts processing the data.
It is assumed that the spatial model will be specified inside the database before starting the application. So that queries have access to the required tables and data.
2.1.3.4.1. User guide HTTP interface
2.1.3.4.1.1. Definitions
2.1.3.4.1.1.1. Parametrized string (parametrizedString)
String (optionally) containing parametrization syntax.
Examples:
"topic_{output['id']}"
"positions"
2.1.3.4.1.1.2. MQTT publish flag (publishFlag)
One of the following strings:
QoSAtLeastOnceDelivery
QoSAtMostOnceDelivery
QoSExactlyOnceDelivery
Retain
Examples:
"QoSAtLeastOnceDelivery"
2.1.3.4.1.1.3. When to publish (publishWhen)
Determines when to publish the produced output.
One of the following strings: - success - failure - always
failure refers to the situation if an error has occured during
processing the stream.
Examples:
"always"
2.1.3.4.1.1.4. Record format (recordFormat)
Determines the records formatting. Two styles are enabled – array
and object.
array formatting:
{
"fields": [
{"name": "field_1", "type", "type_1"},
...
],
"records": [
[value_1_1, value_1_2, ..., value_1_n],
[value_2_1, value_2_2, ..., value_2, n],
...
]
}
object formatting:
{
"fields": [
{"name": "field_1", "type", "type_1"},
...
],
"records": [
{"field_1": value_1_1, "field_2": value_1_2, ..., "field_n": value_1_n},
{"field_1": value_2_1, "field_2": value_2_2, ..., "field_n": value_2_n},
...
]
}
Examples:
"array"
2.1.3.4.1.1.5. Output JSON format (jsonFormat)
JSON format for the output data.
Name |
Description |
Type |
|---|---|---|
recordFormat |
Determines the records formatting |
recordFormat |
showHeader |
Whether to show the header |
boolean |
wrapSingleColumn |
Whether a single column should be wrapped |
boolean |
Examples:
{
"recordFormat": "object",
"showHeader": true,
"wrapSingleColumn": true
}
2.1.3.4.1.1.6. Input topic (inputTopic)
Topic to subscribe to.
Name |
Description |
Type |
|---|---|---|
name |
Topic name |
string |
Examples:
{
"name": "vehicles/excavators"
}
2.1.3.4.1.1.7. Output topic (outputTopic)
Topic where the output is published.
Name |
Description Type |
|---|---|
name |
Topic name string |
publishEmptyOutput optional |
Whether to publish empty output boolean |
publishWhen optional |
When to publish publishWhen |
publishFlags optional |
Publish flags array[publishFlag] |
{
"name": "cats",
"publishFlags": ["QoSExactlyOnceDelivery", "Retain"],
"publishWhen": "success",
"publishEmptyOutput": false
}
2.1.3.4.1.1.8. Input settings (inputSettings)
MQTT input settings. | Name | Description | Type | |————| | host | MQTT host | string | | port | MQTT port | number | | username optional | Client credentials | string | | password optional | Client credentials | string | | topics optional | MQTT topics to subscribe | array[inputTopic] |
Examples:
{
"host": "pilot1",
"port": 1883,
"username": "jared",
"password": "dunn",
"topics": [
{"name": "vehicles/excavators"},
{"name": "cats"}
]
}
2.1.3.4.1.1.9. Output settings (outputSettings)
MQTT output settings. | Name | Description | Type | |————| | host | MQTT host | string | | port | MQTT port | number | | username optional | Client credentials | string | | password optional | Client credentials | string | | topics optional | MQTT topics to publish | array[outputTopic] | | format optional | Determines how to format the output | jsonFormat |
Examples:
{
"host": "pilot2",
"port": 1883,
"username": "bob",
"password": "builder",
"topics": [
{
"name": "danger/{output['id']}",
"publishFlags": [
"QoSExactlyOnceDelivery",
"Retain"
],
"publishWhen": "always",
"publishEmptyOutput": true
}
],
"format": {
"recordFormat": "object",
"showHeader": true,
"wrapSingleColumn": true
}
}
2.1.3.4.1.1.10. Query (query)
Query configuration. | Name | Description | Type | |————| | name required | Unique query name | string | | inputSettings optional | Input settings | inputSettings | | outputSettings optional | Output settings | outputSettings | | sql required | SQL query | parametrizedString |
{
"name": "dangerous",
"inputSettings": {
"host": "pilot1",
"port": 1883,
"username": "jared",
"password": "dunn",
"topics": [
{"name": "vehicles/excavators"},
{"name": "cats"}
]
},
"outputSettings": {
"host": "pilot2",
"port": 1883,
"username": "bob",
"password": "builder",
"topics": [
{
"name": "danger/{output['id']}",
"publishFlags": [
"QoSExactlyOnceDelivery",
"Retain"
],
"publishWhen": "always",
"publishEmptyOutput": true
}
],
"format": {
"recordFormat": "object",
"showHeader": true,
"wrapSingleColumn": true
}
},
"sql": "select id, x, y from worker_positions where st_distance(st_makepoint(x, y), st_makepoint({input['x']}, {input['y']})) < 50;"
}
2.1.3.4.1.2. Examples
2.1.3.4.1.2.1. Creating queries
To create a query, use the POST method on the v1/queries endpoint.
The request’s body is expected to be a complete definition of a query.
{
"name": "dangerous",
"inputSettings": {
"host": "pilot1",
"port": 1883,
"username": "jared",
"password": "dunn",
"topics": [
{"name": "vehicles/excavators"},
{"name": "cats"}
]
},
"outputSettings": {
"host": "pilot2",
"port": 1883,
"username": "bob",
"password": "builder",
"topics": [
{
"name": "danger/{output['id']}",
"publishFlags": [
"QoSExactlyOnceDelivery",
"Retain"
],
"publishWhen": "always",
"publishEmptyOutput": true
}
],
"format": {
"recordFormat": "object",
"showHeader": true,
"wrapSingleColumn": true
}
},
"sql": "select id, x, y from worker_positions where st_distance(st_makepoint(x, y), st_makepoint({input['x']}, {input['y']})) < 50;"
}
The query’s name is its unique identifier which will be later
referenced to use the query. The inputSettings and
outputSettings refer to the MQTT input and output configuration. In
both cases, host, port, username, and password are used
to connect to brokers. The client subscribes to the list of topic names
provided in the topics inside the input settings. The topics
from the output settings are slightly different. Their names can be
parametrized (see the dedicated section for more explanation on that
matter), and they represent the topics to which the output messages will
be sent. In this example, the message would be sent to the topic, which
part would be determined after getting some specific field (in this case
– id) from the generated output (from running the SQL query).
"name": "danger/{output['id']}"
The part with publishFlags refers to the list of MQTT flags set
while sending the message. The next part of the outputSettings is
publishWhen. This option can control when the publish action is
triggered – for instance, only if there are no errors. This modifier is
practical as the output from running without errors can differ from the
output of a failed query. The option publishEmptyOutput should be
self-explanatory. Then there is format that controls how the output
is formatted (see the definitions section to see the examples). Finally
– the sql query. It is a parametrized string where one can use the
received input values. The query will be run against the database
included in the enabler. After creating the query, the response will
confirm the operation returning the created query.
2.1.3.4.1.2.2. Updating queries
The queries can be updated by sending a PUT request (v1/queries)
with the body describing the query the same as while creating a new
query.
2.1.3.4.1.2.3. Deleting queries
To delete a query, send a DELETE request to v1/queries/{name}, where
name is the query name.
2.1.3.4.1.2.4. Triggering queries
If one provides the input and output MQTT settings, then the queries are
typically triggered by the MQTT events. However, every query can be run
from the HTTP interface by calling its name and passing the input it
expects from the MQTT broker – the request body must be a valid JSON
with the input data. The endpoint format is v1/queries/{name}/input
(POST request). After running the query, the output is sent as the
response.
2.1.3.4.1.3. Endpoints
2.1.3.4.1.3.1. GET v1/queries
Retrieves all queries.
Parameters: none.
Body: none.
Returns:
On success - status code 200, body:
{
"queries": [
{...},
{...}
]
}
On failure - status code 500, body:
{
"description": "..."
}
2.1.3.4.1.3.2. GET v1/queries/{name}
Retrieves the query with name.
Parameters:
name: query name
Body: none.
Returns:
On success - status code 200, body:
{
"query": {
...
}
}
If query does not exist - status code 404, body:
{
"description: "..."
}
On failure - status code 500, body:
{
"description": "..."
}
2.1.3.4.1.3.3. POST v1/queries
Creates a query.
Parameters: none.
Body:
query to be created
Returns:
On success - status code 201, body:
{
"info": "...",
"query": {
...
}
}
On error - status code 400, body:
{
"description: "..."
}
2.1.3.4.1.3.4. PUT v1/queries/{name}
Updates the query with name.
Parameters:
name: query name
Body:
query to be updated
Returns:
On success - status code 200, body:
{
"info": "...",
"query": {
...
}
}
On error - status code 400, body:
{
"description: "..."
}
2.1.3.4.1.3.5. DELETE v1/queries/{name}
Deletes the query with name.
Parameters:
name: query name
Body: none.
Returns:
On success - status code 200, body:
{
"info": "...",
"deletedQueriesCount": ...
}
On failure - status code 400, body:
{
"description": "..."
}
2.1.3.4.1.3.5.1. User guide parametrization
Parametrization is a feature that gives access to the incoming data and the results of running queries. For defining SQL queries, input data is available. Output MQTT topics have access to input and output data.
2.1.3.4.1.3.6. Input data
inputJSON datastrInputJSON data in string format
2.1.3.4.1.3.7. Output data
outputJSON datastrOutputJSON data in string format
To access JSON data (input or output) one may use
JSONPath
syntax.
A valid expression must be inside curly braces {...}.
Examples:
"select {strInput}::json->>2;"
"topic/{output..temperature.max()}"
2.1.3.5. Prerequisites
2.1.3.6. Installation
2.1.3.6.1. Development environment
For development, run the following scripts:
# first terminal
./scripts/dev-env.sh
The dev-env.sh script starts the Postgres database (with a pgAdmin
instance) and the MQTT broker (with a MQTT explorer instance). The
database is accessible at localhost:5432. The pgAdmin instance is
accessible at localhost:5433. The MQTT broker is accessible at
localhost:1883. The MQTT explorer instance is accessible at
localhost:4000. Additionally, one can run the qgis.sh script to
start a QGIS instance to visualize the geolocation data.
# second terminal
./scripts/dev-app.sh
The dev-app.sh script starts the application. The application is
accessible at localhost:8080.
2.1.3.6.1.1. Production environment
To simulate the production environment, run the following scripts:
# first terminal
./scripts/prod-env.sh
The prod-env.sh script starts the Postgres database.
# second terminal
./scripts/prod-app.sh
The prod-app.sh script starts the application. The application is
accessible at localhost:8080.
2.1.3.7. Configuration
2.1.3.7.1. Application
The app can be configured via environment variables. - HTTP_PORT:
port at which the API is accessible, i.e. 8080 -
DB_QUERIES_SERVER_NAME: i.e. postgres - DB_QUERIES_PORT:
i.e. 5432 - DB_QUERIES_NAME: i.e. queries -
DB_QUERIES_USER: i.e. queries_user - DB_QUERIES_PASSWORD:
i.e. postgres123 - DB_GEOLOCATION_SERVER_NAME: i.e. postgres
- DB_GEOLOCATION_PORT: i.e. 5432 - DB_GEOLOCATION_NAME:
i.e. geolocation - DB_GEOLOCATION_USER:
i.e. geolocation_user - DB_GEOLOCATION_PASSWORD:
i.e. postgres123
2.1.3.8. Developer guide
2.1.3.8.1. Environment
Refer to the installation guide to setup the environment.
2.1.3.8.2. Scripts
The development scripts are located in scripts directory. -
check.sh runs linter (in check mode) and tests - clean.sh cleans
docker data - fix.sh runs linter - sql-formatter.sh formats sql
files
2.1.3.8.2.1. Dependencies
Refer to the build.sbt file.
2.1.3.8.2.2. Code style
Configs for scalafmt, scalafix, and scalastyle can be found
in the configs directory.
2.1.3.9. Version control and release
The enabler is under development.
2.1.3.10. License
The Location Processing is licensed under the Apache License, Version 2.0 (the “License”).
One may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
2.1.3.11. Notice(dependencies)
Dependency list and licensing information will be provided before the first major release.