Skip to main content

A streaming hub. Sort of.

Project description

Señor Octopus is a streaming hub, fetching data from APIs, transforming it, filtering it, and storing it, based on a declarative configuration.

Confused? Keep reading.

A simple example

Señor Octopus reads a pipeline defintion from a configuration file like this:

# generate random numbers and send them to "check" and "normal"
[random]
plugin = source.random
flow = -> check, normal
schedule = * * * * *

# filter numbers from "random" that are > 0.5 and send to "high"
[check]
plugin = filter.jsonpath
flow = random -> high
filter = $.events[?(@.value>0.5)]

# log all the numbers coming from "random" at the default level
[normal]
plugin = sink.log
flow = * ->
batch = 5 minutes

# log all the numbers coming from "check" at the warning level
[high]
plugin = sink.log
flow = check ->
level = warning

The example above has a source called “random”, that generates random numbers every minute (its schedule). It’s connected to 2 other nodes, “check” and “normal” (flow = -> check, normal). Each random number is an event that looks like this:

{
    "timestamp": "2021-01-01T00:00:00+00:00",
    "name": "hub.random",
    "value": 0.6394267984578837
}

The node check is a filter that verifies that the value of each number is greater than 0.5. Events that pass the filter are sent to the high node (the filter connects the two nodes, according to flow = random -> high).

The node normal is a sink that logs events. It receives events from any other node (flow = * ->), and stores them in a queue, logging them at the INFO level (the default) every 5 minutes (batch = 5 minutes). The node high, on the other hand, receives events only from check, and logs them immediately at the WARNING level.

To run it:

$ srocto config.ini -vv
[2021-03-25 14:28:26] INFO:senor_octopus.cli:Reading configuration
[2021-03-25 14:28:26] INFO:senor_octopus.cli:Building DAG
[2021-03-25 14:28:26] INFO:senor_octopus.cli:
*   random
|\
* | check
| * normal
* high

[2021-03-25 14:28:26] INFO:senor_octopus.cli:Running Sr. Octopus
[2021-03-25 14:28:26] INFO:senor_octopus.scheduler:Starting scheduler
[2021-03-25 14:28:26] INFO:senor_octopus.scheduler:Scheduling random to run in 33.76353 seconds
[2021-03-25 14:28:26] DEBUG:senor_octopus.scheduler:Sleeping for 5 seconds

A concrete example

Now for a more realistic example. I wanted to monitor the air quality in my bedroom, using an Awair Element. Since their API is throttled I want to read values once every 5 minutes, and store everything in a Postgres database. If the CO2 value is higher than 1000 ppm I want to receive a notification on my phone, limited to one message every 30 minutes.

This is the config I use for that:

[awair]
plugin = source.awair
flow = -> *
schedule = */5 * * * *
prefix = hub.awair
AWAIR_ACCESS_TOKEN = XXX
AWAIR_DEVICE_TYPE = awair-element
AWAIR_DEVICE_ID = 12345

[high_co2]
plugin = filter.jsonpath
flow = awair -> pushover
filter = $.events[?(@.name=="hub.awair.co2" and @.value>1000)]

[pushover]
plugin = sink.pushover
flow = high_co2 ->
throttle = 30 minutes
PUSHOVER_APP_TOKEN = XXX
PUSHOVER_USER_TOKEN = johndoe

[db]
plugin = sink.db.postgresql
flow = * ->
batch = 15 minutes
POSTGRES_DBNAME = dbname
POSTGRES_USER = user
POSTGRES_PASSWORD = password
POSTGRES_HOST = host
POSTGRES_PORT = 5432

I’m using Pushover to send notifications to my phone.

Will it rain?

Here’s another example, a pipeline that will notify you if tomorrow will rain:

[weather]
plugin = source.weatherapi
flow = -> will_it_rain
schedule = 0 12 * * *
location = London
WEATHERAPI_TOKEN = XXX

[will_it_rain]
plugin = filter.jsonpath
flow = weather -> pushover
filter = $.events[?(@.name=="hub.weatherapi.forecast.forecastday.daily_will_it_rain" and @.value==1)]

[pushover]
plugin = sink.pushover
flow = will_it_rain ->
throttle = 30 minutes
PUSHOVER_APP_TOKEN = XXX
PUSHOVER_USER_TOKEN = johndoe

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

senor-octopus-0.1.10.tar.gz (39.2 kB view details)

Uploaded Source

Built Distribution

senor_octopus-0.1.10-py2.py3-none-any.whl (20.8 kB view details)

Uploaded Python 2 Python 3

File details

Details for the file senor-octopus-0.1.10.tar.gz.

File metadata

  • Download URL: senor-octopus-0.1.10.tar.gz
  • Upload date:
  • Size: 39.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.4.1 importlib_metadata/3.9.1 pkginfo/1.7.0 requests/2.25.1 requests-toolbelt/0.9.1 tqdm/4.59.0 CPython/3.9.2

File hashes

Hashes for senor-octopus-0.1.10.tar.gz
Algorithm Hash digest
SHA256 ccfa5eb3a553d1eddbb5bf32ecb9a4b67e89ba231130a5c2055b2ede9a284dd9
MD5 0e8c2a4e86ae0dbaa5f861cf5744a322
BLAKE2b-256 5beb5f1dae65d9c801f187ef21580562a10ec3046b80dccf00f3280b1c419be2

See more details on using hashes here.

Provenance

File details

Details for the file senor_octopus-0.1.10-py2.py3-none-any.whl.

File metadata

  • Download URL: senor_octopus-0.1.10-py2.py3-none-any.whl
  • Upload date:
  • Size: 20.8 kB
  • Tags: Python 2, Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.4.1 importlib_metadata/3.9.1 pkginfo/1.7.0 requests/2.25.1 requests-toolbelt/0.9.1 tqdm/4.59.0 CPython/3.9.2

File hashes

Hashes for senor_octopus-0.1.10-py2.py3-none-any.whl
Algorithm Hash digest
SHA256 67dd1015ef0c200484ab9c451d5ef7690ffc13d76e3a0efa18f1554dd3ce0a8a
MD5 f76bde6307150c92e97bf528946a641c
BLAKE2b-256 a30c76f17c4804f5016e6455632639189f595a5fd5f85b64767e5481d0e6a0d4

See more details on using hashes here.

Provenance

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page