AMROC Main     Blockstructured Adaptive Mesh Refinement in object-oriented C++


Main Page   Class Hierarchy   Compound List   File List  

CommServer.C

Go to the documentation of this file.
00001 
00006 #if  __GNUC__ >= 3
00007 #else
00008 extern "C" {
00009 void exit(int);
00010 }
00011 #endif
00012 
00013 #include "CommServer.h"
00014 
00015 // ---------------------------------------------------------------------
00016 
00017 int dim_cube( const int N )
00018   { int i; for ( i = 0 ; ( 1 << i ) < N ; i++ ); return i ; }
00019 
00020 // ---------------------------------------------------------------------
00021 
00022 ostream & operator << ( ostream & OS , const MPI_Status & MS )
00023 {
00024   return
00025     OS << "{" << dec << MS.MPI_SOURCE << " , " << MS.MPI_TAG << dec << "}";
00026     //OS << "{ " << hex << MS.MPI_SOURCE << " , " << MS.MPI_TAG << dec << "}";
00027 }
00028 
00029 // ---------------------------------------------------------------------
00030 
00031 // ---------------------------------------------------------------------
00032 
00033 void comm_service::error_msg( const char who[] , const char what[] , int R )
00034 {
00035 
00036 #ifdef DAGH_NO_MPI
00037 #else
00038 
00039   char message[ MPI_MAX_ERROR_STRING ];
00040   int me ;
00041   int len = MPI_MAX_ERROR_STRING ;
00042 
00043   MPI_Comm_rank( CommWorld , &me );
00044 
00045   MPI_Error_string( R , message , &len );
00046 
00047   for (int i=0; i<1000; i++)
00048     cerr << "P" << me << " : " << who << " " << what << " => " << R << endl ;
00049 
00050 #endif
00051 }
00052 void comm_service::error_die( const char who[] , const char what[] , int R )
00053 {
00054   error_msg( who , what , R );
00055   exit(-1);
00056 }
00057 
00058 // ---------------------------------------------------------------------
00059 
00060 class comm_barrier : public comm_service {
00061 public:
00062   const int cycle ;
00063   const int who ;
00064   int flag ;
00065   MPI_Comm comm;
00066 
00067   comm_barrier( const int Cycle , const int Who, MPI_Comm C, unsigned const T );
00068   inline ~comm_barrier() {}
00069 
00070   void callrecv(  const MPI_Status & );
00071 
00072   const char * name(void) const ;
00073 };
00074 
00075 // ---------------------------------------------------------------------
00076 
00077 #ifndef CommIncrement
00078 #define CommIncrement (128)
00079 #endif
00080 
00081 // ---------------------------------------------------------------------
00082 
00083 ofstream        comm_service::oflog ; 
00084 ostream*       comm_service::olog = (ostream *) 0; 
00085 int             comm_service::init_flag = 0 ;
00086 int             comm_service::dce_flag = 0 ;
00087 
00088 int             comm_service::PMe   = -1 ;
00089 
00090 int             comm_service::NList =  0 ;
00091 comm_service ** comm_service::SList =  0 ;
00092 comm_barrier ** comm_service::Barrier = 0 ;
00093 comm_barrier ** comm_service::BarrierWorld = 0 ;
00094 MPI_Request   * comm_service::RList =  0 ;
00095 
00096 int             comm_service::PNum  = -1 ;
00097 MPI_Comm        comm_service::Comm  =  0 ;
00098 MPI_Group       comm_service::Grp = 0 ;
00099 
00100 int             comm_service::io_flag = 0;
00101 int             comm_service::PIO = -1;
00102 MPI_Comm        comm_service::CommIO = 0;
00103 
00104 int             comm_service::PWorld = 0;
00105 MPI_Comm        comm_service::CommWorld = 0;
00106 MPI_Group       comm_service::GrpWorld = 0 ;
00107 
00108 int             comm_service::NumCommArray = 0;
00109 MPI_Comm      * comm_service::CommArray = 0;
00110 
00111 double          comm_service::Idle_Time = 0.0 ;
00112 double          comm_service::Srvc_Time = 0.0 ;
00113 
00114 // ---------------------------------------------------------------------
00115 
00116 int comm_service::findTag( const int Id , const int Tag,  const int Src )
00117 {
00118   const int NL = NList ;
00119   comm_service ** const SL = SList ;
00120   for ( register int i = 1 ; i < NL ; i++ )
00121      if ( SL[i] && SL[i]->Id == Id && 
00122           SL[i]->Tag == Tag && SL[i]->Src == Src ) return i ;
00123   return -1 ;
00124 }
00125 
00126 int comm_service::findService( comm_service * const CS )
00127 {
00128   const int NL = NList ;
00129   comm_service ** const SL = SList ;
00130   for ( register int i = 1 ; i < NL ; i++ ) if ( SL[i] == CS ) return i ;
00131   return -1 ;
00132 }
00133 
00134 int comm_service::findRequest( const MPI_Request req_in )
00135 {
00136   const int NL = NList ;
00137   const MPI_Request * const RL = RList ;
00138   for ( register int i = 1 ; i < NL ; i++ ) if ( RL[i] == req_in ) return i ;
00139   return -1 ;
00140 }
00141 
00142 void comm_service::setreq( int const I , int const T , int const S)
00143 {
00144   int J;
00145   if ( (J = findTag(I,T,S)) >= 0)
00146     Req = RList + J;
00147   else 
00148     Req = 0;
00149 }
00150 
00151 // ---------------------------------------------------------------------
00152 // Initialize
00153 // ---------------------------------------------------------------------
00154 int comm_service::init( MPI_Comm c )
00155 {
00156   if (init_flag == 1) return MPI_SUCCESS;
00157 
00158   int flag;
00159 #ifdef DAGH_NO_MPI
00160   flag = 0;
00161 #else
00162   MPI_Initialized(&flag);
00163 #endif
00164 
00165   if (flag == 0) {
00166     PMe = 0;
00167     PNum = 1;
00168     dce_flag = 0;
00169     CommWorld = Comm = c ;
00170     if (io_flag == 1) {
00171       CommIO = c;
00172       PIO = 0;
00173     }
00174     PWorld = 1;
00175 
00176 #ifdef DEBUG_PRINT
00177     {
00178      char buf[64];
00179      ostrstream obuf(buf,sizeof(buf),ios::out);
00180      obuf << "P" << PMe << ".log" << ((char)0) ;
00181      oflog.open( buf , ios::out );     
00182      if (olog) delete olog;
00183      olog = new ostream(oflog.rdbuf());
00184     }
00185 #endif
00186 
00187     init_flag = 1;
00188     return MPI_SUCCESS;
00189   }
00190 
00191 #ifdef DAGH_NO_MPI
00192 #else
00193 
00194   if ( SList ) return MPI_ERR_COMM ;
00195 
00196   // All MPI_Errhandler_set() added for correct error-handling --- RD
00197   MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_RETURN);
00198 
00199   //if ( 0 == c ) c = MPI_COMM_WORLD ;
00200   if ( 0 == c ) MPI_Comm_dup(MPI_COMM_WORLD, &c) ;
00201   MPI_Errhandler_set(c, MPI_ERRORS_RETURN);
00202 
00203   dce_flag = 1 ;
00204 
00205   {
00206     int R , Me , Num ;
00207     if ( MPI_SUCCESS != ( R = MPI_Comm_rank( c , &Me  ) ) ) return R ;
00208     if ( MPI_SUCCESS != ( R = MPI_Comm_size( c , &Num ) ) ) return R ;
00209 
00210     if (io_flag == 1 && Num > 1) {
00211       PMe = Me ;
00212       PIO = Num-1;
00213       PNum = Num-1 ;
00214       PWorld = Num;
00215       CommWorld = CommIO = c;    
00216 
00217       // Define the compute node communicator 
00218       int ranks[1]; ranks[0] = Num-1;
00219       MPI_Comm_group(CommWorld, &GrpWorld);
00220       MPI_Group_excl(GrpWorld, 1, ranks, &Grp);
00221       MPI_Comm_create(CommWorld, Grp, &Comm);
00222       MPI_Errhandler_set(Comm, MPI_ERRORS_RETURN);
00223     }
00224     else if (io_flag == 1 && Num == 1) {
00225       PMe = Me ;
00226       PIO = Me;
00227       PNum = Num ;
00228       PWorld = Num;
00229       CommWorld = CommIO = Comm = c;    
00230       MPI_Comm_group(Comm, &Grp);
00231       GrpWorld = Grp;
00232     }
00233     else {
00234       PMe  = Me ;
00235       PWorld = PNum = Num ;
00236       CommWorld = Comm = c ;
00237       MPI_Comm_group(Comm, &Grp);
00238       GrpWorld = Grp;
00239     }
00240   }
00241 
00242 #ifdef DEBUG_PRINT
00243     {
00244      char buf[64];
00245      ostrstream obuf(buf,sizeof(buf),ios::out);
00246      obuf << "P" << PMe << ".log" << ((char)0) ;
00247      oflog.open( buf , ios::out );
00248      if (olog) delete olog;
00249      olog = new ostream(oflog.rdbuf());
00250     }
00251 #endif
00252 
00253   {
00254     comm_service ** const SL = SList = new comm_service*[ CommIncrement ];
00255     MPI_Request   * const RL = RList = new MPI_Request[ CommIncrement ];
00256     register int i ;
00257     for ( i = 0 ; i < CommIncrement ; i++ ) SL[i] = 0 ;
00258     for ( i = 0 ; i < CommIncrement ; i++ ) RL[i] = MPI_REQUEST_NULL ;
00259   }
00260   NList = 1 ;
00261 
00262   init_flag = 1;
00263 
00264   // Post compute barrier receives
00265   if (PMe < PNum) {
00266     const int me  = PMe ;
00267     const int num = PNum ;
00268 
00269     const int dcube = dim_cube( num );
00270     if (dcube > 0) {
00271       comm_barrier ** const B = Barrier = new comm_barrier * [ dcube ];
00272 
00273       for ( int cycle = 0 ; cycle < dcube ; cycle++ ) {
00274         B[cycle] = 0 ;
00275   
00276         const int upper = 1 << ( cycle + 1 );
00277         const int lower = 1 << cycle ;
00278         const int who   = me ^ lower ;
00279   
00280         if ( who < num )
00281           if (  ( lower <= who && who < upper )   // Fan-in  receive
00282              || ( lower <= me  &&  me < upper ) ) // Fan-out receive
00283             B[cycle] = new comm_barrier( cycle , who, Comm, 
00284                            (comm_service_tag|comm_service_comp_tag) );
00285       }
00286     }
00287   }
00288 
00289   // Post world barrier receives
00290   {
00291     const int me  = PMe ;
00292     const int num = PWorld ;
00293 
00294     const int dcube = dim_cube( num );
00295     if (dcube > 0) {
00296       comm_barrier ** const B = BarrierWorld = new comm_barrier * [ dcube ];
00297 
00298       for ( int cycle = 0 ; cycle < dcube ; cycle++ ) {
00299   
00300         B[cycle] = 0 ;
00301 
00302         const int upper = 1 << ( cycle + 1 );
00303         const int lower = 1 << cycle ;
00304         const int who   = me ^ lower ;
00305   
00306         if ( who < num )
00307           if (  ( lower <= who && who < upper )   // Fan-in  receive
00308              || ( lower <= me  &&  me < upper ) ) // Fan-out receive
00309             B[cycle] = new comm_barrier( cycle , who, CommWorld,
00310                            (comm_service_tag|comm_service_world_tag) );
00311       }
00312     }
00313   }
00314 
00315 #endif
00316 
00317   return MPI_SUCCESS ;
00318 }
00319 
00320 // ---------------------------------------------------------------------
00321 // Clean & Kill
00322 // ---------------------------------------------------------------------
00323 void comm_service::clean(void)
00324 
00325 {
00326 #ifdef DAGH_NO_MPI
00327 #else
00328 
00329   if (!dce_flag) return;  
00330 
00331 #ifdef DEBUG_PRINT_COMM
00332      ( oflog << "comm_service::clean Cleaning Up !" << endl ).flush();
00333 #endif
00334 
00335   if ( Barrier ) {
00336 
00337 #ifdef DEBUG_PRINT_COMM
00338      ( oflog << "comm_service::clean Deleting Barrier" << endl ).flush();
00339 #endif 
00340 
00341     const int dcube = dim_cube( PNum );
00342     for ( int i = 0 ; i < dcube ; i++ )
00343       if ( Barrier[i] ) delete Barrier[i] ;
00344     delete[] Barrier ;
00345     Barrier = 0 ;
00346   }
00347   if ( BarrierWorld ) {
00348 #ifdef DEBUG_PRINT_COMM
00349      ( oflog << "comm_service::clean Deleting BarrierWorld" << endl ).flush();
00350 #endif
00351 
00352     const int dcube = dim_cube( PWorld );
00353     for ( int i = 0 ; i < dcube ; i++ )
00354       if ( BarrierWorld[i] ) delete BarrierWorld[i] ;
00355     delete[] BarrierWorld ;
00356     BarrierWorld = 0 ;
00357   }
00358 
00359   {
00360     comm_service ** const SL = SList ;
00361     MPI_Request   * const RL = RList ;
00362     const int N = NList ;
00363 
00364     for ( int i = 0 ; i < N ; i++ ) {
00365       if (RL[i] != MPI_REQUEST_NULL) {
00366         if (comm_service::proc_me() == comm_service::proc_io()) {
00367           while (RL[i] != MPI_REQUEST_NULL) serve(RL[i]);
00368         }
00369         else {
00370           serve();
00371 // #ifndef MPICH
00372 //        MPI_Status MS ;
00373 //        MPI_Cancel( RL + i );
00374 //        MPI_Wait( RL + i, &MS );
00375 // #else
00376 //        MPI_Request_free( RL + i );
00377 // #endif
00378 #ifndef MPI_SCORE2
00379           MPI_Status MS ;
00380           MPI_Cancel( RL + i );
00381           MPI_Wait( RL + i, &MS );
00382 #else
00383           MPI_Request_free( RL + i );
00384 #endif
00385         }
00386       }
00387       if ( SL[i] ) {
00388         SL[i]->Req = 0;
00389         SL[i] = 0;
00390       }
00391     }
00392 
00393     delete [] SL;
00394     delete [] RL;
00395   }
00396 
00397   int commcomp1 = MPI_IDENT;
00398   int commcomp2 = MPI_IDENT;
00399 
00400   if (Comm == (MPI_Comm)-1) Comm = 0; // Only on the IO proc.
00401 
00402   /* I addded the if's because on the SGI's MPICH
00403      gives and error if the args are not valid communicators */
00404   if (Comm != 0 && Comm != MPI_COMM_NULL)
00405     MPI_Comm_compare(Comm,CommWorld,&commcomp1);
00406   if (CommIO != 0 && CommIO != MPI_COMM_NULL)
00407     MPI_Comm_compare(CommWorld,CommIO,&commcomp2);
00408 
00409   if (commcomp1 == MPI_IDENT && commcomp2 == MPI_IDENT) {
00410     if (CommWorld != 0 && CommWorld != MPI_COMM_NULL && CommWorld != MPI_COMM_WORLD) 
00411       MPI_Comm_free(&CommWorld);
00412   }
00413   else if (commcomp1 == MPI_IDENT) {
00414     if (CommWorld != 0 && CommWorld != MPI_COMM_NULL && CommWorld != MPI_COMM_WORLD) 
00415       MPI_Comm_free(&CommWorld);
00416     if (CommIO != 0 && CommIO != MPI_COMM_NULL && CommIO != MPI_COMM_WORLD) 
00417       MPI_Comm_free(&CommIO);
00418   }
00419   else if (commcomp2 == MPI_IDENT) {
00420     if (CommWorld != 0 && CommWorld != MPI_COMM_NULL && CommWorld != MPI_COMM_WORLD) 
00421       MPI_Comm_free(&CommWorld);
00422     if (Comm != 0 && Comm != MPI_COMM_NULL && Comm != MPI_COMM_WORLD) 
00423       MPI_Comm_free(&Comm);
00424   }
00425   else {
00426     if (CommIO != 0 && CommIO != MPI_COMM_NULL && CommIO != MPI_COMM_WORLD) 
00427       MPI_Comm_free(&CommIO);
00428     if (CommWorld != 0 && CommWorld != MPI_COMM_NULL && CommWorld != MPI_COMM_WORLD) 
00429       MPI_Comm_free(&CommWorld);
00430     if (Comm != 0 && Comm != MPI_COMM_NULL && Comm != MPI_COMM_WORLD) 
00431       MPI_Comm_free(&Comm);
00432   } 
00433   
00434   if (CommArray) delete [] CommArray;
00435 
00436   dce_flag = 0;
00437 
00438 #ifdef DEBUG_PRINT_COMM
00439      ( oflog << "comm_service::clean Done Cleaning Up !" << endl ).flush();
00440 #endif
00441 
00442   return ;
00443 #endif
00444 }
00445 
00446 void comm_service::kill(void)
00447 { 
00448 
00449  if ( dce_flag ) clean() ;
00450 
00451 #ifdef DEBUG_PRINT_COMM
00452      ( oflog << "comm_service::kill Communications shut-down !" << endl ).flush();
00453 #endif
00454 
00455 #ifdef DEBUG_PRINT
00456   oflog.close();
00457   if (olog) delete olog;
00458 #endif
00459 
00460   return ;
00461 }
00462 
00463 // ---------------------------------------------------------------------
00464 // Add a service
00465 // ---------------------------------------------------------------------
00466 comm_service::comm_service( const int I , const int T , const int S ) 
00467     : Req(0), Id(I), Tag(T), Src(S)
00468 {
00469   if ( !init_flag ) comm_service::init(); // Initialize static parameters
00470 
00471 #ifdef DAGH_NO_MPI
00472 #else
00473 
00474   if ( 0 <= findTag( I , T , S ) ) {
00475     Req = 0 ;
00476     oflog << "comm_service::comm_service" 
00477           << "(Id:" << I << ")" 
00478           << "(Tag:" << T << ")"
00479           << "(Src:" << S << ")"
00480           << "DUPLICATE TAG: Service not constructed."
00481           << endl ;
00482     oflog.flush();
00483     return ;
00484   }
00485 
00486   comm_service ** const SL = SList ;
00487   MPI_Request   * const RL = RList ;
00488   const int N = NList ;
00489 
00490   // Enlarge array as necessary
00491 
00492   if ( 0 == ( N % CommIncrement ) ) {
00493     MPI_Request   * const R = RList = new MPI_Request[ N + CommIncrement ];
00494     comm_service ** const S = SList = new comm_service *[ N + CommIncrement ];
00495     register int i ;
00496     if ( SL ) {
00497       for ( i = 0 ; i < N ; i++ ) { 
00498         if (RL[i]) {
00499           R[i] = RL[i] ;
00500           RL[i] = MPI_REQUEST_NULL;
00501         }
00502         else R[i] = MPI_REQUEST_NULL;
00503 
00504         if (SL[i]) {
00505           S[i] = SL[i];
00506           S[i]->Req = (R + i) ;
00507           SL[i] = 0;
00508         }
00509         else S[i] = 0 ;
00510       }
00511       delete[] SL ;
00512       delete[] RL ;
00513     }
00514     for ( i = N ; i < N + CommIncrement ; i++ ) {
00515        S[i] = 0 ;
00516        R[i] = MPI_REQUEST_NULL ;
00517     }
00518   }
00519 
00520   // Add to list
00521   SList[N] = this;
00522   SList[N]->Req = RList + N ;
00523   RList[N] = MPI_REQUEST_NULL ;
00524 
00525 #ifdef DEBUG_PRINT_COMM
00526      ( oflog << "comm_service::comm_service"
00527              << "(Id:" << Id << ")" 
00528              << "(Tag:" << Tag << ")"
00529              << "(Src:" << Src << ")"
00530              << "(NList:" << NList << ")"
00531              << "(N:" << N << ")"
00532              << "(*Req:" << *Req << ")"
00533              << "(RList[N]:" << RList[N] << ")"
00534              << endl ).flush();
00535 #endif
00536 
00537   NList++ ;
00538 
00539 #endif
00540 }
00541 
00542 // ---------------------------------------------------------------------
00543 // Delete a service
00544 // ---------------------------------------------------------------------
00545 comm_service::~comm_service()
00546 
00547 {
00548 
00549 #ifdef DAGH_NO_MPI
00550 #else
00551 
00552   if ( !Req ) return ; // Never registered
00553 
00554   const int I = findService( this );
00555 
00556   if ( I < 0 ) {
00557     oflog << "comm_service::~comm_service() UNREGISTERED OBJECT ?" << endl ;
00558     oflog.flush();
00559     return ;
00560   }
00561 
00562 #ifdef DEBUG_PRINT_COMM
00563      ( oflog << "comm_service::~comm_service"
00564              << "(Id:" << Id << ")" 
00565              << "(Tag:" << Tag << ")"
00566              << "(Src:" << Src << ")"
00567              << "(NList:" << NList << ")"
00568              << "(I:" << I << ")"
00569              << "(*Req:" << *Req << ")"
00570              << "(RList[I]:" << RList[I] << ")"
00571              << endl ).flush();
00572 #endif
00573 
00574   // Remove service from "registry"
00575   if ( MPI_REQUEST_NULL != *Req ) {
00576 // #ifndef MPICH
00577 //        MPI_Status MS ;
00578 //        MPI_Cancel( Req );
00579 //        MPI_Wait( Req , &MS );
00580 // #else
00581 //        MPI_Request_free( Req );
00582 // #endif
00583 #ifndef MPI_SCORE2
00584     MPI_Status MS ;
00585     MPI_Cancel( Req );
00586     MPI_Wait( Req , &MS );
00587 #else
00588     MPI_Request_free( Req );
00589 #endif
00590   }
00591 
00592   comm_service ** const SL = SList ;
00593   MPI_Request   * const RL = RList ;
00594 
00595   const int N = NList - 1 ;
00596 
00597   {
00598     for ( register int i = I ; i < N; i++ ) {
00599       RL[i] = RL[i+1];
00600       SL[i] = SL[i+1];
00601       SL[i]->Req = (RL + i);
00602     }
00603   }
00604 
00605   SL[N] = 0 ;
00606   RL[N] = MPI_REQUEST_NULL ;
00607 
00608   if ( 0 == ( N % CommIncrement ) ) {
00609     if ( N > 0 ) {
00610       MPI_Request   * const R = RList = new MPI_Request[ N ];
00611       comm_service ** const S = SList = new comm_service*[ N ];
00612       register int i ;
00613       for ( i = 0 ; i < N ; i++ ) {
00614         if (RL[i]) {
00615           R[i] = RL[i] ;
00616           RL[i] = MPI_REQUEST_NULL;
00617         }
00618         else R[i] = MPI_REQUEST_NULL;
00619 
00620         if (SL[i]) { 
00621           S[i] = SL[i];
00622           S[i]->Req = R + i ;
00623           SL[i] = 0;
00624         }
00625         else S[i] = 0 ;
00626       }
00627     }
00628     else {
00629       SList = 0 ;
00630       RList = 0 ;
00631     }
00632     delete[] SL ;
00633     delete[] RL ;
00634   }
00635 
00636   NList-- ;
00637 
00638 #endif
00639 
00640   return ;
00641 }
00642 
00643 // ---------------------------------------------------------------------
00644 
00645 void comm_service::callrecv( const MPI_Status & MS )
00646 {
00647   oflog << "comm_service::callrecv("
00648         << MS << ") DEFAULT CALLBACK!" << endl ;
00649   oflog.flush();
00650 }
00651 
00652 const char * comm_service::name( void ) const
00653 {
00654   static const char Name[] = "comm_service" ;
00655   return Name ;
00656 }
00657 
00658 // ---------------------------------------------------------------------
00659 // ---------------------------------------------------------------------
00660 // Perform services
00661 
00662 int comm_service::serve( void )
00663 {
00664 
00665 #ifdef DAGH_NO_MPI
00666 #else
00667 
00668   if ( NList <= 1 ) {
00669 #ifdef DEBUG_PRINT_COMM
00670      ( oflog << "comm_service::serve" << NList << " <= 1" << endl ).flush();
00671 #endif
00672     return MPI_SUCCESS ;
00673   }
00674 
00675   // Handle stockpiled requests
00676 
00677   int NumR ;
00678   int        * const IndR = new int[ NList ];
00679   MPI_Status * const MSR  = new MPI_Status[ NList ];
00680 
00681   MSR[0].MPI_SOURCE = MSR[0].MPI_TAG = 0 ;
00682 
00683   double next_time ;
00684   double start_time = MPI_Wtime();
00685 
00686   const int R = MPI_Testsome(NList,RList,&NumR,IndR,MSR);
00687 
00688   if ( MPI_SUCCESS == R ) {
00689     for ( int i = 0 ; i < NumR ; i++ ) {
00690       const int I = IndR[i];
00691 
00692       // MPI Spec says to use MSR[i], but 'chimp' uses MSR[I]
00693 
00694       //const MPI_Status &   MS = MSR[I] ;
00695       const MPI_Status &   MS = MSR[i] ;
00696       comm_service * const CS = SList[I];
00697 
00698 #ifdef DEBUG_PRINT_COMM
00699       if ( CS )
00700         ( oflog << "MPI_Testsome #" << i << " (" << I << ")" << " => " << MS 
00701                 << " => " << CS->name() << endl ).flush();
00702       else
00703         ( oflog << "MPI_Testsome #" << i << " => " << MS << endl ).flush();
00704 #endif
00705 
00706       if ( CS ) {
00707         Idle_Time += ( next_time = MPI_Wtime() ) - start_time ;
00708         CS->callrecv( MS );
00709         Srvc_Time += ( start_time = MPI_Wtime() ) - next_time ;
00710       }
00711     }
00712   }
00713 
00714   delete [] IndR ;
00715   delete [] MSR ;
00716 
00717   Idle_Time += MPI_Wtime() - start_time ;
00718 
00719 #endif
00720 
00721   return MPI_SUCCESS ;
00722 }
00723 
00724 // ---------------------------------------------------------------------
00725 
00726 int comm_service::serve( MPI_Request req_in )
00727 {
00728 
00729 #ifdef DAGH_NO_MPI
00730 #else
00731 
00732   double start_time = MPI_Wtime();
00733   double next_time ;
00734 
00735   // No services, just wait
00736 
00737   if ( 0 == NList ) {
00738     MPI_Status MS ;
00739     const int R = MPI_Wait( &req_in , &MS );
00740     Idle_Time += MPI_Wtime() - start_time ;
00741     return R ;
00742   }
00743 
00744   // Wait for the desired request
00745 
00746   int J = findRequest( req_in );
00747   if ( J < 0 ) RList[ J = 0 ] = req_in ;
00748 
00749   int NumR = 0;
00750   int        * const IndR = new int[ NList ];
00751   MPI_Status * const MSR  = new MPI_Status[ NList ];
00752   
00753   assert(IndR != 0);
00754   assert(MSR != 0);
00755 
00756   MSR[0].MPI_SOURCE = MSR[0].MPI_TAG = 0 ;
00757 
00758   do {
00759 
00760 #ifdef DEBUG_PRINT_COMM
00761   ( oflog << "MPI_Waitsome " << J << "th" << endl ).flush();
00762 #endif 
00763 
00764     const int R = MPI_Waitsome(NList,RList,&NumR,IndR,MSR);
00765 
00766     if ( MPI_SUCCESS != R ) {
00767       Idle_Time += MPI_Wtime() - start_time ;
00768       delete [] IndR ;
00769       delete [] MSR ;
00770       error_die( "comm_service::serve" , "MPI_Waitsome" , R );
00771       return R ;
00772     }
00773 
00774 #ifdef DEBUG_PRINT_COMM
00775   ( oflog << "MPI_Waitsome " << NumR << " requests served" << endl ).flush();
00776 #endif
00777 
00778     for ( int i = 0 ; i < NumR ; i++ ) {
00779       const int I = IndR[i];
00780 
00781       // MPI Spec says to use MSR[i], but 'chimp' uses MSR[I]
00782       //const MPI_Status & MS = MSR[I] ;
00783       const MPI_Status & MS = MSR[i] ;
00784 
00785       comm_service * const CS = SList[I];
00786 
00787 #ifdef DEBUG_PRINT_COMM
00788   if ( CS )
00789     ( oflog << "MPI_Waitsome #" << i << "(" << I << ")" << " => " << MS
00790             << " => " << CS->name() << endl ).flush();
00791   else
00792     ( oflog << "MPI_Waitsome #" << i << "(" << I << ")" << " => " << MS << endl ).flush();
00793 #endif
00794 
00795       if ( CS ) { // A service requests
00796         Idle_Time += ( next_time = MPI_Wtime() ) - start_time ;
00797         CS->callrecv( MS );
00798         Srvc_Time += ( start_time = MPI_Wtime() ) - next_time ;
00799       }
00800 
00801       if ( J == I ) J = -1 ;
00802     }
00803 
00804   } while ( 0 <= J );
00805 
00806   if (IndR) delete [] IndR ;
00807   if (MSR)  delete [] MSR ;
00808 
00809   RList[0] = MPI_REQUEST_NULL ;
00810 
00811   Idle_Time += MPI_Wtime() - start_time ;
00812   
00813 #endif
00814 
00815   return MPI_SUCCESS ;
00816 }
00817 
00818 // ---------------------------------------------------------------------
00819 // Add/Delete a communicator
00820 // ---------------------------------------------------------------------
00821 
00822 void comm_service::add_comm(int const id)
00823 {
00824 #ifdef DAGH_NO_MPI
00825 #else
00826   MPI_Comm_dup(Comm, &CommArray[id]) ;
00827   MPI_Errhandler_set(CommArray[id], MPI_ERRORS_RETURN);
00828   NumCommArray++ ;
00829 #endif
00830 }
00831 
00832 void comm_service::delete_comm(int const id)
00833 {
00834 #ifdef DAGH_NO_MPI
00835 #else
00836   MPI_Comm_free(&CommArray[id]) ;
00837   NumCommArray-- ;
00838 #endif
00839 }
00840 
00841 void comm_service::reset_comm(int const id)
00842 {
00843 #ifdef DAGH_NO_MPI
00844 #else
00845   // Reseting the communication is necessary only in specific situations.
00846   // See the usage of this function in GridHierarchy!
00847   // The function must be used with care, because most MPI-implementations 
00848   // allow only 32767 communicator duplications during a single run.  If this 
00849   // number is reached, MPI usually blocks and waits an infinite time. 
00850   
00851   if (CommArray[id] != MPI_COMM_NULL) {
00852     MPI_Comm_free(&CommArray[id]) ;
00853     MPI_Comm_dup(Comm, &CommArray[id]) ;
00854     MPI_Errhandler_set(CommArray[id], MPI_ERRORS_RETURN);
00855   }
00856 #endif
00857 }
00858 
00859 void comm_service::inc_commarray(int const inc)
00860 {
00861 #ifdef DAGH_NO_MPI
00862 #else
00863   MPI_Comm   * const CA = CommArray ;
00864   const int N = NumCommArray ;
00865 
00866   MPI_Comm * const C = CommArray = new MPI_Comm[ N + inc ];
00867   register int i ;
00868   if ( CA ) {
00869     for ( i = 0 ; i < N ; i++ ) { 
00870       if (CA[i] != MPI_COMM_NULL) {
00871         C[i] = CA[i] ;
00872         CA[i] = MPI_COMM_NULL;
00873       }
00874       else C[i] = MPI_COMM_NULL;
00875     }
00876     delete[] CA ;
00877   }
00878   for ( i = N ; i < N + inc ; i++ ) {
00879     C[i] = MPI_COMM_NULL ;
00880   }
00881 #endif
00882 }
00883 // ---------------------------------------------------------------------
00884 
00885 // ---------------------------------------------------------------------
00886 // Barrier sync....
00887 // ---------------------------------------------------------------------
00888 void comm_service::barrier(int const type)
00889 { 
00890 #ifdef DAGH_NO_MPI
00891 #else
00892   int R ;
00893   const int me    = PMe ;
00894   int num   = 0;
00895   unsigned tag_prefix = 0;
00896   comm_barrier **B = 0;
00897   MPI_Comm C = 0;
00898 
00899   if (type == comm_service_comp) {
00900      num   = PNum ;
00901      tag_prefix = (comm_service_tag | comm_service_comp_tag);
00902      B = Barrier;
00903      C = Comm;
00904   }
00905   /*******************/
00906   //else if (type == comm_sevice_io) {
00907   //   num   = (PWorld > 1) ? PNum+1 : PNum ;
00908   //   tag_prefix = (comm_service_tag | comm_service_io_tag);
00909   //   B = BarrierIO;
00910   //   C = CommIO;
00911   //}
00912   /*******************/
00913   else if (type == comm_service_world) {
00914      num = PWorld ;
00915      tag_prefix = (comm_service_tag | comm_service_world_tag);
00916      B = BarrierWorld;
00917      C = CommWorld;
00918   }
00919   else  {
00920      num   = PNum ;
00921      tag_prefix = (comm_service_tag | comm_service_comp_tag);
00922      B = Barrier;
00923      C = Comm;
00924   }
00925  
00926   const int dcube = dim_cube( num );
00927 
00928 #ifdef DEBUG_PRINT_COMM
00929   oflog << "comm_service::barrier" << " DO " << dcube << endl ;
00930   oflog.flush();
00931 #endif
00932 
00933   // Fan-in
00934 
00935   int cycle;
00936   for ( cycle = dcube - 1 ; 0 <= cycle ; cycle-- ) {
00937     const int upper = 1 << ( cycle + 1 );
00938     const int lower = 1 << cycle ;
00939     const int who   = me ^ lower ;
00940     const int tag   = tag_prefix | ( cycle + 1 ) ;
00941 
00942     if ( who < num ) {
00943 
00944       if ( lower <= who && who < upper ) {
00945 
00946 #ifdef DEBUG_PRINT_COMM
00947   ( oflog << "comm_service::barrier" << " <= " << who << "..." << endl ).flush();
00948 #endif
00949 
00950         if ( ! B[cycle]->flag ) {
00951           const int I = findTag( comm_service_null_id, tag, MPI_ANY_SOURCE );
00952           if ( I < 0 ) error_die( "comm_service::barrier" , "comm_service::find" , 0 );
00953           R = serve( RList[I] );
00954           if ( MPI_SUCCESS != R ) error_die( "comm_service::barrier" , "comm_service::serve" , R );
00955         }
00956         B[cycle]->flag = 0 ;
00957 
00958 #ifdef DEBUG_PRINT_COMM
00959   ( oflog << "comm_service::barrier" << " <= " << who << " DONE" << endl ).flush();
00960 #endif
00961 
00962       }
00963       else if ( lower <= me && me < upper ) {
00964 
00965 #ifdef DEBUG_PRINT_COMM
00966   ( oflog <<  "comm_service::barrier" << " => " << who ).flush();
00967 #endif
00968 
00969         MPI_Request req ;
00970         R = MPI_Isend((void*)0, 0, MPI_BYTE, who, tag, C , &req );
00971         if ( MPI_SUCCESS != R ) error_die( "comm_service::barrier" , "MPI_Isend" , R );
00972 
00973 #ifdef DEBUG_PRINT_COMM
00974   ( oflog <<  " Isend" << endl ).flush();
00975 #endif
00976 
00977         R = serve( req );
00978         if ( MPI_SUCCESS != R ) error_die( "comm_service::barrier" , "comm_service::serve" , R );
00979 
00980 #ifdef DEBUG_PRINT_COMM
00981   ( oflog <<  "comm_service::barrier" << " => " << who << " Done" << endl ).flush();
00982 #endif
00983 
00984       }
00985     }
00986   }
00987 
00988   // Fan-out
00989 
00990   for ( cycle = 0 ; cycle < dcube ; cycle++ ) {
00991     const int upper = 1 << ( cycle + 1 );
00992     const int lower = 1 << cycle ;
00993     const int who   = me ^ lower ;
00994     const int tag   = tag_prefix | ( cycle + 1 ) ;
00995 
00996     if ( who < num ) {
00997 
00998       if ( lower <= who && who < upper ) {
00999 
01000 #ifdef DEBUG_PRINT_COMM
01001   ( oflog << "comm_service::barrier" << " => " << who ).flush();
01002 #endif
01003 
01004         MPI_Request req ;
01005         R = MPI_Isend((void*)0, 0, MPI_BYTE, who, tag, C , &req );
01006         if ( MPI_SUCCESS != R ) error_die( "comm_service::barrier" , "MPI_Isend" , R );
01007 
01008 #ifdef DEBUG_PRINT_COMM
01009   ( oflog <<  " Isend" << endl ).flush();
01010 #endif
01011 
01012         R = serve( req );
01013         if ( MPI_SUCCESS != R ) error_die( "comm_service::barrier" , "comm_service::serve" , R );
01014 
01015 #ifdef DEBUG_PRINT_COMM
01016   ( oflog << "comm_service::barrier" << " => " << who << " Done" << endl ).flush();
01017 #endif
01018 
01019       }
01020       else if ( lower <= me && me < upper ) {
01021 
01022 #ifdef DEBUG_PRINT_COMM
01023   ( oflog << "comm_service::barrier" << " <= " << who << "..." << endl ).flush();
01024 #endif
01025 
01026         if ( ! B[cycle]->flag ) {
01027           const int I = findTag( comm_service_null_id , tag, MPI_ANY_SOURCE );
01028           if ( I < 0 ) error_die( "comm_service::barrier" , "comm_service::find" , 0 );
01029           R = serve( RList[I] );
01030           if ( MPI_SUCCESS != R ) error_die( "comm_service::barrier" , "comm_service::serve" , R );
01031         }
01032 
01033 #ifdef DEBUG_PRINT_COMM
01034   ( oflog << "comm_service::barrier" << " <= " << who << endl ).flush();
01035 #endif
01036 
01037         B[cycle]->flag = 0 ;
01038       }
01039     }
01040   }
01041 
01042 #ifdef DEBUG_PRINT_COMM
01043   ( oflog << "comm_service::barrier" << " DONE" << endl ).flush();
01044 #endif
01045 
01046 #endif
01047   return ; // All sync'd
01048 }
01049 // ---------------------------------------------------------------------
01050 
01051 // ---------------------------------------------------------------------
01052 // Receive / log barrier messages
01053 // ---------------------------------------------------------------------
01054 comm_barrier::comm_barrier( const int Cycle , const int Who, MPI_Comm C, unsigned const T )
01055   : comm_service( comm_service_null_id, T | (Cycle+1) )
01056   , cycle( Cycle ) , who( Who ), comm(C)
01057 {
01058 
01059   flag = 0 ;
01060 
01061 #ifdef DAGH_NO_MPI
01062 #else
01063 
01064 #ifdef DEBUG_PRINT_COMM
01065      ( comm_service::log() << "comm_service::barrier " << Tag
01066              << " (" << *req() << ")"
01067              << " (" << cycle << "," << who << ")"
01068              << endl ).flush();
01069 #endif
01070 
01071   int R = MPI_Irecv((void*)0,0,MPI_BYTE,who,Tag,comm,req());
01072   if ( MPI_SUCCESS != R ) error_die( "comm_service::barrier" , "MPI_Irecv" , R );
01073 
01074 #endif
01075 }
01076 
01077 // ---------------------------------------------------------------------
01078 
01079 void comm_barrier::callrecv( const MPI_Status & MS )
01080 {
01081 
01082 #ifdef DAGH_NO_MPI
01083 #else
01084 
01085 #ifdef DEBUG_PRINT_COMM
01086   (comm_service::log() << "comm_barrier::callrecv()" << MS << endl ).flush(); 
01087 #endif
01088 
01089   if ( flag ) {
01090     comm_service::log() << "comm_barrier::callrecv()" << " ERROR: flag != 0" << endl ;
01091     comm_service::log().flush();
01092   }
01093 
01094   int R = MPI_Irecv((void*)0,0,MPI_BYTE,who,Tag,comm,req());
01095 
01096   if ( MPI_SUCCESS != R ) error_die( "comm_barrier::callrecv()" , "MPI_Irecv" , R );
01097 
01098   flag = 1 ;
01099 
01100 #endif
01101 
01102   return ;
01103 }
01104 
01105 // ---------------------------------------------------------------------
01106 const char * comm_barrier::name( void ) const
01107 {
01108   static const char Name[] = "comm_barrier" ;
01109   return Name ;
01110 }
01111 // ---------------------------------------------------------------------
01112 
01113 // ---------------------------------------------------------------------
01114 // All to "proc" comm !
01115 // ---------------------------------------------------------------------
01116 // Will do later... :-)
01117 // ---------------------------------------------------------------------
01118 


Quickstart     Users Guide     Programmers Reference     Installation      Examples     Download



AMROC Main      Home      Contact
last update: 06/01/04