## Prototyping Parallel Programs on OpenShift

In the previous posts, we were introduced to parallel computing and saw how it can speed up computation and by what factor (depending on the algorithm). We also introduced Chapel, a language for parallel computation, developed by Cray. Using Chapel, we first developed a very simple parallel algorithm to compute the sum of a list of numbers. Then we learned how to implement parallel matrix multiplication. In this post, we will talk about how to run our parallel programs using a cluster of machines. The source code referenced here can be found on github.

To run our parallel program, we will need to set up a bare-metal server or a Virtual Machine (VM), install Chapel and clone it to N instances. A shared filesystem will be mounted on a pre-defined directory on each instance. This filesystem will allow all instances to load data from the same location. Any output from each instance will also be written to this filesystem (although we must ensure they write to different files or directories to avoid corruption of data).

Here is how our setup will look like:

We will also need to setup the following:

• SSH server – this will run as an ordinary user using port 2222. The Chapel platform will connect to port 2222 on each machine.

We will need to export the following environment variables to enable parallel computation across different machines. The purpose of those variables can be found here. The variable GASNET_SSH_SERVERS is a space-separated list of IP addresses or hostnames of our nodes. In our setup, we will use the following:

export CHPL_COMM=gasnet
export CHPL_LAUNCHER=amudprun
export GASNET_SSH_SERVERS="node00 node01 node10 node11"


where node00, node01, node10, node11 are hostnames of our machines/nodes.

## How to Run a Parallel Program

We will run our parallel matrix_multiplication on our cluster. To launch the program, we will login to one of the nodes and change directory to /home/chapel containing the mat_mul binary and issue the following command:

chpl mat_mul -nl 4


## Running in OpenShift

If having to provision a bare-metal or VM environment for parallel computing is costly, we can turn to containers for a much cheaper and faster way to provision. To make our lives easier, we will use OpenShift Origin platform to run our parallel computing environment.

In the succeeding sections, we assume that we have a working installation of OpenShift. See the instructions here on how to set up OpenShift on VirtualBox or here to install a single-node OpenShift in AWS.

### Containerizing the “Node”

We will have to build a container image of the bare metal or VM we mentioned above. The format we will use is Docker. First, we clone the project from github:

git clone https://github.com/corpbob/chapel-openshift-demo.git


Change directory to chapel-openshift-demo and build the image:

docker built -t chapel .


Create the chapel project on OpenShift. We use the admin user in this example:

oc login -u admin
oc new-project chapel


We need to know the ip address of the docker-registry service in OpenShift. We execute the following command:

[root@openshift chapel-openshift-demo]# oc describe svc docker-registry -n default
Name:			docker-registry
Namespace:		default
Labels:			docker-registry=default
Selector:		docker-registry=default
Type:			ClusterIP
IP:			172.30.1.1
Port:			5000-tcp	5000/TCP
Endpoints:		172.17.0.7:5000
Session Affinity:	ClientIP
No events.


Line 7 (highlighted) of the output gives us the IP address of the docker-registry service. We will use this to push the Chapel image we just created to the registry. First we login to the docker registry:

docker login -u admin -p $(oc whoami -t) 172.30.1.1:5000  Tag the chapel image using the following format <registry_ip>:<port>/<project_name>/<image_name>:<version>  For our example, we use the following command to tag the image: docker tag chapel 172.30.1.1:5000/chapel/chapel:v0.1  We can now push the image to the internal docker registry: docker push 172.30.1.1:5000/chapel/chapel  ### Importing the Chapel Template In the previous section, we built the container image of Chapel and pushed it to the private docker registry. In this section, we will import a template that will do the following: • Set up the Chapel containers inside OpenShift • Create a file that will dynamically generate the variable GASNET_SSH_SERVERS containing the IP addresses of the Chapel pods that will be used in the parallel computation. The name of the template is chapel.yml. Import the template using the command oc create -f chapel.yml  We need to give the default service account the view role so that it can read the IP addresses of the pods associated with chapel. To do this, execute the command: oc policy add-role-to-user view system:serviceaccount:chapel:default  After this we can now create the chapel application: oc new-app chapel  This will automatically trigger a deployment of 4 chapel instances with a shared volume mounted at /home/chapel/mnt. The “hook-post” pod is a separate instance of the chapel image that will execute the following commands echo "export GASNET_MASTERIP=\$MY_NODE_IP" > /home/chapel/mnt/exports && \
echo "export GASNET_SSH_OPTIONS=\"-p 2222\"" >> /home/chapel/mnt/exports && \
for pod in oc get pods -l app=chapel|grep chapel|awk '{print $1}'; \ do \ oc describe pod$pod |grep ^IP:|awk '{print $2}'; \ done| \ awk 'BEGIN { x="" } \ {x = x$1" "} \
END {print "export GASNET_SSH_SERVERS=\""x"\""}' >> \
/home/chapel/mnt/exports


