/* This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
*/
/* PARAdiam computes the diameter of an undirected graph in parallel
* Copyright (C) 1997, Jean-Albert Ferrez, Jean-Albert.Ferrez@epfl.ch
*/
#include <stdio.h>
#include <stdlib.h>
#include <sys/time.h>
#include <limits.h>
#include <math.h>
#include <mpi.h>
#include "heap.h"
#include "diam.h"
#define ADD_WORK_TIME gettimeofday(&ti0); \
worktime += (ti0.tv_sec-ti1.tv_sec)*1000 + (ti0.tv_usec-ti1.tv_usec)/1000;
#define ADD_COMM_TIME gettimeofday(&ti1); \
commtime += (ti1.tv_sec-ti0.tv_sec)*1000 + (ti1.tv_usec-ti0.tv_usec)/1000;
/* MPI messages tags */
#define SEND_S_TAG 0
#define RECV_T_TAG 1
#define NAMELEN_TAG 10
#define NAME_TAG 11
#define WORK_TAG 20
#define COMM_TAG 21
int npes, my_pe;
typedef struct {
int s; /* source tested */
int t; /* (one of the) furthest vertex */
double min; /* length of a longest shortest path from s */
double max; /* diameter of the associated tree (rooted in s) */
} TDijRes;
void buildMPItype(MPI_Datatype* mt)
{
TDijRes r;
MPI_Datatype t[4];
int bl[4];
MPI_Aint a[5];
MPI_Aint d[4];
t[0] = t[1] = MPI_INT;
t[2] = t[3] = MPI_DOUBLE;
bl[0] = bl[1] = bl[2] = bl[3] = 1;
MPI_Address(&(r), &a[0]);
MPI_Address(&(r.s), &a[1]);
MPI_Address(&(r.t), &a[2]);
MPI_Address(&(r.min), &a[3]);
MPI_Address(&(r.max), &a[4]);
d[0] = a[1] - a[0];
d[1] = a[2] - a[0];
d[2] = a[3] - a[0];
d[3] = a[4] - a[0];
MPI_Type_struct(4, bl, d, t, mt);
MPI_Type_commit(mt);
};
void main(int argc, char *argv[])
{
int N; /* number of vertices */
int E; /* number of edges */
edge** A; /* adjacency matrix */
int* D; /* degrees */
int* done; /* vertex has already been the root */
#ifdef WEIGHTED
BH h; /* binary heap */
BHelt* elt; /* heap elements */
int *pos; /* positions in heap */
double *dist; /* current shortest distance to s */
#else
int *Q; /* FIFO queue */
int *dist; /* current shortest distance to s */
#endif
int *pred; /* predecessor in shortest distance tree */
int i, j, s; /* various counters */
int finished = 0; /* flag */
int display = 0; /* display the results for the first 10 sources
(command line option) */
double Max = -HUGE_VAL; /* upper bound */
double Min = HUGE_VAL; /* lower bound */
int td = 0; /* number of sources actually tested */
int *ptd; /* number of sources actually tested per thread */
int ness = 0; /* diameter found after ness sources tested */
int next; /* next source to test */
int nbslave = 0; /* number of threads busy at computing for some s */
char procname[100]; /* name of the MPI task */
time_t wctime;
struct timeval ti0, ti1;
long worktime = 0; /* accumulated time doing computation (miliseconds) */
long commtime = 0; /* accumulated time waiting for data (miliseconds) */
long Worktime = 0, Commtime = 0; /* Totals for all threads */
TDijRes buffer; /* buffer for MPI */
TDijRes b_in, b_out; /* buffers for mixing computation and communication */
MPI_Status status;
MPI_Request req_in;
MPI_Request req_out;
MPI_Datatype TDijResMPItype;
MPI_Init(&argc, &argv);
MPI_Comm_size(MPI_COMM_WORLD, &npes);
MPI_Comm_rank(MPI_COMM_WORLD, &my_pe);
nbslave = npes-1;
buildMPItype(&TDijResMPItype);
if (npes < 2) {
if (my_pe == 0) {
printf("Error: %s needs at least two MPI tasks to run.\n", argv[0]);
}
MPI_Finalize();
exit(1);
}
if (argc < 2 || argc > 3 || (argc == 3 && atoi(argv[2]) < 0)) {
if (my_pe == 0) {
printf("usage: %s datafile [display]\n", argv[0]);
}
MPI_Finalize();
exit(1);
}
if (argc == 3) display = atoi(argv[2]);
if (my_pe == 0) {
#ifdef WEIGHTED
printf("Starting (weighted) %s on %d MPI threads.\n", argv[0], npes);
#else
printf("Starting %s on %d MPI threads.\n", argv[0], npes);
#endif
}
fileopen(argv[1], &N, &E, &A, &D, my_pe);
/* Test if N large enough for npes */
if (N < 2*nbslave) {
if (my_pe == 0) {
printf("Error: %s needs at least %d vertices to run on %d MPI tasks.\n",
argv[0], 2*nbslave, npes);
}
MPI_Finalize();
exit(1);
}
/***** master code ******************************************************/
if (my_pe == 0) {
ptd = (int*) malloc(npes*sizeof(int));
for (i=0; i<npes; i++) ptd[i] = 0;
done = (int*) malloc((N+1)*sizeof(int));
for (i=0; i<N; i++) done[i] = 0;
next = 2*nbslave;
if (display > 0) {
printf("#PE s t graph (best) tree (best)\n");
printf("#------------------------------------------------\n");
}
wctime = time(0);
gettimeofday(&ti1);
/* initial sources distribution : the slaves are clever enough to know
where they have to start, the master just keeps a note of it. */
for (i=0; i<next; i++)
done[i] = 1;
ADD_WORK_TIME;
nbslave *= 2; /* number of sources remaining on the slaves */
while (nbslave > 0) {
MPI_Recv(&buffer, 1, TDijResMPItype, MPI_ANY_SOURCE, RECV_T_TAG,
MPI_COMM_WORLD, &status);
ADD_COMM_TIME;
td++;
ptd[status.MPI_SOURCE]++;
if (buffer.min < Min) Min = buffer.min;
if (buffer.max > Max) { Max = buffer.max; ness = td; };
if (td <= display) {
printf(" %2d %6d %6d %5.1f %5.1f %5.1f %5.1f\n",
status.MPI_SOURCE, buffer.s, buffer.t,
buffer.max, Max, buffer.min, Min);
}
/* some magic for the selection of the next source :
try the freshly returned furthest vertex, and if already done,
scan the list for undone sources */
buffer.s = buffer.t;
if (done[buffer.s] == 1) {
for ( ; next < N && done[next] == 1; next++) { };
buffer.s = next;
}
done[buffer.s] = 1; /* done[] has size N+1 to accept this */
/* special meaning : the bounds have met, terminate */
if (Max >= Min) buffer.s = N;
buffer.t = -1;
MPI_Send(&buffer, 1, TDijResMPItype, status.MPI_SOURCE,
SEND_S_TAG, MPI_COMM_WORLD);
if (buffer.s >= N) nbslave--;
ADD_WORK_TIME;
}
wctime = time(0) - wctime;
MPI_Barrier(MPI_COMM_WORLD);
MPI_Get_processor_name(procname, &i);
printf("Diameter = %.1f\n", Max);
printf("Upper Bound = %.1f\n", Min);
printf("task_no #nodes work_time comm_time total %%comm\n");
printf("----------------------------------------------------------\n");
printf("(master) %10.3f %10.3f %10.3f %6.2f%% %s\n",
worktime/1000.0, commtime/1000.0, (worktime + commtime)/1000.0,
100.0*commtime/(worktime+commtime), procname);
printf("----------------------------------------------------------\n");
/* Worktime = worktime; */
/* Commtime = commtime; */
/* Recieve names, worktimes and commtimes */
for (i=1; i<npes; i++) {
MPI_Recv(&j, 1, MPI_INT, i, NAMELEN_TAG, MPI_COMM_WORLD, &status);
MPI_Recv(procname, j, MPI_CHAR, i, NAME_TAG, MPI_COMM_WORLD, &status);
procname[j] = 0;
MPI_Recv(&worktime, 1, MPI_LONG, i, WORK_TAG, MPI_COMM_WORLD, &status);
MPI_Recv(&commtime, 1, MPI_LONG, i, COMM_TAG, MPI_COMM_WORLD, &status);
printf(" PE %-2d %6d %10.3f %10.3f %10.3f %6.2f%% %s\n", i, ptd[i],
worktime/1000.0, commtime/1000.0, (worktime + commtime)/1000.0,
100.0*commtime/(worktime+commtime), procname);
Worktime += worktime;
Commtime += commtime;
}
printf("TOTAL %6d %10.3f %10.3f\n", td,
Worktime/1000.0, Commtime/1000.0);
printf("AVERAGE %8.1f %10.3f %10.3f %10.3f %6.2f%%\n", td/(npes-1.0),
Worktime/1000.0/(npes-1.0), Commtime/1000.0/(npes-1.0),
(Worktime + Commtime)/1000.0/(npes-1.0),
100.0*Commtime/(Worktime+Commtime));
printf("----------------------------------------------------------\n");
printf("Wall clock time : %d seconds.\n", wctime);
free(ptd);
free(done);
}
/***** slaves code ******************************************************/
else {
#ifdef WEIGHTED
elt = (BHelt*) malloc(N*sizeof(BHelt));
pos = (int*) malloc(N*sizeof(int));
BHcreate(&h, N, pos);
dist = (double*) malloc(N*sizeof(double));
#else
Q = (int*) malloc(N*sizeof(int));
dist = (int*) malloc(N*sizeof(int));
#endif
pred = (int*) malloc(N*sizeof(int));
gettimeofday(&ti1);
/* initial source */
buffer.s = my_pe-1;
/* preallocation of the next source */
b_in.s = buffer.s + nbslave;
/* compute first source (idem as inside the while) */
for (i=0; i<N; i++) for (j=0; j<D[i]; j++) A[i][j].intree = 0;
#ifdef WEIGHTED
wdijkstra(N, A, D, buffer.s, &(buffer.t), &(buffer.max),
h, elt, pos, dist, pred);
#else
dijkstra(N, A, D, buffer.s, &(buffer.t), &(buffer.max),
Q, dist, pred);
#endif
buffer.min = treedist(N, A, D, buffer.t, -1);
b_out = buffer;
buffer.s = b_in.s;
ADD_WORK_TIME;
/* stopping condition (also if npes > N) */
while (buffer.s < N) {
/* send out the results ... */
MPI_Isend(&b_out, 1, TDijResMPItype, 0, RECV_T_TAG, MPI_COMM_WORLD,
&req_out);
/* ... and recieve a new job (without waiting) */
MPI_Irecv(&b_in, 1, TDijResMPItype, 0, SEND_S_TAG, MPI_COMM_WORLD,
&req_in);
ADD_COMM_TIME;
for (i=0; i<N; i++) for (j=0; j<D[i]; j++) A[i][j].intree = 0;
#ifdef WEIGHTED
wdijkstra(N, A, D, buffer.s, &(buffer.t), &(buffer.max),
h, elt, pos, dist, pred);
#else
dijkstra(N, A, D, buffer.s, &(buffer.t), &(buffer.max),
Q, dist, pred);
#endif
buffer.min = treedist(N, A, D, buffer.t, -1);
ADD_WORK_TIME;
/* Check that previous result has gone */
MPI_Wait(&req_out, &status);
b_out = buffer;
/* Check that next source has arrived */
MPI_Wait(&req_in, &status);
buffer.s = b_in.s;
}
/* send out the results of the last source */
MPI_Send(&buffer, 1, TDijResMPItype, 0, RECV_T_TAG, MPI_COMM_WORLD);
ADD_COMM_TIME;
MPI_Barrier(MPI_COMM_WORLD);
/* Send my name, worktime and commtime to master */
MPI_Get_processor_name(procname, &i);
MPI_Send(&i, 1, MPI_INT, 0, NAMELEN_TAG, MPI_COMM_WORLD);
MPI_Send(procname, i, MPI_CHAR, 0, NAME_TAG, MPI_COMM_WORLD);
MPI_Send(&worktime, 1, MPI_LONG, 0, WORK_TAG, MPI_COMM_WORLD);
MPI_Send(&commtime, 1, MPI_LONG, 0, COMM_TAG, MPI_COMM_WORLD);
#ifdef WEIGHTED
free(pos);
free(elt);
#else
free(Q);
#endif
free(pred);
free(dist);
}
MPI_Barrier(MPI_COMM_WORLD);
MPI_Finalize();
exit(0);
}