Pipelines

Pipelines process data through a sequence of filtering and transforming blocks, known as Filtras. Data flows into and out of pipelines through Connectors. A single data unit traveling through the pipeline is called a message.

Each pipeline has a connector-in (input connector) and a connector-out (output connector), along with an optional array of filtras.

Here is an example of a JSON configuration for the simplest pipeline, which simply receives MQTT messages and sends them as-is to GCP Pub/Sub:

"pipeline_1": {
  "connector_in": {
    "type": "mqtt",
    "topic": "/topic/+/event"
  },
  "connector_out":{
    "type":"gcp_pubsub",
    "authbundle_id":"my_authbundle_id",
    "project_id":"my_project_id",
    "topic_id":"my_gcp_topic_id"
  }
}

This configuration is an extended version of the first example. In addition to transferring messages from MQTT to Pub/Sub, it inspects each message, allowing through only those that have a temp value above 5.4 and do not contain any LOG substrings:

"pipeline_1":{
  "connector_in":{
    "type": "mqtt",
    "topic": "/topic/+/event"
  },
  "connector_out":{
    "type":"gcp_pubsub",
    "authbundle_id":"my_authbundle_id",
    "project_id":"my_project_id",
    "topic_id":"my_gcp_topic_id"
  },
  "filtras":[
    {
      "type": "comparator",
      "operator": "gt",
      "decoder": "json",
      "value_key": "temp",
      "comparand": 5.4
    },
    {
      "type": "finder",
      "operator": "contain",
      "logical_negation": true,
      "string": "LOG"
    }
  ]
}

By default, each pipeline processes data sequentially. Messages move through each filtra until they reach the connector-out or are rejected by one of the filtras. Complex flow control, including if/else logic and loops, can be implemented using filtra goto properties, which allow messages to move forward or backward in the sequence of filtras.

Connectors

Connectors handle the input and output of messages in a pipeline. Most connectors support in and out modes. A connector-in is defined under the connector_in key in the pipeline config, while the connector_out key defines the connector-out.

Authentication bundles

Authentication bundles, or authbundles, store authentication data for connecting to external services. The required data varies based on the service type. Currently, supported types include:

  • gcp - GCP service key;

  • aws - AWS access key;

  • mqtt311, mqtt50 - MQTT authentication based on CONNECT message Supported options are:

    • Username/Password - Plain string literals for Username and Password;

    • JWT - Password serves as a JWT token;

    Note: In MQTT v3.11, Username cannot be empty, unlike in v5.0.

Connector Properties

Each connector must define the type property. While most other properties are specific to each connector type, a few are common across many connectors:

authbundle_idstring

Specifies the identifier of an authentication bundle to be used in the authentication process with the service the connector connects to.

typestring

Specifies the connector type. Valid options include:

  • mqtt

  • gcp_pubsub

  • gcp_bucket

  • aws_s3,

  • email

  • queue

A connector may require different properties depending on whether it is in in or out mode.

MQTT Connector

Connects to an MQTT Broker:

{
  "type": "mqtt",
  "topic": "/topic/+/event",
  "server": "mqtt://mqtt.iotplan.io:1883"
}

Properties

Common properties: authbundle_id, type.

topicstring

The MQTT topic to subscribe to for connector-in or to publish to for connector-out.

serverstring

URL of the MQTT Broker.

qosnumber

MQTT Quality of Service (QoS) level. Possible values are:

  • 0 - At Most Once

  • 1 - At Least Once

  • 2 - Exactly Once

GCP Pub/Sub Connector

Connects to the Google Cloud Pub/Sub service:

{
  "type":"gcp_pubsub",
  "authbundle_id":"my_authbundle_id",
  "project_id":"my_project_id",
  "topic_id":"my_topic_id",
  "attributes":{
    "deviceId":"{{topic[2]}}",
    "projectId": "G-NODE"
  }
}

Properties

Common properties: authbundle_id, type.

project_idstring

Specifies the GCP project identifier.

topic_idstring

Specifies the PubSub topic. The connector-in publishes to this topic. The connector-out creates a subscription to this topic if subscription_id is not specified.

subscription_idstring

Specifies the subscription that the connector-in subscribes to. If the subscription does not exist, it is created. The connector-out ignores this property.

GCP Cloud Storage Connector

Connects to the Google Cloud Cloud Storage service:

{
  "type":"gcp_storage",
  "authbundle_id":"my_service_key",
  "project_id":"my_project_id",
  "bucket_name":"my_bucket_name",
  "object_name": "my_object_name"
}

