Pipelines Using Fibers in Ruby 1.9
Users of the command line are familiar with the idea of building pipelines: a chain of simple commands strung together to the output of one becomes the input of the next. Using pipelines and a basic set of primitives, shell users can accomplish some sophisticated tasks. Here’s a basic Unix shell pipeline that reports the ten longest .tip files in the current directory, based on the number of lines in each file:
wc -l *.tip | grep \.tip | sort -n | tail -10
Let’s see how to add something similar to Ruby. By the end of this set of two articles, we’ll be able to write things like
puts (even_numbers | tripler | incrementer | multiple_of_five ).resume
and a palindrome finder using blocks:
= Pump.new %w{Madam, the civic radar rotator is not level.}
words = Filter.new {|word| word == word.reverse}
is_palindrome
= words .| {|word| word.downcase.tr("^a-z", '') } .| is_palindrome
pipeline
while word = pipeline.resume
puts word
end
Great code? Nope. But getting there is fun. And, who knows? The techniques might well be useful in your next project.
A Daily Dose of Fiber
Ruby 1.9 adds support for Fibers. At their most basic, let you create simple generators (much as you could do previously with blocks. Here’s a trivial example: a fiber that generates successive Fibonacci numbers:
= Fiber.new do
fib = f2 = 1
f1 loop do
Fiber.yield f1
= f2, f1 + f2
f1, f2 end
end
10.times { puts fib.resume }
A fiber is somewhat like a thread, except you have control over when it gets scheduled. Initially, a fiber is suspended. When you resume it, it runs the block until the block finishes, or it hits a Fiber.yield
. This is similar to a regular block yield: it suspends the fiber and passes control back to the resume
. Any value passed to Fiber.yield
becomes the value returned by resume
.
By default, a fiber can only yield back to the code that resumed it. However, if you require the "fiber"
library, Fibers get extended with a transfer
method that allows one fiber to transfer control to another. Fibers then become fully fledged coroutines. However, we won’t be needing all that power today.
Instead, let’s get back to the idea of creating pipelines of functionality in code, much as you can create pipelines in the shell.
As a starting point, let’s write two fibers. One’s a generator—it creates a list of even numbers. The second is a consumer. All it does it accept values from the generator and print them. We’ll make the consumer stop after printing 10 numbers.
= Fiber.new do
evens = 0
value loop do
Fiber.yield value
+= 2
value end
end
= Fiber.new do
consumer 10.times do
= evens.resume
next_value puts next_value
end
end
.resume consumer
Note how we had to use resume
to kick off the consumer. Technically, the consumer doesn’t have to be a Fiber, but, as we’ll see in a minute, making it one gives us some flexibility.
As a next step, notice how we’ve created some coupling in this code. Our consumer
fiber has the name of the evens generator coded into it. Let’s wrap both fibers in a method, and pass the name of the generator into the consumer
method.
def evens
Fiber.new do
= 0
value loop do
Fiber.yield value
+= 2
value end
end
end
def consumer(source)
Fiber.new do
10.times do
= source.resume
next_value puts next_value
end
end
end
.resume consumer(evens)
OK. Let’s add one more fiber to the weave. We’ll create a filter that only passes on numbers that are multiples of three. Again, we’ll wrap it in a method.
def evens
Fiber.new do
= 0
value loop do
Fiber.yield value
+= 2
value end
end
end
def multiples_of_three(source)
Fiber.new do
loop do
= source.resume
next_value Fiber.yield next_value if next_value % 3 == 0
end
end
end
def consumer(source)
Fiber.new do
10.times do
= source.resume
next_value puts next_value
end
end
end
.resume consumer(multiples_of_three(evens))
Running this, we get the output
0
6
12
18
. . .
This is getting cool. We write little chunks of code, and then combine them to get work done. Just like a pipeline. Except…
We can do better. First, the composition looks backwards. Because we’re passing methods to methods, we write
consumer(multiples_of_three(evens))
Instead, we’d like to write
| multiples_of_three | consumer evens
Also, there’s a fair amount of duplication in this code. Each of our little pipeline methods has the same overall structure, and each is coupled to the implementation of fibers. Let’s see if we can fix this.
Wrapping Fibers
As is usual when we’re refactoring towards a solution, we’re about to get really messy. Don’t worry, though. It will all wash off, and we’ll end up with something a lot neater.
First, let’s create a class that represents something that can appear in our pipeline. At it’s heart is theprocess
method. This reads something from the input side of the pipe, then “handles” that value. The default handling is to write that value to the output side of the pipeline, passing it on to the next element in the chain.
class PipelineElement
attr_accessor :source
def initialize
@fiber_delegate = Fiber.new do
processend
end
def resume
@fiber_delegate.resume
end
def process
while value = input
handle_value(value)end
end
def handle_value(value)
output(value)end
def input
.resume
sourceend
def output(value)
Fiber.yield(value)
end
end
When I first wrote this, I was tempted to make PipelineElement
a subclass of Fiber
, but that leads to coupling. In the end, the pipeline elements delegate to a separate Fiber
object.
The first element of the pipeline doesn’t receive any input from prior elements (because there are no prior elements), so we need to override its process
method.
class Evens < PipelineElement
def process
= 0
value loop do
output(value)+= 2
value end
end
end
= Evens.new evens
Just to make things more interesting, we’ll create a generic MultiplesOf filter, so we can filter based on any number, and not just 3:
class MultiplesOf < PipelineElement
def initialize(factor)
@factor = factor
super()
end
def handle_value(value)
if value % @factor == 0
output(value) end
end
= MultiplesOf.new(3)
multiples_of_three = MultiplesOf.new(7) multiples_of_seven
Then we just knit it all together into a pipeline:
.source = evens
multiples_of_three.source = multiples_of_three
multiples_of_seven
10.times do
puts multiples_of_seven.resume
end
We get 0, 42, 84, 126, 168, and so on as output. (Any output stream that contains 42 must be correct, so no need for any unit tests here.)
But we’re still a little way from our ideal of being able to pipe these puppies together. It’s a good thing that Ruby let’s us override the “|” operator. Up in classPipelineElement
, define a new method:
def |(other)
.source = self
other
otherend
This allows us to write:
10.times do
puts (evens | multiples_of_three | multiples_of_seven).resume
end
or even:
= evens | multiples_of_three | multiples_of_seven
pipeline
10.times do
puts pipeline.resume
end
Cool, or what?
In The Next Thrilling Installment
The next post will take these basic ideas and tart them up a bit, allowing us to use blocks directly in pipelines. We’ll also reveal why our PipelineElement
class I just wrote is somewhat more complicated than might seem necessary. In the meantime, here’s the full source of the code so far.
class PipelineElement
attr_accessor :source
def initialize
@fiber_delegate = Fiber.new do
processend
end
def |(other)
.source = self
other
otherend
def resume
@fiber_delegate.resume
end
def process
while value = input
handle_value(value)end
end
def handle_value(value)
output(value)end
def input
.resume
sourceend
def output(value)
Fiber.yield(value)
end
end
##
# The classes below are the elements in our pipeline
#
class Evens < PipelineElement
def process
= 0
value loop do
output(value)+= 2
value end
end
end
class MultiplesOf < PipelineElement
def initialize(factor)
@factor = factor
super()
end
def handle_value(value)
if value % @factor == 0
output(value) end
end
= Evens.new
evens = MultiplesOf.new(3)
multiples_of_three = MultiplesOf.new(7)
multiples_of_seven
= evens | multiples_of_three | multiples_of_seven
pipeline
10.times do
puts pipeline.resume
end