Pipelines¶
Pipelines process data through a sequence of filtering and transforming blocks, known as Filtras. Data flows into and out of pipelines through Connectors. A single data unit traveling through the pipeline is called a message.
Each pipeline has a connector-in (input connector) and a connector-out (output connector), along with an optional array of filtras.
Here is an example of a JSON configuration for the simplest pipeline, which simply receives MQTT messages and sends them as-is to GCP Pub/Sub:
"pipeline_1": {
"connector_in": {
"type": "mqtt",
"topic": "/topic/+/event"
},
"connector_out":{
"type":"gcp_pubsub",
"authbundle_id":"my_authbundle_id",
"project_id":"my_project_id",
"topic_id":"my_gcp_topic_id"
}
}
This configuration is an extended version of the first example. In addition to transferring messages from MQTT to Pub/Sub, it inspects each message, allowing through only those that have a temp value above 5.4 and do not contain any LOG substrings:
"pipeline_1":{
"connector_in":{
"type": "mqtt",
"topic": "/topic/+/event"
},
"connector_out":{
"type":"gcp_pubsub",
"authbundle_id":"my_authbundle_id",
"project_id":"my_project_id",
"topic_id":"my_gcp_topic_id"
},
"filtras":[
{
"type": "comparator",
"operator": "gt",
"decoder": "json",
"value_key": "temp",
"comparand": 5.4
},
{
"type": "finder",
"operator": "contain",
"logical_negation": true,
"string": "LOG"
}
]
}
By default, each pipeline processes data sequentially. Messages move through each filtra until they reach the connector-out or are rejected by one of the filtras. Complex flow control, including if/else logic and loops, can be implemented using filtra goto properties, which allow messages to move forward or backward in the sequence of filtras.
Connectors¶
Connectors handle the input and output of messages in a pipeline. Most connectors support in and out modes. A connector-in is defined under the connector_in key in the pipeline config, while the connector_out key defines the connector-out.
Authentication bundles¶
Authentication bundles, or authbundles, store authentication data for connecting to external services. The required data varies based on the service type. Currently, supported types include:
gcp - GCP service key;
aws - AWS access key;
mqtt311, mqtt50 - MQTT authentication based on CONNECT message Supported options are:
Username/Password - Plain string literals for Username and Password;
JWT - Password serves as a JWT token;
Note: In MQTT v3.11, Username cannot be empty, unlike in v5.0.
Connector Properties¶
Each connector must define the type property. While most other properties are specific to each connector type, a few are common across many connectors:
- authbundle_idstring
Specifies the identifier of an authentication bundle to be used in the authentication process with the service the connector connects to.
- typestring
Specifies the connector type. Valid options include:
mqtt
gcp_pubsub
gcp_bucket
aws_s3,
email
queue
A connector may require different properties depending on whether it is in in or out mode.
MQTT Connector¶
Connects to an MQTT Broker:
{
"type": "mqtt",
"topic": "/topic/+/event",
"server": "mqtt://mqtt.iotplan.io:1883"
}
Properties¶
Common properties: authbundle_id, type.
- topicstring
The MQTT topic to subscribe to for connector-in or to publish to for connector-out.
- serverstring
URL of the MQTT Broker.
- qosnumber
MQTT Quality of Service (QoS) level. Possible values are:
0 - At Most Once
1 - At Least Once
2 - Exactly Once
GCP Pub/Sub Connector¶
Connects to the Google Cloud Pub/Sub service:
{
"type":"gcp_pubsub",
"authbundle_id":"my_authbundle_id",
"project_id":"my_project_id",
"topic_id":"my_topic_id",
"attributes":{
"deviceId":"{{topic[2]}}",
"projectId": "G-NODE"
}
}
Properties¶
Common properties: authbundle_id, type.
- project_idstring
Specifies the GCP project identifier.
- topic_idstring
Specifies the PubSub topic. The connector-in publishes to this topic. The connector-out creates a subscription to this topic if subscription_id is not specified.
- subscription_idstring
Specifies the subscription that the connector-in subscribes to. If the subscription does not exist, it is created. The connector-out ignores this property.
GCP Cloud Storage Connector¶
Connects to the Google Cloud Cloud Storage service:
{
"type":"gcp_storage",
"authbundle_id":"my_service_key",
"project_id":"my_project_id",
"bucket_name":"my_bucket_name",
"object_name": "my_object_name"
}
Properties¶
Common properties: authbundle_id, type.
- project_idstring
Specifies the GCP project identifier.
- bucket_namestring
Specifies the bucket name to which the connector-out uploads messages or from which the connector-in downloads messages. If the bucket does not exist, the connector-out creates it, whereas the connector-in does not.
- object_namestring
Specifies the name of the bucket object where the message will be stored. If it is a plain string, it will be overwritten with each new message. Dynamic substitutions can be used to generate unique object names for every message.
AWS S3 Connector¶
Connects to Amazon Web Services Simple Storage Service (S3):
{
"type":"aws_s3",
"authbundle_id":"my_access_key",
"bucket_name":"feisty",
"object_name":"sensor-data"
}
Properties¶
Common properties: authbundle_id, type.
- bucket_namestring
Specifies the bucket name to which the connector-out uploads messages or from which the connector-in downloads messages. If the bucket does not exist, the connector-out creates it, whereas the connector-in does not.
- object_namestring
Specifies the name of the bucket object where the message will be stored. If it is a plain string, it will be overwritten with each new message. Dynamic substitutions can be used to generate unique object names for every message.
Email service connector¶
Connects to an email server via SMTP. Currently, only the connector-out is supported:
{
"type":"email",
"authbundle_id":"my_authbundle_id",
"to": "my_recepient",
"smtp_server": "my_smtp_server",
"smtp_port": "my_smtp_port"
}
Properties¶
Common properties: authbundle_id, type.
- tostring
Specifies the email address to which the message will be sent.
- smtp_serverstring
Specifies the URL of the SMTP server.
- smtp_portnumber
Specifies the port of the SMTP server.
Internal Queue Connector¶
Connects to an internal queue and serves as a communication channel, allowing pipelines to exchange messages with each other:
{
"type":"queue",
"name": "my_queue_name"
}
Properties¶
Common properties: type.
- namestring
Specifies the name of the queue.
Each Filtra can have a queues property that, in addition to normal message processing in the pipeline, allows it to direct messages to internal queues. These messages become available to other pipelines if those pipelines are configured with an Internal Queue Connector as the connector-in.
Filtras¶
A filtra is a message-processing unit that functions as either a filter or a transformer.
Filters are units that do not modify the original message received through the connector-in. Their primary purpose is to filter out messages that do not meet specific criteria.
In contrast, transformers are units that can modify the original message, though some transformers may pass certain messages unchanged.
Pipelines can include any number of filtras, which can be linked to create complex data-processing flows using goto properties.
Filtra Properties¶
All filtras require a type property and may include other optional properties. Common properties for all filtras are described here, while type-specific properties are covered in their respective sections.
Common Properties¶
- msg_formatstring
Specifies the msg_format used for decoding/encoding the message payload. Supported values:
json
corb
- goto_acceptedstring
Specifies the name of the next filtra in the pipeline if the current filtra allows the message to pass.
- goto_rejectedstring
Specifies the name of the next filtra in the pipeline if the message is rejected by the current filtra.
- gotostring
Equivalent to goto_accepted.
- namestring
Specifies a name for the filtra to be used in goto properties. The names self and out are reserved.
- logical_negationboolean
Specifies that logical negation is applied to the result, meaning an accepted message becomes rejected, and a rejected message becomes accepted.
- metadatadictionary
Specifies a dictionary of attributes that can be used for various purposes, such as Dynamic substitutions in property values.
- queuesarray of strings
Specifies the names of internal queues used for message interchange between pipelines.
Builder Filtra¶
Creates a new message:
{
"type": "builder",
"payload": {
"key": value
}
}
Properties¶
Common properties: name, goto_accepted, goto, queues, metadata, msg_format.
- payloaddictionary
Specifies a dictionary that serves as the payload for the new message, completely discarding the incoming message.
Comparator Filtra¶
Compares numerical values:
{
"type": "comparator",
"operator": "gt",
"msg_format": "json",
"value_key": "temp",
"comparand": 5.4
}
Properties¶
Common properties: name, goto_accepted, goto, queues, metadata, msg_format, logical_negation.
- operatorstring
Specifies the comparison operator. Possible values include:
gt - greater than
gte - greater than or equal
lt - less than
lte - less than or equal
eq - equal
- value_keystring
Specifies the key in the payload whose value is compared with the comparand.
- comparandstring
Specifies a numerical constant to be compared with a value in the payload.
Eraser Filtra¶
Removes specified keys from the payload:
{
"type": "eraser",
"keys": ["key1", "keyN"],
"msg_format": "json"
}
Properties¶
Common properties: name, goto_accepted, goto, queues, metadata, msg_format.
- keysarray
Specifies the keys to be removed from the message payload.
Finder Filtra¶
Searches for a specific part in the message based on the finder configuration. Three variants are supported:
Searches for text within the payload or searches for the payload within the text. Properties text and operator must be specified:
{ "type": "finder", "operator": "contain", "text": "LOG" }
Searches for specified keys in the decoded payload. A property keys must be specified.
{ "type": "finder", "keys": ["key1", "keyN"] }
Performs the same type of search as in Variant 1, but instead of analyzing the entire payload, it examines the value of a specified key in the decoded payload. Properties text, operator, value_key must be specified:
{ "type": "finder", "operator": "contain", "text": "LOG", "value_key": "key1" }
Properties¶
Common properties: name, goto_accepted, goto_rejected, goto, queues, metadata, msg_format, logical_negation.
- operatorstring
Specifies the method for searching for a substring. Possible values include:
contain - searches for text within the payload;
contained - searches for the payload within text;
match - checks if text exactly matches the payload.
- textstring
Specifies a string that can either be searched as a substring within the payload or used to search for the payload as a substring.
- keysarray
Specifies the keys in the decoded payload dictionary. All keys must be present in the payload; otherwise, the message is rejected.
- value_keystring
Specifies a key in the decoded payload whose value is used for the text search.
Limiter Filtra¶
Rejects messages that exceed the specified size:
{
"type": "limiter",
"size": 1024
}
Properties¶
Common properties: name, goto_accepted, goto_rejected, goto, queues, metadata, msg_format, logical_negation.
- sizenumber
Specifies a size limit in bytes for the message. Messages exceeding this limit are rejected.
Nop Filtra¶
Performs no action; however, it can be used, for example, to attach metadata or direct messages to internal queues:
{
"type": "nop"
}
Properties¶
Common properties: name, goto_accepted, goto, queues, metadata.
Splitter Filtra¶
Splits messages that exceed a specified size into chunks. If the message size is below chunk_size, it passes through the Splitter unmodified; otherwise, it is split into chunks of chunk_size bytes (with the last chunk possibly smaller). Each chunk is sent as a separate message. It is the receiver’s responsibility to reconstruct the original message:
{
"type": "splitter",
"chunk_size": 1024
}
Each chunk is encoded as a CBOR array containing the following elements:
____SPL - chunk marker
Message ID - unique identifier for the original message (generated by the Splitter)
Original Message Size - total size of the original message
Chunk Number - sequential number of the chunk
Array of Bytes - portion of the original message, with a length of chunk_size or smaller
Properties¶
Common properties: name, goto_accepted, goto, queues, metadata, msg_format.
- chunk_sizenumber
Specifies the maximum size of each message chunk in bytes.
Dynamic substitutions¶
Dynamic substitutions are supported in certain places. Placeholders for dynamic substitutions are
defined using double curly braces: {{dynamic_value}}
.
"filtras":[
{
"type": "nop",
"metadata":{
"topic": "/big"
},
}
],
"connector_out": {
"type": "mqtt",
"topic": "{{topic}}",
"server": "mqtt://mqtt.iotplan.io:1883"
}
In the example above, the Nop Filtra defines metadata that is used by the connector-out
to replace {{topic}}
with the actual value from the metadata dictionary. As a result,
the connector-out sends each message to the topic /big
.
It is not always necessary to explicitly define the metadata property, as some metadata key-value pairs are automatically derived:
"connector_in": {
"type": "mqtt",
"topic": "/topic/+/event",
"server": "mqtt://mqtt.iotplan.io:1883",
},
"connector_out":{
"type":"gcp_pubsub",
"authbundle_id":"my_authbundle_id",
"project_id":"my_project_id",
"topic_id":"my_topic_id",
"attributes":{
"deviceId":"{{topic[2]}}",
}
}
In the example above, the topic
in the connector-out refers to the topic that
the connector-in is subscribed to. This also demonstrates indexing in dynamic substitutions.
The connector-in subscribes to an MQTT topic composed of multiple levels,
and dynamic substitution with indexing allows extracting a specific level of interest from
the topic to replace the placeholder {{topic[2]}}
. In this case, the connector-in
is subscribed to a topic with a wildcard character. As a result, the value of {{topic[2]}}
varies from message to message, depending on the actual topic to which the message was published.