## Implementing Parallel Algorithms Part 2

In the previous post, we implemented a very simple parallel program to add a set of numbers. In this post, we will implement parallel matrix multiplication.

We have shown a parallel algorithm to multiply 2 big matrices using message passing. The algorithm involved block sub-matrices to be passed from node to node and multiplied within a node until the answer is found.

There will be 2 square matrices A and B. In our example, the dimension of both A and B is 4×4. We will distribute the matrices evenly to 4 nodes.

$A = \left[ \begin{array}{cc|cc} a_{00} & a_{01} & a_{02} & a_{03}\\ a_{10} & a_{11} & a_{12} & a_{13}\\ \hline a_{20} & a_{21} & a_{22} & a_{23}\\ a_{30} & a_{31} & a_{32} & a_{33} \end{array} \right], B = \left[ \begin{array}{cc|cc} b_{00} & b_{01} & b_{02} & b_{03}\\ b_{10} & b_{11} & b_{12} & b_{13}\\ \hline b_{20} & b_{21} & b_{22} & b_{23}\\ b_{30} & b_{31} & b_{32} & b_{33} \end{array} \right]$

In this example, node 00 will have the following matrices:

$A_{00}=\begin{bmatrix} a_{00} & a_{01}\\ a_{10} & a_{11} \end{bmatrix}, B_{00}= \begin{bmatrix} b_{00} & b_{01}\\ b_{10} & b_{11} \end{bmatrix}$

const n = 4;
var vec = 1..n;
var blockSize = 2;

var A: [vec, vec] real;
var B: [vec, vec] real;
var C: [vec, vec] real;

coforall loc in Locales {
on loc {
var i = loc.id/2;
var j = loc.id%2;
var istart = i*blockSize;
var iend = istart + blockSize;
var jstart = j*blockSize;
var jend = jstart + blockSize;

for (r,s) in {istart + 1..iend, jstart + 1..jend} {
B(r,s) = r+s;
A(r,s) = 2*r + s;
}
}
}


Each node has limited memory physically exclusive to itself. In order for node A to have access to the contents of the memory of another node B, node B should pass the data to node A. Fortunately, Chapel can use a library called GASNet that allows each node to have a global view of the memory of all nodes participating in the computation.

In the code above, each node loads its own data. However, the GASNet library allows each node to access the matrix elements loaded by the other nodes. Consequently, we are able to reference the sub-matrix held by each node without doing fancy message passing. The algorithm is then a straightforward implementation of

$\displaystyle \mathbf{C}_{ij}=\sum_{k=0}^{2} \mathbf{A}_{ik} \mathbf{B}_{kj}$

where $\mathbf{A_{ij}}$, $\mathbf{B_{ij}}$ and $\mathbf{C_{ij}}$ are submatrices of $\mathbf{A}$, $\mathbf{B}$, and $\mathbf{C}$, respectively.

Below is the straightforward implementation of parallel block multiplication:

coforall loc in Locales {
on loc {
var i = loc.id/2;
var j= loc.id%2;
var istart = i*blockSize;
var iend = istart + blockSize;
var jstart = j*blockSize;
var jend = jstart + blockSize;
var r = { istart + 1..iend, jstart + 1..jend };
ref W = C[r].reindex( { 1..2,1..2 });

coforall k in 0..1 {
var U=get_block_matrix(A[vec,vec],i,k,blockSize);
var V=get_block_matrix(B[vec,vec],k,j,blockSize);
var P = mat_mul(U,V);
coforall (s,t) in { 1..2,1..2 } {
W(s,t) += P(s,t);
}
}
}
}


The procedure get_block_matrix will return the sub-matrix given the (i,j)th index and the block size.

proc get_block_matrix(A: [?D], i:int, j:int , blockSize:int) {
var r = { i*blockSize+1 .. i*blockSize
+  blockSize, j*blockSize
+ 1 .. j*blockSize + blockSize };
return A[r];
}


The procedure mat_mul will return the matrix product of two sub-matrices:

proc mat_mul(A: [?D1], B: [?D2]) {
var D3 = { 1..2, 1..2 };
var C: [D3] real;
var AA = A.reindex({1..2,1..2});
var BB = B.reindex({1..2,1..2});

for row in 1..2 {
for col in 1..2 {
var sum:real = 0;
for k in 1..2 {
sum += AA(row,k) * BB(k,col);
}
C(row,col) = sum;
}
}
return C;
}

