Меню:


Данная статья является небольшим введением в программирование Hadoop с помощью Clojure.

Введение

Проект Hadoop является свободной реализацией инфраструктуры для распределенных, масштабируемых вычислений. Он начался как реализация идей MapReduce и GFS, введенных Google, но со временем, в рамках проекта были реализованы и другие проекты. Hadoop активно используется во многих проектах, включая коммерческие — Yahoo, LinkedIn, и т.д., и позволяет обрабатывать огромные объемы данных используя "стандартное" оборудование.

Hadoop написан на языке Java, что позволяет достаточно просто интегрировать его с Clojure. Для упрощения программирования на Clojure для Hadoop, Stuart Sierra разработал достаточно простой, но мощный пакет clojure-hadoop, о котором и пойдет речь в данной статье.

Установка и настройка

Установка Hadoop в минимальной конфигурации, необходимой для наших экспериментов, достаточно проста и описана в документации. Настройка Hadoop для работы в кластере немного сложнее, но тоже описана достаточно подробно.

Хочется отметить, что компания Cloudera предоставляет готовые пакеты для разных версий Linux, так что вы можете установить Hadoop и другие пакеты используя ваш любимый пакетный менеджер. Кроме того, они распространяют готовую к работе инсталляцию в виде образа для VMWare, так что вы можете загрузить нужный архив и получить готовую рабочую среду.

Исходные тексты clojure-hadoop можно взять с github — она работает с Hadoop версии 0.19 и Clojure 1.1.0. Если вам нужен Hadoop 0.20 и старше и/или Clojure 1.2.0, то вы можете взять версию с моими изменениями. Сборка и установка производится с помощью Maven — для этого надо всего лишь выполнить команду mvn install.

Из чего состоит clojure-hadoop

clojure-hadoop состоит из нескольких уровней абстракции, которые реализуются с помощью макросов. Каждый из уровней находится в отдельном пространстве имен, перечисленных в порядке увеличения абстракции.

clojure-hadoop.gen
реализует набор макросов, которые генерируют классы, необходимые для определения задания MapReduce. Функции map и reduce имеют точно такой же набор параметров, как и функции Java и имеют фиксированные имена — mapper-map и reduce-reducer. А инициализация задания, указание входных и выходных файлов и т.п., должно производиться функцией tool-run.
clojure-hadoop.wrap
реализует функции-обертки, которые упрощают конвертацию входных и выходных данных, позволяя писать функции map и reduce в более натуральном для Clojure стиле. Все остальное реализуется также как и в предыдущем случае.
clojure-hadoop.job
реализует все необходимые функции (mapper-map, reduce-reducer и tool-run) и позволяя указывать произвольные функции map и reduce (написанные на Clojure), входные и выходные данные и т.п., используя опции командной строки.
clojure-hadoop.defjob
определяет макрос defjob, который позволяет определять задания MapReduce используя Clojure. При этом сохраняется возможность указания части данных (обычно это входные и выходные данные) через опции командной строки. А с помощью макроса указываются функции map и reduce, а также функции для преобразования входных и выходных данных.

Кроме того, имеется пространство имен clojure-hadoop.imports, в котором определяются функции для импорта определений классов и интерфейсов Hadoop, что делает жизнь разработчика легче — эти функции используются во всех программах (все или только некоторые), независимо от выбранного уровня абстракции. В данный момент определены следующие функции:

Как программировать с помощью clojure-hadoop?

В зависимости от выбранного вами уровня абстракции, вам необходимо использовать разные подходы к программированию. В большинстве случаев достаточно использовать defjob, который прячет большую часть деталей за своим интерфейсом, так что программисту достаточно реализовать логику в виде двух функций. Однако, в некоторых случаях может понадобиться использовать низкоуровневые интерфейсы, поэтому тут приводится описание всех уровней. В каждом из разделов имеются ссылки на примеры, которые можно найти в поставке clojure-hadoop.

