A streaming hub. Sort of.
Project description
Señor Octopus is a streaming hub, fetching data from APIs, transforming it, filtering it, and storing it, based on a declarative configuration.
Confused? Keep reading.
A simple example
Señor Octopus reads a pipeline definition from a YAML configuration file like this:
# generate random numbers and send them to "check" and "normal"
random:
plugin: source.random
flow: -> check, normal
schedule: "* * * * *" # every minute
# filter numbers from "random" that are > 0.5 and send to "high"
check:
plugin: filter.jsonpath
flow: random -> high
filter: "$.events[?(@.value>0.5)]"
# log all the numbers coming from "random" at the default level
normal:
plugin: sink.log
flow: "* ->"
batch: 5 minutes
# log all the numbers coming from "check" at the warning level
high:
plugin: sink.log
flow: check ->
level: warning
The example above has a source called “random”, that generates random numbers every minute (its schedule). It’s connected to 2 other nodes, “check” and “normal” (flow = -> check, normal). Each random number is sent in an event that looks like this:
{
"timestamp": "2021-01-01T00:00:00+00:00",
"name": "hub.random",
"value": 0.6394267984578837
}
The node check is a filter that verifies that the value of each number is greater than 0.5. Events that pass the filter are sent to the high node (the filter connects the two nodes, according to flow = random -> high).
The node normal is a sink that logs events. It receives events from any other node (flow = * ->), and stores them in a queue, logging them at the INFO level (the default) every 5 minutes (batch = 5 minutes). The node high, on the other hand, receives events only from check, and logs them immediately at the WARNING level.
To run it:
$ srocto config.yaml -vv
[2021-03-25 14:28:26] INFO:senor_octopus.cli:Reading configuration
[2021-03-25 14:28:26] INFO:senor_octopus.cli:Building DAG
[2021-03-25 14:28:26] INFO:senor_octopus.cli:
* random
|\
* | check
| * normal
* high
[2021-03-25 14:28:26] INFO:senor_octopus.cli:Running Sr. Octopus
[2021-03-25 14:28:26] INFO:senor_octopus.scheduler:Starting scheduler
[2021-03-25 14:28:26] INFO:senor_octopus.scheduler:Scheduling random to run in 33.76353 seconds
[2021-03-25 14:28:26] DEBUG:senor_octopus.scheduler:Sleeping for 5 seconds
To stop running, press ctrl+C. Any batched events will be processed before the scheduler terminates.
A concrete example
Now for a more realistic example. I wanted to monitor the air quality in my bedroom, using an Awair Element. Since their API is throttled I want to read values once every 5 minutes, and store everything in a Postgres database. If the CO2 value is higher than 1000 ppm I want to receive a notification on my phone, limited to one message every 30 minutes.
This is the config I use for that:
awair:
plugin: source.awair
flow: -> *
schedule: "*/5 * * * *"
prefix: hub.awair
access_token: XXX
device_type: awair-element
device_id: 12345
high_co2:
plugin: filter.jsonpath
flow: awair -> pushover
filter: '$.events[?(@.name=="hub.awair.co2" and @.value>1000)]'
pushover:
plugin: sink.pushover
flow: high_co2 ->
throttle: 30 minutes
app_token: XXX
user_token: johndoe
db:
plugin: sink.db.postgresql
flow: "* ->"
batch: 15 minutes
dbname: dbname
user: user
password: password
host: host
port: 5432
I’m using Pushover to send notifications to my phone.
Will it rain?
Here’s another example, a pipeline that will notify you if tomorrow will rain:
weather:
plugin: source.weatherapi
flow: -> will_it_rain
schedule: 0 12 * * *
location: London
access_token: XXX
will_it_rain:
plugin: filter.jsonpath
flow: weather -> pushover
filter: '$.events[?(@.name=="hub.weatherapi.forecast.forecastday.daily_will_it_rain" and @.value==1)]'
pushover:
plugin: sink.pushover
flow: will_it_rain ->
throttle: 30 minutes
app_token: XXX
user_token: johndoe
Event-driven sources
Señor Octopus also supports event-driven sources. Differently to the sources in the previous examples, these sources run constantly and respond immediately to events. An example is the MQTT source:
mqtt:
plugin: source.mqtt
flow: -> log
topics: test/#
host: mqtt.example.org
log:
plugin: sink.log
flow: mqtt ->
Running the pipeline above, when an event arrives in the MQTT topic test/# (eg, test/1) it will be immediately sent to the log.
There’s also an MQTT sink, that will publish events to a given topic:
mqtt:
plugin: sink.mqtt
flow: "* ->"
topic: test/1
host: mqtt.example.org
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file senor-octopus-0.1.12.tar.gz
.
File metadata
- Download URL: senor-octopus-0.1.12.tar.gz
- Upload date:
- Size: 47.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.4.1 importlib_metadata/4.0.1 pkginfo/1.7.0 requests/2.25.1 requests-toolbelt/0.9.1 tqdm/4.60.0 CPython/3.9.5
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 63999b5495ec41cb8001e3535f33e669f82142c223d63e63f6b0d237bfead6a8 |
|
MD5 | 8311571b7a88c70a116a615b87c78f81 |
|
BLAKE2b-256 | 1cdf255b81a7621e015f9408c6b0fa160e29eeab327c970c14cd19dce7375a99 |
Provenance
File details
Details for the file senor_octopus-0.1.12-py2.py3-none-any.whl
.
File metadata
- Download URL: senor_octopus-0.1.12-py2.py3-none-any.whl
- Upload date:
- Size: 25.3 kB
- Tags: Python 2, Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.4.1 importlib_metadata/4.0.1 pkginfo/1.7.0 requests/2.25.1 requests-toolbelt/0.9.1 tqdm/4.60.0 CPython/3.9.5
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | d213bbe5aa49ecb35378d5fe4352dd9df72b5e76eae9bf80bf67dc13971aed0d |
|
MD5 | 9cc751a1890b4521ac28eb61a4444559 |
|
BLAKE2b-256 | 53cf150a590af887f1fcee9b8c66b49265d1073a2760d07a3747c08a9eb20482 |