writeln(C[vec,vec]);


To run this code, we need to set the following environment variables:

This means the probability of that all molecules are on the left side of the balloon is $\displaystyle \frac{1}{\displaystyle 2^{6.022 \times 10^{23}}} < 1/1024^{10^{22}} < 1/(10^3)^{10^{22}}=10^{-30,000,000,000,000,000,000,000}$ This is a very small number such that it contains 30 million trillion zeros to the right of the decimal point. When you write this zeros on a piece of paper with a thickness of 0.05 millimeters, you would need to stack it up to a height 74 times the distance of earth to pluto in kilometers! ## An Interview Question: Using Integer Programming We can solve the Interview Question using a mathematical technique called Integer Programming. Let $d_1, d_2, \ldots, d_N$ be the variables representing diskette 1, diskette 2, diskette 3, etc. The values of the $d_k$ variables can only be 0 or 1. A 0 means the diskette is not used while a 1 means that it is used. Each file is saved to a certain diskette. We want to know to what diskette $d_i$ a given file $f_j$ is assigned. To represent this, we assign the variable $a_{ij}$ a value of 1 if file $f_j$ is assigned to diskette $d_i$. We will normalize the file sizes so that if $s_i$ is the size of $f_i$, the $s_i \le 1$. We do this by simply dividing all file sizes by the size of the diskette. For a given diskette $d_i$, the following constraint should be satisfied: $d_i - s_1a_{i1} - s_2a_{i2} - \ldots - s_N a_{iN} \ge 0$ for diskette $i = 1, 2, \ldots, N$ and $s_i$ are the normalized file sizes of file $f_i$ for $i=1,2,\ldots,N$. Since each file $f_j$ can only be assigned to one diskette, we have the following constraint: $a_{1j} + a_{2j} + \ldots + a_{Nj} = 1$ where $a_{1j}$ is the variable representing the “file $f_j$ is in diskette $d_1$“, etc. Finally, we have to constrain the value of $d_i$ to be either 0 or 1, that is, $d_i \le 1$ for all $i=1,2,\ldots,N$. ## Integer Programming Formulation Given the above information, we can formulate the Integer Programming problem as Minimize: $d_1 + d_2 + d_3 + \ldots + d_N$ subject to $\begin{array}{rl} d_1 - s_1a_{11} - s_2a_{12} - s_3a_{13} - \ldots - s_Na_{1N} &\ge 0\\ d_2 - s_1a_{21} - s_2a_{22} - s_3a_{23} - \ldots - s_Na_{2N} &\ge 0\\ :\\ d_N - s_1a_{N1} - s_2a_{N2} - s_3a_{N3} - \ldots - s_Na_{NN} &\ge 0\\ a_{11} + a_{21} + a_{31} + \ldots + a_{N1} &= 1\\ a_{12} + a_{22} + a_{32} + \ldots + a_{N2} &= 1\\ :\\ a_{1N} + a_{2N} + a_{3N} + \ldots + a_{NN} &= 1\\ d_1 &\le 1\\ d_2 &\le 1\\ :\\ d_n &\le 1 \end{array}$ ## Solving the Problem We will use R to solve this Integer Programming Formulation. Please see code below: library("lpSolve") NUMFILES=4 # Generate random file sizes between 1 and 10 FileSizes=ceiling(10*runif(NUMFILES)) x = -1*FileSizes/10 l=length(x) # Each files can be in any of the diskettes. Suppose there are N files, # to determine if a file j is in diskette i, the value of variable x_ij will # 1 if file j is in diskette i, and 0 otherwise. # Here we construct the coefficients of variables x_ij which are the # sizes of the files (normalized to 1) zz=c() for(i in 1:(l-1)){ zz=c(zz,x,rep(0,l*l)) } zz=c(zz,x) # Construct the coefficients of the indicator variables representing the # diskettes d_i zzmatrix=matrix(zz,ncol=l*l,byrow=T) CoefficientsOfDiskettes=c(); for(i in 1:l){ ttt=rep(0,l) ttt[i] = 1 CoefficientsOfDiskettes= c(CoefficientsOfDiskettes,ttt,zzmatrix[i,]) } # Construct the coefficients of x_ij for constant j. These variables # satisfy the equation \sum_{i=1}^N x_{ij} SumOfFileAcrossDiskettes=c() for(i in 1:l){ ttt=rep(0,l) ttt[i]=1 SumOfFileAcrossDiskettes=c(SumOfFileAcrossDiskettes,rep(ttt,l)) } # Prepend Coefficients of variables d_i. The value of these coefficients is 0. SumOfFileAcrossDiskettesMatrix=matrix(SumOfFileAcrossDiskettes,ncol=l*l,byrow=T) PrependCoefficientsOfDiskettes=c() for(i in 1:l){ PrependCoefficientsOfDiskettes=c(PrependCoefficientsOfDiskettes,c(rep(0,l),SumOfFileAcrossDiskettesMatrix[i,])) } # Construct coefficients of d_i to construct constraint d_i <= 1 DisketteConstraints=c() for(i in 1:l){ ttt=rep(0,l) ttt[i]=1 DisketteConstraints=c(DisketteConstraints,ttt,rep(0,l*l)) } # Construct matrix input of lpSolve const.mat=matrix(c(CoefficientsOfDiskettes,PrependCoefficientsOfDiskettes,DisketteConstraints),ncol=l*(l+1),byrow=T) print("Matrix Coefficients:") print(const.mat) # Construct inequalities/equalities const.dir=c(rep(">=",l),rep("=",l),rep("<=",l)) # Construct Right-Hand side const.rhs=c(rep(0,l),rep(1,l),rep(1,l)) # Construct Objective Function objective.in=c(rep(1,l),rep(0,l*l)) # Invoke lpSolve mylp=lp(direction="min",objective.in=objective.in,const.mat=const.mat,const.dir=const.dir,const.rhs=const.rhs,all.int=T) # Print Results print(paste("Number of Diskettes: ", sum(mylp$solution[1:l])))
tz=matrix(mylp\$solution,ncol=l,byrow=T)
print("File Sizes: ")
print(FileSizes)
for(i in 2:(l+1)){
files = which(tz[i,] == 1)
if(length(files) > 0){
print(paste("Files in diskette ", i-1))
print(files)
}
}