Использование clojure-hadoop.gen

В пространстве имен clojure-hadoop.gen определено всего два объекта: макрос gen-job-classes и функция gen-main-method.

Макрос gen-job-classes создает три класса, которые реализуют интерфейсы org.apache.hadoop.mapred.Mapper, org.apache.hadoop.mapred.Reducer и org.apache.hadoop.util.Tool, соответственно. При этом пользователь обязан определить функции mapper-map, reducer-reduce и tool-run, поскольку эти функции являются реализациями методов в соответствующих интерфейсах. Функция mapper-map реализует отображение входных данных в промежуточные данные (часть map в схеме MapReduce), функция reducer-reduce реализует обработку промежуточных данных и превращение их в окончательный результат (часть reduce в схеме MapReduce), а функция tool-run определяет опции задания Hadoop и подготавливает входные и выходные данные. Все параметры функций напрямую соответствуют параметрам функций в интерфейсах, так что для детальной информации стоит обратиться к документации на соответствующие интерфейсы.

Функция gen-main-method создает стандартный метод main, который запускает функцию tool-run используя метод run класса org.apache.hadoop.util.ToolRunner.

Пример реализации всех этих функций можно увидеть в примере wordcount1.

Использование clojure-hadoop.wrap

Использование функций из clojure-hadoop.wrap позволяют упростить написание функций map и reduce, сделать их более простыми за счет работы не с классами Hadoop, а используя типы Clojure для ввода и вывода данных. Функция wrap-map реализует обертку для map, а wrap-reduce — для reduce. Каждая из функций-оберток имеет по три варианта реализации — с одним, двумя или тремя аргументами. Первый, обязательный аргумент — обертываемая функция. Второй аргумент — функция, реализующая чтение данных. А третий аргумент — функция, реализующая запись данных. В составе библиотеки реализован набор функций чтения и записи, которые вы можете использовать в своем коде.

Функция, реализующая часть map, принимает на вход два параметра — ключ и значение, а функция, реализующая часть reduce также принимает на вход два параметра — ключ и список значений, сгенерированных функций map.

При этом, стоит отметить что функция tool-run должна быть реализована точно также как и в предыдущем случае, а результат применения функций-оберток должен иметь имена mapper-map и reducer-reduce, соответственно.

Пример реализации можно увидеть в примере wordcount2.

Использование clojure-hadoop.job

Код, реализованный в пространстве имен clojure-hadoop.job еще более упрощает реализацию функций map и reduce, реализуя остальные необходимые функции и позволяя пользователю указывать свои функции используя параметры командной строки. Все это производится автоматически, используя класс clojure_hadoop.job, который выполняет разбор командной строки и связывает переданные параметры с соответствующими настройками задания.

Опции командной строки указываются как пары -имя_опции значение, разделенные пробелом. При запуске задания следующие опции являются обязательными:

-input
список входных данных, в виде путей, разделенных запятой;
-output
название каталога для выходных данных;
-map
полное название функции map, в виде namespace/название или имя класса, реализующего интерфейс org.apache.hadoop.mapred.Mapper. В качестве функции можно указать identity, тогда данные будут переданы в функцию reduce без обработки;
-reduce
полное название функции reduce, в виде namespace/название или имя класса, реализующего интерфейс org.apache.hadoop.mapred.Reducer. В качестве функции можно указать identity или none, тогда данные будут выданы без обработки.

Существует также набор необязательных опций, которые могут указаны при запуске задания:

