.. _Pipeline: ######### Pipelines ######### Pipelines process data through a sequence of filtering and transforming blocks, known as Filtras_. Data flows into and out of pipelines through Connectors_. A single data unit traveling through the pipeline is called a *message*. Each pipeline has a *connector-in* (input connector) and a *connector-out* (output connector), along with an optional array of *filtras*. Here is an example of a JSON configuration for the simplest pipeline, which simply receives MQTT messages and sends them as-is to GCP Pub/Sub:: "pipeline_1": { "connector_in": { "type": "mqtt", "topic": "/topic/+/event" }, "connector_out":{ "type":"gcp_pubsub", "authbundle_id":"my_authbundle_id", "project_id":"my_project_id", "topic_id":"my_gcp_topic_id" } } This configuration is an extended version of the first example. In addition to transferring messages from MQTT to Pub/Sub, it inspects each message, allowing through only those that have a *temp* value above *5.4* and do not contain any *LOG* substrings:: "pipeline_1":{ "connector_in":{ "type": "mqtt", "topic": "/topic/+/event" }, "connector_out":{ "type":"gcp_pubsub", "authbundle_id":"my_authbundle_id", "project_id":"my_project_id", "topic_id":"my_gcp_topic_id" }, "filtras":[ { "type": "comparator", "operator": "gt", "decoder": "json", "value_key": "temp", "comparand": 5.4 }, { "type": "finder", "operator": "contain", "logical_negation": true, "string": "LOG" } ] } By default, each pipeline processes data sequentially. Messages move through each filtra until they reach the connector-out or are rejected by one of the filtras. Complex flow control, including if/else logic and loops, can be implemented using filtra `goto properties`_, which allow messages to move forward or backward in the sequence of filtras. ********** Connectors ********** Connectors handle the input and output of messages in a pipeline. Most connectors support *in* and *out* modes. A **connector-in** is defined under the *connector_in* key in the pipeline config, while the *connector_out* key defines the **connector-out**. ++++++++++++++++++++++ Authentication bundles ++++++++++++++++++++++ Authentication bundles, or *authbundles*, store authentication data for connecting to external services. The required data varies based on the service type. Currently, supported types include: * *gcp* - GCP service key; * *aws* - AWS access key; * *mqtt311*, *mqtt50* - MQTT authentication based on CONNECT message Supported options are: * *Username/Password* - Plain string literals for Username and Password; * *JWT* - Password serves as a JWT token; **Note**: In MQTT v3.11, *Username* cannot be empty, unlike in v5.0. +++++++++++++++++++++ Connector Properties +++++++++++++++++++++ Each connector must define the *type* property. While most other properties are specific to each connector type, a few are common across many connectors: .. _authbundle_id: authbundle_id : string Specifies the identifier of an authentication bundle to be used in the authentication process with the service the connector connects to. .. _type: type : string Specifies the connector type. Valid options include: * mqtt * gcp_pubsub * gcp_bucket * aws_s3, * email * queue * slack A connector may require different properties depending on whether it is in *in* or *out* mode. .. _MQTT Connectors: .. _MQTT Connector: ++++++++++++++ MQTT Connector ++++++++++++++ .. _MQTT: https://mqtt.org/ Connects to an `MQTT`_ Broker:: { "type": "mqtt", "topic": "/topic/+/event", "server": "mqtt://mqtt.iotplan.io:1883" } ========== Properties ========== Common properties: authbundle_id_, type_. topic : string The MQTT topic to subscribe to for connector-in or to publish to for connector-out. server : string URL of the MQTT Broker. qos : number MQTT Quality of Service (QoS) level. Possible values are: * 0 - At Most Once * 1 - At Least Once * 2 - Exactly Once +++++++++++++++++++++ GCP Pub/Sub Connector +++++++++++++++++++++ .. _Pub/Sub service: https://cloud.google.com/pubsub/docs/overview Connects to the Google Cloud `Pub/Sub service`_:: { "type":"gcp_pubsub", "authbundle_id":"my_authbundle_id", "project_id":"my_project_id", "topic_id":"my_topic_id", "attributes":{ "deviceId":"{{topic[2]}}", "projectId": "G-NODE" } } ========== Properties ========== Common properties: authbundle_id_, type_. project_id : string Specifies the GCP project identifier. topic_id : string Specifies the PubSub topic. The connector-in publishes to this topic. The connector-out creates a subscription to this topic if *subscription_id* is not specified. subscription_id : string Specifies the subscription that the connector-in subscribes to. If the subscription does not exist, it is created. The connector-out ignores this property. +++++++++++++++++++++++++++ GCP Cloud Storage Connector +++++++++++++++++++++++++++ .. _Cloud Storage service: https://cloud.google.com/storage/docs/introduction Connects to the Google Cloud `Cloud Storage service`_:: { "type":"gcp_storage", "authbundle_id":"my_service_key", "project_id":"my_project_id", "bucket_name":"my_bucket_name", "object_name": "my_object_name" } ========== Properties ========== Common properties: authbundle_id_, type_. project_id : string Specifies the GCP project identifier. bucket_name : string Specifies the bucket name to which the connector-out uploads messages or from which the connector-in downloads messages. If the bucket does not exist, the connector-out creates it, whereas the connector-in does not. object_name : string Specifies the name of the bucket object where the message will be stored. If it is a plain string, it will be overwritten with each new message. `Dynamic substitutions`_ can be used to generate unique object names for every message. ++++++++++++++++ AWS S3 Connector ++++++++++++++++ .. _Simple Storage Service: https://aws.amazon.com/s3/ Connects to Amazon Web Services `Simple Storage Service`_ (S3):: { "type":"aws_s3", "authbundle_id":"my_access_key", "bucket_name":"feisty", "object_name":"sensor-data" } ========== Properties ========== Common properties: authbundle_id_, type_. bucket_name : string Specifies the bucket name to which the connector-out uploads messages or from which the connector-in downloads messages. If the bucket does not exist, the connector-out creates it, whereas the connector-in does not. object_name : string Specifies the name of the bucket object where the message will be stored. If it is a plain string, it will be overwritten with each new message. `Dynamic substitutions`_ can be used to generate unique object names for every message. +++++++++++++++++++++++ Email service connector +++++++++++++++++++++++ Connects to an email server via SMTP. Currently, only the connector-out is supported:: { "type":"email", "authbundle_id":"my_authbundle_id", "to": "my_recepient", "smtp_server": "my_smtp_server", "smtp_port": "my_smtp_port" } ========== Properties ========== Common properties: authbundle_id_, type_. to : string Specifies the email address to which the message will be sent. smtp_server : string Specifies the URL of the SMTP server. smtp_port : number Specifies the port of the SMTP server. .. _Internal Queue Connector: ++++++++++++++++++++++++ Internal Queue Connector ++++++++++++++++++++++++ Connects to an internal queue and serves as a communication channel, allowing pipelines to exchange messages with each other:: { "type":"queue", "name": "my_queue_name" } ========== Properties ========== Common properties: type_. name : string Specifies the name of the queue. Each Filtra_ can have a *queues* property that, in addition to normal message processing in the pipeline, allows it to direct messages to internal queues. These messages become available to other pipelines if those pipelines are configured with an `Internal Queue Connector`_ as the connector-in. +++++++++++++++++++++++ Slack connector +++++++++++++++++++++++ Sends messages to a `Slack `__ channel via `incoming webhooks `__. Only connector-out is supported:: { "type":"slack", "authbundle_id":"my_authbundle_id", } ========== Properties ========== Common properties: type_, authbundle_id_. .. _Filtra: .. _Filtras: ******* Filtras ******* A filtra is a message-processing unit that functions as either a *filter* or a *transformer*. *Filters* are units that do not modify the original message received through the connector-in. Their primary purpose is to filter out messages that do not meet specific criteria. In contrast, *transformers* are units that can modify the original message, though some transformers may pass certain messages unchanged. Pipelines can include any number of filtras, which can be linked to create complex data-processing flows using `goto properties`_. +++++++++++++++++ Filtra Properties +++++++++++++++++ All filtras require a **type** property and may include other optional properties. Common properties for all filtras are described here, while type-specific properties are covered in their respective sections. ================= Common Properties ================= .. _msg_format: msg_format : string Specifies the msg_format used for decoding/encoding the message payload. Supported values: * json * corb .. _goto properties: .. _goto_accepted: goto_accepted : string Specifies the name of the next filtra in the pipeline if the current filtra allows the message to pass. .. _goto_rejected: goto_rejected : string Specifies the name of the next filtra in the pipeline if the message is rejected by the current filtra. .. _goto: goto : string Equivalent to goto_accepted_. .. _name: name : string Specifies a name for the filtra to be used in *goto* properties. The names *self* and *out* are reserved. .. _logical_negation: logical_negation : boolean Specifies that logical negation is applied to the result, meaning an accepted message becomes rejected, and a rejected message becomes accepted. .. _metadata: metadata : dictionary Specifies a dictionary of attributes that can be used for various purposes, such as `Dynamic substitutions`_ in property values. .. _queues: queues : array of strings Specifies the names of internal queues used for message interchange between pipelines. ++++++++++++++ Builder Filtra ++++++++++++++ Creates a new message:: { "type": "builder", "payload": { "key": value } } ========== Properties ========== Common properties: name_, goto_accepted_, goto_, queues_, metadata_, msg_format_. payload : dictionary Specifies a dictionary that serves as the payload for the new message, completely discarding the incoming message. .. _Comparator Filtras: +++++++++++++++++ Comparator Filtra +++++++++++++++++ Compares numerical values:: { "type": "comparator", "operator": "gt", "msg_format": "json", "value_key": "temp", "comparand": 5.4 } ========== Properties ========== Common properties: name_, goto_accepted_, goto_, queues_, metadata_, msg_format_, logical_negation_. operator : string Specifies the comparison operator. Possible values include: * gt - greater than * gte - greater than or equal * lt - less than * lte - less than or equal * eq - equal value_key : string Specifies the key in the payload whose value is compared with the comparand. comparand : string Specifies a numerical constant to be compared with a value in the payload. +++++++++++++ Eraser Filtra +++++++++++++ Removes specified keys from the payload:: { "type": "eraser", "keys": ["key1", "keyN"], "msg_format": "json" } ========== Properties ========== Common properties: name_, goto_accepted_, goto_, queues_, metadata_, msg_format_. keys : array Specifies the keys to be removed from the message payload. +++++++++++++ Finder Filtra +++++++++++++ Searches for a specific part in the message based on the finder configuration. Three variants are supported: .. __: 1. Searches for text within the payload or searches for the payload within the text. Properties *text* and *operator* must be specified:: { "type": "finder", "operator": "contain", "text": "LOG" } 2. Searches for specified keys in the decoded payload. A property *keys* must be specified. :: { "type": "finder", "keys": ["key1", "keyN"] } 3. Performs the same type of search as in `Variant 1`__, but instead of analyzing the entire payload, it examines the value of a specified key in the decoded payload. Properties *text*, *operator*, *value_key* must be specified:: { "type": "finder", "operator": "contain", "text": "LOG", "value_key": "key1" } ========== Properties ========== Common properties: name_, goto_accepted_, goto_rejected_, goto_, queues_, metadata_, msg_format_, logical_negation_. operator : string Specifies the method for searching for a substring. Possible values include: * contain - searches for *text* within the payload; * contained - searches for the payload within *text*; * match - checks if *text* exactly matches the payload. text : string Specifies a string that can either be searched as a substring within the payload or used to search for the payload as a substring. keys : array Specifies the keys in the decoded payload dictionary. All keys must be present in the payload; otherwise, the message is rejected. value_key : string Specifies a key in the decoded payload whose value is used for the text search. ++++++++++++++ Limiter Filtra ++++++++++++++ Rejects messages that exceed the specified size:: { "type": "limiter", "size": 1024 } ========== Properties ========== Common properties: name_, goto_accepted_, goto_rejected_, goto_, queues_, metadata_, msg_format_, logical_negation_. size : number Specifies a size limit in bytes for the message. Messages exceeding this limit are rejected. ++++++++++ Nop Filtra ++++++++++ Performs no action; however, it can be used, for example, to attach metadata or direct messages to internal queues:: { "type": "nop" } ========== Properties ========== Common properties: name_, goto_accepted_, goto_, queues_, metadata_. .. _Splitter: +++++++++++++++ Splitter Filtra +++++++++++++++ Splits messages that exceed a specified size into chunks. If the message size is below *chunk_size*, it passes through the Splitter_ unmodified; otherwise, it is split into chunks of *chunk_size* bytes (with the last chunk possibly smaller). Each chunk is sent as a separate message. It is the receiver's responsibility to reconstruct the original message:: { "type": "splitter", "chunk_size": 1024 } Each chunk is encoded as a CBOR array containing the following elements: * ____SPL - chunk marker * Message ID - unique identifier for the original message (generated by the Splitter_) * Original Message Size - total size of the original message * Chunk Number - sequential number of the chunk * Array of Bytes - portion of the original message, with a length of *chunk_size* or smaller ========== Properties ========== Common properties: name_, goto_accepted_, goto_, queues_, metadata_, msg_format_. chunk_size : number Specifies the maximum size of each message chunk in bytes. ********************* Dynamic substitutions ********************* Dynamic substitutions are supported in certain places. Placeholders for dynamic substitutions are defined using double curly braces: ``{{dynamic_value}}``. :: "filtras":[ { "type": "nop", "metadata":{ "topic": "/big" }, } ], "connector_out": { "type": "mqtt", "topic": "{{topic}}", "server": "mqtt://mqtt.iotplan.io:1883" } In the example above, the `Nop Filtra`_ defines *metadata* that is used by the *connector-out* to replace ``{{topic}}`` with the actual value from the *metadata* dictionary. As a result, the *connector-out* sends each message to the topic ``/big``. It is not always necessary to explicitly define the *metadata* property, as some *metadata* key-value pairs are automatically derived:: "connector_in": { "type": "mqtt", "topic": "/topic/+/event", "server": "mqtt://mqtt.iotplan.io:1883", }, "connector_out":{ "type":"gcp_pubsub", "authbundle_id":"my_authbundle_id", "project_id":"my_project_id", "topic_id":"my_topic_id", "attributes":{ "deviceId":"{{topic[2]}}", } } In the example above, the ``topic`` in the *connector-out* refers to the topic that the *connector-in* is subscribed to. This also demonstrates indexing in dynamic substitutions. The *connector-in* subscribes to an MQTT topic composed of multiple levels, and dynamic substitution with indexing allows extracting a specific level of interest from the topic to replace the placeholder ``{{topic[2]}}``. In this case, the *connector-in* is subscribed to a topic with a wildcard character. As a result, the value of ``{{topic[2]}}`` varies from message to message, depending on the actual topic to which the message was published.