三(N)进程流水线的原理比较简单,使用异步传输,这样可以使计算与通信重叠,隐藏一部分传输延迟。课本和ppt中给出了流水线的伪代码
while (Not_Done){
MPI_Irevc(NextX, … );
MPI_Isend(PreviousY, … );
CurrentY=Q(CurrentX);
}
计算任务选用AES加密计算,每个进程加密一轮发给下一个进程,因而总共加密三轮。
对于一定数量的任务,将上面伪代码中while(Not_Done)
改为for()
收发使用双缓冲区,分奇偶运算,比如0缓冲区接受数据同时在1缓冲区计算前一批接收到的数据,算完后再切换到0缓冲区计算,1缓冲区再接收下一批数据……
需要注意的是使用MPI的Isend是异步的,而计算代码一般是同步的,所以要实现计算和通信重叠要先写Isend,比如
// 计算和通信重叠
ISend();
CAL();
ISendWait();
// 先计算后通信,没有重叠
CAL();
ISend();
ISendWait();
各个进程的收发,计算代码有较多相同部分,使用c++宏简化代码。
流水线的核心代码如下(具体代码见附件)
if (rank == 0) {
CALCULATE(0);
PIPELINE_SEND(0);
for (int i = 1; i < PACKAGE_NUM; ++i) {
CALCULATE(i);
PIPELINE_SEND_WAIT();
PIPELINE_SEND(i);
}
PIPELINE_SEND_WAIT();
} else if (rank == 1) {
PIPELINE_RECV(0);
PIPELINE_RECV_WAIT();
CALCULATE(0);
PIPELINE_SEND(0);
PIPELINE_RECV(1);
for (int i = 1; i < PACKAGE_NUM - 1; ++i) {
PIPELINE_RECV_WAIT();
PIPELINE_RECV(i + 1);
CALCULATE(i);
PIPELINE_SEND_WAIT();
PIPELINE_SEND(i);
}
PIPELINE_RECV_WAIT();
CALCULATE(PACKAGE_NUM - 1);
PIPELINE_SEND_WAIT();
PIPELINE_SEND(PACKAGE_NUM - 1);
PIPELINE_SEND_WAIT();
} else if (rank == 2) {
PIPELINE_RECV(0);
for (int i = 1; i < PACKAGE_NUM; ++i) {
PIPELINE_RECV_WAIT();
PIPELINE_RECV(i);
CALCULATE(i - 1)
}
PIPELINE_RECV_WAIT();
CALCULATE(PACKAGE_NUM - 1);
}
// pipeline macros
#define PIPELINE_SEND(idx) \
MPI_Isend(send_buffer[(idx) % 2], BUFFER_SIZE, MPI_UINT8_T, rank + 1, idx, \
MPI_COMM_WORLD, &send_request);
#define PIPELINE_SEND_WAIT() MPI_Wait(&send_request, MPI_STATUS_IGNORE);
#define PIPELINE_RECV(idx) \
MPI_Irecv(recv_buffer[(idx) % 2], BUFFER_SIZE, MPI_UINT8_T, rank - 1, idx, \
MPI_COMM_WORLD, &recv_request);
#define PIPELINE_RECV_WAIT() \
MPI_Wait(&recv_request, MPI_STATUS_IGNORE); \
#define CALCULATE(idx) \
for (int cali = 0; cali < BUFFER_SIZE; cali += AES_ONCE) { \
aes_cipher(&recv_buffer[(idx) % 2][cali], &send_buffer[(idx) % 2][cali], \
w); \
}
// end pipeline macros
使用google/glog来打印各个进程的运行时间,这里绘制出3个进程的运算部分时间图,可以看出3个进程的运算部分基本重叠,流水线运行符合预期。
MPI_Alltoall是MPI的全局交换函数,在所有进程间交换,使用MPI_Send的MPI_Recv一个一个收发也可以达到相同效果。
每个进程都运行0-进程数的循环,循环到i时,进程i向其他进程发送,其他进程接收;进程本地的数据直接复制,不用mpi函数收发。
代码如下:
void my_MPI_Alltoall(int *sendbuf, int *recvbuf, int count, MPI_Comm comm,
int rank) {
for (int i = 0; i < count; ++i) {
recvbuf[rank] = sendbuf[rank];
if (i == rank) {
// recv from other proc
for (int j = 0; j < count; ++j) {
if (j != rank) {
MPI_Recv(recvbuf + j, 1, MPI_INT, j, j, MPI_COMM_WORLD,
MPI_STATUS_IGNORE);
}
}
} else {
// send to one proc
MPI_Send(sendbuf + i, 1, MPI_INT, i, rank, MPI_COMM_WORLD);
}
}
}
使用标准的MPI_Alltoall作为对比,验证正确性。
// use my_MPI_Alltoall
my_MPI_Alltoall(send_buffer, recv_buffer_test, size, MPI_COMM_WORLD, rank);
// use MPI_Alltoall
MPI_Alltoall(send_buffer, BUFFER_SIZE, MPI_INT, recv_buffer_val, BUFFER_SIZE,
MPI_INT, MPI_COMM_WORLD);
// validate data
bool foundErr = false;
for (int i = 0; i < size; ++i) {
if (recv_buffer_test[i] != recv_buffer_val[i]) {
std::cout << "proc" << rank << ": "
<< "test[" << i << "]:" << recv_buffer_test[i] << ", val[" << i
<< "]:" << recv_buffer_val[i] << std::endl;
foundErr = true;
}
}
if (!foundErr) {
std::cout <<"proc" << rank << ": no err found" << std::endl;
}
cyh@cyh-GE5S:~/Desktop/pp_exp1/all2all$ mpicxx main.cc
cyh@cyh-GE5S:~/Desktop/pp_exp1/all2all$ mpirun -np 4 a.out
proc0: no err found
proc1: no err found
proc2: no err found
proc3: no err found
将每个MPI进程按照所在节点名称建立node通信子域分组;
再将各个node子通信域的0号进程再次组成一个名为head的通信域;
在进行广播时,首先由root进程将消息在head通信子域内广播,
然后,再由head子域内各进程在其所在的node子域内进行广播。
使用MPI函数获取节点的名称(字符串),而MPI建立通信域需要相同的color(整形),虽然节点最后一位是数字,但考虑鲁棒性,使用c++的stl的hash,获得从字符串到整形的映射。
其他操作都比较简单,直接使用MPI函数建立(拆分)通信域,进行广播即可。
广播内容为hello from root,每个进程初始的字符串为default string,只有正确广播后,才会变成hello from root,最后打印出每个进程在各个域的信息,和字符串。
// node comm
MPI_Get_processor_name(proc_name, &proc_name_len);
std::string proc_name_str(proc_name, proc_name_len);
int proc_name_hash = std::hash<std::string>{}(proc_name_str);
// std::cout << proc_name_str << ", " << proc_name_hash << std::endl;
MPI_Comm_split(MPI_COMM_WORLD, proc_name_hash, -1, &node);
int node_rank, node_size;
MPI_Comm_rank(node, &node_rank);
MPI_Comm_size(node, &node_size);
// head comm
int head_rank, head_size;
int headornot = node_rank == 0;
MPI_Comm_split(MPI_COMM_WORLD, headornot, -1, &head);
MPI_Comm_rank(head, &head_rank);
MPI_Comm_size(head, &head_size);
//std::cout << "rank:" << rank << ", head_rank:" << head_rank << ", head_size:" << head_size << std::endl;
char message[2048] = "default string";
printf("rank:%d, is_head:%d, node:%s, node_rank:%d, msg:%s\n", rank, headornot, proc_name, node_rank, message);
// bcast in head
if (head_rank == 0 && headornot) {
strcpy(message, "hello from root");
}
if (headornot) {
MPI_Bcast((void*) message, 16, MPI_CHAR, 0, head);
}
// bcast in node
MPI_Bcast((void*) message, 16, MPI_CHAR, 0, node);
printf("rank:%d, is_head:%d, node:%s, node_rank:%d, msg:%s\n", rank, headornot, proc_name, node_rank, message);
#### 原理
在 LU 分解的过程中,主要的计算是利用主行 i 对其余各行 j,(j>i)作初等行变换,各行计算之间没有数据相关关系,因此可以对矩阵 A 按行划分来实现并行计算。但本实验要求改写为行连续划分。
比如矩阵为12*12,进程数为3时,按行交叉划分,每个进程分到的矩阵行为1,4,7,10;2,5,8,11;3,6,9,12;而按照行连续划分为1,2,3,4;5,6,7,8;9,10,11,12
要修改的部分为
另外,原来代码有个bug,在A行变换后才打印A,应该移到开始
修改如下
59,66d58
< printf("Input of file \"dataIn.txt\"\n");
< printf("%d\t %d\n",M, N);
< for(i=0;i<M;i++)
< {
< for(j=0;j<N;j++)
< printf("%f\t",A(i,j));
< printf("\n");
< }
87c79
< /*0号进程采用行连续划分将矩阵A划分为大小m*M的p块子矩阵,依次发送给1至p-1号进程*/
---
> /*0号进程采用行交叉划分将矩阵A划分为大小m*M的p块子矩阵,依次发送给1至p-1号进程*/
94,95c86,88
< a(i,j)=A(i,j);
< for(i=1;i<p;i++)
---
> a(i,j)=A((i*p),j);
> for(i=0;i<M;i++)
> if ((i%p)!=0)
97c90,92
< MPI_Send(&A(i*m,0),m*M,MPI_FLOAT,i,i,MPI_COMM_WORLD);
---
> i1=i%p;
> i2=i/p+1;
> MPI_Send(&A(i,0),M,MPI_FLOAT,i1,i2,MPI_COMM_WORLD);
102c97,98
< MPI_Recv(&a(0,0),m*M,MPI_FLOAT,0,my_rank,MPI_COMM_WORLD,&status);
---
> for(i=0;i<m;i++)
> MPI_Recv(&a(i,0),M,MPI_FLOAT,0,i+1,MPI_COMM_WORLD,&status);
105,107c101,102
<
< for(j=0;j<p;j++)
< for(i=0;i<m;i++)
---
> for(i=0;i<m;i++)
> for(j=0;j<p;j++)
112c107
< v=j*m+i;
---
> v=i*p+j;
120c115
< v=j*m+i;
---
> v=i*p+j;
124,125c119,120
< /*编号等于my_rank的进程(包括my_rank本身)利用主行对其第i+1,…,m-1行数据做行变换*/
< if (my_rank==j)
---
> /*编号小于my_rank的进程(包括my_rank本身)利用主行对其第i+1,…,m-1行数据做行变换*/
> if (my_rank<=j)
133c128
< /*编号大于my_rank的进程利用主行对其第0,i,…,m-1行数据做行变换*/
---
> /*编号大于my_rank的进程利用主行对其第i,…,m-1行数据做行变换*/
135c130
< for(k=0;k<m;k++)
---
> for(k=i;k<m;k++)
148c143
< A(i,j)=a(i,j);
---
> A(i*p,j)=a(i,j);
152c147,148
< MPI_Send(&a(0,0),m*M,MPI_FLOAT,0,my_rank,MPI_COMM_WORLD);
---
> for(i=0;i<m;i++)
> MPI_Send(&a(i,0),M,MPI_FLOAT,0,i,MPI_COMM_WORLD);
156a153
> for(j=0;j<m;j++)
158c155,157
< MPI_Recv(&A(i*m,0),m*M,MPI_FLOAT,i,i,MPI_COMM_WORLD,&status);
---
> MPI_Recv(&a(j,0),M,MPI_FLOAT,i,j,MPI_COMM_WORLD,&status);
> for(k=0;k<M;k++)
> A((j*p+i),k)=a(j,k);
179,186c178,185
< //printf("Input of file \"dataIn.txt\"\n");
< //printf("%d\t %d\n",M, N);
< //for(i=0;i<M;i++)
< //{
< // for(j=0;j<N;j++)
< // printf("%f\t",A(i,j));
< // printf("\n");
< //}
---
> printf("Input of file \"dataIn.txt\"\n");
> printf("%d\t %d\n",M, N);
> for(i=0;i<M;i++)
> {
> for(j=0;j<N;j++)
> printf("%f\t",A(i,j));
> printf("\n");
> }
Input of file "dataIn.txt"
3 3
2.000000 1.000000 2.000000
0.500000 1.500000 2.000000
2.000000 -0.666667 -0.666667
Output of LU operation
Matrix L:
1.000000 0.000000 0.000000
0.250000 1.000000 0.000000
1.000000 -1.333334 1.000000
Matrix U:
2.000000 1.000000 2.000000
0.000000 1.250000 1.500000
0.000000 0.000000 -0.666667