The output of the above command is a file named exports and looks like the below:

### Running the Sample Program

We now go to the web console-> Applications -> Pods. Select any of the pods and click Terminal. In the /home/chapel directory, there is a file named run-test.sh. This file contains the following commands:

export GASNET_SPAWNFN=S
source /home/chapel/mnt/exports

./hello6-taskpar-dist -nl $*  The commands above executes the pre-compiled chapel binary hello6-taskpar-dist which was compiled when we built the container image earlier. Executing this file gives us: # the parameter 4 tells chapel to use 4 pods to execute the command. sh-4.2$ ./run-test.sh 4
Warning: Permanently added '[172.17.0.16]:2222' (ECDSA) to the list of known hosts.
Warning: Permanently added '[172.17.0.18]:2222' (ECDSA) to the list of known hosts.
Warning: Permanently added '[172.17.0.15]:2222' (ECDSA) to the list of known hosts.
Warning: Permanently added '[172.17.0.17]:2222' (ECDSA) to the list of known hosts.
Hello, world! (from locale 0 of 4 named chapel-1-fgg4b)
Hello, world! (from locale 2 of 4 named chapel-1-pt3v9)
Hello, world! (from locale 1 of 4 named chapel-1-rg668)
Hello, world! (from locale 3 of 4 named chapel-1-rw2rc)


### Running the Parallel Matrix Multiplication

Copy the file mat_mul.chpl to the pod and compile.

chpl mat_mul.chpl -o mnt/mat_mul


The command above will place the resulting binary inside the directory /home/chapel/mnt. This will be accessible from all pods.

Finally, execute the parallel matrix multiplication:

./run.sh mnt/mat_mul 4


## Conclusion

I have not tried this in a real production environment or even on a bare-metal installation of OpenShift. This seems to be a promising use of OpenShift but I still have to find out.

## Implementing Parallel Algorithms Part 2

In the previous post, we implemented a very simple parallel program to add a set of numbers. In this post, we will implement parallel matrix multiplication.

We have shown a parallel algorithm to multiply 2 big matrices using message passing. The algorithm involved block sub-matrices to be passed from node to node and multiplied within a node until the answer is found.

There will be 2 square matrices A and B. In our example, the dimension of both A and B is 4×4. We will distribute the matrices evenly to 4 nodes.

$A = \left[ \begin{array}{cc|cc} a_{00} & a_{01} & a_{02} & a_{03}\\ a_{10} & a_{11} & a_{12} & a_{13}\\ \hline a_{20} & a_{21} & a_{22} & a_{23}\\ a_{30} & a_{31} & a_{32} & a_{33} \end{array} \right], B = \left[ \begin{array}{cc|cc} b_{00} & b_{01} & b_{02} & b_{03}\\ b_{10} & b_{11} & b_{12} & b_{13}\\ \hline b_{20} & b_{21} & b_{22} & b_{23}\\ b_{30} & b_{31} & b_{32} & b_{33} \end{array} \right]$

In this example, node 00 will have the following matrices:

$A_{00}=\begin{bmatrix} a_{00} & a_{01}\\ a_{10} & a_{11} \end{bmatrix}, B_{00}= \begin{bmatrix} b_{00} & b_{01}\\ b_{10} & b_{11} \end{bmatrix}$

const n = 4;
var vec = 1..n;
var blockSize = 2;

var A: [vec, vec] real;
var B: [vec, vec] real;
var C: [vec, vec] real;

