Skip to main content

A streaming hub. Sort of.

Project description

Señor Octopus is a streaming hub, fetching data from APIs, transforming it, filtering it, and storing it, based on a declarative configuration.

Confused? Keep reading.

A simple example

Señor Octopus reads a pipeline definition from a YAML configuration file like this:

# generate random numbers and send them to "check" and "normal"
random:
  plugin: source.random
  flow: -> check, normal
  schedule: "* * * * *"  # every minute

# filter numbers from "random" that are > 0.5 and send to "high"
check:
  plugin: filter.jsonpath
  flow: random -> high
  filter: "$.events[?(@.value>0.5)]"

# log all the numbers coming from "random" at the default level
normal:
  plugin: sink.log
  flow: "* ->"
  batch: 5 minutes

# log all the numbers coming from "check" at the warning level
high:
  plugin: sink.log
  flow: check ->
  level: warning

The example above has a source called “random”, that generates random numbers every minute (its schedule). It’s connected to 2 other nodes, “check” and “normal” (flow = -> check, normal). Each random number is sent in an event that looks like this:

{
    "timestamp": "2021-01-01T00:00:00+00:00",
    "name": "hub.random",
    "value": 0.6394267984578837
}

The node check is a filter that verifies that the value of each number is greater than 0.5. Events that pass the filter are sent to the high node (the filter connects the two nodes, according to flow = random -> high).

The node normal is a sink that logs events. It receives events from any other node (flow = * ->), and stores them in a queue, logging them at the INFO level (the default) every 5 minutes (batch = 5 minutes). The node high, on the other hand, receives events only from check, and logs them immediately at the WARNING level.

To run it:

$ srocto config.yaml -vv
[2021-03-25 14:28:26] INFO:senor_octopus.cli:Reading configuration
[2021-03-25 14:28:26] INFO:senor_octopus.cli:Building DAG
[2021-03-25 14:28:26] INFO:senor_octopus.cli:
*   random
|\
* | check
| * normal
* high

[2021-03-25 14:28:26] INFO:senor_octopus.cli:Running Sr. Octopus
[2021-03-25 14:28:26] INFO:senor_octopus.scheduler:Starting scheduler
[2021-03-25 14:28:26] INFO:senor_octopus.scheduler:Scheduling random to run in 33.76353 seconds
[2021-03-25 14:28:26] DEBUG:senor_octopus.scheduler:Sleeping for 5 seconds

To stop running, press ctrl+C. Any batched events will be processed before the scheduler terminates.

A concrete example

Now for a more realistic example. I wanted to monitor the air quality in my bedroom, using an Awair Element. Since their API is throttled I want to read values once every 5 minutes, and store everything in a Postgres database. If the CO2 value is higher than 1000 ppm I want to receive a notification on my phone, limited to one message every 30 minutes.

This is the config I use for that:

awair:
  plugin: source.awair
  flow: -> *
  schedule: "*/5 * * * *"
  prefix: hub.awair
  access_token: XXX
  device_type: awair-element
  device_id: 12345

high_co2:
  plugin: filter.jsonpath
  flow: awair -> pushover
  filter: '$.events[?(@.name=="hub.awair.co2" and @.value>1000)]'

pushover:
  plugin: sink.pushover
  flow: high_co2 ->
  throttle: 30 minutes
  app_token: XXX
  user_token: johndoe

db:
  plugin: sink.db.postgresql
  flow: "* ->"
  batch: 15 minutes
  dbname: dbname
  user: user
  password: password
  host: host
  port: 5432

I’m using Pushover to send notifications to my phone.

Will it rain?

Here’s another example, a pipeline that will notify you if tomorrow will rain:

weather:
  plugin: source.weatherapi
  flow: -> will_it_rain
  schedule: 0 12 * * *
  location: London
  access_token: XXX

will_it_rain:
  plugin: filter.jsonpath
  flow: weather -> pushover
  filter: '$.events[?(@.name=="hub.weatherapi.forecast.forecastday.daily_will_it_rain" and @.value==1)]'

pushover:
  plugin: sink.pushover
  flow: will_it_rain ->
  throttle: 30 minutes
  app_token: XXX
  user_token: johndoe

Event-driven sources

Señor Octopus also supports event-driven sources. Differently to the sources in the previous examples, these sources run constantly and respond immediately to events. An example is the MQTT source:

mqtt:
  plugin: source.mqtt
  flow: -> log
  topics: test/#
  host: mqtt.example.org

log:
  plugin: sink.log
  flow: mqtt ->

Running the pipeline above, when an event arrives in the MQTT topic test/# (eg, test/1) it will be immediately sent to the log.

There’s also an MQTT sink, that will publish events to a given topic:

mqtt:
  plugin: sink.mqtt
  flow: "* ->"
  topic: test/1
  host: mqtt.example.org

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

senor-octopus-0.1.12.tar.gz (47.1 kB view details)

Uploaded Source

Built Distribution

senor_octopus-0.1.12-py2.py3-none-any.whl (25.3 kB view details)

Uploaded Python 2 Python 3

File details

Details for the file senor-octopus-0.1.12.tar.gz.

File metadata

  • Download URL: senor-octopus-0.1.12.tar.gz
  • Upload date:
  • Size: 47.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.4.1 importlib_metadata/4.0.1 pkginfo/1.7.0 requests/2.25.1 requests-toolbelt/0.9.1 tqdm/4.60.0 CPython/3.9.5

File hashes

Hashes for senor-octopus-0.1.12.tar.gz
Algorithm Hash digest
SHA256 63999b5495ec41cb8001e3535f33e669f82142c223d63e63f6b0d237bfead6a8
MD5 8311571b7a88c70a116a615b87c78f81
BLAKE2b-256 1cdf255b81a7621e015f9408c6b0fa160e29eeab327c970c14cd19dce7375a99

See more details on using hashes here.

Provenance

File details

Details for the file senor_octopus-0.1.12-py2.py3-none-any.whl.

File metadata

  • Download URL: senor_octopus-0.1.12-py2.py3-none-any.whl
  • Upload date:
  • Size: 25.3 kB
  • Tags: Python 2, Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.4.1 importlib_metadata/4.0.1 pkginfo/1.7.0 requests/2.25.1 requests-toolbelt/0.9.1 tqdm/4.60.0 CPython/3.9.5

File hashes

Hashes for senor_octopus-0.1.12-py2.py3-none-any.whl
Algorithm Hash digest
SHA256 d213bbe5aa49ecb35378d5fe4352dd9df72b5e76eae9bf80bf67dc13971aed0d
MD5 9cc751a1890b4521ac28eb61a4444559
BLAKE2b-256 53cf150a590af887f1fcee9b8c66b49265d1073a2760d07a3747c08a9eb20482

See more details on using hashes here.

Provenance

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page