Данная статья является небольшим введением в программирование Hadoop с помощью Clojure.
Проект Hadoop является свободной реализацией инфраструктуры для распределенных, масштабируемых вычислений. Он начался как реализация идей MapReduce и GFS, введенных Google, но со временем, в рамках проекта были реализованы и другие проекты. Hadoop активно используется во многих проектах, включая коммерческие — Yahoo, LinkedIn, и т.д., и позволяет обрабатывать огромные объемы данных используя "стандартное" оборудование.
Hadoop написан на языке Java, что позволяет достаточно просто интегрировать его с Clojure.
Для упрощения программирования на Clojure для Hadoop, Stuart Sierra разработал достаточно
простой, но мощный пакет clojure-hadoop
, о котором и пойдет речь в данной статье.
Установка Hadoop в минимальной конфигурации, необходимой для наших экспериментов, достаточно проста и описана в документации. Настройка Hadoop для работы в кластере немного сложнее, но тоже описана достаточно подробно.
Хочется отметить, что компания Cloudera предоставляет готовые пакеты для разных версий Linux, так что вы можете установить Hadoop и другие пакеты используя ваш любимый пакетный менеджер. Кроме того, они распространяют готовую к работе инсталляцию в виде образа для VMWare, так что вы можете загрузить нужный архив и получить готовую рабочую среду.
Исходные тексты clojure-hadoop
можно взять с github — она работает с Hadoop версии 0.19 и
Clojure 1.1.0. Если вам нужен Hadoop 0.20 и старше и/или Clojure 1.2.0, то вы можете
взять версию с моими изменениями. Сборка и установка производится с помощью Maven — для
этого надо всего лишь выполнить команду mvn install
.
clojure-hadoop
состоит из нескольких уровней абстракции, которые реализуются с помощью
макросов. Каждый из уровней находится в отдельном пространстве имен, перечисленных в
порядке увеличения абстракции.
clojure-hadoop.gen
map
и reduce
имеют точно такой же набор
параметров, как и функции Java и имеют фиксированные имена —
mapper-map
и
reduce-reducer
. А инициализация задания, указание входных и выходных файлов и т.п.,
должно производиться функцией tool-run
.clojure-hadoop.wrap
map
и reduce
в более натуральном для Clojure
стиле. Все остальное реализуется также как и в предыдущем случае.clojure-hadoop.job
mapper-map
, reduce-reducer
и
tool-run
) и позволяя указывать произвольные функции map
и reduce
(написанные на
Clojure), входные и выходные данные и т.п., используя опции командной строки.clojure-hadoop.defjob
defjob
, который позволяет определять задания
MapReduce используя Clojure. При этом сохраняется возможность указания части данных
(обычно это входные и выходные данные) через опции командной строки. А с помощью
макроса указываются функции map
и reduce
, а также функции для преобразования входных и
выходных данных.Кроме того, имеется пространство имен clojure-hadoop.imports
, в котором определяются
функции для импорта определений классов и интерфейсов Hadoop, что делает жизнь
разработчика легче — эти функции используются во всех программах (все или только
некоторые), независимо от выбранного уровня абстракции. В данный момент определены
следующие функции:
import-io
— для импорта классов и интерфейсов из пакета org.apache.hadoop.io
;import-io-compress
— для импорта классов и интерфейсов из пакета
org.apache.hadoop.io.compress
;import-fs
— для импорта классов и интерфейсов из пакета org.apache.hadoop.fs
;import-mapred
— для импорта классов и интерфейсов из пакета org.apache.hadoop.mapred
;import-mapred-lib
— для импорта классов и интерфейсов из пакета
org.apache.hadoop.mapred.lib
.В зависимости от выбранного вами уровня абстракции, вам необходимо использовать разные
подходы к программированию. В большинстве случаев достаточно использовать defjob
, который
прячет большую часть деталей за своим интерфейсом, так что программисту достаточно
реализовать логику в виде двух функций. Однако, в некоторых случаях может понадобиться
использовать низкоуровневые интерфейсы, поэтому тут приводится описание всех уровней. В
каждом из разделов имеются ссылки на примеры, которые можно найти в
поставке clojure-hadoop.
В пространстве имен clojure-hadoop.gen
определено всего два объекта: макрос
gen-job-classes
и функция gen-main-method
.
Макрос gen-job-classes
создает три класса, которые реализуют интерфейсы
org.apache.hadoop.mapred.Mapper
, org.apache.hadoop.mapred.Reducer
и
org.apache.hadoop.util.Tool
, соответственно. При этом пользователь обязан определить
функции mapper-map
, reducer-reduce
и tool-run
, поскольку эти функции являются реализациями
методов в соответствующих интерфейсах. Функция mapper-map
реализует отображение входных
данных в промежуточные данные (часть map
в схеме MapReduce), функция reducer-reduce
реализует обработку промежуточных данных и превращение их в окончательный результат
(часть reduce
в схеме MapReduce), а функция tool-run
определяет опции задания Hadoop и
подготавливает входные и выходные данные. Все параметры функций напрямую соответствуют
параметрам функций в интерфейсах, так что для детальной информации стоит обратиться к
документации на соответствующие интерфейсы.
Функция gen-main-method
создает стандартный метод main
, который запускает функцию tool-run
используя метод run
класса org.apache.hadoop.util.ToolRunner
.
Пример реализации всех этих функций можно увидеть в примере wordcount1.
Использование функций из clojure-hadoop.wrap
позволяют упростить написание функций map
и
reduce
, сделать их более простыми за счет работы не с классами Hadoop, а используя типы
Clojure для ввода и вывода данных. Функция wrap-map
реализует обертку для map
, а
wrap-reduce
— для reduce
. Каждая из функций-оберток имеет по три варианта реализации —
с одним, двумя или тремя аргументами. Первый, обязательный аргумент — обертываемая
функция. Второй аргумент — функция, реализующая чтение данных. А третий аргумент —
функция, реализующая запись данных. В составе библиотеки реализован набор функций чтения
и записи, которые вы можете использовать в своем коде.
Функция, реализующая часть map
, принимает на вход два параметра — ключ и значение, а
функция, реализующая часть reduce
также принимает на вход два параметра — ключ и список
значений, сгенерированных функций map
.
При этом, стоит отметить что функция tool-run
должна быть реализована точно также как и в
предыдущем случае, а результат применения функций-оберток должен иметь имена mapper-map
и
reducer-reduce
, соответственно.
Пример реализации можно увидеть в примере wordcount2.
Код, реализованный в пространстве имен clojure-hadoop.job
еще более упрощает реализацию
функций map
и reduce
, реализуя остальные необходимые функции и позволяя пользователю
указывать свои функции используя параметры командной строки. Все это производится
автоматически, используя класс clojure_hadoop.job
, который выполняет разбор командной
строки и связывает переданные параметры с соответствующими настройками задания.
Опции командной строки указываются как пары -имя_опции значение
, разделенные пробелом.
При запуске задания следующие опции являются обязательными:
-input
-output
-map
map
, в виде namespace/название
или имя класса,
реализующего интерфейс org.apache.hadoop.mapred.Mapper
. В качестве функции можно
указать identity
, тогда данные будут переданы в функцию reduce
без обработки;-reduce
reduce
, в виде namespace/название
или имя класса,
реализующего интерфейс org.apache.hadoop.mapred.Reducer
. В качестве функции можно
указать identity
или none
, тогда данные будут выданы без обработки.Существует также набор необязательных опций, которые могут указаны при запуске задания:
-input-format
text
, для
текстовых данных, seq
для SequenceFile
или имя класса, реализующего нужный формат;-output-format
-input-format
;-output-key
reduce
);-output-value
reduce
);-map-output-key
map
);-map-output-value
map
);-map-reader
map
), в виде
namespace/имя
;-map-writer
map
), в
виде namespace/имя
;-reduce-reader
reduce
),
в виде namespace/имя
;-reduce-writer
reduce
), в
виде namespace/имя
;-combine
combine
, в виде namespace/название
или имя класса,
реализующего интерфейс org.apache.hadoop.mapred.Reducer
. Эта функция работает также как
reduce
, но используется для объединения результатов, выданных map
только на локальной
ноде. Это позволяет уменьшить объем передаваемых по сети данных, и ускорить обработку
данных. Функция объединения должна принимать данные того же типа, что и reduce
, а
выдавать результаты того же типа, что и map
! (Эта функциональность имеется только в
моей версии clojure-hadoop);-name
-replace
true
приводит к удалению выходного каталога;-compress-output
true
производит сжатие выходных данных;-output-compressor
gzip
, bzip2
, default
или имя класса;-compression-type
SequenceFile
—
block
, record
или none
.Пример использования данного подхода можно увидеть в примере wordcount3 — он состоит всего из двух функций, а все параметры указываются через командную строку.
Макрос defjob
, определенный в одноименном пространстве имен, еще больше упрощает написание
заданий для Hadoop. Этот макрос позволяет указать часть опций для clojure-hadoop.job
в
исходном коде, а потом лишь указать какое задание нужно использовать. Опции указываются
как ключевые слова (keywords), их названия совпадают с названиями соответствующих опций
командной строки, только начинаются с двоеточия, а не со знака минус. Единственным
обязательным аргументом этого макроса является имя задания.
Например, вот такой код используется для определения задания и именем job
:
(defjob/defjob job :map my-map :map-reader wrap/int-string-map-reader :reduce my-reduce :input-format :text)
И после этого, можно запускать код на выполнение указывая название задания используя
опцию -job
(как полное имя, вместе с пространством имен) вместо опций -map
и -reduce
.
Использование макроса defjob
демонстрируется в примерах wordcount4 и wordcount5. Отличие
между ними заключается в том, что в wordcount5
в определении задания указываются функции
чтения и записи. Полный пример с defjob
можно увидеть ниже.
В качестве примера хочу показать небольшую программу, которая генерирует наборы N-Gram из
заданных файлов — я использую сгенерированные базы для задач классификации документов.
Исходный код доступен на github, каталог hadoop1
. Данный пример использует defjob
для
объявления задания и пользовательских функций map
и reduce
.
Весь проект состоит из одного файла с исходным кодом, в котором реализуются функции my-map
и my-reduce
, которые затем указываются в описании задания (входные и выходные форматы
функций, исходных и результирующих файлов, и т.д.), для чего используется макрос defjob
.
Как обычно для MapReduce, функция my-map
принимает на вход ключ (целое число) и строку, и
выдает список пар строка/целое число, где строка — выделенный участок слова. Функция
my-reduce
очень проста — она просто суммирует количество вхождений каждого ключа.
(ns hadoop1 (:require [clojure-hadoop.wrap :as wrap] [clojure-hadoop.defjob :as defjob] [clojure-hadoop.imports :as imp]) (:use clojure.contrib.seq-utils) (:require [clojure.contrib.str-utils2 :as str2]) (:import (java.util StringTokenizer))) (imp/import-io) (imp/import-mapred) (def delimiters "0123456789[ \t\n\r!\"#$%&'()*+,./:;<=>?@\\^`{|}~-]+") (defn gen-n-grams [#^String s #^Integer n] (when (> (.length s) 0) (let [str (str " " s (String. ) (str2/repeat " " (- n 1)))] (reduce (fn [val elem] (conj val (.substring str elem (+ elem n)))) [] (range 0 (+ 1 (.length s))))))) (defn my-map [key #^String value] (map (fn [token] [token 1]) (flatten (map #(gen-n-grams %1 3) (enumeration-seq (StringTokenizer. value delimiters)))))) (defn my-reduce [key values-fn] [ [key (reduce + (values-fn))] ]) (defn string-long-writer [#^OutputCollector output #^String key value] (.collect output (Text. key) (LongWritable. value))) (defn string-long-reduce-reader [#^Text key wvalues] [(.toString key) (fn [] (map (fn [#^LongWritable v] (.get v)) (iterator-seq wvalues)))]) (defjob/defjob job :map my-map :map-reader wrap/int-string-map-reader :map-writer string-long-writer :reduce my-reduce :reduce-reader string-long-reduce-reader :reduce-writer string-long-writer :output-key Text :output-value LongWritable :input-format :text :output-format :text :compress-output false)
Данный код основан на примере wordcount5 из поставки пакета clojure-hadoop. Для сборки кода используется следующий проект Leiningen:
(defproject hadoop1 "1.0" :description "hadoop1" :dependencies [[org.clojure/clojure "1.1.0"] [org.clojure/clojure-contrib "1.1.0"] [com.stuartsierra/clojure-hadoop "1.2.0-SNAPSHOT"]] )
Для запуска проекта необходимо собрать все библиотеки в один архив, поэтому для сборки
должна использоваться команда lein uberjar
, которая упакует код на Clojure, вместе со
всеми необходимыми библиотеками1. После сборки вы можете запустить задачу на выполнение
либо в автономном режиме — без запущенного Hadoop, и работающего с локальными файлами2,
либо в кластерном режиме.
Запуск в автономном режиме производится следующей командой:
java -cp hadoop1-standalone.jar clojure_hadoop.job -job hadoop1/job -input FILE -output out
в качестве аргумента FILE
подставьте нужный текстовый файл, и после выполнения программы,
в каталоге out
будет создан файл, содержащий список N-Gram и их количество в тексте.
Данный файл можно затем использовать в качестве базы данных для задач определения языков и
т.п.
Для запуска вашего кода в кластере Hadoop вы должны поместить анализируемые файлы на HDFS
(в каталог input
, в нашем примере), и запустить задачу на выполнение с помощью следующей
команды:
hadoop jar hadoop1-standalone.jar clojure_hadoop.job -job hadoop1/job -input input -output output
После окончания выполнения задания, данные будут помещены в каталог output
3 на HDFS, и
вы можете получить доступ к ним используя стандартные команды работы с HDFS.
Данная статья является лишь введением в использование Clojure для программирования на базе Hadoop. Дополнительные примеры применения clojure-hadoop вы можете найти в следующих материалах:
clojure-hadoop
.Дополнительную информацию о программировании Hadoop и Map/Reduce вы можете найти в следующих материалах:
Также стоит упомянуть проект Cascalog, который реализует DSL, позволяющий производить выборки данных из Hadoop, используя Clojure.
1. Вы также можете использовать для сборки команду lein hadoop
, реализованную плагином
lein-hadoop, которая создает архив, соответствующий правилам упаковки заданий Hadoop.
2. Запуск в автономном режиме удобен для отладки вашего кода, поскольку он выполняется в отдельном инстансе JVM, и работает с локальными файлами.
3. Каталог для результатов не должен существовать, задание просто не запустится если
каталог уже существует. Если вам не нужны данные в существующем каталоге, то вы
можете использовать опцию командной строки -replace true
.
Last change: 05.03.2013 16:54