coforall loc in Locales {
on loc {
var i = loc.id/2;
var j = loc.id%2;
var istart = i*blockSize;
var iend = istart + blockSize;
var jstart = j*blockSize;
var jend = jstart + blockSize;

for (r,s) in {istart + 1..iend, jstart + 1..jend} {
B(r,s) = r+s;
A(r,s) = 2*r + s;
}
}
}


Each node has limited memory physically exclusive to itself. In order for node A to have access to the contents of the memory of another node B, node B should pass the data to node A. Fortunately, Chapel can use a library called GASNet that allows each node to have a global view of the memory of all nodes participating in the computation.

In the code above, each node loads its own data. However, the GASNet library allows each node to access the matrix elements loaded by the other nodes. Consequently, we are able to reference the sub-matrix held by each node without doing fancy message passing. The algorithm is then a straightforward implementation of

$\displaystyle \mathbf{C}_{ij}=\sum_{k=0}^{2} \mathbf{A}_{ik} \mathbf{B}_{kj}$

where $\mathbf{A_{ij}}$, $\mathbf{B_{ij}}$ and $\mathbf{C_{ij}}$ are submatrices of $\mathbf{A}$, $\mathbf{B}$, and $\mathbf{C}$, respectively.

Below is the straightforward implementation of parallel block multiplication:

coforall loc in Locales {
on loc {
var i = loc.id/2;
var j= loc.id%2;
var istart = i*blockSize;
var iend = istart + blockSize;
var jstart = j*blockSize;
var jend = jstart + blockSize;
var r = { istart + 1..iend, jstart + 1..jend };
ref W = C[r].reindex( { 1..2,1..2 });

coforall k in 0..1 {
var U=get_block_matrix(A[vec,vec],i,k,blockSize);
var V=get_block_matrix(B[vec,vec],k,j,blockSize);
var P = mat_mul(U,V);
coforall (s,t) in { 1..2,1..2 } {
W(s,t) += P(s,t);
}
}
}
}


The procedure get_block_matrix will return the sub-matrix given the (i,j)th index and the block size.

proc get_block_matrix(A: [?D], i:int, j:int , blockSize:int) {
var r = { i*blockSize+1 .. i*blockSize
+  blockSize, j*blockSize
+ 1 .. j*blockSize + blockSize };
return A[r];
}


The procedure mat_mul will return the matrix product of two sub-matrices:

proc mat_mul(A: [?D1], B: [?D2]) {
var D3 = { 1..2, 1..2 };
var C: [D3] real;
var AA = A.reindex({1..2,1..2});
var BB = B.reindex({1..2,1..2});

for row in 1..2 {
for col in 1..2 {
var sum:real = 0;
for k in 1..2 {
sum += AA(row,k) * BB(k,col);
}
C(row,col) = sum;
}
}
return C;
}

writeln(C[vec,vec]);


To run this code, we need to set the following environment variables:

source \$CHPL_HOME/util/setchplenv.bash

export CHPL_COMM=gasnet
export CHPL_LAUNCHER=amudprun

export GASNET_SSH_SERVERS="127.0.0.1 127.0.0.1 127.0.0.1 127.0.0.1"


Compiling and running this program gives the output:

#Compile
chpl mat_mul.chpl -o mat_mul

# Run using 4 nodes
./mat_mul -nl 4

A=
3.0 4.0 5.0 6.0
5.0 6.0 7.0 8.0
7.0 8.0 9.0 10.0
9.0 10.0 11.0 12.0
B=
2.0 3.0 4.0 5.0
3.0 4.0 5.0 6.0
4.0 5.0 6.0 7.0
5.0 6.0 7.0 8.0
C=
68.0 86.0 104.0 122.0
96.0 122.0 148.0 174.0
124.0 158.0 192.0 226.0
152.0 194.0 236.0 278.0


## Implementing Parallel Algorithms Part 1

Now we know that parallel algorithms allow us to make our programs run faster. So how do we implement them?

I have used mpich before, but that was more than a decade ago. Recently, I found myself looking for new ways of doing parallel programming. I discovered a very nice parallel programming language called Chapel. This is what we’ll use to implement parallel algorithms.

## Algorithm 1: Parallel sum of consecutive numbers from 1 to N

To get the sum of numbers from 1 to N, where N is some integer is easy. There is a formula to do that:

