Tags

, ,

One of the more compelling aspects of Riak is the general concept of moving processing to the data. In a distributed data store like Riak, this makes a lot of sense since each node can process large chunks of data, thereby speeding up the processing. The primary interface for this is to run map/reduce jobs. In Riak, m/r can be used to do pretty much any operation over data that you wish, given that you adhere to a few basic principles. This post moves beyond the basics and discusses some tips and tricks for running complex map/reduce jobs within Riak. Most of this discussion is centered around the erlang API, but some aspects apply to the Javascript world as well.

Map Phases

All jobs must start with a map phase.  From the wiki, “[t]he input list to a map phase must be a list of (possibly annotated) bucket-key pairs” [1]. This description focuses on the call to the map runner. The corresponding function API of the actual job is slightly different and is best shown via example.

Non-Annotated Jobs

Running non-annotated jobs is the simplest and most straight-forward way to run a map/reduce job. It is necessary that all the objects exist in the list of tuples.

[ {<<"bucket.1">>, <<"key.1">>},
  {<<"bucket.1">>, <<"key.2">>},
  {<<"bucket.1">>, <<"key.3">>} ]

Upon execution, Riak will attempt to retrieve each object described in the input definition above and call the map function as

MapFunction(RiakObject, undefined, none)

where the object identified by {Bucket,Key} is passed to the function as a RiakObject with the contents accessible via riak_object:get_value(RiakObject). The second argument is the KeyData, which is undefined since this job is not annotated. The third argument is a static argument that is passed to the function. This argument must be defined in the phase spec prior to executing the job.

The map output is expected to be a list, which is concatenated to the results of all the nodes processing the same map phase. In a simple map/reduce job, the output can be an arbitrary list that is passed to the reduce phase.

Annotated Jobs

An annotated bucket-key pair simply means that you are passing key data along with the bucket-key tuple.

[ {{<<"bucket.1">>, <<"key.1">>}, [{property_1,value_1}, {property_2,value_2}] },
  {<<"bucket.1">>, <<"key.2">>}, [{property_1,value_1}, {property_2,value_2}] },
  {<<"bucket.1">>, <<"key.3">>}, [{property_1,value_1}, {property_2,value_2}] } ]

This in turn is used in the function call as

MapFunction(RiakObject, [{property_1,value_1}, {property_2,value_2}], none)

Given the inclusion of a static argument, the KeyData term seems a bit redundant. However, it turns out to be an indispensable part of the system due to the requirement that all inputs to map jobs must be valid bucket-key pairs*. In practice the input list is really only used to control the dimensions of the processing, while actual data passing between phases occurs via the KeyData.

Reduce

Unlike map phases, reduce jobs are fairly low maintenance, requiring only a List as input; the rest is up to you.

The important thing to understand is that the function defining the reduce phase may be evaluated multiple times, and the input of later evaluations will include the output of earlier evaluations. [1]

Basho presents a few approaches to handle this situation, where the actual solution is dependent on the type of reduce operation you are performing. For something like a set union (their example) it is clear that the input shape and the output shape are the same (i.e. a list of numbers). Clearly this doesn’t apply to all situations, and in that event, it’s best to use pattern matching or some other mechanism to detect the shape of each input element.

No KeyData is passed to reduce phases, so these functions only have an arity of 2. Like map phases, a static argument is available and set when declaring the job.

ReduceFunction(List, none)

Since the input is a list, the output of the function must also be a list.

Arbitrary Chains

Map and reduce comprise a limited vocabulary of functional operations. To describe operations exclusively using these functions, complex chains of phases can result. This can be accomplished safely in Riak by following a few simple guidelines. For example, suppose a job is defined as

map -> reduce -> reduce -> map -> reduce

This is not contrived, as I am working on a benchmarking tool that needs to calculate aggregate statistics over multiple iterations of a process. To accomplish arbitrary chains of map/reduce phases, it is critical that the data passed to each phase is compatible with the phases adjacent to it. Based on the APIs, some sequences are free. Anything that looks like

map -> reduce -> … -> reduce

will likely work without a lot of data massaging. Things aren’t so rosy when either of these situations arise:

map -> map

map -> reduce -> map

reduce -> map -> reduce

Jobs structured like these will likely fail due to data mismatches. Under these circumstances, the KeyData field pulls everything together.

Map to Map

When chaining map phases together, the output of the first map phase must adhere to the input rules of a map phase. i.e. the input can either be a list of (possibly annotated) valid bucket-key pairs or the atom ‘none’. Consequently you cannot actually use the RiakObject to hold any data generated in the map phase. Instead you must pass that data via the annotation. In code, this would look like

map_1(RiakObject, _, _) ->
  Data = riak_object:get_value(RiakObject),
  Result = do_stuff(Data),
  [ {{none,none}, Result} ].

map_2(_RiakObject, KeyData, _) ->
  Result = do_other_stuff(KeyData),
  [ Result ].

Hence, the usable input of the second phase is the KeyData, which contains the result of the first map phase. The RiakObject in the second phase is essentially garbage and is ignored.

Reduce to Map

If an intermediate reduce phase needs to pass data onto another map phase, the same data format issues will present themselves. The solution is not so different from the map -> map example, except that there is no RiakObject to give you buckets and keys. Thankfully, since {none,none} is considered valid, this is usually sufficient, assuming KeyData is used exclusively for further processing.

reduce_1(List, _) ->
  Result = do_stuff(List),
  [ {{none,none}, Result} ].

map_2(_RiakObject, Data, _) ->
  Result = do_other_stuff(Data),
  [ Result ].

In my branch of rekon, I provide a reduce phase called as_keydata.red that transforms any list into a list of annotated bucket-key pairs. This removes data massaging from user map/reduce phases, increasing the probability that the phases can be reused.

Starting Jobs with Reduce

Sometimes you need to reduce data just to prepare it for the actual data processing work. In this situation, the arities mismatch, making it rather challenging to start a job with a reduce phase. For these situations, you can use the binary_to_term.map phase again provided in my Rekon branch to convert RiakObjects into a list of raw values. Clearly this only works for erlang binaries, but the pattern is the same for other data formats.

Integrating with Rekon

My branch of Rekon supports running map/reduce jobs from within Rekon. All these jobs have a JSON conversion step that is applied to the final output of the job. If you are working in erlang, this can present parsing issues. To get around it, there is a sample phase I provide (erl_to_string.red) that converts the result into a string. Clearly there are more sophisticated ways of transforming the data, but this gives a one-size-fits all method to viewing your job output.

Notes

* I recently discovered that the bucket and key do not have to be strictly valid, insomuch that the atom ‘none’ will allow Riak to continue processing

References

[1] http://wiki.basho.com/MapReduce.html