Properties

Common properties: authbundle_id, type.

project_idstring

Specifies the GCP project identifier.

bucket_namestring

Specifies the bucket name to which the connector-out uploads messages or from which the connector-in downloads messages. If the bucket does not exist, the connector-out creates it, whereas the connector-in does not.

object_namestring

Specifies the name of the bucket object where the message will be stored. If it is a plain string, it will be overwritten with each new message. Dynamic substitutions can be used to generate unique object names for every message.

AWS S3 Connector

Connects to Amazon Web Services Simple Storage Service (S3):

{
  "type":"aws_s3",
  "authbundle_id":"my_access_key",
  "bucket_name":"feisty",
  "object_name":"sensor-data"
}

Properties

Common properties: authbundle_id, type.

bucket_namestring

Specifies the bucket name to which the connector-out uploads messages or from which the connector-in downloads messages. If the bucket does not exist, the connector-out creates it, whereas the connector-in does not.

object_namestring

Specifies the name of the bucket object where the message will be stored. If it is a plain string, it will be overwritten with each new message. Dynamic substitutions can be used to generate unique object names for every message.

Email service connector

Connects to an email server via SMTP. Currently, only the connector-out is supported:

{
  "type":"email",
  "authbundle_id":"my_authbundle_id",
  "to": "my_recepient",
  "smtp_server": "my_smtp_server",
  "smtp_port": "my_smtp_port"
}

Properties

Common properties: authbundle_id, type.

tostring

Specifies the email address to which the message will be sent.

smtp_serverstring

Specifies the URL of the SMTP server.

smtp_portnumber

Specifies the port of the SMTP server.

Internal Queue Connector

Connects to an internal queue and serves as a communication channel, allowing pipelines to exchange messages with each other:

{
  "type":"queue",
  "name": "my_queue_name"
}

Properties

Common properties: type.

namestring

Specifies the name of the queue.

Each Filtra can have a queues property that, in addition to normal message processing in the pipeline, allows it to direct messages to internal queues. These messages become available to other pipelines if those pipelines are configured with an Internal Queue Connector as the connector-in.

Filtras

A filtra is a message-processing unit that functions as either a filter or a transformer.

Filters are units that do not modify the original message received through the connector-in. Their primary purpose is to filter out messages that do not meet specific criteria.

In contrast, transformers are units that can modify the original message, though some transformers may pass certain messages unchanged.

Pipelines can include any number of filtras, which can be linked to create complex data-processing flows using goto properties.

Filtra Properties

All filtras require a type property and may include other optional properties. Common properties for all filtras are described here, while type-specific properties are covered in their respective sections.

Common Properties

msg_formatstring

Specifies the msg_format used for decoding/encoding the message payload. Supported values:

  • json

  • corb

goto_acceptedstring

Specifies the name of the next filtra in the pipeline if the current filtra allows the message to pass.

goto_rejectedstring

Specifies the name of the next filtra in the pipeline if the message is rejected by the current filtra.

gotostring

Equivalent to goto_accepted.

namestring

Specifies a name for the filtra to be used in goto properties. The names self and out are reserved.

logical_negationboolean

Specifies that logical negation is applied to the result, meaning an accepted message becomes rejected, and a rejected message becomes accepted.

metadatadictionary

Specifies a dictionary of attributes that can be used for various purposes, such as Dynamic substitutions in property values.

queuesarray of strings

Specifies the names of internal queues used for message interchange between pipelines.

Builder Filtra

Creates a new message:

{
  "type": "builder",
  "payload": {
    "key": value
  }
}

Properties

Common properties: name, goto_accepted, goto, queues, metadata, msg_format.

payloaddictionary

Specifies a dictionary that serves as the payload for the new message, completely discarding the incoming message.

Comparator Filtra

Compares numerical values:

{
  "type": "comparator",
  "operator": "gt",
  "msg_format": "json",
  "value_key": "temp",
  "comparand": 5.4
}

Properties

Common properties: name, goto_accepted, goto, queues, metadata, msg_format, logical_negation.

operatorstring

Specifies the comparison operator. Possible values include:

  • gt - greater than

  • gte - greater than or equal

  • lt - less than

  • lte - less than or equal

  • eq - equal

value_keystring

Specifies the key in the payload whose value is compared with the comparand.

comparandstring

Specifies a numerical constant to be compared with a value in the payload.

Eraser Filtra

Removes specified keys from the payload:

{
  "type": "eraser",
  "keys": ["key1", "keyN"],
  "msg_format": "json"
}