-input-format
определяет формат входных данных. Допустимые значения text, для текстовых данных, seq для SequenceFile или имя класса, реализующего нужный формат;
-output-format
определяет формат выходных данных. Допустимые значения те же, что и для -input-format;
-output-key
имя класса для ключей выходных данных (вывод функции reduce);
-output-value
имя класса для значений выходных данных (вывод функции reduce);
-map-output-key
имя класса для ключей промежуточных данных (вывод функции map);
-map-output-value
имя класса для значений промежуточных данных (вывод функции map);
-map-reader
название функции чтения входных данных (на входе функции map), в виде namespace/имя;
-map-writer
название функции записи промежуточных данных (на выходе функции map), в виде namespace/имя;
-reduce-reader
название функции чтения промежуточных данных (на входе функции reduce), в виде namespace/имя;
-reduce-writer
название функции записи выходных данных (на выходе функции reduce), в виде namespace/имя;
-combine
полное название функции combine, в виде namespace/название или имя класса, реализующего интерфейс org.apache.hadoop.mapred.Reducer. Эта функция работает также как reduce, но используется для объединения результатов, выданных map только на локальной ноде. Это позволяет уменьшить объем передаваемых по сети данных, и ускорить обработку данных. Функция объединения должна принимать данные того же типа, что и reduce, а выдавать результаты того же типа, что и map! (Эта функциональность имеется только в моей версии clojure-hadoop);
-name
устанавливает название задания, которое будет отображаться в административном интерфейсе Hadoop;
-replace
при установке значения true приводит к удалению выходного каталога;
-compress-output
при установке значения true производит сжатие выходных данных;
-output-compressor
вид сжатия — gzip, bzip2, default или имя класса;
-compression-type
тип сжатия для SequenceFileblock, record или none.

Пример использования данного подхода можно увидеть в примере wordcount3 — он состоит всего из двух функций, а все параметры указываются через командную строку.

Использование clojure-hadoop.defjob

Макрос defjob, определенный в одноименном пространстве имен, еще больше упрощает написание заданий для Hadoop. Этот макрос позволяет указать часть опций для clojure-hadoop.job в исходном коде, а потом лишь указать какое задание нужно использовать. Опции указываются как ключевые слова (keywords), их названия совпадают с названиями соответствующих опций командной строки, только начинаются с двоеточия, а не со знака минус. Единственным обязательным аргументом этого макроса является имя задания.

Например, вот такой код используется для определения задания и именем job:

(defjob/defjob job
  :map my-map
  :map-reader wrap/int-string-map-reader
  :reduce my-reduce
  :input-format :text)

И после этого, можно запускать код на выполнение указывая название задания используя опцию -job (как полное имя, вместе с пространством имен) вместо опций -map и -reduce.

Использование макроса defjob демонстрируется в примерах wordcount4 и wordcount5. Отличие между ними заключается в том, что в wordcount5 в определении задания указываются функции чтения и записи. Полный пример с defjob можно увидеть ниже.

Полный пример

В качестве примера хочу показать небольшую программу, которая генерирует наборы N-Gram из заданных файлов — я использую сгенерированные базы для задач классификации документов. Исходный код доступен на github, каталог hadoop1. Данный пример использует defjob для объявления задания и пользовательских функций map и reduce.

Весь проект состоит из одного файла с исходным кодом, в котором реализуются функции my-map и my-reduce, которые затем указываются в описании задания (входные и выходные форматы функций, исходных и результирующих файлов, и т.д.), для чего используется макрос defjob. Как обычно для MapReduce, функция my-map принимает на вход ключ (целое число) и строку, и выдает список пар строка/целое число, где строка — выделенный участок слова. Функция my-reduce очень проста — она просто суммирует количество вхождений каждого ключа.

(ns hadoop1
    (:require [clojure-hadoop.wrap :as wrap]
              [clojure-hadoop.defjob :as defjob]
              [clojure-hadoop.imports :as imp])
    (:use clojure.contrib.seq-utils)
    (:require [clojure.contrib.str-utils2 :as str2])
    (:import (java.util StringTokenizer)))

(imp/import-io)
(imp/import-mapred)

(def delimiters "0123456789[ \t\n\r!\"#$%&'()*+,./:;<=>?@\\^`{|}~-]+")

