This article is a short introduction into programming Hadoop using Clojure language.


The Hadoop is free implementation of infrastructure for scalable, distributed computing. It was started as implementation of ideas of MapReduce and GFS, that was introduced by Google, but later, many different projects were included into it. Hadoop is actively used by many projects, including commercial companies — Yahoo, LinkedIn, etc., and it allows to process big amounts of data using "standard" equipment.

Hadoop is written in Java, and this allows to easily use it from Clojure. To simplify Hadoop programming in Clojure, Stuart Sierra developed simple, but powerful package clojure-hadoop, that will be described in this article.

Installation and configuration

Process of minimal Hadoop installation is pretty simple and described in documentation. Customization of Hadoop for work in clustered environment is slightly complicated, but described in details in following document.

I want to mention, that the Cloudera company provides ready-to-work packages for different Linux distributions, so you can setup Hadoop and other packages using your favorite package manager. Besides this, Cloudera provides ready-to-use installation as disk image for VMWare, so you can download one archive, and get working environment.

The source code of clojure-hadoop is available from github — this version works with Hadoop version 0.19 and Clojure 1.1.0. If you want to use Hadoop 0.20, or newer, and/or Clojure 1.2.0, then you can take version with my changes. Build and installation is performed with Maven — you just need to execute the mvn install command.

Parts of clojure-hadoop

clojure-hadoop has several levels of abstraction. Each of these levels provided as separate namespace, that are described in abstraction increase order:

implements set of macros, that are used to generate classes for definition of MapReduce job. The mapper and reducer functions have the same set of arguments, as Java functions, and should have fixed names — mapper-map and reduce-reducer. The initialization of job, specification of input and output data, and other options, should be implemented inside the tool-run function.
implements wrapper functions, that simplify conversion of input and output data, making mapper and reducer functions more Clojurish. All other things are implemented the same way as in previous case.
implements all necessary functions (mapper-map, reduce-reducer and tool-run) and allows to use command line options to specify any mapper and reducer functions (written in Clojure), input and output data, and other parameters.
defines the defjob macro, that allows to define MapReduce job using Clojure code. You can specify some parameters using command line options (usually this is input and output data), while using this macro you can specify map and reduce functions, input/output data conversion functions, etc.

Besides these namespaces, there is also clojure-hadoop.imports namespace, that provides functions for importing of Hadoop's classes and interfaces, that makes developer's life much easier — these functions are used in all programs (all, or only some of them), independent on selected level of abstraction. Now following functions are defined:

How to program with clojure-hadoop

Depending of selected level of abstraction, you need to use different styles of programming. In most cases it's enough to use defjob, that hides most of the low-level details behind its interface, so you only need to implement logic as two functions. But I put here description of all namespaces, because sometimes you'll need to get access to low-level interfaces. All sections have links to examples, that you can find in clojure-hadoop's distribution.

Using clojure-hadoop.gen

Only two objects are defined in clojure-hadoop.gen namespace: the gen-job-classes macro and the gen-main-method function.

The gen-job-classes creates three classes, that are implementing org.apache.hadoop.mapred.Mapper, org.apache.hadoop.mapred.Reducer and org.apache.hadoop.util.Tool interfaces, correspondingly. Developer must define three functions: mapper-map, reducer-reduce and tool-run, that will be used as implementation of corresponding methods in concrete interfaces. The mapper-map function implements mapping of input data into intermediate data (the map part of MapReduce), the reducer-reduce function receives intermediate data and produces output data (the reduce part of MapReduce), and the tool-run function defines Hadoop's job options, prepares input and output parameters, etc. All function's parameters match to corresponding parameters of functions, defined in Hadoop interfaces. So, for detail information it's better to look into Hadoop's API.

The gen-main-method function creates standard main method, that executes the tool-run function using run method from org.apache.hadoop.util.ToolRunner class.

Complete example of code you can find in wordcount1 example.

Using clojure-hadoop.wrap

