MC logo

MPI Graph Diameter

  Parallel Programming

MPIdiam.c
/*  This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation; either version 2 of the License, or
    (at your option) any later version.

    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with this program; if not, write to the Free Software
    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
*/

/* PARAdiam computes the diameter of an undirected graph in parallel
 * Copyright (C) 1997, Jean-Albert Ferrez, Jean-Albert.Ferrez@epfl.ch
 */

#include <stdio.h>
#include <stdlib.h>
#include <sys/time.h>
#include <limits.h>
#include <math.h>
#include <mpi.h>
#include "heap.h"
#include "diam.h"

#define ADD_WORK_TIME gettimeofday(&ti0); \
    worktime += (ti0.tv_sec-ti1.tv_sec)*1000 + (ti0.tv_usec-ti1.tv_usec)/1000;

#define ADD_COMM_TIME gettimeofday(&ti1); \
    commtime += (ti1.tv_sec-ti0.tv_sec)*1000 + (ti1.tv_usec-ti0.tv_usec)/1000;

/* MPI messages tags */
#define SEND_S_TAG 0
#define RECV_T_TAG 1
#define NAMELEN_TAG 10
#define NAME_TAG 11
#define WORK_TAG 20
#define COMM_TAG 21

int npes, my_pe;

typedef struct {
  int s;             /* source tested */
  int t;             /* (one of the) furthest vertex */
  double min;        /* length of a longest shortest path from s */
  double max;        /* diameter of the associated tree (rooted in s) */
} TDijRes;

void buildMPItype(MPI_Datatype* mt)
{
  TDijRes r;

  MPI_Datatype t[4];
  int bl[4];
  MPI_Aint a[5];
  MPI_Aint d[4];

  t[0] = t[1] = MPI_INT;
  t[2] = t[3] = MPI_DOUBLE;

  bl[0] = bl[1] = bl[2] = bl[3] = 1;

  MPI_Address(&(r), &a[0]);
  MPI_Address(&(r.s), &a[1]);
  MPI_Address(&(r.t), &a[2]);
  MPI_Address(&(r.min), &a[3]);
  MPI_Address(&(r.max), &a[4]);

  d[0] = a[1] - a[0];
  d[1] = a[2] - a[0];
  d[2] = a[3] - a[0];
  d[3] = a[4] - a[0];

  MPI_Type_struct(4, bl, d, t, mt);
  MPI_Type_commit(mt);
};