$\displaystyle \sum_{i=1}^N i = \frac{N(N+1)}{2}$

However for the sake of illustration, we are going to compute the sum of 1 to N using a cluster of machines. Here is the Chapel code to accomplish it inside the file add_parallel.chpl.

config var N:int = 1000;
var D:domain(1) = {0..numLocales -1};
var s:[D] int;
var sum:int= 0;
var bs = N/numLocales;
coforall loc in Locales {
on loc {
var i = loc.id;
var start = i*bs + 1;
var end = start + bs -1;
var _sum:int = 0;
for j in start .. end {
_sum += j;
}
writeln("i= " + i + ", start= "+ start + ", end=" + end + ", sum = " + _sum);
s[i] = _sum;
}
}

sum = + reduce s;
writeln("sum: " + sum);


This program is compiled using the command:

chpl add_parallel.chpl -o add_parallel


where add_parallel.chpl is the filename of the program and -o add_parallel specifies the filename of the binary produced after compilation.

One line 1, we have defined the default value of N to be 1000. This can be overridden on the command line by specifying the --x parameter. The number of machines we are going to use is also specified on the command line using the -nl parameter. A sample invocation of this program is the following:

./add_parallel -nl 3 --N=120


The above command means that we want to run the add_parallel binary using 3 machines with the value of N=120.

Executing the above command will give us:

./add_parallel -nl 3 --N=120
i= 0, start= 1, end=40, sum = 820
i= 1, start= 41, end=80, sum = 2420
i= 2, start= 81, end=120, sum = 4020
sum: 7260


## How the program works

The program will partition N into 3 blocks. This is specified on line 5 where we divided N by numLocales to get the block size. The numLocales will contains the value of the parameter -nl which in this example is 3.

The code on line 6 tells chapel to execute a parallel for-loop executing the code inside on loc block on each Locale. A Locale has an id starting from 0. The Locale will determine it’s id and compute the starting and ending number to sum and store this value in the variable _sum.

coforall loc in Locales {
on loc {
var i = loc.id;
var start = i*bs + 1;
var end = start + bs -1;
var _sum:int = 0;
for j in start .. end {
_sum += j;
}
writeln("i= " + i + ", start= "+ start + ", end=" + end + ", sum = " + _sum);
s[i] = _sum;
}
}


This _sum is stored in the array s. We take the sum of entries of the s array using the reduce keyword specifying + as the reduction operator. Finally we print the total sum across the machines.

sum = + reduce s;
writeln("sum: " + sum);


## Conclusion

We have seen that that we can implement parallel programs using Chapel programming language. In part 2, we will show how to do Parallel Matrix Multiplication using Chapel.

## A Very Brief Introduction to Parallel Computing

Imagine you were given a hundred 3-digit numbers to add, how much time would it take you to get the answer? If it would take you 30 seconds to add ten numbers (using a calculator), then it would take you about 300 seconds (or 5 minutes) to add 100 numbers.

Now imagine there are a hundred people and each person has a number. How would a hundred people compute the sum of all numbers? Seems like a recipe for disaster. However, we can do this:

1. Group the people by two, if there is an extra person with no group, this person can join the nearest group (to make a group of 3 people).
2. Each group will add the numbers that they have to get the sum S.
3. Each group will nominate a representative that will carry this new number S. The remaining members can sit down.
4. Repeat step 1 until there is only one person remaining.
5. The last person remaining will have the sum of the 100 numbers.

At the beginning, we have 100 people in groups of two. It takes 3 seconds for them to add their respective numbers. In the next iteration, only 50 people remain that will then add their numbers. Continuing in this way, the remaining number of people will be halved until in the 7th iteration we get our answers. So if each iteration can execute in 3 seconds, then it will take 21 seconds for a hundred people to compute the sum of 100 numbers!

I mentioned that in the 7th iteration, we are able to get the answer. There is a formula to get the number of iterations and it is

$\displaystyle \lceil\log_2(n)\rceil$

where n is the number of people to start with and the symbol $\lceil\rceil$ is the ceiling function which rounds off the result of the function $\log_2(n)$ to the next highest integer. If $n=100$, the number of iterations is

$\displaystyle \lceil\log_2 100\rceil = 7$

