Разработка распределенной вычислительной системы на основе MapReduce и Kubernetes | Digital Wing

Разработка распределенной вычислительной системы на основе MapReduce и Kubernetes

High-Load, BigData, DevOps10 января 2019
Проблема необходимости обработки больших данных и работы с высокими нагрузками становится все более актуальной для IT-проектов. Но практических примеров как это сделать в интернете очень мало. Поэтому цель данной статьи - заполнить этот пробел и показать пример реализации распределенной вычислительной системы, которая решает вышеуказанную проблему.

Разбираемая архитектура распределенной вычислительной системы используется командой Digital Wing в своих проектах, но в данной статье немного упрощена в целях лучшего понимания ее сути.

Весь код из данной статьи мы собрали в отдельный проект и опубликовали его на GitHub. Поэтому вы можете попробовать запустить эту систему у себя. Ссылку на этот репозиторий GitHub мы оставили в конце статьи.

План статьи следующий:
  1. Теория
  2. Проектирование системы
  3. Разработка системы
  4. Развертывание системы

В качестве языка программирования будем использовать Go, как наиболее подходящий для написания микросервисов и высоконагруженного бэкенда.

Теория

MapReduce

MapReduce  - это модель распределенных вычислений, представленная компанией Google. Она применяется, когда исходных данных так много, что мы не можем произвести вычисления на одной машине (так как это будет слишком долго), и распределяем эти вычисления между несколькими серверами, объединенными в кластер.

Кластер представляет собой совокупность компьютеров, объединённых высокоскоростными каналами связи.

Вычислительные узлы в распределенной системе можно разделить на две категории по назначению:
  • Главный узел (master node)  - управляющий сервер, который координирует работу других узлов. Обычно такой узел всего один в распределенной системе.
  • Рабочие узлы (worker nodes)  - сервера, которые непосредственно выполняют какую-либо полезную вычислительную работу. Они запускаются главным узлом и подчиняются ему. Рабочих узлов может быть неограниченное количество в распределенной системе.

Модель MapReduce состоит из следующих последовательных шагов:

1. Splitting
Все входные данные, которые наша система должна обработать, разделяем по какому-либо алгоритму на приблизительно равные порции. Количество таких порций обычно ограничивается количеством доступных в кластере рабочих узлов, которые предназначены для обработки этих данных.

Такие наборы с данными, которые подлежат обработке, в дальнейшем будем называть заданиями (tasks).

2. Mapping
На этом шаге мы применяем функцию Map ко всем заданиям, сформированным на предыдущем этапе. Каждое задание обрабатывается отдельным узлом и все вычисления на этих узлах происходят параллельно друг другу. Заканчивается этот этап как только последний узел завершил выполнение функции Map. Всеми вычислениями занимаются рабочие узлы, а распределяет между ними задачи и принимает от них результат главный узел.

Функция Map является абстрактной. Она представляет основное действие, которое распределенная вычислительная система должна произвести над исходными данными, и ее реализация напрямую зависит от задачи, стоящей перед этой системой.

3. Shuffling
Преобразование данных, полученных на выходе функции Map в результате предыдущего этапа, в задания для следующего шага. Данный этап не является обязательным и может отсутствовать, если данные не требует каких-либо преобразований. Также если подразумеваемые на этом этапе вычисления являются трудоемкими, то их можно аналогично предыдущему пункту распределить между рабочими узлами.

4. Reducing
Применение функции Reduce ко всем заданиям, полученным в результате предыдущего этапа. Вычисления происходят параллельно на нескольких рабочих узлах аналогично этапу Mapping. Результатом выполнения функции Reduce на всех машинах является решение задачи, которая изначально стояла перед распределенной вычислительной системой. Итоговый ответ аккумулируется в главном узле.

Функция Reduce является абстрактной. Можно сказать, что она преобразует набор данных к единственному атомарному значению. В следующих разделах статьи будет более понятно, что из себя представляет функция Reduce.

Kubernetes (K8s)

Kubernetes  - это ПО для автоматизации развёртывания, масштабирования и управления контейнерами с приложениями на кластере машин. Более подробную информацию про Kubernetes вы можете получить на официальном сайте, изучив документацию. В этой статье мы рассмотрим часть его функционала применительно к нашей распределенной вычислительной системе и изучим возможности Headless Service, примеры использования которого довольно сложно найти в интернете.