Most of the code above is setting up the matrix of coefficients. The line 70 then calls on lpSolve to compute the optimal values of the variables

## Program Output

Running this code we get the output

[1] "Matrix Coefficients:"
[,1] [,2] [,3] [,4] [,5] [,6] [,7] [,8] [,9] [,10] [,11] [,12] [,13] [,14] [,15] [,16] [,17] [,18] [,19] [,20]
[1,]    1    0    0    0   -1 -0.2 -0.1 -0.1    0   0.0   0.0   0.0     0   0.0   0.0   0.0     0   0.0   0.0   0.0
[2,]    0    1    0    0    0  0.0  0.0  0.0   -1  -0.2  -0.1  -0.1     0   0.0   0.0   0.0     0   0.0   0.0   0.0
[3,]    0    0    1    0    0  0.0  0.0  0.0    0   0.0   0.0   0.0    -1  -0.2  -0.1  -0.1     0   0.0   0.0   0.0
[4,]    0    0    0    1    0  0.0  0.0  0.0    0   0.0   0.0   0.0     0   0.0   0.0   0.0    -1  -0.2  -0.1  -0.1
[5,]    0    0    0    0    1  0.0  0.0  0.0    1   0.0   0.0   0.0     1   0.0   0.0   0.0     1   0.0   0.0   0.0
[6,]    0    0    0    0    0  1.0  0.0  0.0    0   1.0   0.0   0.0     0   1.0   0.0   0.0     0   1.0   0.0   0.0
[7,]    0    0    0    0    0  0.0  1.0  0.0    0   0.0   1.0   0.0     0   0.0   1.0   0.0     0   0.0   1.0   0.0
[8,]    0    0    0    0    0  0.0  0.0  1.0    0   0.0   0.0   1.0     0   0.0   0.0   1.0     0   0.0   0.0   1.0
[9,]    1    0    0    0    0  0.0  0.0  0.0    0   0.0   0.0   0.0     0   0.0   0.0   0.0     0   0.0   0.0   0.0
[10,]    0    1    0    0    0  0.0  0.0  0.0    0   0.0   0.0   0.0     0   0.0   0.0   0.0     0   0.0   0.0   0.0
[11,]    0    0    1    0    0  0.0  0.0  0.0    0   0.0   0.0   0.0     0   0.0   0.0   0.0     0   0.0   0.0   0.0
[12,]    0    0    0    1    0  0.0  0.0  0.0    0   0.0   0.0   0.0     0   0.0   0.0   0.0     0   0.0   0.0   0.0
[1] "Number of Diskettes:  2"
[1] "File Sizes: "
[1] 10  2  1  1
[1] "Files in diskette  1"
[1] 2 3 4
[1] "Files in diskette  2"
[1] 1



