Spouts and Bolts¶
The general flow for creating new spouts and bolts using pystorm is to add
them to your
src folder and update the corresponding topology definition.
Let’s create a spout that emits sentences until the end of time:
import itertools from pystorm.spout import Spout class SentenceSpout(Spout): def initialize(self, stormconf, context): self.sentences = [ "She advised him to take a long holiday, so he immediately quit work and took a trip around the world", "I was very glad to get a present from her", "He will be here in half an hour", "She saw him eating a sandwich", ] self.sentences = itertools.cycle(self.sentences) def next_tuple(self): sentence = next(self.sentences) self.emit([sentence]) def ack(self, tup_id): pass # if a tuple is processed properly, do nothing def fail(self, tup_id): pass # if a tuple fails to process, do nothing
The magic in the code above happens in the
next_tuple() functions. Once the spout enters the main run loop,
pystorm will call your spout’s
After initialization is complete, pystorm will continually call the spout’s
next_tuple() method where you’re expected to emit tuples that match
whatever you’ve defined in your topology definition.
Now let’s create a bolt that takes in sentences, and spits out words:
import re from pystorm.bolt import Bolt class SentenceSplitterBolt(Bolt): def process(self, tup): sentence = tup.values # extract the sentence sentence = re.sub(r"[,.;!\?]", "", sentence) # get rid of punctuation words = [[word.strip()] for word in sentence.split(" ") if word.strip()] if not words: # no words to process in the sentence, fail the tuple self.fail(tup) return for word in words: self.emit([word]) # tuple acknowledgement is handled automatically
The bolt implementation is even simpler. We simply override the default
process() method which pystorm calls when a tuple has been emitted by
an incoming spout or bolt. You are welcome to do whatever processing you would
like in this method and can further emit tuples or not depending on the purpose
of your bolt.
process() method completes without raising an Exception, pystorm
will automatically ensure any emits you have are anchored to the current tuple
being processed and acknowledged after
If an Exception is raised while
process() is called, pystorm
automatically fails the current tuple prior to killing the Python process.
In the example above, we added the ability to fail a sentence tuple if it did
not provide any words. What happens when we fail a tuple? Storm will send a
“fail” message back to the spout where the tuple originated from (in this case
SentenceSpout) and pystorm calls the spout’s
fail() method. It’s then up to your spout
implementation to decide what to do. A spout could retry a failed tuple, send
an error message, or kill the topology.
Bolt Configuration Options¶
You can disable the automatic acknowleding, anchoring or failing of tuples by
adding class variables set to false for:
auto_fail. All three options are documented in
from pystorm.bolt import Bolt class MyBolt(Bolt): auto_ack = False auto_fail = False def process(self, tup): # do stuff... if error: self.fail(tup) # perform failure manually self.ack(tup) # perform acknowledgement manually
Handling Tick Tuples¶
Ticks tuples are built into Storm to provide some simple forms of cron-like behaviour without actually having to use cron. You can receive and react to tick tuples as timer events with your python bolts using pystorm too.
The first step is to override
process_tick() in your custom
Bolt class. Once this is overridden, you can set the storm option
topology.tick.tuple.freq.secs=<frequency> to cause a tick tuple
to be emitted every
You can see the full docs for
from pystorm.bolt import Bolt class MyBolt(Bolt): def process_tick(self, freq): # An action we want to perform at some regular interval... self.flush_old_state()
Then, for example, to cause
process_tick() to be called every
2 seconds on all of your bolts that override it, you can launch
your topology under
sparse run by setting the appropriate -o
option and value as in the following example:
$ sparse run -o "topology.tick.tuple.freq.secs=2" ...