Если вы хотите запустить систему на своей машине, вам необходимо предварительно установить и настроить кластер Kubernetes. Для локальной разработки и тестирования отлично подходит утилита minikube. Инструкцию по ее установке можно найти на официальном сайте.

Проектирование системы

В качестве примера в данной статье будем рассматривать распределенную вычислительную систему, которая будет высчитывать частоту появления слов в тексте.

На изображение ниже представлен этот процесс в соответствии с моделью MapReduce:
Map-Reduce DiagramДалее более подробно будем разбирать каждый этап по отдельности. Но прежде условимся, что наша система будет состоять из пяти рабочих узлов и одного главного. Обмен данными между узлами будет происходить через HTTP протокол (для простоты понимания и реализации). Помимо этого, рабочие узлы поделим на две логических группы:
  • Мапперы (Mappers) -  рабочие узлы, которые будут выполнять функцию Map. Всего их будет три (127.0.0.1, 127.0.0.2, 127.0.0.3);
  • Редьюсеры (Reducers) - рабочие узлы, которые будут выполнять функцию Reduce. Всего их будет два (127.0.0.4, 127.0.0.5).
Тем не менее можно реализовать систему таким образом, чтобы один рабочий узел выполнял обе функции Map и Reduce. Но нами было решено разделить их на отдельные машины, чтобы продемонстрировать возможности Headless Service K8s. Подробнее о нем поговорим в разделе "Развертывание".

Схематично наша система будет выглядеть следующим образом:
Cluster diagram

1. Splitting

На вход системе подается следующая строка: ‘hello hello hello hello world world world’.

Сначала управление программой принадлежит главному узлу, который запрашивает доступные Мапперы с их IP-адресами. На основании количества полученных Мапперов мы делим исходный текст на приблизительно равные части. По окончанию этапа получаем следующий результат:
Каждому Мапперу по его IP назначается задание.

2. Mapping

Главный узел отправляет задания Мапперам и дожидается ответа от них. Мапперы выполняют вычисления параллельно друг другу. По окончанию этапа мы имеем следующий результат:
Каждый Маппер посчитал частоту повторений слов в своем задании.

3. Shuffling

Данные, полученные от Мапперов на предыдущем этапе, необходимо преобразовать в задания для Редьюсеров. Мы будем делать это на главном узле, чтобы не усложнять систему без необходимости. Тем не менее, этот процесс также можно распределить между рабочими узлами. Стоит отметить, что главный узел перед преобразованием данных запрашивает доступные Редьюсеры с их IP-адресами. По итогу этого этапа имеем следующий результат:
Каждому Редьюсеру по его IP назначается задание.

4. Reducing

Главный узел отправляет задания всем Редьюсерам и дожидается ответа от них. Редьюсеры выполняют вычисления параллельно друг другу. По итогу этого этапа мы имеем следующий результат, который является финальным ответом нашей системы:
Каждый Редьюсер подсчитал итоговое количество повторений переданных ему слов.
Описанную выше модель можно улучшить различными способами:
  • не передавать все данные между узлами напрямую в теле HTTP-запроса, а передавать, например, ссылки на хранилища, откуда можно будет извлечь необходимые данные;
  • реализация потоковой передачи данных между шагами;
  • обмениваться данными между узлами более легкими протоколами, например, gRPC;
  • передавать задания для рабочих узлов в виде lua/js кода (чтобы была возможность не только подсчитывать слова);
  • реализовать распределенный Shuffling и т. д.

Возможно, какие-то из этих улучшений разберем в будущих статьях. Пишите в комментариях, что вам будет интересно.

Разработка системы

Map

Начнем с разработки функции Map. На вход она принимает строку из слов, разделенных пробелом. На выход эта функция отдает сколько раз каждое слово встретилось во входящей строке. Результат возвращается в виде объекта "ключ-значение". Ключом является слово, а значением является количество повторений этого слова. Ответ кодируется в байт-код с помощью утилиты gob.

Пример входных данных:

Пример выходных данных:

Исходный код Маппера:

Сама функция Мар описывается в строках 6-35 и представляет собой функцию-обработчик, которая вызывается при GET запросе на роуте "/map". Функция "mapper()" всего лишь инициализирует HTTP-сервер с функцией Map.

Reduce

Дальше перейдем к разработке функции Reduce. На вход она должна принимать объект "ключ-значение" следующего формата:

Ключами являются слова, а значениями являются массивы содержащие количества повторений этих слов в разных частях исходного текста.