## Interpreting the Result

Lines 2-14 of the output gives you the matrix of coefficients. Line 15 prints the number of diskettes needed to store the files. Line 17 prints the randomly generated file sizes from 1 to 10. Finally lines 18-21 prints which diskettes contain which files.

The space complexity of this solution is quite substantial. Given $N$ files, we need to specify $N^2 + N$ variables by $3\times N$ equations for a total of $(N^2 + N)\times 3N$ memory space for coefficients.

## An Interview Question

I was given this interview question and I’d like to share it to you. The setting is back in the days when the largest size of a hard disk was 1 GB and there were no CD writers yet and the only way to back up your data is through those 1.44 floppy disks. You want to back up your files but you want to minimize the number of floppy disks you need to use. Assume your most important files are in a single directory. How will you distribute the files across your disks in such a way that the number of disks you use is minimized ?

To make this simple, let’s assume the following:

– we will not take into account that every file you copy to the disk has a record of the metadata of the file and stored on the disk as well. This will eat up space as you put more files. For our purposes, we ignore this complexity.
– The size of each file is less than or equal to 1.44

First we need to have a list of those files including the size and sort the list according to size in descending order. If A is the list of files, we can apply this algorithm:

B := list of files to copy to current floppy disk
remaining_size := 1.44 MB
For file in A:
If remaining_size - file.size > 0:
A.remove(file)
Copy all files listed in B to disk
Empty B
Repeat process for remaining files in A


Although there are other better algorithms than the one above, this is the one I managed to to come up during the interview.

We now need to determine how fast our algorithm can run.

## Worst Case Complexity

How slow can this algorithm get ? If for any two files $F_i$ and $F_j$ in A we have $F_i + F_j > 1.44$, then all files will have their own diskette. If this is the case, for each file, our algorithm will execute step 4. For the first disk, it will execute the step $N$ times. For the second disk, it will execute the step $N-1$ times, for the third disk it will execute $N-2$ times, etc. the total number of times it executes step 4 is the total number of comparisons and is equal to the summation:

$\displaystyle \sum_{1=1}^{N} i$

which is equal to

$\displaystyle \frac{N(N+1)}{2}$

Therefore, in the worst case, the complexity is $O(N^2)$.

## Best Case Complexity

The best case is when all files fit in just one diskette. For this, the total number of comparisons is $N$

## Average Case Complexity

On the average, files have different sizes. We now compute the complexity on the assumption that the probability distribution is uniform.

If $k$ is the number of diskettes, the number of comparisons is a sequence of monotonic decreasing numbers $\{ a_1, a_2, a_3, \ldots, a_k \}$ taken at random from the set $\{ 1, 2, \ldots, N\}$. Each of the numbers $a_j$, $j\in \{1, 2, \ldots, k\}$ has a probability $1/N$ of being chosen. Let $X$ be a random variable such that

$\displaystyle Pr(X=a_j) = \frac{1}{N} \text{ for } j=1,2,\ldots,k$

then the number of comparisons $C$ is equal to

$\displaystyle C = \sum_{i=1}^k X = kX$

The expected value of $C$ is given by the

$E[C] = E[kX] = kE[X]$

However, the expected value of X is given by

$\displaystyle E[X] = \sum_{j=1}^N j\cdot Pr(X=j) = \frac{1}{N} \sum_{j=1}^N j = \frac{1}{N}\frac{N(N+1)}{2} = \frac{N+1}{2}$

Therefore,

$\displaystyle E[C] = k\frac{N+1}{2}$