Properties

Common properties: name, goto_accepted, goto, queues, metadata, msg_format.

keysarray

Specifies the keys to be removed from the message payload.

Finder Filtra

Searches for a specific part in the message based on the finder configuration. Three variants are supported:

  1. Searches for text within the payload or searches for the payload within the text. Properties text and operator must be specified:

    {
        "type": "finder",
        "operator": "contain",
        "text": "LOG"
    }
    
  2. Searches for specified keys in the decoded payload. A property keys must be specified.

    {
        "type": "finder",
        "keys": ["key1", "keyN"]
    }
    
  3. Performs the same type of search as in Variant 1, but instead of analyzing the entire payload, it examines the value of a specified key in the decoded payload. Properties text, operator, value_key must be specified:

    {
        "type": "finder",
        "operator": "contain",
        "text": "LOG",
        "value_key": "key1"
    }
    

Properties

Common properties: name, goto_accepted, goto_rejected, goto, queues, metadata, msg_format, logical_negation.

operatorstring

Specifies the method for searching for a substring. Possible values include:

  • contain - searches for text within the payload;

  • contained - searches for the payload within text;

  • match - checks if text exactly matches the payload.

textstring

Specifies a string that can either be searched as a substring within the payload or used to search for the payload as a substring.

keysarray

Specifies the keys in the decoded payload dictionary. All keys must be present in the payload; otherwise, the message is rejected.

value_keystring

Specifies a key in the decoded payload whose value is used for the text search.

Limiter Filtra

Rejects messages that exceed the specified size:

{
    "type": "limiter",
    "size": 1024
}

Properties

Common properties: name, goto_accepted, goto_rejected, goto, queues, metadata, msg_format, logical_negation.

sizenumber

Specifies a size limit in bytes for the message. Messages exceeding this limit are rejected.

Nop Filtra

Performs no action; however, it can be used, for example, to attach metadata or direct messages to internal queues:

{
    "type": "nop"
}

Properties

Common properties: name, goto_accepted, goto, queues, metadata.

Splitter Filtra

Splits messages that exceed a specified size into chunks. If the message size is below chunk_size, it passes through the Splitter unmodified; otherwise, it is split into chunks of chunk_size bytes (with the last chunk possibly smaller). Each chunk is sent as a separate message. It is the receiver’s responsibility to reconstruct the original message:

{
    "type": "splitter",
    "chunk_size": 1024
}

Each chunk is encoded as a CBOR array containing the following elements:

  • ____SPL - chunk marker

  • Message ID - unique identifier for the original message (generated by the Splitter)

  • Original Message Size - total size of the original message

  • Chunk Number - sequential number of the chunk

  • Array of Bytes - portion of the original message, with a length of chunk_size or smaller

Properties

Common properties: name, goto_accepted, goto, queues, metadata, msg_format.

chunk_sizenumber

Specifies the maximum size of each message chunk in bytes.

Dynamic substitutions

Dynamic substitutions are supported in certain places. Placeholders for dynamic substitutions are defined using double curly braces: {{dynamic_value}}.

"filtras":[
  {
    "type": "nop",
    "metadata":{
      "topic": "/big"
    },
  }
],
"connector_out": {
  "type": "mqtt",
  "topic": "{{topic}}",
  "server": "mqtt://mqtt.iotplan.io:1883"
}

In the example above, the Nop Filtra defines metadata that is used by the connector-out to replace {{topic}} with the actual value from the metadata dictionary. As a result, the connector-out sends each message to the topic /big.

It is not always necessary to explicitly define the metadata property, as some metadata key-value pairs are automatically derived:

"connector_in": {
  "type": "mqtt",
  "topic": "/topic/+/event",
  "server": "mqtt://mqtt.iotplan.io:1883",
},
"connector_out":{
  "type":"gcp_pubsub",
  "authbundle_id":"my_authbundle_id",
  "project_id":"my_project_id",
  "topic_id":"my_topic_id",
  "attributes":{
    "deviceId":"{{topic[2]}}",
  }
}

In the example above, the topic in the connector-out refers to the topic that the connector-in is subscribed to. This also demonstrates indexing in dynamic substitutions. The connector-in subscribes to an MQTT topic composed of multiple levels, and dynamic substitution with indexing allows extracting a specific level of interest from the topic to replace the placeholder {{topic[2]}}. In this case, the connector-in is subscribed to a topic with a wildcard character. As a result, the value of {{topic[2]}} varies from message to message, depending on the actual topic to which the message was published.