This article is a short introduction into programming Hadoop using Clojure language.
The Hadoop is free implementation of infrastructure for scalable, distributed computing. It was started as implementation of ideas of MapReduce and GFS, that was introduced by Google, but later, many different projects were included into it. Hadoop is actively used by many projects, including commercial companies — Yahoo, LinkedIn, etc., and it allows to process big amounts of data using "standard" equipment.
Hadoop is written in Java, and this allows to easily use it from Clojure. To simplify
Hadoop programming in Clojure, Stuart Sierra developed simple, but powerful package
clojure-hadoop
, that will be described in this article.
Process of minimal Hadoop installation is pretty simple and described in documentation. Customization of Hadoop for work in clustered environment is slightly complicated, but described in details in following document.
I want to mention, that the Cloudera company provides ready-to-work packages for different Linux distributions, so you can setup Hadoop and other packages using your favorite package manager. Besides this, Cloudera provides ready-to-use installation as disk image for VMWare, so you can download one archive, and get working environment.
The source code of clojure-hadoop
is available from github — this version works with
Hadoop version 0.19 and Clojure 1.1.0. If you want to use Hadoop 0.20, or newer, and/or
Clojure 1.2.0, then you can take version with my changes. Build and installation is
performed with Maven — you just need to execute the mvn install
command.
clojure-hadoop
has several levels of abstraction. Each of these levels provided as
separate namespace, that are described in abstraction increase order:
clojure-hadoop.gen
mapper-map
and
reduce-reducer
. The initialization of job, specification of input and output data, and
other options, should be implemented inside the tool-run
function.clojure-hadoop.wrap
clojure-hadoop.job
mapper-map
, reduce-reducer
and
tool-run
) and allows to use command line options to specify any mapper and reducer
functions (written in Clojure), input and output data, and other parameters.clojure-hadoop.defjob
defjob
macro, that allows to define MapReduce job
using Clojure code. You can specify some parameters using command line options
(usually this is input and output data), while using this macro you can specify map
and
reduce
functions, input/output data conversion functions, etc.Besides these namespaces, there is also clojure-hadoop.imports
namespace, that provides
functions for importing of Hadoop's classes and interfaces, that makes developer's life
much easier — these functions are used in all programs (all, or only some of them),
independent on selected level of abstraction. Now following functions are defined:
import-io
— to import classes and interfaces from org.apache.hadoop.io
package;import-io-compress
— to import classes and interfaces from
org.apache.hadoop.io.compress
package;import-fs
— to import classes and interfaces from org.apache.hadoop.fs
package;import-mapred
— to import classes and interfaces from org.apache.hadoop.mapred
package;import-mapred-lib
— to import classes and interfaces from org.apache.hadoop.mapred.lib
package.Depending of selected level of abstraction, you need to use different styles of
programming. In most cases it's enough to use defjob
, that hides most of the low-level
details behind its interface, so you only need to implement logic as two functions. But I
put here description of all namespaces, because sometimes you'll need to get access to
low-level interfaces. All sections have links to examples, that you can find in
clojure-hadoop's distribution.
Only two objects are defined in clojure-hadoop.gen
namespace: the gen-job-classes
macro
and the gen-main-method
function.
The gen-job-classes
creates three classes, that are implementing
org.apache.hadoop.mapred.Mapper
, org.apache.hadoop.mapred.Reducer
and
org.apache.hadoop.util.Tool
interfaces, correspondingly. Developer must define three
functions: mapper-map
, reducer-reduce
and tool-run
, that will be used as implementation of
corresponding methods in concrete interfaces. The mapper-map
function implements mapping
of input data into intermediate data (the map
part of MapReduce), the reducer-reduce
function receives intermediate data and produces output data (the reduce
part of
MapReduce), and the tool-run
function defines Hadoop's job options, prepares input and
output parameters, etc. All function's parameters match to corresponding parameters of
functions, defined in Hadoop interfaces. So, for detail information it's better to look
into Hadoop's API.
The gen-main-method
function creates standard main
method, that executes the tool-run
function using run
method from org.apache.hadoop.util.ToolRunner
class.
Complete example of code you can find in wordcount1 example.
Use of functions, defined in clojure-hadoop.wrap
allows to simplify implementation of
mapper and reducer functions, make them more clojurish, because you'll work with Clojure
data types, not with Hadoop classes. The wrap-map
function implements wrapper for map
function, and wrap-reduce
— for reduce
. Each of these functions could have from 1 to 3
arguments. First, mandatory argument — function to wrap. Second argument — function,
that implements reading of data, and third argument — function, that implements writing
of data. Library provides number of functions for reading and writing, and you can easily
use them in your code.
Function, that implements the map part, receives two parameters — key and value, while reducer function receives key and list of values, generated by mapper function.
I need to mention, that the tool-run
function should be implemented the same way as in
previous case, and results of wrapped functions should have names mapper-map
and
reducer-reduce
.
Example of code you can find in wordcount2.
The code, implemented in clojure-hadoop.job
namespace, additionally simplifies
implementation of Hadoop jobs. It implements all necessary functions, so all you need —
is to implement of map
and reduce
function, and specify them in command line options,
together with other parameters. All details are handled by the clojure_hadoop.job
class,
that performs analysis of command line options and bind them with corresponding job's
parameter.
Command line options are specified as -key value
pairs separated by space. Following
options are required:
-input
-output
-map
namespace/name
, or as class name, that implements
the org.apache.hadoop.mapred.Mapper
interface. You can also use the
identity
function, and input data will passed to reducer without processing;-reduce
namespace/name
, or as class name, that
implements the org.apache.hadoop.mapred.Reducer
interface. You can also use the identity
or none
functions, and output data will written without any processing.There are also optional command line parameters:
-input-format
text
for text data,
seq
for SequenceFile
, or class name that implements corresponding format;-output-format
-input-format
;-output-key
reduce
function);-output-value
reduce
function);-map-output-key
map
function);-map-output-value
map
function);-map-reader
map
function), as namespace/name
;-map-writer
map
function), as namespace/name
;-reduce-reader
reduce
function), as namespace/name
;-reduce-writer
reduce
function) , as namespace/name
;-combine
namespace/name
, or as class name, that
implements the org.apache.hadoop.mapred.Reducer
interface. This function works like
reduce
, but it's used to combine map's results only on local node. This allows to
decrease amount of data, transferred via network, and increase speed of processing.
This function should accept data of the same type as reduce
, and output data in the same
format as a map
! (This functionality is implemented only in my version of
clojure-hadoop);-name
-replace
true
then job will remove directory with output data;-compress-output
true
job will perform compression of output data;-output-compressor
gzip
, bzip2
, default
or class name;-compression-type
SequenceFile
—
block
, record
or none
.You can find example of this approach in wordcount3 file — it consists only from two functions, and all parameters are specified via command-line options.
The defjob
macro, defined in namespace with the same name, allows to specify some part of
clojure-hadoop.job
's configuration parameters directly in source code, while
rest of parameters could be specified from command line on invocation.
defjob
's options are specified as keywords wit the same names as
corresponding command line options, but without minus sign at start of option. The only
required parameter for macro is name of job.
For example, following code could be used to define job with name job
:
(defjob/defjob job :map my-map :map-reader wrap/int-string-map-reader :reduce my-reduce :input-format :text)
and after definition, we can run this job using command line option -job job-name
(as full
with namespace), instead of specifying separate options -map
and -reduce
.
Usage of defjob
macro is shown in wordcount4 and wordcount5 examples. The only different
between them is that in wordcount5
in job's definition the reader and writer functions are
specified. Full example with defjob
you can see below.
As example, I want to show small program, that generates sets of N-Grams from given files
— I use generated databases for document classification tasks. The source code is
available on github, in hadoop1
directory. This example uses defjob
macro to describe job
and user-defined mapper & reducer functions.
All example consists from one file with source code, that implements functions my-map
and
my-reduce
, that are used in job's specification together with input and output formats
specifications, etc. As usual in MapReduce, the my-map
function accepts two parameters —
key (integer number) and value (string) and produces list of pairs of string/integer,
where string is n-gram. The my-reduce
function is very simple — it just sum all entries
for given key.
(ns hadoop1 (:require [clojure-hadoop.wrap :as wrap] [clojure-hadoop.defjob :as defjob] [clojure-hadoop.imports :as imp]) (:use clojure.contrib.seq-utils) (:require [clojure.contrib.str-utils2 :as str2]) (:import (java.util StringTokenizer))) (imp/import-io) (imp/import-mapred) (def delimiters "0123456789[ \t\n\r!\"#$%&'()*+,./:;<=>?@\\^`{|}~-]+") (defn gen-n-grams [#^String s #^Integer n] (when (> (.length s) 0) (let [str (str " " s (String. ) (str2/repeat " " (- n 1)))] (reduce (fn [val elem] (conj val (.substring str elem (+ elem n)))) [] (range 0 (+ 1 (.length s))))))) (defn my-map [key #^String value] (map (fn [token] [token 1]) (flatten (map #(gen-n-grams %1 3) (enumeration-seq (StringTokenizer. value delimiters)))))) (defn my-reduce [key values-fn] [ [key (reduce + (values-fn))] ]) (defn string-long-writer [#^OutputCollector output #^String key value] (.collect output (Text. key) (LongWritable. value))) (defn string-long-reduce-reader [#^Text key wvalues] [(.toString key) (fn [] (map (fn [#^LongWritable v] (.get v)) (iterator-seq wvalues)))]) (defjob/defjob job :map my-map :map-reader wrap/int-string-map-reader :map-writer string-long-writer :reduce my-reduce :reduce-reader string-long-reduce-reader :reduce-writer string-long-writer :output-key Text :output-value LongWritable :input-format :text :output-format :text :compress-output false)
This code is based on wordcount5 example from clojure-hadoop
distribution. To build it
we're using following project for Leiningen:
(defproject hadoop1 "1.0" :description "hadoop1" :dependencies [[org.clojure/clojure "1.1.0"] [org.clojure/clojure-contrib "1.1.0"] [com.stuartsierra/clojure-hadoop "1.2.0-SNAPSHOT"]] )
To run this job, we need to combine all code into one archive, so we need to use lein
uberjar
to combine our Clojure code with all necessary libraries1. After this, you can run
this job either in autonomous mode, without running Hadoop and working with local
files2, or in cluster mode. To run job in autonomous mode you can use following command:
java -cp hadoop1-standalone.jar clojure_hadoop.job -job hadoop1/job -input FILE -output out
specify any text file instead of FILE
argument, and after execution, you'll get generated
file with n-grams in the out
directory. This file can be used as database for language
detection & text classification tasks.
To run your code in Hadoop cluster, you need to put files onto HDFS (into input
directory,
in our example), and run task with following command:
hadoop jar hadoop1-standalone.jar clojure_hadoop.job -job hadoop1/job -input input -output output
And after finishing of task, data will put into the output
directory3 on HDFS, and you
can get access to them with standard HDFS commands.
This information is just an introduction into use of Clojure for Hadoop
programming. Additional examples of clojure-hadoop
usage you can find in following
materials:
clojure-hadoop
distribution.Additional information about programming for Hadoop and MapReduce you can find in following materials:
It also worth to mention the Cascalog project, that implements DSL, that allows to query data in Hadoop using Clojure.
1. You can also use the lein hadoop
command (implemented by lein-hadoop plugin), that
creates correct archive with Hadoop task (packing all dependencies and necessary
information).
2. Execution of job in autonomous is very handy for debugging your code, because it executed in separate JVM instance and works with local files.
3. Output directory shouldn't exists, or job will not run. If you don't need files output
directory, then you can use command line option -replace true
.
Last change: 05.03.2013 16:54