Having a hundred people to compute the sum of 100 numbers might be practically infeasible. We might not have the space to accommodate them. However, if we only have say 10 people, we can still have a faster computation.

## Map Reduce

Given a hundred numbers, we can group the numbers by 10 and distribute it to 10 people. Each person will then add the numbers they have and combine the sum with the rest of the participants to get the sum of 100 numbers in 1/10th of the time.

Grouping the numbers into smaller subsets and distributing them to each person is called mapping. Combining the sum computed by each person to the total sum is called reduction. This is called map-reduce and the example given is a simple example of map reduce.

## A More Complex Example

Let $\mathbf A$ and $\mathbf B$ be two matrices. The product of matrices $\mathbf A$ and $\mathbf B$ is the matrix $\mathbf C$

$\displaystyle \begin{bmatrix} c_{00} & c_{01} & \cdots & c_{0n}\\ c_{10} & c_{11} & \cdots & c_{1n}\\ \cdots & \cdots & \cdots & \cdots\\ c_{n0} & \cdots & \cdots & c_{nn} \end{bmatrix} = \begin{bmatrix} a_{00} & a_{01} & \cdots & a_{0n}\\ a_{10} & a_{11} & \cdots & a_{1n}\\ \cdots & \cdots & \cdots & \cdots\\ a_{n0} & \cdots & \cdots & a_{nn} \end{bmatrix} \begin{bmatrix} b_{00} & b_{01} & \cdots & b_{0n}\\ b_{10} & b_{11} & \cdots & b_{1n}\\ \cdots & \cdots & \cdots & \cdots\\ b_{n0} & \cdots & \cdots & b_{nn} \end{bmatrix}$

such that

$\displaystyle c_{ij} = \sum_{k=0}^{n-1} a_{ik}b_{kj}$

where $c_{ij}$ is the entry of the matrix C on row i and column j.

Here is a sequential algorithm to compute the matrix C:

//a and b are nxn matrices
//c[i][j] is initialized to 0 for all i,j
for(var i=0;i<n;i++){
for(var j=0;j<n;j++){
for(var k=0;k<n;k++){
c[i][j] += a[i][k] * b[k][j]
}
}
}


There are 3 loops in the above algorithm, the innermost loop will execute n times to compute the sum of element-wise product of row i and column j. In the diagram below, the inner loop will do the following: multiply each element inside the of the box of matrix A and the elements inside the box of matrix B and take the sum. Since there are n such products, the number of addition operations is n.

Then you have to do this n times for each column of matrix B and n times for each row of matrix A, as shown below, for a total of $n^3$ operations.

So if you to multiply a matrix with n=100, you will need to execute $100^3$ (or 1 million) operations!

Parallel Computing can help us here.

## Parallel Matrix Multiplication

The good thing about matrix multiplication is that we can multiply by blocks. For the sake of simplicity, suppose we have 2 square matrices of dimension 100. We can divide the matrices into small square sub-matrices of dimension 25:

$A= \left[\begin{array}{c|c|c|c} \mathbf A_{00} & \mathbf A_{01} & \mathbf A_{02} & \mathbf A_{03}\\ \hline \mathbf A_{10} & \mathbf A_{11} & \mathbf A_{12} & \mathbf A_{13}\\ \hline \mathbf A_{20} & \mathbf A_{21} & \mathbf A_{22} & \mathbf A_{23}\\ \hline \mathbf A_{30} & \mathbf A_{31} & \mathbf A_{32} & \mathbf A_{33} \end{array} \right] , B= \left[\begin{array}{c|c|c|c} \mathbf B_{00} & \mathbf B_{01} & \mathbf B_{02} & \mathbf B_{03}\\ \hline \mathbf B_{10} & \mathbf B_{11} & \mathbf B_{12} & \mathbf B_{13}\\ \hline \mathbf B_{20} & \mathbf B_{21} & \mathbf B_{22} & \mathbf B_{23}\\ \hline \mathbf B_{30} & \mathbf B_{31} & \mathbf B_{32} & \mathbf B_{33} \end{array} \right]$

where each $\mathbf A_{ij}$ and $\mathbf B_{ij}$ are square matrices of dimension 25.

