Development of a distributed computing system based on MapReduce and Kubernetes
January 10, 2019
The need to process big data and handle high loads is becoming an increasingly important issue for IT projects. But there are few guides and practical examples of how to do this on the Internet. Therefore, the goal of this article is to fill this gap and show an example of a distributed computing system development that helps to solve the above issue.
The Digital Wing team uses the concerned distributed computing system architecture in its production projects. But in this article we simplified it a bit to make it easier to understand the essence.
We collected all code from this article into a separate project and published it on GitHub. Therefore, you can try to run this system on your PC. We left the link to this GitHub repository at the end of the article.
The plan of the article is as follows:
- Theoretical part
- System designed
- System development
- System deployment
We will use Go as a programming language, as it is most suitable for writing microservices and high-load backends.
MapReduceMapReduce is a distributed computing model created by Google. It is used when there is so much source data that we cannot perform computations on a single server (since it will be too long), and we distribute these computations among several machines united in a cluster.
Cluster is a group of computers connected by high-speed communication channels.
Computational nodes in a distributed system can be divided into two categories by purpose:
- Master node - a controlling server that coordinates the work of other nodes. Usually there is only one such node in a distributed system.
- Worker nodes - servers that do some useful computing work. They are started by a Master node and controlled by it. The number of working nodes can be unlimited in the distributed system.
The MapReduce model consists of the following successive steps:
All input data that our system has to process is divided by some algorithm into approximately equal portions. The number of such portions is usually limited by the number of worker nodes available in the cluster, which are intended to process this data.
Such sets with data to be processed will be referred to as tasks in the following.
In this step, we apply the Map function to all the tasks created in the previous stage. Each task is processed by a separate node and all computations on these nodes occur parallel to each other. This stage ends as soon as the last node has completed the execution of the Map function. Worker nodes performs all this computations. Master node just sends the tasks to the worker nodes and receives a responses from them.
The Map function is abstract. It represents the main action that a distributed computing system has to perform on the source data, and its implementation directly depends on the purpose of this system.
Converting the data received at the output of the Map function as a result of the previous stage into tasks for the next step. This step is optional and may be omitted if this data does not require any conversion. Also, if the computations implied at this stage are laborious, then they can be distributed between worker nodes in the same way as in the previous step.
We apply the Reduce function to all tasks obtained as a result of the previous stage. Calculations occur in parallel on several worker nodes, similar to Mapping stage. The result of the Reduce function on all worker nodes is the final answer we expect from a distributed computing system. This result is accumulated in master node.
The Reduce function is abstract. It can be said that it converts the data set to a single atomic value. In the following parts of the article it will be more clearer what the Reduce function is.
Kubernetes (K8s)Kubernetes is software for automating the deployment, scaling, and management of application containers on a cluster of nodes. For more information about Kubernetes, you can visit the official site and study the documentation. In this article, we will look at some of its functionality as applied to our distributed computing system and explore the capabilities of the Headless Service, whose usage samples are rather difficult to find on the network.
If you want to start the system on your own computer, you need to pre-install and configure the Kubernetes cluster. The minikube utility is great for local development and testing. You can find instructions for installing on the official website.
As an example, in this article we will consider a distributed computing system, which will calculate the frequency of occurrence of all words in the text.
The image below shows this process in accordance with the MapReduce model:Further, we will investigate each stage separately in more detail. But first, let's agree that our cluster will consist of five worker nodes and single master node. Data exchange between nodes will occur via the HTTP protocol (for easier understanding and implementation). Also, we will divide the worker nodes into two groups:
However, we can create the system in such a way that each worker node performs both Map and Reduce functions. But we decided to divide them into separate worker nodes in order to demonstrate the capabilities of Headless Service of Kubernetes. We will talk more about this in the "Deployment" section.
- Mappers are the workers nodes that will only perform the Map function. There will be three of them (127.0.0.1, 127.0.0.2, 127.0.0.3);
- Reducers are the workers nodes that will only perform the Reduce function. There will be two of them (127.0.0.4, 127.0.0.5).
Schematically, our system will look like this:
1. SplittingThe following string is input into the system: ‘hello hello hello hello world world world’.
Primarily the control of the program belongs to the master node, which requests the available Mappers with their IP addresses. Based on the number of the Mappers received, we divide the input text into approximately equal parts. At the output of this stage we get the following result:
Each Mapper is assigned a task by its IP.
2. MappingThe master node sends tasks to all available Mappers and receives a responses from them. Mappers perform computations parallel to each other. At the end of this stage, we have the following result:
Each Mapper has counted the word repetition rate in its task.
3. ShufflingThe data received from the Mappers in the previous step should be converted into tasks for the Reducers. We will do this on the master node in order not to needlessly complicate the system. However, this process can also be distributed among the worker nodes. It is worth noting that before converting the data, the master node requests available Reducers with their IP addresses. At the end of this stage we have the following result:
Each Reducer is assigned a task by its IP.
4. ReducingThe master node sends tasks to all available Reducers and receives a responses from them. Reducers perform computations parallel to each other. At the end of this stage, we have the following result, which is the final response of our system:
Each Reducer have counted the total number of repetitions of the words transmitted to it.The above-described model can be improved in various ways:
- not transferring all data between nodes directly in the body of the HTTP requests, but transmitting, for example, links to the data storages, where each node can retrieve the necessary data;
- implementation of streaming data between steps;
- exchanging of data between nodes with faster protocols, for example, gRPC;
- transmitting tasks to worker nodes in the format of lua/js code (to make possible not only counting words);
- implementing the distributed Shuffling etc.
Perhaps some of these improvements will be discussed in future articles. Write in comments below what will be interesting for you.
MapLet's start with the development of the Map function. At the input, it receives a string of words separated by a space. At the output, it returns how many times each word was repeated in this string. The result is returned as a key-value data structure. The key is the word, and the value is the number of repetitions of this word. The answer is encoded in bytes using the gob utility.
Example of input data:
Example of output data:
Source code of the Mapper:
The Map function is described in lines 6-32. It is a handler function called when a GET request is made on the route "/map". The "mapper()" function only initializes an HTTP server with the Map function.
ReduceWe will proceed with the development of the Reduce function. At the input, it should receive a key-value object of the following format:
The keys are words, and the values are arrays containing the number of repetitions of these words in different parts of the source text.
The Reduce function should output the key-value object with the same keys, but now their values are integers, showing the total number of repetitions of word in the entire source text. That is, all the values in the input arrays for each word are summed. Example of output data:
Input and output data are encoded in bytes.
Source code of the Reducer:
The Map function is described in lines 6-38. It is a handler function called when a GET request is made on the route "/reduce". The "reducer()" function only initializes an HTTP server with the Reduce function.
MasterWe now proceed to the most difficult part - the development of the master function, which should control the Reducers and Mappers.
The algorithm of the master function can be divided into the following stages:
- Splitting the source text into parts, determining the available Mappers in the system and task assignment for each Mapper (Splitting);
- Distributed execution of tasks by the Mappers (Mapping);
- Identifying available Reducers and converting the data received from the Mappers into tasks for Reducers (Shuffling);
- Distributed execution of tasks by the Reducers (Reducing).
Source code of the master node:
The main function is described in lines 6-23 and called when a GET request is made on the route "/compute". The "master()" function just initializes the HTTP server with the main function.
Since the source code of the master function is too large, we have divided it into parts. Next, we will analyze each part in detail.
We convert the data received from the Mappers into tasks for the Reducers:
An example of data that can be in the code 3rd and 4th points:
Final applicationWe have developed the Map, Reduce and Master functions. Now we have to combine them into one application. For ease of implementation, let's do everything in a single file "main.go":
In lines 18-29, we describe the "main ()" function, which starts the execution of the application. In line 19 we extract the value "TYPE" from the env config. This value determines the role executed by this application instance: the Mapper, the Reducer, or the Master. Depending on this, the required function is called in lines 21-28.
We will examine in detail in the next section how to run several instances of the same application and how to transmit the value of "TYPE" to each of them.
TheoryFirst of all, we will describe the theoretical minimum necessary for a general understanding of everything that will happen further in the deployment process.
We will deploy the entire system on the Kubernetes cluster. Kubernetes is represented by a number of abstractions called Kubernetes Objects. Each such Object describe some part of the state of your system: deployed containerized applications, network and disk resources, workloads, etc. Next, we will take a brief look only at those Kubernetes Objects that we will use to deploy our system and their relations with each other. For more information, please refer to the documentation of Kubernetes.
A Pod is a Kubernetes abstraction that represents a group of one or more application containers (such as Docker or rkt), and some shared resources for those containers. It is the smallest structural unit in the Kubernetes Objects.
Source of image: Kubernetes.io
Pods always runs on a Node, which can be either a virtual or a physical machine, depending on the cluster.
Source of image: Kubernetes.io
We will use each such Pod to store one application instance containerized by Docker. Consequently, we will have three Pods with Mappers, two Pods with Reducers and a Pod with the Master.
A Service in Kubernetes is an abstraction which defines a logical set of Pods and an access policy for them.
Source of image: Kubernetes.io
In our system there will be three Services: the first one will contain Pods with Mappers, the second one will contain Pods with Reducers, and the third one will contain a Pod with the Master.
StatefulSet is a Kubernetes higher-level abstraction (also called a Controller), which manages the deployment and scaling of a set of Pods.
We will have two StatefulSets: the first one will manage a group of Pods with Mappers and the second one will manage a group of Pods with Reducers. For the Master, StatefulSet is not needed, as it will be represented by only one Pod.
To setup each such Kubernetes Object, we need to describe its configuration in a specific format (YAML or JSON) and transmit this configuration to Kubernetes. We will use YAML format as more conventional for DevOps.
Headless ServiceIt is also important to explain what a Headless Service is. A Headless Service is a variation of the usual Service. We need it for receiving the IP addresses of all Pods within each Service and sending tasks to the Pods directly. In the standard Service, all the Pods are united by a single IP address of the Service, which distributes requests between its Pods using a load balancer and other tools.
We use this Headless Service feature in these code lines when we want to receive the IP addresses of all the Reducers' and Mappers' Pods:
Let's now follow the difference between the Service and Headless Service work. To do this, we will send queries to the DNS. We will use Kubernetes CLI.
Here we see that in the case of the Headless Service, separate IP addresses are created for each Pod. When we use the usual Service all Pods are "united" by the same IP address of the Service to which they belong.
This is very useful when implementing distributed computing systems. We can control for ourselves what data will be sent to which Pod.
Configuration of MasterIn our system, there is only one Master. It creates and distributes tasks to Mappers and Reducers, while fulfilling the basic business logic. Therefore, we do not need several replications of this application, so it will be enough for us to make one Pod config and one Service config for it.
In lines 14-24, we define env values that will be provided to the application instance when it starts.
Configuration of ReducerA Reducer is deployed using the previously reviewed Headless Service, since we want to send request directly to each of its replications. But instead of the Pod config, we have to create the StatefulSet config, since we will have several Pods with the Reducers. The presence of a StatefulSet config is also required when using the Headless Service.
Note that spec.selector.app == "reducers". Using this selector we will receive the IP addresses of the Pods with the Reducers.
Configuration of MapperMappers are configured completely the same as Reducers, only the values in the YAML file differ.
Launch of the systemAfter we have created all the application configurations, all that remains is to launch the system. This can be done by the following three commands using Kubernetes CLI:We transmit to Kubernetes all three of our YAML files, which we have created above. Based on these configurations, Kubernetes runs applications the way we want.
Next, we activate the proxy in order to be able to make HTTP requests through the specified port, and not directly through the cluster port:
Doing a test HTTP request to our system:
The response should be the following:We deliberately left word splitting to IP addresses to demonstrate the distribution of computations.
Above, we have designed and developed a small distributed computing system. This system is still far from production, but the goal of this article is to demonstrate that the distribution of computations in a cluster of machines is not difficult. Especially when we have such a useful tool as Kubernetes, thanks to which you don't need to be a professional system administrator.
We have published a project with the complete system in a repository on Github via the link - https://github.com/DigWing/simple-mapreduce. Thank you for your attention! Write your questions and suggestions concerning the topics of the future articles in the comments below.