Awesome
MQTT TRANSFORMER
This is a small service which subscribes to given topics, transforms the jsons with the given patterns, and emits them back to other topics.
Use-case;
- you own some mqtt emitters (for ex. measure type iot devices)
- you own some mqtt displayers (for ex. services, or physical devices)
- the emitters and the displayers are not compatible out of the box
- you need some bridge to convert the messages
- you want to do some simple http->mqtt or mqtt->http transforms/bridging
Config syntax
The app works with one conf.json
which looks like this;
{
"transforms": [
{
"fromTopic": "tele/tasmota/STATE",
"toTopic": "transformed/tasmota-state",
"emitInterval": 60,
"emitType": "repeat",
"template": {
"uptime": {
"$eval": "UptimeSec"
}
}
}
],
"io": []
}
You can have multiple transforms in the array!
The required fields are; toTopic
(or toTopicTemplate
see below the templates), emitType
, and either fromTopic
as a string or fromTopics
as an array of strings. The other fields may vary based on the choosen emitType
.
The template
or filterTemplate
parameter is the trickiest. The app uses json-e underneath, so you need to cook up a valid json-e transformation.
Read the docs (or the site) for reference.
(For concrete examples and use-cases; check the recipes or help me by opening an issue or PR. For minimal examples you can check the example conf or the tests.)
Emit types - "transforms"
Transformation - map
Maps with the given template
.
Data may optionally be wrapped in a root object. This allows transformations on raw values like strings and numbers. The name of the value property in the root object is taken from the configuration parameter wrapper
. Check the recipe Power consumed -> Power provided for an example.
Filter - filter and collect
Filters with the filterTemplate
. The filterTemplate
must return with true/false for correctly describe your intention.
(It's javascript and I filter with if(result)
so technically you can return false/null/undefined/0/empty-string/empty-array for false values and anything else for true, but still...)
Filter will emit the given message as is if the filterTemplate
returns true, while collect will do a map on it with the template
.
If the filterTemplate
returns false it will not emit.
Data may optionally be wrapped in a root object. This allows filtering of raw values like strings and numbers. The name of the value property in the root object is taken from the configuration parameter wrapper
. Check the recipe Power consumed -> Power provided for examples.
Time driven types - repeat and once
The emitInterval
is the minimum time in seconds between message emits to the toTopic
.
Both will emit for the first seen message, but while
once
will not emit until the emitInterval
is zeroed (and then next emit when it sees a new message in the topic),
repeat
will send the last seen message whenever the interval is passed.
Both maps with the given template
.
Combine multiple topics - zipLast and combineLatest
CombineLatest works the same as reactiveX defines it.
ZipLast kinda works like zip. It waits till it gets at least one element in all of its topics, BUT always keeping the last element if it gets more than one in the same topic.
When it gets an element in each topic, it calls the template
, emits the output, and clears all of the saved elements.
The template
will get a {messages: []}
object, the indexes will match to the topic indexes.
Reduce
Gets the messages from the given topic(s), and the old state from the last emitted message, and combines them.
The template
will get a {message: {}, state: {}}
object.
toTopic
should be used, toTopicTemplate
will not work!
Constants
You can add commonly used constants as a "transformation".
For usage you need to add the useConstants
as an optional parameter, and you can rename the constant there newName: constantName
.
The defined name will be written to the input data before the transformations, so it could potentially override values from the incoming data!
[
{
"emitType": "constant",
"name": "secToHour",
"value": 3600
},
{
"emitType": "map",
"fromTopic": "t",
"toTopic": "out",
"template": {"message": "we have ${sTh} secs in on hour"},
"useConstants": {"sTh": "secToHour"}
}
]
Additional values
Topic
You can add the topicKeyToMessage
key to the config, and the incoming messages will be enhanced with the key and the topic name.
{
"fromTopic": "tele/+/STATE",
"toTopicTemplate": "${topic}_NEW",
"emitInterval": 60,
"topicKeyToMessage": "topic",
"emitType": "repeat",
"template": {"uptime": {"$eval": "UptimeSec"}, "topic": {"$eval": "topic"}}
}
(If the message has the topic tele/test/STATE
, the above example will produce messages to the tele/test/STATE_NEW
topic.)
The toTopicTemplate
could be dynamically generate a toTopicName.
Metrics
You can add useMetrics
the same way as useConstants
. The available metrics are described in the types.ts or you can check the example below.
{
"fromTopic": "computed/hooks/response1",
"toTopic": "computed/map",
"emitType": "map",
"useMetrics": {
"lT": "lastMessageTime",
"fT": "firstMessageTime",
"pT": "prevMessageTime",
"mC": "messageCount"
},
"template": {
"responseStatus": {
"$eval": "responseStatus"
},
"avgMsgPerSec": {
"$eval": "floor((lT-fT)/mC/1000)"
},
"msgDelay": {
"$eval": "floor((lT-pT)/1000)"
}
}
}
(In the above example the avg will converge to the actual number but it will be off a bit. But mC-1
would cause a division by zero!)
IOs
The app by default uses the env params as an mqtt connection. But it could bridge multiple mqtt and/or webservers.
All the ios has a type (see below) and a topicPrefix
which is optional.
The prefix could help on the routings, like if you have two mqtt servers, you can add the prefix as mqtt1/
and mqtt2/
respectively,
and can bridge the messages from mqtt1/test
to mqtt2/test
.
The empty topicPrefix will get ALL the messages! (So if you have 3 mqtt servers ""
will get all the messages and "first/"
will get "first/second"
messages if you prefix them in the same order.
We are using the topicPrefix
only for routing, it will be dropped from the topic name before delivery.
MQTT
{
"type": "mqtt",
"topicPrefix": "",
"url": "mqtt://localhost:1883",
"user": "optional string",
"password": "optional string",
"clientId": "optional string"
}
Gets and sends messages from/to mqtt topics.
Webserver
{
"type": "webserver",
"topicPrefix": "ws/",
"port": 3000
}
Starts a webserver on the given port.
The command;
curl -d '{"stringParam":"stringValue", "numParam": 5}' -H "Content-Type: application/json" -X POST localhost:3000/test1
Will fire the ws/test1
from topics, with the given json in the body. (Only handles POST requests.)
HookCall
{
"type": "hookCall",
"url": "http://localhost:3000/hook",
"topicPrefix": "hook/",
"responseTopic": "computed/hooks/response1"
}
Calls the given hook url. Optionally writes the response to a responseTopic
.
Running the app
Local install / dev
For enable debugging you can export IS_VERBOSE=true
npm i
docker-compose up -d
ts-node-dev --respawn --watch src src/app.ts
# from other consoles
curl -d '{"stringParam":"stringValue", "numParam": 5}' -H "Content-Type: application/json" -X POST localhost:3000/test1
curl -d '{"stringParam":"stringValue", "numParam": 7}' -H "Content-Type: application/json" -X POST localhost:3000/test2
Docker and compose
For docker you can run;
docker run -e MQTT_URL="mqtt://myserver:1883" -v ${PWD}/conf:/home/node/app/conf ghcr.io/tg44/mqtt-transformer
For docker compose;
version: '3.1'
services:
mqtt-transformer:
image: ghcr.io/tg44/mqtt-transformer
restart: unless-stopped
volumes:
- /otp/mqtt-transformer/:/home/node/app/conf
environment:
- MQTT_URL=mqtt://myserver:1883
In the early config/template writing/testing phase, you can add the IS_VERBOSE
env var too.
That will log all the incoming messages alongside with the rule id, the applied template, and the resulting output.
MQTT_USER
, MQTT_PW
, MQTT_CLIENT_ID
can be set as env vars too.
Breaking changes
- 2022.01.06
- we will permanently move away from dockerhub, the latest images will be pushed, but the documentation and the other infos will only be updated here
- DH freezes the free builds, while GH-Actions not only build free, but gives us public repositories too
- we will permanently move away from dockerhub, the latest images will be pushed, but the documentation and the other infos will only be updated here
Contribution
If you have any idea about the base functionality or the config/emitter syntax, just start a new issue/pr and we can talk about the use-cases, pros and cons!