We can then compute the resulting sub-matrix of C using the usual formula:

$\displaystyle \mathbf C_{ij} = \sum_{k=0}^{p-1} \mathbf A_{ik}\mathbf B_{jk}$

where $p=100/25=4$.

For example, to compute $\mathbf C_{00}$ we have

$\mathbf C_{00} = \mathbf A_{00} \mathbf B_{00} + \mathbf A_{01} \mathbf B_{10} + \mathbf A_{02} \mathbf B_{20} + \mathbf A_{03} \mathbf B_{30}$

We then use this mechanism to distribute our sub-matrices to different computers (or in modern parlance “compute nodes”). For this example, we need 4×4=16 compute nodes. Each compute node will contain 2 sub-matrices, one for A and one for B. For easy visualization, we can make a drawing of our compute nodes arranged in a square of side 4 and labelled as shown below:

Now that we have evenly distributed our matrix data to each node, the next question is how do we compute? Notice that not one node contains all the data. Each node has a very limited subset of the data.

### The Trick

We can get a clue by looking at the end result of the computation for Node 00. At the end of the computation, Node 00 should have the following result:

$\mathbf C_{00} = \mathbf A_{00} \mathbf B_{00} + \mathbf A_{01} \mathbf B_{10} + \mathbf A_{02} \mathbf B_{20} + \mathbf A_{03} \mathbf B_{30}$

Looking at the above formula, Node 00 only has the following data

$\mathbf A_{00} \text{ and } \mathbf B_{00}$

The rest of the sub-matrices are not in its memory. Therefore, Node 00 can only compute

$\mathbf A_{00} \times \mathbf B_{00}$

We are missing the following products:

$\mathbf A_{01}\mathbf B_{10}, \mathbf A_{02}\mathbf B_{20}, \text{ and } \mathbf A_{03}\mathbf B_{30}$

The matrix $\mathbf A_{01}$ is with Node 01 and the matrix $\mathbf B_{10}$ is with Node 10. So if Node 01 can send matrix $\mathbf A_{01}$ to Node 00 and Node 10 can send matrix $\mathbf B_{10}$ to Node 00, we can then get the product $\mathbf A_{01}\mathbf B_{10}$. In fact, if we do a slight rearrangement like the below, we can use the following algorithm to compute matrix $\mathbf C$:

1. Each node will send the current $A$ sub-matrix to the node on the left and receive a new $A$ sub-matrix from the node on the right. If there is no node on the left, it will send the sub-matrix to the last node on its row.
2. Each node will send the $B$ sub-matrix to the node on top and receive a new $B$ sub-matrix from the node below it. If there is no node on top, it will send the sub-matrix to the last node on its column.
3. Multiply the new sub-matrices and add the result to the current value of the sub-matrix of C that it keeps in memory.
4. Repeat until the number of iterations equal to N, where $N^2$ is the number of nodes.

The figure below describes this algorithm for Node 00.

Doing this for all nodes, we can visualize the parallel algorithm at work on all nodes as shown in the animation below:

This algorithm is one of my favorite algorithms since it looks like the pumping of blood in to the heart.

### How fast is this algorithm?

Given a 2 square matrices of dimension N, the number of sequential computations is $O(N^3)$. Using parallel matrix multiplication above, we divide the matrices into sub-matrices of dimension $n$. The resulting block matrix is of dimension $N/m=p$. Each node will now compute the product in parallel using $O(n^3)$ operations per sub-matrix multiplication. Since there are $p$ multiplications per node, the number of operations per node is $O(pn^3)$ The ratio of these two quantities will give us how fast the parallel algorithm is

$\displaystyle \frac{N^3}{pn^3} = \frac{1}{p}\frac{N^3}{n^3} = \frac{1}{p}\cdot p^3 = p^2$

For our particular example, the theoretical speedup is $p^2 = 4^2 = 16$, that is, our parallel algorithm can compute the product 16 times faster than the sequential algorithm. If we increase p, we can increase the speedup. However, we can only increase p up to a certain point as the network will then become the bottleneck and will make things slower than the sequential algorithm.

In the next 3 posts, we will see how to program simple parallel algorithms and parallel matrix multiplication. We will also show how you can use OpenShift to prototype your parallel programs.