(defn gen-n-grams [#^String s #^Integer n]
  (when (> (.length s) 0)
      (let [str (str " " s (String. ) (str2/repeat " " (- n 1)))]
        (reduce (fn [val elem]
                  (conj val (.substring str elem (+ elem n))))
                []
                (range 0 (+ 1 (.length s)))))))

(defn my-map [key #^String value]
  (map (fn [token] [token 1])
       (flatten (map #(gen-n-grams %1 3)
                     (enumeration-seq (StringTokenizer. value delimiters))))))

(defn my-reduce [key values-fn]
  [ [key (reduce + (values-fn))] ])

(defn string-long-writer [#^OutputCollector output
                          #^String key value]
  (.collect output (Text. key) (LongWritable. value)))

(defn string-long-reduce-reader [#^Text key wvalues]
  [(.toString key)
   (fn [] (map (fn [#^LongWritable v] (.get v))
               (iterator-seq wvalues)))])

(defjob/defjob job
  :map my-map
  :map-reader wrap/int-string-map-reader
  :map-writer string-long-writer
  :reduce my-reduce
  :reduce-reader string-long-reduce-reader
  :reduce-writer string-long-writer
  :output-key Text
  :output-value LongWritable
  :input-format :text
  :output-format :text
  :compress-output false)

Данный код основан на примере wordcount5 из поставки пакета clojure-hadoop. Для сборки кода используется следующий проект Leiningen:

(defproject hadoop1 "1.0"
  :description "hadoop1"
  :dependencies [[org.clojure/clojure "1.1.0"]
                 [org.clojure/clojure-contrib "1.1.0"]
                 [com.stuartsierra/clojure-hadoop "1.2.0-SNAPSHOT"]]
  )

Для запуска проекта необходимо собрать все библиотеки в один архив, поэтому для сборки должна использоваться команда lein uberjar, которая упакует код на Clojure, вместе со всеми необходимыми библиотеками1. После сборки вы можете запустить задачу на выполнение либо в автономном режиме — без запущенного Hadoop, и работающего с локальными файлами2, либо в кластерном режиме.

Запуск в автономном режиме производится следующей командой:

java -cp hadoop1-standalone.jar clojure_hadoop.job -job hadoop1/job -input FILE -output out

в качестве аргумента FILE подставьте нужный текстовый файл, и после выполнения программы, в каталоге out будет создан файл, содержащий список N-Gram и их количество в тексте. Данный файл можно затем использовать в качестве базы данных для задач определения языков и т.п.

Для запуска вашего кода в кластере Hadoop вы должны поместить анализируемые файлы на HDFS (в каталог input, в нашем примере), и запустить задачу на выполнение с помощью следующей команды:

hadoop jar hadoop1-standalone.jar clojure_hadoop.job -job hadoop1/job -input input -output output

После окончания выполнения задания, данные будут помещены в каталог output3 на HDFS, и вы можете получить доступ к ним используя стандартные команды работы с HDFS.

Дополнительная информация

Данная статья является лишь введением в использование Clojure для программирования на базе Hadoop. Дополнительные примеры применения clojure-hadoop вы можете найти в следующих материалах:

Дополнительную информацию о программировании Hadoop и Map/Reduce вы можете найти в следующих материалах:

Также стоит упомянуть проект Cascalog, который реализует DSL, позволяющий производить выборки данных из Hadoop, используя Clojure.


1. Вы также можете использовать для сборки команду lein hadoop, реализованную плагином lein-hadoop, которая создает архив, соответствующий правилам упаковки заданий Hadoop.

2. Запуск в автономном режиме удобен для отладки вашего кода, поскольку он выполняется в отдельном инстансе JVM, и работает с локальными файлами.

3. Каталог для результатов не должен существовать, задание просто не запустится если каталог уже существует. Если вам не нужны данные в существующем каталоге, то вы можете использовать опцию командной строки -replace true.

Last change: 05.03.2013 16:54

blog comments powered by Disqus