Quickstart

Dependencies

Spouts and Bolts

The general flow for creating new spouts and bolts using pystorm is to add them to your src folder and update the corresponding topology definition.

Let’s create a spout that emits sentences until the end of time:

import itertools

from pystorm.spout import Spout


class SentenceSpout(Spout):

    def initialize(self, stormconf, context):
        self.sentences = [
            "She advised him to take a long holiday, so he immediately quit work and took a trip around the world",
            "I was very glad to get a present from her",
            "He will be here in half an hour",
            "She saw him eating a sandwich",
        ]
        self.sentences = itertools.cycle(self.sentences)

    def next_tuple(self):
        sentence = next(self.sentences)
        self.emit([sentence])

    def ack(self, tup_id):
        pass  # if a tuple is processed properly, do nothing

    def fail(self, tup_id):
        pass  # if a tuple fails to process, do nothing

The magic in the code above happens in the initialize() and next_tuple() functions. Once the spout enters the main run loop, pystorm will call your spout’s initialize() method. After initialization is complete, pystorm will continually call the spout’s next_tuple() method where you’re expected to emit tuples that match whatever you’ve defined in your topology definition.

Now let’s create a bolt that takes in sentences, and spits out words:

import re

from pystorm.bolt import Bolt

class SentenceSplitterBolt(Bolt):

    def process(self, tup):
        sentence = tup.values[0]  # extract the sentence
        sentence = re.sub(r"[,.;!\?]", "", sentence)  # get rid of punctuation
        words = [[word.strip()] for word in sentence.split(" ") if word.strip()]
        if not words:
            # no words to process in the sentence, fail the tuple
            self.fail(tup)
            return

        for word in words:
            self.emit([word])
        # tuple acknowledgement is handled automatically

The bolt implementation is even simpler. We simply override the default process() method which pystorm calls when a tuple has been emitted by an incoming spout or bolt. You are welcome to do whatever processing you would like in this method and can further emit tuples or not depending on the purpose of your bolt.

If your process() method completes without raising an Exception, pystorm will automatically ensure any emits you have are anchored to the current tuple being processed and acknowledged after process() completes.

If an Exception is raised while process() is called, pystorm automatically fails the current tuple prior to killing the Python process.

Failed Tuples

In the example above, we added the ability to fail a sentence tuple if it did not provide any words. What happens when we fail a tuple? Storm will send a “fail” message back to the spout where the tuple originated from (in this case SentenceSpout) and pystorm calls the spout’s fail() method. It’s then up to your spout implementation to decide what to do. A spout could retry a failed tuple, send an error message, or kill the topology.

Bolt Configuration Options

You can disable the automatic acknowleding, anchoring or failing of tuples by adding class variables set to false for: auto_ack, auto_anchor or auto_fail. All three options are documented in pystorm.bolt.Bolt.

Example:

from pystorm.bolt import Bolt

class MyBolt(Bolt):

    auto_ack = False
    auto_fail = False

    def process(self, tup):
        # do stuff...
        if error:
          self.fail(tup)  # perform failure manually
        self.ack(tup)  # perform acknowledgement manually

Handling Tick Tuples

Ticks tuples are built into Storm to provide some simple forms of cron-like behaviour without actually having to use cron. You can receive and react to tick tuples as timer events with your python bolts using pystorm too.

The first step is to override process_tick() in your custom Bolt class. Once this is overridden, you can set the storm option topology.tick.tuple.freq.secs=<frequency> to cause a tick tuple to be emitted every <frequency> seconds.

You can see the full docs for process_tick() in pystorm.bolt.Bolt.

Example:

from pystorm.bolt import Bolt

class MyBolt(Bolt):

    def process_tick(self, freq):
        # An action we want to perform at some regular interval...
        self.flush_old_state()

Then, for example, to cause process_tick() to be called every 2 seconds on all of your bolts that override it, you can launch your topology under sparse run by setting the appropriate -o option and value as in the following example:

$ sparse run -o "topology.tick.tuple.freq.secs=2" ...