-
Notifications
You must be signed in to change notification settings - Fork 1
/
distribute_array.cc
139 lines (106 loc) · 4.18 KB
/
distribute_array.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# include <mpi.h>
# include <stdlib.h>
# include <iostream>
# include <unistd.h>
//# include <string>
//# include <fstream>
#define SERVER_NODE 0
using namespace std;
//Prototypes
int main ( int argc, char *argv[] );
long make_operation(int data[], int n_elements, int myid);
int main ( int argc, char *argv[] ) {
MPI_Status status;
int numprocs, my_id;
//Initialize MPI.
MPI_Init ( &argc, &argv );
//Get the number of processes.
MPI_Comm_size ( MPI_COMM_WORLD, &numprocs );
//Determine the rank of this process.
MPI_Comm_rank ( MPI_COMM_WORLD, &my_id );
int tag_data = 1;
int tag_chunksize = 2;
// Master task only
if (my_id == SERVER_NODE){
//Initialize the array
if (argc != 2) {
cerr << "Usage: " << argv[0] << " n_elements" << endl;
MPI_Abort(MPI_COMM_WORLD, 1);
}
int array_size = atoi(argv[1]);
if (array_size % numprocs != 0) {
cout << "Quitting. Array size (" << array_size << ") must be divisible by numprocs (" << numprocs << ").\n";
MPI_Abort(MPI_COMM_WORLD, 1);
}
int* data = new int[array_size];
long expected_sum = 0;
for(int i=0; i<array_size; i++) {
data[i] = i;
//data[i] = 1;
expected_sum = expected_sum + data[i];
}
cout << "Initialized array sum = " << expected_sum << endl;
// Send each job a a fraction of the array, keeping the first for the server node
int chunksize = (array_size / numprocs);
int offset = chunksize;
for (int destination_id=1; destination_id<numprocs; destination_id++) {
MPI_Send(&chunksize, 1, MPI_INT, destination_id, tag_chunksize, MPI_COMM_WORLD);
MPI_Send(&data[offset], chunksize, MPI_INT, destination_id, tag_data, MPI_COMM_WORLD);
cout << "Sent " << chunksize << " elements to task " << destination_id << " offset= " << offset << endl;
offset = offset + chunksize;
}
// Master does its share of the work
cout << "Sent " << chunksize << " elements to task SERVER no offset" << endl;
long mysum = make_operation(data, chunksize, my_id);
// Reduce data from workers
long sum = -1;
MPI_Reduce(&mysum, &sum, 1, MPI_LONG, MPI_SUM, SERVER_NODE, MPI_COMM_WORLD);
// Just print out some sample of the data
cout << "Sample data:" << endl;
offset = 0;
for (int i=0; i<numprocs; i++) {
for (int j=0; j<5; j++)
cout << " " << data[offset+j];
cout << endl;
offset = offset + chunksize;
}
// If the sum is not the same as the expected one ... panic and exit
if (sum != expected_sum) {
cout << "Quitting. Final sum (" << sum << ") != expected_sum (" << expected_sum << ").\n";
MPI_Abort(MPI_COMM_WORLD, 1);
}
delete [] data;
}
else {
// Worker nodes
// Receive fraction of array sent by the server node
int chunksize;
MPI_Recv(&chunksize, 1, MPI_INT, SERVER_NODE, tag_chunksize, MPI_COMM_WORLD, &status);
int* data = new int[chunksize];
MPI_Recv(data, chunksize, MPI_INT, SERVER_NODE, tag_data, MPI_COMM_WORLD, &status);
long mysum = make_operation(data, chunksize, my_id);
/* Send my results back to the the server task */
MPI_Reduce(&mysum, NULL, 1, MPI_LONG, MPI_SUM, SERVER_NODE, MPI_COMM_WORLD);
delete [] data;
}
MPI_Finalize();
}
// Perform some operations with the array
long make_operation(int data[], int n_elements, int myid) {
long mysum = 0;
//We need to waste some time.
float* waster_array;
waster_array = new float[1000];
for(int i=0; i<n_elements; i++) {
mysum = mysum + data[i];
//Do the same amout of work per each position of the array. (keep it linear)
for(int j=0; j < 1000; j++) {
for(int k=0; k < 1000; k++) {
waster_array[j] = waster_array[i] + data[i] + 1 * 1.0;
}
}
}
delete [] waster_array;
cout << "proc [" << myid << "] sum = " << mysum << endl;
return(mysum);
}