Streaming MapReduce¶
This document walks through how to implement a simple streaming application using Ray’s actor capabilities. It implements a streaming MapReduce which computes word counts on wikipedia articles.
You can view the code for this example.
To run the example, you need to install the dependencies
pip install wikipedia
and then execute the script as follows:
python ray/doc/examples/streaming/streaming.py
For each round of articles read, the script will output the top 10 words in these articles together with their word count:
article index = 0
the 2866
of 1688
and 1448
in 1101
to 593
a 553
is 509
as 325
are 284
by 261
article index = 1
the 3597
of 1971
and 1735
in 1429
to 670
a 623
is 578
as 401
by 293
for 285
article index = 2
the 3910
of 2123
and 1890
in 1468
to 658
a 653
is 488
as 364
by 362
for 297
article index = 3
the 2962
of 1667
and 1472
in 1220
a 546
to 538
is 516
as 307
by 253
for 243
article index = 4
the 3523
of 1866
and 1690
in 1475
to 645
a 583
is 572
as 352
by 318
for 306
...
Note that this examples uses distributed actor handles, which are still considered experimental.
There is a Mapper
actor, which has a method get_range
used to retrieve
word counts for words in a certain range:
@ray.remote
class Mapper(object):
def __init__(self, title_stream):
# Constructor, the title stream parameter is a stream of wikipedia
# article titles that will be read by this mapper
def get_range(self, article_index, keys):
# Return counts of all the words with first
# letter between keys[0] and keys[1] in the
# articles that haven't been read yet with index
# up to article_index
The Reducer
actor holds a list of mappers, calls get_range
on them
and accumulates the results.
@ray.remote
class Reducer(object):
def __init__(self, keys, *mappers):
# Constructor for a reducer that gets input from the list of mappers
# in the argument and accumulates word counts for words with first
# letter between keys[0] and keys[1]
def next_reduce_result(self, article_index):
# Get articles up to article_index that haven't been read yet,
# accumulate the word counts and return them
On the driver, we then create a number of mappers and reducers and run the streaming MapReduce:
streams = # Create list of num_mappers streams
keys = # Partition the keys among the reducers.
# Create a number of mappers.
mappers = [Mapper.remote(stream) for stream in streams]
# Create a number of reduces, each responsible for a different range of keys.
# This gives each Reducer actor a handle to each Mapper actor.
reducers = [Reducer.remote(key, *mappers) for key in keys]
article_index = 0
while True:
counts = ray.get([reducer.next_reduce_result.remote(article_index)
for reducer in reducers])
article_index += 1
The actual example reads a list of articles and creates a stream object which produces an infinite stream of articles from the list. This is a toy example meant to illustrate the idea. In practice we would produce a stream of non-repeating items for each mapper.