This article is a short introduction into programming Hadoop using Clojure language.
The Hadoop is free implementation of infrastructure for scalable, distributed computing. It was started as implementation of ideas of MapReduce and GFS, that was introduced by Google, but later, many different projects were included into it. Hadoop is actively used by many projects, including commercial companies — Yahoo, LinkedIn, etc., and it allows to process big amounts of data using "standard" equipment.
Hadoop is written in Java, and this allows to easily use it from Clojure. To simplify
Hadoop programming in Clojure, Stuart Sierra developed simple, but powerful package
clojure-hadoop, that will be described in this article.
Process of minimal Hadoop installation is pretty simple and described in documentation. Customization of Hadoop for work in clustered environment is slightly complicated, but described in details in following document.
I want to mention, that the Cloudera company provides ready-to-work packages for different Linux distributions, so you can setup Hadoop and other packages using your favorite package manager. Besides this, Cloudera provides ready-to-use installation as disk image for VMWare, so you can download one archive, and get working environment.
The source code of clojure-hadoop is available from github — this version works with
Hadoop version 0.19 and Clojure 1.1.0. If you want to use Hadoop 0.20, or newer, and/or
Clojure 1.2.0, then you can take version with my changes. Build and installation is
performed with Maven — you just need to execute the mvn install command.
clojure-hadoop has several levels of abstraction. Each of these levels provided as
separate namespace, that are described in abstraction increase order:
clojure-hadoop.genmapper-map and
reduce-reducer. The initialization of job, specification of input and output data, and
other options, should be implemented inside the tool-run function.clojure-hadoop.wrapclojure-hadoop.jobmapper-map, reduce-reducer and
tool-run) and allows to use command line options to specify any mapper and reducer
functions (written in Clojure), input and output data, and other parameters.clojure-hadoop.defjobdefjob macro, that allows to define MapReduce job
using Clojure code. You can specify some parameters using command line options
(usually this is input and output data), while using this macro you can specify map and
reduce functions, input/output data conversion functions, etc.Besides these namespaces, there is also clojure-hadoop.imports namespace, that provides
functions for importing of Hadoop's classes and interfaces, that makes developer's life
much easier — these functions are used in all programs (all, or only some of them),
independent on selected level of abstraction. Now following functions are defined:
import-io — to import classes and interfaces from org.apache.hadoop.io package;import-io-compress — to import classes and interfaces from
org.apache.hadoop.io.compress package;import-fs — to import classes and interfaces from org.apache.hadoop.fs package;import-mapred — to import classes and interfaces from org.apache.hadoop.mapred
package;import-mapred-lib — to import classes and interfaces from org.apache.hadoop.mapred.lib
package.Depending of selected level of abstraction, you need to use different styles of
programming. In most cases it's enough to use defjob, that hides most of the low-level
details behind its interface, so you only need to implement logic as two functions. But I
put here description of all namespaces, because sometimes you'll need to get access to
low-level interfaces. All sections have links to examples, that you can find in
clojure-hadoop's distribution.
Only two objects are defined in clojure-hadoop.gen namespace: the gen-job-classes macro
and the gen-main-method function.
The gen-job-classes creates three classes, that are implementing
org.apache.hadoop.mapred.Mapper, org.apache.hadoop.mapred.Reducer and
org.apache.hadoop.util.Tool interfaces, correspondingly. Developer must define three
functions: mapper-map, reducer-reduce and tool-run, that will be used as implementation of
corresponding methods in concrete interfaces. The mapper-map function implements mapping
of input data into intermediate data (the map part of MapReduce), the reducer-reduce
function receives intermediate data and produces output data (the reduce part of
MapReduce), and the tool-run function defines Hadoop's job options, prepares input and
output parameters, etc. All function's parameters match to corresponding parameters of
functions, defined in Hadoop interfaces. So, for detail information it's better to look
into Hadoop's API.
The gen-main-method function creates standard main method, that executes the tool-run
function using run method from org.apache.hadoop.util.ToolRunner class.
Complete example of code you can find in wordcount1 example.
Use of functions, defined in clojure-hadoop.wrap allows to simplify implementation of
mapper and reducer functions, make them more clojurish, because you'll work with Clojure
data types, not with Hadoop classes. The wrap-map function implements wrapper for map
function, and wrap-reduce — for reduce. Each of these functions could have from 1 to 3
arguments. First, mandatory argument — function to wrap. Second argument — function,
that implements reading of data, and third argument — function, that implements writing
of data. Library provides number of functions for reading and writing, and you can easily
use them in your code.
Function, that implements the map part, receives two parameters — key and value, while reducer function receives key and list of values, generated by mapper function.
I need to mention, that the tool-run function should be implemented the same way as in
previous case, and results of wrapped functions should have names mapper-map and
reducer-reduce.
Example of code you can find in wordcount2.
The code, implemented in clojure-hadoop.job namespace, additionally simplifies
implementation of Hadoop jobs. It implements all necessary functions, so all you need —
is to implement of map and reduce function, and specify them in command line options,
together with other parameters. All details are handled by the clojure_hadoop.job class,
that performs analysis of command line options and bind them with corresponding job's
parameter.
Command line options are specified as -key value pairs separated by space. Following
options are required:
-input-output-mapnamespace/name, or as class name, that implements
the org.apache.hadoop.mapred.Mapper interface. You can also use the
identity function, and input data will passed to reducer without processing;-reducenamespace/name, or as class name, that
implements the org.apache.hadoop.mapred.Reducer interface. You can also use the identity
or none functions, and output data will written without any processing.There are also optional command line parameters:
-input-formattext for text data,
seq for SequenceFile, or class name that implements corresponding format;-output-format-input-format;-output-keyreduce function);-output-valuereduce function);-map-output-keymap function);-map-output-valuemap function);-map-readermap
function), as namespace/name;-map-writermap
function), as namespace/name;-reduce-readerreduce function), as namespace/name;-reduce-writerreduce
function) , as namespace/name;-combinenamespace/name, or as class name, that
implements the org.apache.hadoop.mapred.Reducer interface. This function works like
reduce, but it's used to combine map's results only on local node. This allows to
decrease amount of data, transferred via network, and increase speed of processing.
This function should accept data of the same type as reduce, and output data in the same
format as a map! (This functionality is implemented only in my version of
clojure-hadoop);-name-replacetrue then job will remove directory with output data;-compress-outputtrue job will perform compression of output data;-output-compressorgzip, bzip2, default or class name;-compression-typeSequenceFile —
block, record or none.You can find example of this approach in wordcount3 file — it consists only from two functions, and all parameters are specified via command-line options.
The defjob macro, defined in namespace with the same name, allows to specify some part of
clojure-hadoop.job's configuration parameters directly in source code, while
rest of parameters could be specified from command line on invocation.
defjob's options are specified as keywords wit the same names as
corresponding command line options, but without minus sign at start of option. The only
required parameter for macro is name of job.
For example, following code could be used to define job with name job:
(defjob/defjob job :map my-map :map-reader wrap/int-string-map-reader :reduce my-reduce :input-format :text)
and after definition, we can run this job using command line option -job job-name (as full
with namespace), instead of specifying separate options -map and -reduce.
Usage of defjob macro is shown in wordcount4 and wordcount5 examples. The only different
between them is that in wordcount5 in job's definition the reader and writer functions are
specified. Full example with defjob you can see below.
As example, I want to show small program, that generates sets of N-Grams from given files
— I use generated databases for document classification tasks. The source code is
available on github, in hadoop1 directory. This example uses defjob macro to describe job
and user-defined mapper & reducer functions.
All example consists from one file with source code, that implements functions my-map and
my-reduce, that are used in job's specification together with input and output formats
specifications, etc. As usual in MapReduce, the my-map function accepts two parameters —
key (integer number) and value (string) and produces list of pairs of string/integer,
where string is n-gram. The my-reduce function is very simple — it just sum all entries
for given key.
(ns hadoop1 (:require [clojure-hadoop.wrap :as wrap] [clojure-hadoop.defjob :as defjob] [clojure-hadoop.imports :as imp]) (:use clojure.contrib.seq-utils) (:require [clojure.contrib.str-utils2 :as str2]) (:import (java.util StringTokenizer))) (imp/import-io) (imp/import-mapred) (def delimiters "0123456789[ \t\n\r!\"#$%&'()*+,./:;<=>?@\\^`{|}~-]+") (defn gen-n-grams [#^String s #^Integer n] (when (> (.length s) 0) (let [str (str " " s (String. ) (str2/repeat " " (- n 1)))] (reduce (fn [val elem] (conj val (.substring str elem (+ elem n)))) [] (range 0 (+ 1 (.length s))))))) (defn my-map [key #^String value] (map (fn [token] [token 1]) (flatten (map #(gen-n-grams %1 3) (enumeration-seq (StringTokenizer. value delimiters)))))) (defn my-reduce [key values-fn] [ [key (reduce + (values-fn))] ]) (defn string-long-writer [#^OutputCollector output #^String key value] (.collect output (Text. key) (LongWritable. value))) (defn string-long-reduce-reader [#^Text key wvalues] [(.toString key) (fn [] (map (fn [#^LongWritable v] (.get v)) (iterator-seq wvalues)))]) (defjob/defjob job :map my-map :map-reader wrap/int-string-map-reader :map-writer string-long-writer :reduce my-reduce :reduce-reader string-long-reduce-reader :reduce-writer string-long-writer :output-key Text :output-value LongWritable :input-format :text :output-format :text :compress-output false)
This code is based on wordcount5 example from clojure-hadoop distribution. To build it
we're using following project for Leiningen:
(defproject hadoop1 "1.0" :description "hadoop1" :dependencies [[org.clojure/clojure "1.1.0"] [org.clojure/clojure-contrib "1.1.0"] [com.stuartsierra/clojure-hadoop "1.2.0-SNAPSHOT"]] )
To run this job, we need to combine all code into one archive, so we need to use lein
uberjar to combine our Clojure code with all necessary libraries1. After this, you can run
this job either in autonomous mode, without running Hadoop and working with local
files2, or in cluster mode. To run job in autonomous mode you can use following command:
java -cp hadoop1-standalone.jar clojure_hadoop.job -job hadoop1/job -input FILE -output out
specify any text file instead of FILE argument, and after execution, you'll get generated
file with n-grams in the out directory. This file can be used as database for language
detection & text classification tasks.
To run your code in Hadoop cluster, you need to put files onto HDFS (into input directory,
in our example), and run task with following command:
hadoop jar hadoop1-standalone.jar clojure_hadoop.job -job hadoop1/job -input input -output output
And after finishing of task, data will put into the output directory3 on HDFS, and you
can get access to them with standard HDFS commands.
This information is just an introduction into use of Clojure for Hadoop
programming. Additional examples of clojure-hadoop usage you can find in following
materials:
clojure-hadoop distribution.Additional information about programming for Hadoop and MapReduce you can find in following materials:
It also worth to mention the Cascalog project, that implements DSL, that allows to query data in Hadoop using Clojure.
1. You can also use the lein hadoop command (implemented by lein-hadoop plugin), that
creates correct archive with Hadoop task (packing all dependencies and necessary
information).
2. Execution of job in autonomous is very handy for debugging your code, because it executed in separate JVM instance and works with local files.
3. Output directory shouldn't exists, or job will not run. If you don't need files output
directory, then you can use command line option -replace true.
Last change: 05.03.2013 16:54