void main(int argc, char *argv[])
{
  int N;      /* number of vertices */
  int E;      /* number of edges */
  edge** A;   /* adjacency matrix */
  int* D;     /* degrees */
  int* done;  /* vertex has already been the root */

#ifdef WEIGHTED
  BH h;          /* binary heap */
  BHelt* elt;    /* heap elements */
  int *pos;      /* positions in heap */
  double *dist;  /* current shortest distance to s */
#else
  int *Q;        /* FIFO queue */
  int *dist;     /* current shortest distance to s */
#endif
  int *pred;     /* predecessor in shortest distance tree */

  int i, j, s;        /* various counters */
  int finished = 0;   /* flag */
  int display = 0;    /* display the results for the first 10 sources
                         (command line option) */

  double Max = -HUGE_VAL;  /* upper bound */
  double Min = HUGE_VAL;   /* lower bound */

  int td = 0;       /* number of sources actually tested */
  int *ptd;         /* number of sources actually tested per thread */
  int ness = 0;     /* diameter found after ness sources tested */
  int next;         /* next source to test */
  int nbslave = 0;  /* number of threads busy at computing for some s */

  char procname[100];   /* name of the MPI task */
  time_t wctime;
  struct timeval ti0, ti1;
  long worktime = 0; /* accumulated time doing computation (miliseconds) */
  long commtime = 0; /* accumulated time waiting for data (miliseconds) */
  long Worktime = 0, Commtime = 0; /* Totals for all threads */

  TDijRes buffer;      /* buffer for MPI */
  TDijRes b_in, b_out;  /* buffers for mixing computation and communication */

  MPI_Status status;
  MPI_Request req_in;
  MPI_Request req_out;
  MPI_Datatype TDijResMPItype;

  MPI_Init(&argc, &argv);
  MPI_Comm_size(MPI_COMM_WORLD, &npes);
  MPI_Comm_rank(MPI_COMM_WORLD, &my_pe);
  nbslave = npes-1;

  buildMPItype(&TDijResMPItype);

  if (npes < 2) {
    if (my_pe == 0) {
      printf("Error: %s needs at least two MPI tasks to run.\n", argv[0]);
    }
    MPI_Finalize();
    exit(1);
  }

  if (argc < 2 || argc > 3 || (argc == 3 && atoi(argv[2]) < 0)) {
    if (my_pe == 0) {
      printf("usage: %s datafile [display]\n", argv[0]);
    }
    MPI_Finalize();
    exit(1);
  }

  if (argc == 3) display = atoi(argv[2]);

  if (my_pe == 0) {
#ifdef WEIGHTED
    printf("Starting (weighted) %s on %d MPI threads.\n", argv[0], npes);
#else
    printf("Starting %s on %d MPI threads.\n", argv[0], npes);
#endif
  }

  fileopen(argv[1], &N, &E, &A, &D, my_pe);

  /* Test if N large enough for npes */
  if (N < 2*nbslave) {
    if (my_pe == 0) {
      printf("Error: %s needs at least %d vertices to run on %d MPI tasks.\n",
             argv[0], 2*nbslave, npes);
    }
    MPI_Finalize();
    exit(1);
  }

  /***** master code ******************************************************/

  if (my_pe == 0) {
    ptd = (int*) malloc(npes*sizeof(int));
    for (i=0; i<npes; i++) ptd[i] = 0;
    done = (int*) malloc((N+1)*sizeof(int));
    for (i=0; i<N; i++) done[i] = 0;
    next = 2*nbslave;

    if (display > 0) {
      printf("#PE     s        t     graph (best)   tree (best)\n");
      printf("#------------------------------------------------\n");
    }

    wctime = time(0);
    gettimeofday(&ti1);

    /* initial sources distribution : the slaves are clever enough to know
       where they have to start, the master just keeps a note of it. */
    for (i=0; i<next; i++)
      done[i] = 1;

    ADD_WORK_TIME;

    nbslave *= 2; /* number of sources remaining on the slaves */

    while (nbslave > 0) {
      MPI_Recv(&buffer, 1, TDijResMPItype, MPI_ANY_SOURCE, RECV_T_TAG,
               MPI_COMM_WORLD, &status);

      ADD_COMM_TIME;

      td++;
      ptd[status.MPI_SOURCE]++;
      if (buffer.min < Min) Min = buffer.min;
      if (buffer.max > Max) { Max = buffer.max; ness = td; };

      if (td <= display) {
        printf(" %2d  %6d   %6d   %5.1f %5.1f   %5.1f %5.1f\n",
               status.MPI_SOURCE, buffer.s, buffer.t,
               buffer.max, Max, buffer.min, Min);
      }

      /* some magic for the selection of the next source :
         try the freshly returned furthest vertex, and if already done,
         scan the list for undone sources */
      buffer.s = buffer.t;
      if (done[buffer.s] == 1) {
        for ( ; next < N && done[next] == 1; next++) { };
        buffer.s = next;
      }
      done[buffer.s] = 1;  /* done[] has size N+1 to accept this */

      /* special meaning : the bounds have met, terminate */
      if (Max >= Min) buffer.s = N;

      buffer.t = -1;
      MPI_Send(&buffer, 1, TDijResMPItype, status.MPI_SOURCE,
               SEND_S_TAG, MPI_COMM_WORLD);
      if (buffer.s >= N) nbslave--;

      ADD_WORK_TIME;
    }

    wctime = time(0) - wctime;

    MPI_Barrier(MPI_COMM_WORLD);

    MPI_Get_processor_name(procname, &i); 

    printf("Diameter          = %.1f\n", Max);
    printf("Upper Bound       = %.1f\n", Min);
    printf("task_no   #nodes   work_time  comm_time     total   %%comm\n");
    printf("----------------------------------------------------------\n");
    printf("(master)         %10.3f %10.3f %10.3f  %6.2f%%  %s\n",
           worktime/1000.0, commtime/1000.0, (worktime + commtime)/1000.0,
           100.0*commtime/(worktime+commtime), procname);
    printf("----------------------------------------------------------\n");

/*     Worktime = worktime; */
/*     Commtime = commtime; */

    /* Recieve names, worktimes and commtimes */
    for (i=1; i<npes; i++) {
      MPI_Recv(&j, 1, MPI_INT, i, NAMELEN_TAG, MPI_COMM_WORLD, &status);
      MPI_Recv(procname, j, MPI_CHAR, i, NAME_TAG, MPI_COMM_WORLD, &status);
      procname[j] = 0;
      MPI_Recv(&worktime, 1, MPI_LONG, i, WORK_TAG, MPI_COMM_WORLD, &status);
      MPI_Recv(&commtime, 1, MPI_LONG, i, COMM_TAG, MPI_COMM_WORLD, &status);
      printf(" PE %-2d  %6d   %10.3f %10.3f %10.3f  %6.2f%%  %s\n", i, ptd[i],
             worktime/1000.0, commtime/1000.0, (worktime + commtime)/1000.0,
             100.0*commtime/(worktime+commtime), procname);

      Worktime += worktime;
      Commtime += commtime;
    }
    printf("TOTAL   %6d   %10.3f %10.3f\n", td,
           Worktime/1000.0, Commtime/1000.0);
    printf("AVERAGE %8.1f %10.3f %10.3f %10.3f  %6.2f%%\n", td/(npes-1.0),
           Worktime/1000.0/(npes-1.0), Commtime/1000.0/(npes-1.0),
           (Worktime + Commtime)/1000.0/(npes-1.0),
           100.0*Commtime/(Worktime+Commtime));
    printf("----------------------------------------------------------\n");
    printf("Wall clock time : %d seconds.\n", wctime);

    free(ptd);
    free(done);

  }

  /***** slaves code ******************************************************/

  else {

#ifdef WEIGHTED
    elt = (BHelt*) malloc(N*sizeof(BHelt));
    pos = (int*) malloc(N*sizeof(int));
    BHcreate(&h, N, pos);
    dist = (double*) malloc(N*sizeof(double));
#else
    Q = (int*) malloc(N*sizeof(int));
    dist = (int*) malloc(N*sizeof(int));
#endif
    pred = (int*) malloc(N*sizeof(int));

    gettimeofday(&ti1);

    /* initial source */
    buffer.s = my_pe-1;

    /* preallocation of the next source */
    b_in.s = buffer.s + nbslave;

    /* compute first source (idem as inside the while) */
    for (i=0; i<N; i++) for (j=0; j<D[i]; j++) A[i][j].intree = 0;
#ifdef WEIGHTED
    wdijkstra(N, A, D, buffer.s, &(buffer.t), &(buffer.max),
              h, elt, pos, dist, pred);
#else
    dijkstra(N, A, D, buffer.s, &(buffer.t), &(buffer.max),
             Q, dist, pred);
#endif
    buffer.min = treedist(N, A, D, buffer.t, -1);

    b_out = buffer;
    buffer.s = b_in.s;

    ADD_WORK_TIME;

    /* stopping condition (also if npes > N) */
    while (buffer.s < N) {

      /* send out the results ... */
      MPI_Isend(&b_out, 1, TDijResMPItype, 0, RECV_T_TAG, MPI_COMM_WORLD,
                &req_out);

      /* ... and recieve a new job (without waiting) */
      MPI_Irecv(&b_in, 1, TDijResMPItype, 0, SEND_S_TAG, MPI_COMM_WORLD,
                &req_in);

      ADD_COMM_TIME;

      for (i=0; i<N; i++) for (j=0; j<D[i]; j++) A[i][j].intree = 0;
#ifdef WEIGHTED
      wdijkstra(N, A, D, buffer.s, &(buffer.t), &(buffer.max),
                h, elt, pos, dist, pred);
#else
      dijkstra(N, A, D, buffer.s, &(buffer.t), &(buffer.max),
               Q, dist, pred);
#endif
      buffer.min = treedist(N, A, D, buffer.t, -1);

      ADD_WORK_TIME;

      /* Check that previous result has gone */
      MPI_Wait(&req_out, &status);
      b_out = buffer;

      /* Check that next source has arrived */
      MPI_Wait(&req_in, &status);
      buffer.s = b_in.s;
    }

    /* send out the results of the last source */
    MPI_Send(&buffer, 1, TDijResMPItype, 0, RECV_T_TAG, MPI_COMM_WORLD);

    ADD_COMM_TIME;

    MPI_Barrier(MPI_COMM_WORLD);

    /* Send my name, worktime and commtime to master */
    MPI_Get_processor_name(procname, &i);
    MPI_Send(&i, 1, MPI_INT, 0, NAMELEN_TAG, MPI_COMM_WORLD);
    MPI_Send(procname, i, MPI_CHAR, 0, NAME_TAG, MPI_COMM_WORLD);
    MPI_Send(&worktime, 1, MPI_LONG, 0, WORK_TAG, MPI_COMM_WORLD);
    MPI_Send(&commtime, 1, MPI_LONG, 0, COMM_TAG, MPI_COMM_WORLD);

#ifdef WEIGHTED
    free(pos);
    free(elt);
#else
    free(Q);
#endif
    free(pred);
    free(dist);
  }
  
  MPI_Barrier(MPI_COMM_WORLD);

  MPI_Finalize();
  exit(0);
}