Use of functions, defined in clojure-hadoop.wrap allows to simplify implementation of mapper and reducer functions, make them more clojurish, because you'll work with Clojure data types, not with Hadoop classes. The wrap-map function implements wrapper for map function, and wrap-reduce — for reduce. Each of these functions could have from 1 to 3 arguments. First, mandatory argument — function to wrap. Second argument — function, that implements reading of data, and third argument — function, that implements writing of data. Library provides number of functions for reading and writing, and you can easily use them in your code.

Function, that implements the map part, receives two parameters — key and value, while reducer function receives key and list of values, generated by mapper function.

I need to mention, that the tool-run function should be implemented the same way as in previous case, and results of wrapped functions should have names mapper-map and reducer-reduce.

Example of code you can find in wordcount2.

Using clojure-hadoop.job

The code, implemented in clojure-hadoop.job namespace, additionally simplifies implementation of Hadoop jobs. It implements all necessary functions, so all you need — is to implement of map and reduce function, and specify them in command line options, together with other parameters. All details are handled by the clojure_hadoop.job class, that performs analysis of command line options and bind them with corresponding job's parameter.

Command line options are specified as -key value pairs separated by space. Following options are required:

comma-separated list of input paths;
directory name for output data;
full name of mapper function as namespace/name, or as class name, that implements the org.apache.hadoop.mapred.Mapper interface. You can also use the identity function, and input data will passed to reducer without processing;
full name of mapper function as namespace/name, or as class name, that implements the org.apache.hadoop.mapred.Reducer interface. You can also use the identity or none functions, and output data will written without any processing.

There are also optional command line parameters:

defines format for input data. Possible values — text for text data, seq for SequenceFile, or class name that implements corresponding format;
defines format for output data with same possible values as for -input-format;
class name for output data keys (output of reduce function);
class name for output data values (output of reduce function);
class name for intermediate data keys (output of map function);
class name for intermediate data keys values (output of map function);
name of function that performs reading of input data (input of map function), as namespace/name;
name of function that performs writing of intermediate data (output of map function), as namespace/name;
name of function that performs reading of intermediate data (input of reduce function), as namespace/name;
name of function that performs writing of output data (output of reduce function) , as namespace/name;
full name of combiner function as namespace/name, or as class name, that implements the org.apache.hadoop.mapred.Reducer interface. This function works like reduce, but it's used to combine map's results only on local node. This allows to decrease amount of data, transferred via network, and increase speed of processing. This function should accept data of the same type as reduce, and output data in the same format as a map! (This functionality is implemented only in my version of clojure-hadoop);
set job's name that will displayed in Hadoop's administrative interface;
if it set to true then job will remove directory with output data;
if it set to true job will perform compression of output data;
compression method — gzip, bzip2, default or class name;
compression type for SequenceFileblock, record or none.

You can find example of this approach in wordcount3 file — it consists only from two functions, and all parameters are specified via command-line options.

Using clojure-hadoop.defjob

The defjob macro, defined in namespace with the same name, allows to specify some part of clojure-hadoop.job's configuration parameters directly in source code, while rest of parameters could be specified from command line on invocation. defjob's options are specified as keywords wit the same names as corresponding command line options, but without minus sign at start of option. The only required parameter for macro is name of job.

For example, following code could be used to define job with name job:

(defjob/defjob job
  :map my-map
  :map-reader wrap/int-string-map-reader
  :reduce my-reduce
  :input-format :text)

and after definition, we can run this job using command line option -job job-name (as full with namespace), instead of specifying separate options -map and -reduce.

Usage of defjob macro is shown in wordcount4 and wordcount5 examples. The only different between them is that in wordcount5 in job's definition the reader and writer functions are specified. Full example with defjob you can see below.

Full example

As example, I want to show small program, that generates sets of N-Grams from given files — I use generated databases for document classification tasks. The source code is available on github, in hadoop1 directory. This example uses defjob macro to describe job and user-defined mapper & reducer functions.

