Данная статья является небольшим введением в программирование Hadoop с помощью Clojure.
Проект Hadoop является свободной реализацией инфраструктуры для распределенных, масштабируемых вычислений. Он начался как реализация идей MapReduce и GFS, введенных Google, но со временем, в рамках проекта были реализованы и другие проекты. Hadoop активно используется во многих проектах, включая коммерческие — Yahoo, LinkedIn, и т.д., и позволяет обрабатывать огромные объемы данных используя "стандартное" оборудование.
Hadoop написан на языке Java, что позволяет достаточно просто интегрировать его с Clojure.
Для упрощения программирования на Clojure для Hadoop, Stuart Sierra разработал достаточно
простой, но мощный пакет clojure-hadoop, о котором и пойдет речь в данной статье.
Установка Hadoop в минимальной конфигурации, необходимой для наших экспериментов, достаточно проста и описана в документации. Настройка Hadoop для работы в кластере немного сложнее, но тоже описана достаточно подробно.
Хочется отметить, что компания Cloudera предоставляет готовые пакеты для разных версий Linux, так что вы можете установить Hadoop и другие пакеты используя ваш любимый пакетный менеджер. Кроме того, они распространяют готовую к работе инсталляцию в виде образа для VMWare, так что вы можете загрузить нужный архив и получить готовую рабочую среду.
Исходные тексты clojure-hadoop можно взять с github — она работает с Hadoop версии 0.19 и
Clojure 1.1.0. Если вам нужен Hadoop 0.20 и старше и/или Clojure 1.2.0, то вы можете
взять версию с моими изменениями. Сборка и установка производится с помощью Maven — для
этого надо всего лишь выполнить команду mvn install.
clojure-hadoop состоит из нескольких уровней абстракции, которые реализуются с помощью
макросов. Каждый из уровней находится в отдельном пространстве имен, перечисленных в
порядке увеличения абстракции.
clojure-hadoop.genmap и reduce имеют точно такой же набор
параметров, как и функции Java и имеют фиксированные имена —
mapper-map и
reduce-reducer. А инициализация задания, указание входных и выходных файлов и т.п.,
должно производиться функцией tool-run.clojure-hadoop.wrapmap и reduce в более натуральном для Clojure
стиле. Все остальное реализуется также как и в предыдущем случае.clojure-hadoop.jobmapper-map, reduce-reducer и
tool-run) и позволяя указывать произвольные функции map и reduce (написанные на
Clojure), входные и выходные данные и т.п., используя опции командной строки.clojure-hadoop.defjobdefjob, который позволяет определять задания
MapReduce используя Clojure. При этом сохраняется возможность указания части данных
(обычно это входные и выходные данные) через опции командной строки. А с помощью
макроса указываются функции map и reduce, а также функции для преобразования входных и
выходных данных.Кроме того, имеется пространство имен clojure-hadoop.imports, в котором определяются
функции для импорта определений классов и интерфейсов Hadoop, что делает жизнь
разработчика легче — эти функции используются во всех программах (все или только
некоторые), независимо от выбранного уровня абстракции. В данный момент определены
следующие функции:
import-io — для импорта классов и интерфейсов из пакета org.apache.hadoop.io;import-io-compress — для импорта классов и интерфейсов из пакета
org.apache.hadoop.io.compress;import-fs — для импорта классов и интерфейсов из пакета org.apache.hadoop.fs;import-mapred — для импорта классов и интерфейсов из пакета org.apache.hadoop.mapred;import-mapred-lib — для импорта классов и интерфейсов из пакета
org.apache.hadoop.mapred.lib.В зависимости от выбранного вами уровня абстракции, вам необходимо использовать разные
подходы к программированию. В большинстве случаев достаточно использовать defjob, который
прячет большую часть деталей за своим интерфейсом, так что программисту достаточно
реализовать логику в виде двух функций. Однако, в некоторых случаях может понадобиться
использовать низкоуровневые интерфейсы, поэтому тут приводится описание всех уровней. В
каждом из разделов имеются ссылки на примеры, которые можно найти в
поставке clojure-hadoop.
В пространстве имен clojure-hadoop.gen определено всего два объекта: макрос
gen-job-classes и функция gen-main-method.
Макрос gen-job-classes создает три класса, которые реализуют интерфейсы
org.apache.hadoop.mapred.Mapper, org.apache.hadoop.mapred.Reducer и
org.apache.hadoop.util.Tool, соответственно. При этом пользователь обязан определить
функции mapper-map, reducer-reduce и tool-run, поскольку эти функции являются реализациями
методов в соответствующих интерфейсах. Функция mapper-map реализует отображение входных
данных в промежуточные данные (часть map в схеме MapReduce), функция reducer-reduce
реализует обработку промежуточных данных и превращение их в окончательный результат
(часть reduce в схеме MapReduce), а функция tool-run определяет опции задания Hadoop и
подготавливает входные и выходные данные. Все параметры функций напрямую соответствуют
параметрам функций в интерфейсах, так что для детальной информации стоит обратиться к
документации на соответствующие интерфейсы.
Функция gen-main-method создает стандартный метод main, который запускает функцию tool-run
используя метод run класса org.apache.hadoop.util.ToolRunner.
Пример реализации всех этих функций можно увидеть в примере wordcount1.
Использование функций из clojure-hadoop.wrap позволяют упростить написание функций map и
reduce, сделать их более простыми за счет работы не с классами Hadoop, а используя типы
Clojure для ввода и вывода данных. Функция wrap-map реализует обертку для map, а
wrap-reduce — для reduce. Каждая из функций-оберток имеет по три варианта реализации —
с одним, двумя или тремя аргументами. Первый, обязательный аргумент — обертываемая
функция. Второй аргумент — функция, реализующая чтение данных. А третий аргумент —
функция, реализующая запись данных. В составе библиотеки реализован набор функций чтения
и записи, которые вы можете использовать в своем коде.
Функция, реализующая часть map, принимает на вход два параметра — ключ и значение, а
функция, реализующая часть reduce также принимает на вход два параметра — ключ и список
значений, сгенерированных функций map.
При этом, стоит отметить что функция tool-run должна быть реализована точно также как и в
предыдущем случае, а результат применения функций-оберток должен иметь имена mapper-map и
reducer-reduce, соответственно.
Пример реализации можно увидеть в примере wordcount2.
Код, реализованный в пространстве имен clojure-hadoop.job еще более упрощает реализацию
функций map и reduce, реализуя остальные необходимые функции и позволяя пользователю
указывать свои функции используя параметры командной строки. Все это производится
автоматически, используя класс clojure_hadoop.job, который выполняет разбор командной
строки и связывает переданные параметры с соответствующими настройками задания.
Опции командной строки указываются как пары -имя_опции значение, разделенные пробелом.
При запуске задания следующие опции являются обязательными:
-input-output-mapmap, в виде namespace/название или имя класса,
реализующего интерфейс org.apache.hadoop.mapred.Mapper. В качестве функции можно
указать identity, тогда данные будут переданы в функцию reduce без обработки;-reducereduce, в виде namespace/название или имя класса,
реализующего интерфейс org.apache.hadoop.mapred.Reducer. В качестве функции можно
указать identity или none, тогда данные будут выданы без обработки.Существует также набор необязательных опций, которые могут указаны при запуске задания:
-input-formattext, для
текстовых данных, seq для SequenceFile или имя класса, реализующего нужный формат;-output-format-input-format;-output-keyreduce);-output-valuereduce);-map-output-keymap);-map-output-valuemap);-map-readermap), в виде
namespace/имя;-map-writermap), в
виде namespace/имя;-reduce-readerreduce),
в виде namespace/имя;-reduce-writerreduce), в
виде namespace/имя;-combinecombine, в виде namespace/название или имя класса,
реализующего интерфейс org.apache.hadoop.mapred.Reducer. Эта функция работает также как
reduce, но используется для объединения результатов, выданных map только на локальной
ноде. Это позволяет уменьшить объем передаваемых по сети данных, и ускорить обработку
данных. Функция объединения должна принимать данные того же типа, что и reduce, а
выдавать результаты того же типа, что и map! (Эта функциональность имеется только в
моей версии clojure-hadoop);-name-replacetrue приводит к удалению выходного каталога;-compress-outputtrue производит сжатие выходных данных;-output-compressorgzip, bzip2, default или имя класса;-compression-typeSequenceFile —
block, record или none.Пример использования данного подхода можно увидеть в примере wordcount3 — он состоит всего из двух функций, а все параметры указываются через командную строку.
Макрос defjob, определенный в одноименном пространстве имен, еще больше упрощает написание
заданий для Hadoop. Этот макрос позволяет указать часть опций для clojure-hadoop.job в
исходном коде, а потом лишь указать какое задание нужно использовать. Опции указываются
как ключевые слова (keywords), их названия совпадают с названиями соответствующих опций
командной строки, только начинаются с двоеточия, а не со знака минус. Единственным
обязательным аргументом этого макроса является имя задания.
Например, вот такой код используется для определения задания и именем job:
(defjob/defjob job :map my-map :map-reader wrap/int-string-map-reader :reduce my-reduce :input-format :text)
И после этого, можно запускать код на выполнение указывая название задания используя
опцию -job (как полное имя, вместе с пространством имен) вместо опций -map и -reduce.
Использование макроса defjob демонстрируется в примерах wordcount4 и wordcount5. Отличие
между ними заключается в том, что в wordcount5 в определении задания указываются функции
чтения и записи. Полный пример с defjob можно увидеть ниже.
В качестве примера хочу показать небольшую программу, которая генерирует наборы N-Gram из
заданных файлов — я использую сгенерированные базы для задач классификации документов.
Исходный код доступен на github, каталог hadoop1. Данный пример использует defjob для
объявления задания и пользовательских функций map и reduce.
Весь проект состоит из одного файла с исходным кодом, в котором реализуются функции my-map
и my-reduce, которые затем указываются в описании задания (входные и выходные форматы
функций, исходных и результирующих файлов, и т.д.), для чего используется макрос defjob.
Как обычно для MapReduce, функция my-map принимает на вход ключ (целое число) и строку, и
выдает список пар строка/целое число, где строка — выделенный участок слова. Функция
my-reduce очень проста — она просто суммирует количество вхождений каждого ключа.
(ns hadoop1 (:require [clojure-hadoop.wrap :as wrap] [clojure-hadoop.defjob :as defjob] [clojure-hadoop.imports :as imp]) (:use clojure.contrib.seq-utils) (:require [clojure.contrib.str-utils2 :as str2]) (:import (java.util StringTokenizer))) (imp/import-io) (imp/import-mapred) (def delimiters "0123456789[ \t\n\r!\"#$%&'()*+,./:;<=>?@\\^`{|}~-]+") (defn gen-n-grams [#^String s #^Integer n] (when (> (.length s) 0) (let [str (str " " s (String. ) (str2/repeat " " (- n 1)))] (reduce (fn [val elem] (conj val (.substring str elem (+ elem n)))) [] (range 0 (+ 1 (.length s))))))) (defn my-map [key #^String value] (map (fn [token] [token 1]) (flatten (map #(gen-n-grams %1 3) (enumeration-seq (StringTokenizer. value delimiters)))))) (defn my-reduce [key values-fn] [ [key (reduce + (values-fn))] ]) (defn string-long-writer [#^OutputCollector output #^String key value] (.collect output (Text. key) (LongWritable. value))) (defn string-long-reduce-reader [#^Text key wvalues] [(.toString key) (fn [] (map (fn [#^LongWritable v] (.get v)) (iterator-seq wvalues)))]) (defjob/defjob job :map my-map :map-reader wrap/int-string-map-reader :map-writer string-long-writer :reduce my-reduce :reduce-reader string-long-reduce-reader :reduce-writer string-long-writer :output-key Text :output-value LongWritable :input-format :text :output-format :text :compress-output false)
Данный код основан на примере wordcount5 из поставки пакета clojure-hadoop. Для сборки кода используется следующий проект Leiningen:
(defproject hadoop1 "1.0" :description "hadoop1" :dependencies [[org.clojure/clojure "1.1.0"] [org.clojure/clojure-contrib "1.1.0"] [com.stuartsierra/clojure-hadoop "1.2.0-SNAPSHOT"]] )
Для запуска проекта необходимо собрать все библиотеки в один архив, поэтому для сборки
должна использоваться команда lein uberjar, которая упакует код на Clojure, вместе со
всеми необходимыми библиотеками1. После сборки вы можете запустить задачу на выполнение
либо в автономном режиме — без запущенного Hadoop, и работающего с локальными файлами2,
либо в кластерном режиме.
Запуск в автономном режиме производится следующей командой:
java -cp hadoop1-standalone.jar clojure_hadoop.job -job hadoop1/job -input FILE -output out
в качестве аргумента FILE подставьте нужный текстовый файл, и после выполнения программы,
в каталоге out будет создан файл, содержащий список N-Gram и их количество в тексте.
Данный файл можно затем использовать в качестве базы данных для задач определения языков и
т.п.
Для запуска вашего кода в кластере Hadoop вы должны поместить анализируемые файлы на HDFS
(в каталог input, в нашем примере), и запустить задачу на выполнение с помощью следующей
команды:
hadoop jar hadoop1-standalone.jar clojure_hadoop.job -job hadoop1/job -input input -output output
После окончания выполнения задания, данные будут помещены в каталог output3 на HDFS, и
вы можете получить доступ к ним используя стандартные команды работы с HDFS.
Данная статья является лишь введением в использование Clojure для программирования на базе Hadoop. Дополнительные примеры применения clojure-hadoop вы можете найти в следующих материалах:
clojure-hadoop.Дополнительную информацию о программировании Hadoop и Map/Reduce вы можете найти в следующих материалах:
Также стоит упомянуть проект Cascalog, который реализует DSL, позволяющий производить выборки данных из Hadoop, используя Clojure.
1. Вы также можете использовать для сборки команду lein hadoop, реализованную плагином
lein-hadoop, которая создает архив, соответствующий правилам упаковки заданий Hadoop.
2. Запуск в автономном режиме удобен для отладки вашего кода, поскольку он выполняется в отдельном инстансе JVM, и работает с локальными файлами.
3. Каталог для результатов не должен существовать, задание просто не запустится если
каталог уже существует. Если вам не нужны данные в существующем каталоге, то вы
можете использовать опцию командной строки -replace true.
Last change: 05.03.2013 16:54