What remains is to determine the average value of $k$, which is the number of diskettes. If $M=1.44$ is the maximum file size, the average file size is $M/2$. The average total file size is then $NM/2$. The average number of diskettes is equal to the average total size divided by size of diskette, that is

$k = \displaystyle \frac{NM}{2}\frac{1}{M} = \frac{N}{2}$

This means that

$\displaystyle E[C] = \frac{N}{2} \frac{N+1}{2} = O(N^2)$

which is the same as the worst case complexity.

There is another way to solve this problem using Integer Programming.

## Asynchronous Versus Synchronous: A Performance Perspective

I just had my coffee from my favorite coffee shop down the corner. I’m a regular at that coffee shop that the baristas know me and I’m able to sit down after I order and do something else while they make my coffee and serve to me when ready.  If I order from other branches of this coffee shop, I would have to wait for my coffee in a corner and not being able to do anything productive.  So while I was seated comfortable waiting for my coffee, a thought came to me. This is a great example of synchronous versus asynchronous service invocation.

A synchronous service invocation is just like buying your coffee and having to wait in the corner for the baristas to complete your coffee before you can even sit down and do something useful. Asynchronous service invocation is having to get seated and do something else while waiting for your coffee to be served.  My instinct tells me that asynchronous invocation is better than synchronous in terms of performance especially when it takes a long for the service to complete and the waiting time is much better used doing something else. But how can we show that this is the case?

We can model the synchronous/asynchronous invocation as a Markov Chain and compute a few metrics from our model. If you’re not familiar with this approach, you can refer to this article MODELING QUEUING SYSTEMS USING MARKOV CHAINS.

First, we model the asynchronous invocation. We can identify each state of our system by  2 slots each of which contains a number. The first slot is the number of responses waiting for the client to process and the second slot is the number of requests waiting for the server to process.  To simplify the modeling, we assume the following:

1. The maximum number of requests that a server can handle at any given time is 2. Which means that a client cannot anymore send a request if the second number is equal to 2.
2. When the server responds, the first number increments by 1. Once a client receives a response, it will stop whatever it is doing and process that response. When the client finishes processing the response, the first number goes down to zero. As a consequence, this assumption says that the maximum value of slot number 1 is 1 at any given time.

With these assumptions, we can draw the Markov Diagram of the asynchronous system to come up with the below diagram:

Explanation of the Diagram

The system initially starts at 00 where there are no requests and no responses. It will then transition to state 01 when the client will send a  request which increments the server’s pending requests to 1. From this state, the system can transition to either state 02 or state 10. State 02 occurs when the client sends another request without waiting for a response. State 10 is when the server is able to process the request and return a response, thereby decrementing it’s pending items and incrementing the client’s number of responses to process.

At state 02, the client cannot anymore send requests since the maximum requests that the server can handle is 2. At this state, it can only transition to state 11, where the server has processed one of the requests (thus decrementing the second slot from 2 to 1 and incrementing the first slot from 0 to 1).

State 11 can only transition to 01 since the client will stop whatever it is doing to process the single response it has received (thus decrementing the first slot from 1 to 0).

The numbers $a,b,c$ are the rates at which the transitions will occur. For example, the rate at which the system will transition from state 00 to state 01 is $b$.

In the remainder of this article, we assume that the client can request at the rate of 60 requests per unit time and can process the response at 60 per unit time. We also assume that the server can process requests at 30 per unit time. We therefore have the following:

$a=60, b=60, c=30$

Computing the Probabilities

At steady state, the flow going into each state is equal to the flow out of that state. For example, for state 00, the following is true:

$\displaystyle bP_{00} = aP_{10}$

Doing this for all states, we get the following balance equations:

$\begin{array}{rlr} \displaystyle bP_{00} &= aP_{10} & (1)\\ bP_{00} + a P_{11} &= bP_{01} + cP_{01} & (2)\\ bP_{01} &= cP_{02} & (3)\\ aP_{10} &= cP_{01} & (4)\\ aP_{11} &= cP_{02} & (5) \end{array}$

Since the sum of the probabilities should be equal to 1, we have our last equation:

$P_{00} + P_{01} + P_{02} + P_{10} + P_{11} = 1$

We have 6 equations in 5 unknowns, one of the equations above is actually redundant. In fact, you can show that equation 2 above is redundant.