All example consists from one file with source code, that implements functions my-map and my-reduce, that are used in job's specification together with input and output formats specifications, etc. As usual in MapReduce, the my-map function accepts two parameters — key (integer number) and value (string) and produces list of pairs of string/integer, where string is n-gram. The my-reduce function is very simple — it just sum all entries for given key.

(ns hadoop1
    (:require [clojure-hadoop.wrap :as wrap]
              [clojure-hadoop.defjob :as defjob]
              [clojure-hadoop.imports :as imp])
    (:use clojure.contrib.seq-utils)
    (:require [clojure.contrib.str-utils2 :as str2])
    (:import (java.util StringTokenizer)))


(def delimiters "0123456789[ \t\n\r!\"#$%&'()*+,./:;<=>?@\\^`{|}~-]+")

(defn gen-n-grams [#^String s #^Integer n]
  (when (> (.length s) 0)
      (let [str (str " " s (String. ) (str2/repeat " " (- n 1)))]
        (reduce (fn [val elem]
                  (conj val (.substring str elem (+ elem n))))
                (range 0 (+ 1 (.length s)))))))

(defn my-map [key #^String value]
  (map (fn [token] [token 1])
       (flatten (map #(gen-n-grams %1 3)
                     (enumeration-seq (StringTokenizer. value delimiters))))))

(defn my-reduce [key values-fn]
  [ [key (reduce + (values-fn))] ])

(defn string-long-writer [#^OutputCollector output
                          #^String key value]
  (.collect output (Text. key) (LongWritable. value)))

(defn string-long-reduce-reader [#^Text key wvalues]
  [(.toString key)
   (fn [] (map (fn [#^LongWritable v] (.get v))
               (iterator-seq wvalues)))])

(defjob/defjob job
  :map my-map
  :map-reader wrap/int-string-map-reader
  :map-writer string-long-writer
  :reduce my-reduce
  :reduce-reader string-long-reduce-reader
  :reduce-writer string-long-writer
  :output-key Text
  :output-value LongWritable
  :input-format :text
  :output-format :text
  :compress-output false)

This code is based on wordcount5 example from clojure-hadoop distribution. To build it we're using following project for Leiningen:

(defproject hadoop1 "1.0"
  :description "hadoop1"
  :dependencies [[org.clojure/clojure "1.1.0"]
                 [org.clojure/clojure-contrib "1.1.0"]
                 [com.stuartsierra/clojure-hadoop "1.2.0-SNAPSHOT"]]

To run this job, we need to combine all code into one archive, so we need to use lein uberjar to combine our Clojure code with all necessary libraries1. After this, you can run this job either in autonomous mode, without running Hadoop and working with local files2, or in cluster mode. To run job in autonomous mode you can use following command:

java -cp hadoop1-standalone.jar clojure_hadoop.job -job hadoop1/job -input FILE -output out

specify any text file instead of FILE argument, and after execution, you'll get generated file with n-grams in the out directory. This file can be used as database for language detection & text classification tasks.

To run your code in Hadoop cluster, you need to put files onto HDFS (into input directory, in our example), and run task with following command:

hadoop jar hadoop1-standalone.jar clojure_hadoop.job -job hadoop1/job -input input -output output

And after finishing of task, data will put into the output directory3 on HDFS, and you can get access to them with standard HDFS commands.

Additional information

This information is just an introduction into use of Clojure for Hadoop programming. Additional examples of clojure-hadoop usage you can find in following materials:

Additional information about programming for Hadoop and MapReduce you can find in following materials:

It also worth to mention the Cascalog project, that implements DSL, that allows to query data in Hadoop using Clojure.

1. You can also use the lein hadoop command (implemented by lein-hadoop plugin), that creates correct archive with Hadoop task (packing all dependencies and necessary information).

2. Execution of job in autonomous is very handy for debugging your code, because it executed in separate JVM instance and works with local files.

3. Output directory shouldn't exists, or job will not run. If you don't need files output directory, then you can use command line option -replace true.

Last change: 05.03.2013 16:54

blog comments powered by Disqus