На выход функция Reduce должна отдавать объект "ключ-значение" с такими же ключами, но теперь их значениями являются целые числа, показывающие общее количество повторений слов во всем исходном тексте. То есть суммируются все значения во входных массивах для каждого слова. Пример выходных данных:

Входные и выходные данные функции Reduce закодированы в байт-код.

Исходный код Редьюсера:
Сама функция Reduce описывается в строках 6-38 и представляет собой функцию-обработчик, которая вызывается при GET запросе на роуте "/reduce". Функция "reducer()" всего лишь инициализирует HTTP-сервер с функцией Reduce.

Master

Сейчас переходим к самой сложной части - реализации функции главного узла, которая должна управлять Редьюсерами и Мапперами.

Алгоритм работы главного узла можно разделить на следующие этапы:
  1. Разбиение исходного текста на части, определение доступных Мапперов в системе и назначение задания для каждого Маппера (Splitting);
  2. Распределенное выполнение заданий Мапперами (Mapping);
  3. Определение доступных Редьюсеров, преобразование полученных от Мапперов данных в задания для Редьюсеров (Shuffling);
  4. Распределенное выполнение заданий Редьюсерами (Reducing).

Исходный код функции главного узла:

Основная функция описывается в строках 6-23 и вызывается при GET запросе на роуте "/compute". Функция "master()" всего лишь инициализирует HTTP-сервер с основной функцией.

Так как исходный код главного узла получается слишком большим, то мы его разбили на части в соответствии с этапами. Далее подробно разберем каждую часть.

1. SplittingНа 14-ой строчке кода мы с помощью утилиты net делаем запрос к DNS (который предоставляет нам Kubernetes) для получения IP-адресов всех доступных Мапперов. Благодаря этому мы можем не останавливать главный узел, когда будем увеличивать количество репликаций (копий) Мапперов. Это реализуется при помощи Kubernetes Headless Service, который мы будем рассматривать далее, на этапе развертывания.

2. Mapping

3. Shuffling

Мы преобразовываем данные, полученные от Мапперов, в задания для Редьюсеров:
4. Reducing

Пример данных, которые могут участвовать в коде 3 и 4 пунктов:

Конечное приложение

Мы разработали функции Map, Reduce и главного узла. Теперь мы должны объединить их в одно приложение. Для простоты реализации все сделаем в одном файле "main.go":

В строках 18-29 мы определяем функцию "main()", с которой начинается выполнение приложения. Внутри нее, в строке 19 мы достаем из env-конфига значение "TYPE". Это значение как раз и определяет то, какую роль исполняет данный экземпляр приложения - роль Маппера, роль Редьюсера или роль главного узла. Взависимости от этого в строках 21-28 вызывается нужная функция.

Конкретно как запускать несколько экземляров одного приложения на разных узлах и как передовать в каждый из них значение "TYPE" мы детально рассмотрим в следующем разделе.

Развертывание системы

Теория

В первую очередь опишем теоретический минимум, необходимый для общего понимания всего, что будет происходить далее в процессе развертывания.

Развертывать всю систему мы будем на кластере Kubernetes. Kubernetes представляется набором абстракций, которые называются Kubernetes Objects. Каждый такой Объект описывает какую-либо часть состояния вашей системы: развернутые контейнеризованные приложения, сетевые и жесткие диски, workloads и т.д. Далее кратко посмотрим только те Kubernetes Objects, которые будем использовать для развертывания нашей системы и их отношения между собой. Для более подробной информации, пожалуйста, обращайтесь к  документации Kubernetes.

1. Pod
Абстракция Kubernetes, представляющая группу из одного или нескольких контейнеров приложений (таких, как Docker или rkt) и некоторые общие ресурсы для этих контейнеров. Является самой маленькой структурной единицей в Kubernetes Objects.
Pods image
Источник изображения: Kubernetes.io
Pod'ы запускаются всегда на узле (Node), который может быть либо виртуальной, либо физической машиной в зависимости от кластера.
Nodes image
Источник изображения: Kubernetes.io
Каждый такой Pod мы будем использовать для хранения одного экземпляра приложения, завернутого в Docker. Следовательно, у нас будет три Pod'а с Маперрами, два Pod'а с Редьюсерами и один Pod с главным узлом.

2. Service
Абстракция Kubernetes, которая определяет логический набор Pod'ов и политику доступа к ним.
Services image
Источник изображения: Kubernetes.io
В нашей системе будут три Service'а: первый содержит Pod'ы с Мапперами, второй содержит Pod'ы с Редьюсерами, третий содержит Pod с главным узлом.