We can form the matrix equation of the system of equations above and solve for the probabilities:

$\begin{bmatrix} -b & 0 & 0 & a & 0 \\ 0 & b & -c & 0 & 0 \\ 0 & -c & 0 & a & 0 \\ 0 & 0 & -c & 0 & a \\ 1 & 1 & 1 & 1 & 1 \end{bmatrix} \begin{bmatrix} P_{00}\\ P_{01}\\ P_{02}\\ P_{10}\\ P_{11} \end{bmatrix} = \begin{bmatrix} 0\\ 0\\ 0\\ 0\\ 1 \end{bmatrix}$

Solving this matrix equation when a=60, b=60 and c=30, we can find the probabilities:

$\begin{bmatrix} P_{00}\\ P_{01}\\ P_{02}\\ P_{10}\\ P_{11} \end{bmatrix} = \displaystyle \frac{1}{ab^2+abc+ac^2+b^2c+bc^2} \begin{bmatrix} ac^2\\ abc\\ ab^2\\ bc^2\\ b^2c \end{bmatrix} = \begin{bmatrix} 1/10 \\ 1/5 \\2/5\\ 1/10\\ 1/5 \end{bmatrix}$

Utilization and Throughput: Asynchronous Case

The client utilization is defined to be the probability that the client is busy. In the same way, the server utilization is the probability that the server is busy. Looking at the diagram, the client is busy sending requests at state 00 and state 01. It is busy processing responses at state 10 and 11.  On the other hand, the server is busy processing requests at state 01 and 02.

Therefore, the client utilization is  equal to

$\begin{array}{rl} U_{\text{client}} &= P_{00} + P_{01} + P_{10} + P_{11}\\ &= 1/10 + 1/5 + 1/10 + 1/5\\ &= 3/5\\ &= 60\% \end{array}$

The server utilization is equal to

$\begin{array}{rl} U_{\text{server}} &= P_{01} + P_{02}\\ &= 1/5 + 2/5 \\ &= 3/5 \\ &= 60\% \end{array}$

The system througput is the number of requests the client is able to submit and is equal to

$\begin{array}{rl} X &= 60P_{00} + 60 P_{01} \\ &= 60*1/10 + 60 * 1/5 \\ &= 18 \end{array}$

Comparison with Synchronous Invocation

For the synchronous invocation, the client will submit a request at state 00. The server will then receive this request and process it immediately at state 01. The client will get the response and process it at state 10 and do the loop again at state 00. Please see the Markov diagram below describing this process.

We can solve the probabitlies of this Markov Chain by solving the balance equation

$\begin{bmatrix} b & 0 & -a \\ 0 & c & -a \\ 1 & 1 & 1 \end{bmatrix} \begin{bmatrix} P_{00}\\ P_{01}\\ P_{10} \end{bmatrix} = \begin{bmatrix} 0\\0\\1 \end{bmatrix}$

Solving for the probabilities, we get

$\begin{bmatrix} P_{00}\\ P_{01}\\ P_{10} \end{bmatrix} = \displaystyle \frac{1}{ab + ac + bc} \begin{bmatrix} ac\\ ab\\ bc \end{bmatrix} = \begin{bmatrix} 1/4\\ 1/2\\ 1/4 \end{bmatrix}$

At state 00, the client is busy sending requests at the rate of 60 requests per unit time, therefore the system throughput is

$\begin{array}{rl} X &= 60 P_{00} \\ &= 60/4 \\ &= 15 \end{array}$

which is less than the Asynchronous case. In addition, the client utilization is

$\begin{array}{rl} U_{\text{client}} &= P_{00} + P_{10}\\ &= 1/4 + 1/4\\ &= 1/2 \end{array}$

This is lower than the Asynchronous case where the client utilization is 60%.

Therefore, the asynchronous service invocation is indeed better compared to the synchronous case. It has a much higher throughput and a much higher utilization compared to the synchronous case.

P.S. I used Mathics to solve the matrix equations. Mathics is an open source alternative to Mathematica. It features Mathematica-compatible syntax and functions so if you’re a user of Mathematica you can run basic commands in Mathics. If you’re a Mathics user, you can easily switch to Mathematica as well. Please visit http://mathics.github.io/ to know more about Mathics.