3. StatefulSet
Абстракция высшего уровня Kubernetes (также Controller), которая управляет развертыванием и масштабированием набора Pod'ов.

У нас будут два StatefulSet'а: один будет упралять группой Pod'ов с Мапперами, второй будет управлять группой Pod'ов с Редьюсерами. Для Главного узла StatefulSet не нужен, так он будет представлен только одним Pod'ом.


Для настройки каждого такого Kubernetes Object необходимо описать его конфигурацию в определенном формате (YAML или JSON) и передать эту конфигурацию в Kubernetes. Мы будем использовать YAML, как более стандартный для DevOps.

Headless Service

Важно также отдельно объяснить, что такое Headless Service. Headless Service является разновидностью обычного Service. Он нам нужен, чтоб мы могли получать IP-адреса Pod'ов внутри каждого Service'а и отправлять к ним запросы напрямую. В стандартном же Service'е все Pod'ы объединены под одним IP-адресом Service'а, который уже самостоятельно распределяет запросы между своими Pod'ами с помощью балансировщика нагрузки и других инструментов.

Мы используем эту особенность Headless Service в этих строках кода, когда получаем IP-адреса всех Pod'ов Редьюсеров и всех Pod'ов Мапперов:
Давайте посмотрим на разницу работы Service и Headless Service. Для этого мы отправим запросы к DNS при каждом из них отдельно. Использовать мы будем Kubernetes CLI.Kubernetes CLI.

Service:
Headless Service:
Здесь мы видим, что в случае с Headless Service создаются открываются IP-адреса для каждого Pod'а, а при Service все Pod'ы “объединяются” одним IP-адресом Service'а, которому они принадлежат.

Это очень удобно при реализации распределенных вычислительных систем. Мы можем сами контролировать то, какие данные и на какой Pod будут отправлятся.

Настройка главного узла (Master node)

В нашей реализации главный узел только один. Он формирует и отправляет задания Мапперам и Редьюсерам, выполняя при этом основную бизнес-логику. Поэтому у нас нет необходимости в нескольких репликациях (копиях/экземплярах) этого приложения, исходя из этого нам будет достаточно сделать один Pod конфиг и один Service конфиг для него.


В строках 14-24 мы определяем env-значения, которые будут передаваться экземпляру приложения при его запуске.

Настройка Редьюсеров

Редьюсер развертывается с помощью рассмотренного ранее Headless Service, так как мы хотим работать с каждой его репликацией напрямую. Вместо Pod конфига мы должны описывать StatefulSet конфиг, так как у нас будет несколько Pod'ов. Также, наличие StatefulSet конфига обязательно при использовании Headless Service.


Обратим внимание, что spec.selector.app == "reducers". Именно по этому селектору мы будем получать IP-адреса Pod'ов с Редьюсерами.

Настройка Мапперов

Мапперы настраиваются полностью аналогично Редьюсерам, отличаются только сами значения в конфигах.

Запуск системы

После того как мы настроили все конфигурации приложений осталось только запустить их. Делается это тремя следующими командами с помощью Kubernetes CLI:Мы передаем в Kubernetes все три наших YAML файла, которые мы выше сделали. На основании этих конфигураций Kubernetes запускает приложения так, как мы хотим.

Далее мы активируем прокси, чтобы иметь возможность делать HTTP запросы через указанный нами порт, а не напрямую через порт кластера:Делаем тестовый запрос к нашей системе:Ответ должен быть следующий:Мы нарошно оставили разбиение слов по IP-адресам, чтоб продемострировать распределенность вычислений.

Резюме

Выше мы спроектировали и реализовали небольшую распределённую вычислительную систему. Этой системе еще далеко до продакшена, но целью данной статьи является демонстрация того, что распределение вычислений в кластере машин — это не сложно. Тем более, когда у нас есть такой полезный инструмент как Kubernetes, благодаря которому не нужно быть профессиональным системным администратором.

Полностью готовую систему мы выложили в отдельный репозиторий на Github по этой ссылке - https://github.com/DigWing/simple-mapreduce. Спасибо за внимание! Пишите свои вопросы и пожелания насчет тем для будущих статей в комметариях ниже.
We use cookies for analytics and marketing. To find out more about our use of cookies, please see our Cookie policy. By continuing to browse our website, you agree to our use of cookies.