Blockstructured Adaptive Mesh Refinement in object-oriented C++
00001 00006 #if __GNUC__ >= 3 00007 #else 00008 extern "C" { 00009 void exit(int); 00010 } 00011 #endif 00012 00013 #include "CommServer.h" 00014 00015 // --------------------------------------------------------------------- 00016 00017 int dim_cube( const int N ) 00018 { int i; for ( i = 0 ; ( 1 << i ) < N ; i++ ); return i ; } 00019 00020 // --------------------------------------------------------------------- 00021 00022 ostream & operator << ( ostream & OS , const MPI_Status & MS ) 00023 { 00024 return 00025 OS << "{" << dec << MS.MPI_SOURCE << " , " << MS.MPI_TAG << dec << "}"; 00026 //OS << "{ " << hex << MS.MPI_SOURCE << " , " << MS.MPI_TAG << dec << "}"; 00027 } 00028 00029 // --------------------------------------------------------------------- 00030 00031 // --------------------------------------------------------------------- 00032 00033 void comm_service::error_msg( const char who[] , const char what[] , int R ) 00034 { 00035 00036 #ifdef DAGH_NO_MPI 00037 #else 00038 00039 char message[ MPI_MAX_ERROR_STRING ]; 00040 int me ; 00041 int len = MPI_MAX_ERROR_STRING ; 00042 00043 MPI_Comm_rank( CommWorld , &me ); 00044 00045 MPI_Error_string( R , message , &len ); 00046 00047 for (int i=0; i<1000; i++) 00048 cerr << "P" << me << " : " << who << " " << what << " => " << R << endl ; 00049 00050 #endif 00051 } 00052 void comm_service::error_die( const char who[] , const char what[] , int R ) 00053 { 00054 error_msg( who , what , R ); 00055 exit(-1); 00056 } 00057 00058 // --------------------------------------------------------------------- 00059 00060 class comm_barrier : public comm_service { 00061 public: 00062 const int cycle ; 00063 const int who ; 00064 int flag ; 00065 MPI_Comm comm; 00066 00067 comm_barrier( const int Cycle , const int Who, MPI_Comm C, unsigned const T ); 00068 inline ~comm_barrier() {} 00069 00070 void callrecv( const MPI_Status & ); 00071 00072 const char * name(void) const ; 00073 }; 00074 00075 // --------------------------------------------------------------------- 00076 00077 #ifndef CommIncrement 00078 #define CommIncrement (128) 00079 #endif 00080 00081 // --------------------------------------------------------------------- 00082 00083 ofstream comm_service::oflog ; 00084 ostream* comm_service::olog = (ostream *) 0; 00085 int comm_service::init_flag = 0 ; 00086 int comm_service::dce_flag = 0 ; 00087 00088 int comm_service::PMe = -1 ; 00089 00090 int comm_service::NList = 0 ; 00091 comm_service ** comm_service::SList = 0 ; 00092 comm_barrier ** comm_service::Barrier = 0 ; 00093 comm_barrier ** comm_service::BarrierWorld = 0 ; 00094 MPI_Request * comm_service::RList = 0 ; 00095 00096 int comm_service::PNum = -1 ; 00097 MPI_Comm comm_service::Comm = 0 ; 00098 MPI_Group comm_service::Grp = 0 ; 00099 00100 int comm_service::io_flag = 0; 00101 int comm_service::PIO = -1; 00102 MPI_Comm comm_service::CommIO = 0; 00103 00104 int comm_service::PWorld = 0; 00105 MPI_Comm comm_service::CommWorld = 0; 00106 MPI_Group comm_service::GrpWorld = 0 ; 00107 00108 int comm_service::NumCommArray = 0; 00109 MPI_Comm * comm_service::CommArray = 0; 00110 00111 double comm_service::Idle_Time = 0.0 ; 00112 double comm_service::Srvc_Time = 0.0 ; 00113 00114 // --------------------------------------------------------------------- 00115 00116 int comm_service::findTag( const int Id , const int Tag, const int Src ) 00117 { 00118 const int NL = NList ; 00119 comm_service ** const SL = SList ; 00120 for ( register int i = 1 ; i < NL ; i++ ) 00121 if ( SL[i] && SL[i]->Id == Id && 00122 SL[i]->Tag == Tag && SL[i]->Src == Src ) return i ; 00123 return -1 ; 00124 } 00125 00126 int comm_service::findService( comm_service * const CS ) 00127 { 00128 const int NL = NList ; 00129 comm_service ** const SL = SList ; 00130 for ( register int i = 1 ; i < NL ; i++ ) if ( SL[i] == CS ) return i ; 00131 return -1 ; 00132 } 00133 00134 int comm_service::findRequest( const MPI_Request req_in ) 00135 { 00136 const int NL = NList ; 00137 const MPI_Request * const RL = RList ; 00138 for ( register int i = 1 ; i < NL ; i++ ) if ( RL[i] == req_in ) return i ; 00139 return -1 ; 00140 } 00141 00142 void comm_service::setreq( int const I , int const T , int const S) 00143 { 00144 int J; 00145 if ( (J = findTag(I,T,S)) >= 0) 00146 Req = RList + J; 00147 else 00148 Req = 0; 00149 } 00150 00151 // --------------------------------------------------------------------- 00152 // Initialize 00153 // --------------------------------------------------------------------- 00154 int comm_service::init( MPI_Comm c ) 00155 { 00156 if (init_flag == 1) return MPI_SUCCESS; 00157 00158 int flag; 00159 #ifdef DAGH_NO_MPI 00160 flag = 0; 00161 #else 00162 MPI_Initialized(&flag); 00163 #endif 00164 00165 if (flag == 0) { 00166 PMe = 0; 00167 PNum = 1; 00168 dce_flag = 0; 00169 CommWorld = Comm = c ; 00170 if (io_flag == 1) { 00171 CommIO = c; 00172 PIO = 0; 00173 } 00174 PWorld = 1; 00175 00176 #ifdef DEBUG_PRINT 00177 { 00178 char buf[64]; 00179 ostrstream obuf(buf,sizeof(buf),ios::out); 00180 obuf << "P" << PMe << ".log" << ((char)0) ; 00181 oflog.open( buf , ios::out ); 00182 if (olog) delete olog; 00183 olog = new ostream(oflog.rdbuf()); 00184 } 00185 #endif 00186 00187 init_flag = 1; 00188 return MPI_SUCCESS; 00189 } 00190 00191 #ifdef DAGH_NO_MPI 00192 #else 00193 00194 if ( SList ) return MPI_ERR_COMM ; 00195 00196 // All MPI_Errhandler_set() added for correct error-handling --- RD 00197 MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_RETURN); 00198 00199 //if ( 0 == c ) c = MPI_COMM_WORLD ; 00200 if ( 0 == c ) MPI_Comm_dup(MPI_COMM_WORLD, &c) ; 00201 MPI_Errhandler_set(c, MPI_ERRORS_RETURN); 00202 00203 dce_flag = 1 ; 00204 00205 { 00206 int R , Me , Num ; 00207 if ( MPI_SUCCESS != ( R = MPI_Comm_rank( c , &Me ) ) ) return R ; 00208 if ( MPI_SUCCESS != ( R = MPI_Comm_size( c , &Num ) ) ) return R ; 00209 00210 if (io_flag == 1 && Num > 1) { 00211 PMe = Me ; 00212 PIO = Num-1; 00213 PNum = Num-1 ; 00214 PWorld = Num; 00215 CommWorld = CommIO = c; 00216 00217 // Define the compute node communicator 00218 int ranks[1]; ranks[0] = Num-1; 00219 MPI_Comm_group(CommWorld, &GrpWorld); 00220 MPI_Group_excl(GrpWorld, 1, ranks, &Grp); 00221 MPI_Comm_create(CommWorld, Grp, &Comm); 00222 MPI_Errhandler_set(Comm, MPI_ERRORS_RETURN); 00223 } 00224 else if (io_flag == 1 && Num == 1) { 00225 PMe = Me ; 00226 PIO = Me; 00227 PNum = Num ; 00228 PWorld = Num; 00229 CommWorld = CommIO = Comm = c; 00230 MPI_Comm_group(Comm, &Grp); 00231 GrpWorld = Grp; 00232 } 00233 else { 00234 PMe = Me ; 00235 PWorld = PNum = Num ; 00236 CommWorld = Comm = c ; 00237 MPI_Comm_group(Comm, &Grp); 00238 GrpWorld = Grp; 00239 } 00240 } 00241 00242 #ifdef DEBUG_PRINT 00243 { 00244 char buf[64]; 00245 ostrstream obuf(buf,sizeof(buf),ios::out); 00246 obuf << "P" << PMe << ".log" << ((char)0) ; 00247 oflog.open( buf , ios::out ); 00248 if (olog) delete olog; 00249 olog = new ostream(oflog.rdbuf()); 00250 } 00251 #endif 00252 00253 { 00254 comm_service ** const SL = SList = new comm_service*[ CommIncrement ]; 00255 MPI_Request * const RL = RList = new MPI_Request[ CommIncrement ]; 00256 register int i ; 00257 for ( i = 0 ; i < CommIncrement ; i++ ) SL[i] = 0 ; 00258 for ( i = 0 ; i < CommIncrement ; i++ ) RL[i] = MPI_REQUEST_NULL ; 00259 } 00260 NList = 1 ; 00261 00262 init_flag = 1; 00263 00264 // Post compute barrier receives 00265 if (PMe < PNum) { 00266 const int me = PMe ; 00267 const int num = PNum ; 00268 00269 const int dcube = dim_cube( num ); 00270 if (dcube > 0) { 00271 comm_barrier ** const B = Barrier = new comm_barrier * [ dcube ]; 00272 00273 for ( int cycle = 0 ; cycle < dcube ; cycle++ ) { 00274 B[cycle] = 0 ; 00275 00276 const int upper = 1 << ( cycle + 1 ); 00277 const int lower = 1 << cycle ; 00278 const int who = me ^ lower ; 00279 00280 if ( who < num ) 00281 if ( ( lower <= who && who < upper ) // Fan-in receive 00282 || ( lower <= me && me < upper ) ) // Fan-out receive 00283 B[cycle] = new comm_barrier( cycle , who, Comm, 00284 (comm_service_tag|comm_service_comp_tag) ); 00285 } 00286 } 00287 } 00288 00289 // Post world barrier receives 00290 { 00291 const int me = PMe ; 00292 const int num = PWorld ; 00293 00294 const int dcube = dim_cube( num ); 00295 if (dcube > 0) { 00296 comm_barrier ** const B = BarrierWorld = new comm_barrier * [ dcube ]; 00297 00298 for ( int cycle = 0 ; cycle < dcube ; cycle++ ) { 00299 00300 B[cycle] = 0 ; 00301 00302 const int upper = 1 << ( cycle + 1 ); 00303 const int lower = 1 << cycle ; 00304 const int who = me ^ lower ; 00305 00306 if ( who < num ) 00307 if ( ( lower <= who && who < upper ) // Fan-in receive 00308 || ( lower <= me && me < upper ) ) // Fan-out receive 00309 B[cycle] = new comm_barrier( cycle , who, CommWorld, 00310 (comm_service_tag|comm_service_world_tag) ); 00311 } 00312 } 00313 } 00314 00315 #endif 00316 00317 return MPI_SUCCESS ; 00318 } 00319 00320 // --------------------------------------------------------------------- 00321 // Clean & Kill 00322 // --------------------------------------------------------------------- 00323 void comm_service::clean(void) 00324 00325 { 00326 #ifdef DAGH_NO_MPI 00327 #else 00328 00329 if (!dce_flag) return; 00330 00331 #ifdef DEBUG_PRINT_COMM 00332 ( oflog << "comm_service::clean Cleaning Up !" << endl ).flush(); 00333 #endif 00334 00335 if ( Barrier ) { 00336 00337 #ifdef DEBUG_PRINT_COMM 00338 ( oflog << "comm_service::clean Deleting Barrier" << endl ).flush(); 00339 #endif 00340 00341 const int dcube = dim_cube( PNum ); 00342 for ( int i = 0 ; i < dcube ; i++ ) 00343 if ( Barrier[i] ) delete Barrier[i] ; 00344 delete[] Barrier ; 00345 Barrier = 0 ; 00346 } 00347 if ( BarrierWorld ) { 00348 #ifdef DEBUG_PRINT_COMM 00349 ( oflog << "comm_service::clean Deleting BarrierWorld" << endl ).flush(); 00350 #endif 00351 00352 const int dcube = dim_cube( PWorld ); 00353 for ( int i = 0 ; i < dcube ; i++ ) 00354 if ( BarrierWorld[i] ) delete BarrierWorld[i] ; 00355 delete[] BarrierWorld ; 00356 BarrierWorld = 0 ; 00357 } 00358 00359 { 00360 comm_service ** const SL = SList ; 00361 MPI_Request * const RL = RList ; 00362 const int N = NList ; 00363 00364 for ( int i = 0 ; i < N ; i++ ) { 00365 if (RL[i] != MPI_REQUEST_NULL) { 00366 if (comm_service::proc_me() == comm_service::proc_io()) { 00367 while (RL[i] != MPI_REQUEST_NULL) serve(RL[i]); 00368 } 00369 else { 00370 serve(); 00371 // #ifndef MPICH 00372 // MPI_Status MS ; 00373 // MPI_Cancel( RL + i ); 00374 // MPI_Wait( RL + i, &MS ); 00375 // #else 00376 // MPI_Request_free( RL + i ); 00377 // #endif 00378 #ifndef MPI_SCORE2 00379 MPI_Status MS ; 00380 MPI_Cancel( RL + i ); 00381 MPI_Wait( RL + i, &MS ); 00382 #else 00383 MPI_Request_free( RL + i ); 00384 #endif 00385 } 00386 } 00387 if ( SL[i] ) { 00388 SL[i]->Req = 0; 00389 SL[i] = 0; 00390 } 00391 } 00392 00393 delete [] SL; 00394 delete [] RL; 00395 } 00396 00397 int commcomp1 = MPI_IDENT; 00398 int commcomp2 = MPI_IDENT; 00399 00400 if (Comm == (MPI_Comm)-1) Comm = 0; // Only on the IO proc. 00401 00402 /* I addded the if's because on the SGI's MPICH 00403 gives and error if the args are not valid communicators */ 00404 if (Comm != 0 && Comm != MPI_COMM_NULL) 00405 MPI_Comm_compare(Comm,CommWorld,&commcomp1); 00406 if (CommIO != 0 && CommIO != MPI_COMM_NULL) 00407 MPI_Comm_compare(CommWorld,CommIO,&commcomp2); 00408 00409 if (commcomp1 == MPI_IDENT && commcomp2 == MPI_IDENT) { 00410 if (CommWorld != 0 && CommWorld != MPI_COMM_NULL && CommWorld != MPI_COMM_WORLD) 00411 MPI_Comm_free(&CommWorld); 00412 } 00413 else if (commcomp1 == MPI_IDENT) { 00414 if (CommWorld != 0 && CommWorld != MPI_COMM_NULL && CommWorld != MPI_COMM_WORLD) 00415 MPI_Comm_free(&CommWorld); 00416 if (CommIO != 0 && CommIO != MPI_COMM_NULL && CommIO != MPI_COMM_WORLD) 00417 MPI_Comm_free(&CommIO); 00418 } 00419 else if (commcomp2 == MPI_IDENT) { 00420 if (CommWorld != 0 && CommWorld != MPI_COMM_NULL && CommWorld != MPI_COMM_WORLD) 00421 MPI_Comm_free(&CommWorld); 00422 if (Comm != 0 && Comm != MPI_COMM_NULL && Comm != MPI_COMM_WORLD) 00423 MPI_Comm_free(&Comm); 00424 } 00425 else { 00426 if (CommIO != 0 && CommIO != MPI_COMM_NULL && CommIO != MPI_COMM_WORLD) 00427 MPI_Comm_free(&CommIO); 00428 if (CommWorld != 0 && CommWorld != MPI_COMM_NULL && CommWorld != MPI_COMM_WORLD) 00429 MPI_Comm_free(&CommWorld); 00430 if (Comm != 0 && Comm != MPI_COMM_NULL && Comm != MPI_COMM_WORLD) 00431 MPI_Comm_free(&Comm); 00432 } 00433 00434 if (CommArray) delete [] CommArray; 00435 00436 dce_flag = 0; 00437 00438 #ifdef DEBUG_PRINT_COMM 00439 ( oflog << "comm_service::clean Done Cleaning Up !" << endl ).flush(); 00440 #endif 00441 00442 return ; 00443 #endif 00444 } 00445 00446 void comm_service::kill(void) 00447 { 00448 00449 if ( dce_flag ) clean() ; 00450 00451 #ifdef DEBUG_PRINT_COMM 00452 ( oflog << "comm_service::kill Communications shut-down !" << endl ).flush(); 00453 #endif 00454 00455 #ifdef DEBUG_PRINT 00456 oflog.close(); 00457 if (olog) delete olog; 00458 #endif 00459 00460 return ; 00461 } 00462 00463 // --------------------------------------------------------------------- 00464 // Add a service 00465 // --------------------------------------------------------------------- 00466 comm_service::comm_service( const int I , const int T , const int S ) 00467 : Req(0), Id(I), Tag(T), Src(S) 00468 { 00469 if ( !init_flag ) comm_service::init(); // Initialize static parameters 00470 00471 #ifdef DAGH_NO_MPI 00472 #else 00473 00474 if ( 0 <= findTag( I , T , S ) ) { 00475 Req = 0 ; 00476 oflog << "comm_service::comm_service" 00477 << "(Id:" << I << ")" 00478 << "(Tag:" << T << ")" 00479 << "(Src:" << S << ")" 00480 << "DUPLICATE TAG: Service not constructed." 00481 << endl ; 00482 oflog.flush(); 00483 return ; 00484 } 00485 00486 comm_service ** const SL = SList ; 00487 MPI_Request * const RL = RList ; 00488 const int N = NList ; 00489 00490 // Enlarge array as necessary 00491 00492 if ( 0 == ( N % CommIncrement ) ) { 00493 MPI_Request * const R = RList = new MPI_Request[ N + CommIncrement ]; 00494 comm_service ** const S = SList = new comm_service *[ N + CommIncrement ]; 00495 register int i ; 00496 if ( SL ) { 00497 for ( i = 0 ; i < N ; i++ ) { 00498 if (RL[i]) { 00499 R[i] = RL[i] ; 00500 RL[i] = MPI_REQUEST_NULL; 00501 } 00502 else R[i] = MPI_REQUEST_NULL; 00503 00504 if (SL[i]) { 00505 S[i] = SL[i]; 00506 S[i]->Req = (R + i) ; 00507 SL[i] = 0; 00508 } 00509 else S[i] = 0 ; 00510 } 00511 delete[] SL ; 00512 delete[] RL ; 00513 } 00514 for ( i = N ; i < N + CommIncrement ; i++ ) { 00515 S[i] = 0 ; 00516 R[i] = MPI_REQUEST_NULL ; 00517 } 00518 } 00519 00520 // Add to list 00521 SList[N] = this; 00522 SList[N]->Req = RList + N ; 00523 RList[N] = MPI_REQUEST_NULL ; 00524 00525 #ifdef DEBUG_PRINT_COMM 00526 ( oflog << "comm_service::comm_service" 00527 << "(Id:" << Id << ")" 00528 << "(Tag:" << Tag << ")" 00529 << "(Src:" << Src << ")" 00530 << "(NList:" << NList << ")" 00531 << "(N:" << N << ")" 00532 << "(*Req:" << *Req << ")" 00533 << "(RList[N]:" << RList[N] << ")" 00534 << endl ).flush(); 00535 #endif 00536 00537 NList++ ; 00538 00539 #endif 00540 } 00541 00542 // --------------------------------------------------------------------- 00543 // Delete a service 00544 // --------------------------------------------------------------------- 00545 comm_service::~comm_service() 00546 00547 { 00548 00549 #ifdef DAGH_NO_MPI 00550 #else 00551 00552 if ( !Req ) return ; // Never registered 00553 00554 const int I = findService( this ); 00555 00556 if ( I < 0 ) { 00557 oflog << "comm_service::~comm_service() UNREGISTERED OBJECT ?" << endl ; 00558 oflog.flush(); 00559 return ; 00560 } 00561 00562 #ifdef DEBUG_PRINT_COMM 00563 ( oflog << "comm_service::~comm_service" 00564 << "(Id:" << Id << ")" 00565 << "(Tag:" << Tag << ")" 00566 << "(Src:" << Src << ")" 00567 << "(NList:" << NList << ")" 00568 << "(I:" << I << ")" 00569 << "(*Req:" << *Req << ")" 00570 << "(RList[I]:" << RList[I] << ")" 00571 << endl ).flush(); 00572 #endif 00573 00574 // Remove service from "registry" 00575 if ( MPI_REQUEST_NULL != *Req ) { 00576 // #ifndef MPICH 00577 // MPI_Status MS ; 00578 // MPI_Cancel( Req ); 00579 // MPI_Wait( Req , &MS ); 00580 // #else 00581 // MPI_Request_free( Req ); 00582 // #endif 00583 #ifndef MPI_SCORE2 00584 MPI_Status MS ; 00585 MPI_Cancel( Req ); 00586 MPI_Wait( Req , &MS ); 00587 #else 00588 MPI_Request_free( Req ); 00589 #endif 00590 } 00591 00592 comm_service ** const SL = SList ; 00593 MPI_Request * const RL = RList ; 00594 00595 const int N = NList - 1 ; 00596 00597 { 00598 for ( register int i = I ; i < N; i++ ) { 00599 RL[i] = RL[i+1]; 00600 SL[i] = SL[i+1]; 00601 SL[i]->Req = (RL + i); 00602 } 00603 } 00604 00605 SL[N] = 0 ; 00606 RL[N] = MPI_REQUEST_NULL ; 00607 00608 if ( 0 == ( N % CommIncrement ) ) { 00609 if ( N > 0 ) { 00610 MPI_Request * const R = RList = new MPI_Request[ N ]; 00611 comm_service ** const S = SList = new comm_service*[ N ]; 00612 register int i ; 00613 for ( i = 0 ; i < N ; i++ ) { 00614 if (RL[i]) { 00615 R[i] = RL[i] ; 00616 RL[i] = MPI_REQUEST_NULL; 00617 } 00618 else R[i] = MPI_REQUEST_NULL; 00619 00620 if (SL[i]) { 00621 S[i] = SL[i]; 00622 S[i]->Req = R + i ; 00623 SL[i] = 0; 00624 } 00625 else S[i] = 0 ; 00626 } 00627 } 00628 else { 00629 SList = 0 ; 00630 RList = 0 ; 00631 } 00632 delete[] SL ; 00633 delete[] RL ; 00634 } 00635 00636 NList-- ; 00637 00638 #endif 00639 00640 return ; 00641 } 00642 00643 // --------------------------------------------------------------------- 00644 00645 void comm_service::callrecv( const MPI_Status & MS ) 00646 { 00647 oflog << "comm_service::callrecv(" 00648 << MS << ") DEFAULT CALLBACK!" << endl ; 00649 oflog.flush(); 00650 } 00651 00652 const char * comm_service::name( void ) const 00653 { 00654 static const char Name[] = "comm_service" ; 00655 return Name ; 00656 } 00657 00658 // --------------------------------------------------------------------- 00659 // --------------------------------------------------------------------- 00660 // Perform services 00661 00662 int comm_service::serve( void ) 00663 { 00664 00665 #ifdef DAGH_NO_MPI 00666 #else 00667 00668 if ( NList <= 1 ) { 00669 #ifdef DEBUG_PRINT_COMM 00670 ( oflog << "comm_service::serve" << NList << " <= 1" << endl ).flush(); 00671 #endif 00672 return MPI_SUCCESS ; 00673 } 00674 00675 // Handle stockpiled requests 00676 00677 int NumR ; 00678 int * const IndR = new int[ NList ]; 00679 MPI_Status * const MSR = new MPI_Status[ NList ]; 00680 00681 MSR[0].MPI_SOURCE = MSR[0].MPI_TAG = 0 ; 00682 00683 double next_time ; 00684 double start_time = MPI_Wtime(); 00685 00686 const int R = MPI_Testsome(NList,RList,&NumR,IndR,MSR); 00687 00688 if ( MPI_SUCCESS == R ) { 00689 for ( int i = 0 ; i < NumR ; i++ ) { 00690 const int I = IndR[i]; 00691 00692 // MPI Spec says to use MSR[i], but 'chimp' uses MSR[I] 00693 00694 //const MPI_Status & MS = MSR[I] ; 00695 const MPI_Status & MS = MSR[i] ; 00696 comm_service * const CS = SList[I]; 00697 00698 #ifdef DEBUG_PRINT_COMM 00699 if ( CS ) 00700 ( oflog << "MPI_Testsome #" << i << " (" << I << ")" << " => " << MS 00701 << " => " << CS->name() << endl ).flush(); 00702 else 00703 ( oflog << "MPI_Testsome #" << i << " => " << MS << endl ).flush(); 00704 #endif 00705 00706 if ( CS ) { 00707 Idle_Time += ( next_time = MPI_Wtime() ) - start_time ; 00708 CS->callrecv( MS ); 00709 Srvc_Time += ( start_time = MPI_Wtime() ) - next_time ; 00710 } 00711 } 00712 } 00713 00714 delete [] IndR ; 00715 delete [] MSR ; 00716 00717 Idle_Time += MPI_Wtime() - start_time ; 00718 00719 #endif 00720 00721 return MPI_SUCCESS ; 00722 } 00723 00724 // --------------------------------------------------------------------- 00725 00726 int comm_service::serve( MPI_Request req_in ) 00727 { 00728 00729 #ifdef DAGH_NO_MPI 00730 #else 00731 00732 double start_time = MPI_Wtime(); 00733 double next_time ; 00734 00735 // No services, just wait 00736 00737 if ( 0 == NList ) { 00738 MPI_Status MS ; 00739 const int R = MPI_Wait( &req_in , &MS ); 00740 Idle_Time += MPI_Wtime() - start_time ; 00741 return R ; 00742 } 00743 00744 // Wait for the desired request 00745 00746 int J = findRequest( req_in ); 00747 if ( J < 0 ) RList[ J = 0 ] = req_in ; 00748 00749 int NumR = 0; 00750 int * const IndR = new int[ NList ]; 00751 MPI_Status * const MSR = new MPI_Status[ NList ]; 00752 00753 assert(IndR != 0); 00754 assert(MSR != 0); 00755 00756 MSR[0].MPI_SOURCE = MSR[0].MPI_TAG = 0 ; 00757 00758 do { 00759 00760 #ifdef DEBUG_PRINT_COMM 00761 ( oflog << "MPI_Waitsome " << J << "th" << endl ).flush(); 00762 #endif 00763 00764 const int R = MPI_Waitsome(NList,RList,&NumR,IndR,MSR); 00765 00766 if ( MPI_SUCCESS != R ) { 00767 Idle_Time += MPI_Wtime() - start_time ; 00768 delete [] IndR ; 00769 delete [] MSR ; 00770 error_die( "comm_service::serve" , "MPI_Waitsome" , R ); 00771 return R ; 00772 } 00773 00774 #ifdef DEBUG_PRINT_COMM 00775 ( oflog << "MPI_Waitsome " << NumR << " requests served" << endl ).flush(); 00776 #endif 00777 00778 for ( int i = 0 ; i < NumR ; i++ ) { 00779 const int I = IndR[i]; 00780 00781 // MPI Spec says to use MSR[i], but 'chimp' uses MSR[I] 00782 //const MPI_Status & MS = MSR[I] ; 00783 const MPI_Status & MS = MSR[i] ; 00784 00785 comm_service * const CS = SList[I]; 00786 00787 #ifdef DEBUG_PRINT_COMM 00788 if ( CS ) 00789 ( oflog << "MPI_Waitsome #" << i << "(" << I << ")" << " => " << MS 00790 << " => " << CS->name() << endl ).flush(); 00791 else 00792 ( oflog << "MPI_Waitsome #" << i << "(" << I << ")" << " => " << MS << endl ).flush(); 00793 #endif 00794 00795 if ( CS ) { // A service requests 00796 Idle_Time += ( next_time = MPI_Wtime() ) - start_time ; 00797 CS->callrecv( MS ); 00798 Srvc_Time += ( start_time = MPI_Wtime() ) - next_time ; 00799 } 00800 00801 if ( J == I ) J = -1 ; 00802 } 00803 00804 } while ( 0 <= J ); 00805 00806 if (IndR) delete [] IndR ; 00807 if (MSR) delete [] MSR ; 00808 00809 RList[0] = MPI_REQUEST_NULL ; 00810 00811 Idle_Time += MPI_Wtime() - start_time ; 00812 00813 #endif 00814 00815 return MPI_SUCCESS ; 00816 } 00817 00818 // --------------------------------------------------------------------- 00819 // Add/Delete a communicator 00820 // --------------------------------------------------------------------- 00821 00822 void comm_service::add_comm(int const id) 00823 { 00824 #ifdef DAGH_NO_MPI 00825 #else 00826 MPI_Comm_dup(Comm, &CommArray[id]) ; 00827 MPI_Errhandler_set(CommArray[id], MPI_ERRORS_RETURN); 00828 NumCommArray++ ; 00829 #endif 00830 } 00831 00832 void comm_service::delete_comm(int const id) 00833 { 00834 #ifdef DAGH_NO_MPI 00835 #else 00836 MPI_Comm_free(&CommArray[id]) ; 00837 NumCommArray-- ; 00838 #endif 00839 } 00840 00841 void comm_service::reset_comm(int const id) 00842 { 00843 #ifdef DAGH_NO_MPI 00844 #else 00845 // Reseting the communication is necessary only in specific situations. 00846 // See the usage of this function in GridHierarchy! 00847 // The function must be used with care, because most MPI-implementations 00848 // allow only 32767 communicator duplications during a single run. If this 00849 // number is reached, MPI usually blocks and waits an infinite time. 00850 00851 if (CommArray[id] != MPI_COMM_NULL) { 00852 MPI_Comm_free(&CommArray[id]) ; 00853 MPI_Comm_dup(Comm, &CommArray[id]) ; 00854 MPI_Errhandler_set(CommArray[id], MPI_ERRORS_RETURN); 00855 } 00856 #endif 00857 } 00858 00859 void comm_service::inc_commarray(int const inc) 00860 { 00861 #ifdef DAGH_NO_MPI 00862 #else 00863 MPI_Comm * const CA = CommArray ; 00864 const int N = NumCommArray ; 00865 00866 MPI_Comm * const C = CommArray = new MPI_Comm[ N + inc ]; 00867 register int i ; 00868 if ( CA ) { 00869 for ( i = 0 ; i < N ; i++ ) { 00870 if (CA[i] != MPI_COMM_NULL) { 00871 C[i] = CA[i] ; 00872 CA[i] = MPI_COMM_NULL; 00873 } 00874 else C[i] = MPI_COMM_NULL; 00875 } 00876 delete[] CA ; 00877 } 00878 for ( i = N ; i < N + inc ; i++ ) { 00879 C[i] = MPI_COMM_NULL ; 00880 } 00881 #endif 00882 } 00883 // --------------------------------------------------------------------- 00884 00885 // --------------------------------------------------------------------- 00886 // Barrier sync.... 00887 // --------------------------------------------------------------------- 00888 void comm_service::barrier(int const type) 00889 { 00890 #ifdef DAGH_NO_MPI 00891 #else 00892 int R ; 00893 const int me = PMe ; 00894 int num = 0; 00895 unsigned tag_prefix = 0; 00896 comm_barrier **B = 0; 00897 MPI_Comm C = 0; 00898 00899 if (type == comm_service_comp) { 00900 num = PNum ; 00901 tag_prefix = (comm_service_tag | comm_service_comp_tag); 00902 B = Barrier; 00903 C = Comm; 00904 } 00905 /*******************/ 00906 //else if (type == comm_sevice_io) { 00907 // num = (PWorld > 1) ? PNum+1 : PNum ; 00908 // tag_prefix = (comm_service_tag | comm_service_io_tag); 00909 // B = BarrierIO; 00910 // C = CommIO; 00911 //} 00912 /*******************/ 00913 else if (type == comm_service_world) { 00914 num = PWorld ; 00915 tag_prefix = (comm_service_tag | comm_service_world_tag); 00916 B = BarrierWorld; 00917 C = CommWorld; 00918 } 00919 else { 00920 num = PNum ; 00921 tag_prefix = (comm_service_tag | comm_service_comp_tag); 00922 B = Barrier; 00923 C = Comm; 00924 } 00925 00926 const int dcube = dim_cube( num ); 00927 00928 #ifdef DEBUG_PRINT_COMM 00929 oflog << "comm_service::barrier" << " DO " << dcube << endl ; 00930 oflog.flush(); 00931 #endif 00932 00933 // Fan-in 00934 00935 int cycle; 00936 for ( cycle = dcube - 1 ; 0 <= cycle ; cycle-- ) { 00937 const int upper = 1 << ( cycle + 1 ); 00938 const int lower = 1 << cycle ; 00939 const int who = me ^ lower ; 00940 const int tag = tag_prefix | ( cycle + 1 ) ; 00941 00942 if ( who < num ) { 00943 00944 if ( lower <= who && who < upper ) { 00945 00946 #ifdef DEBUG_PRINT_COMM 00947 ( oflog << "comm_service::barrier" << " <= " << who << "..." << endl ).flush(); 00948 #endif 00949 00950 if ( ! B[cycle]->flag ) { 00951 const int I = findTag( comm_service_null_id, tag, MPI_ANY_SOURCE ); 00952 if ( I < 0 ) error_die( "comm_service::barrier" , "comm_service::find" , 0 ); 00953 R = serve( RList[I] ); 00954 if ( MPI_SUCCESS != R ) error_die( "comm_service::barrier" , "comm_service::serve" , R ); 00955 } 00956 B[cycle]->flag = 0 ; 00957 00958 #ifdef DEBUG_PRINT_COMM 00959 ( oflog << "comm_service::barrier" << " <= " << who << " DONE" << endl ).flush(); 00960 #endif 00961 00962 } 00963 else if ( lower <= me && me < upper ) { 00964 00965 #ifdef DEBUG_PRINT_COMM 00966 ( oflog << "comm_service::barrier" << " => " << who ).flush(); 00967 #endif 00968 00969 MPI_Request req ; 00970 R = MPI_Isend((void*)0, 0, MPI_BYTE, who, tag, C , &req ); 00971 if ( MPI_SUCCESS != R ) error_die( "comm_service::barrier" , "MPI_Isend" , R ); 00972 00973 #ifdef DEBUG_PRINT_COMM 00974 ( oflog << " Isend" << endl ).flush(); 00975 #endif 00976 00977 R = serve( req ); 00978 if ( MPI_SUCCESS != R ) error_die( "comm_service::barrier" , "comm_service::serve" , R ); 00979 00980 #ifdef DEBUG_PRINT_COMM 00981 ( oflog << "comm_service::barrier" << " => " << who << " Done" << endl ).flush(); 00982 #endif 00983 00984 } 00985 } 00986 } 00987 00988 // Fan-out 00989 00990 for ( cycle = 0 ; cycle < dcube ; cycle++ ) { 00991 const int upper = 1 << ( cycle + 1 ); 00992 const int lower = 1 << cycle ; 00993 const int who = me ^ lower ; 00994 const int tag = tag_prefix | ( cycle + 1 ) ; 00995 00996 if ( who < num ) { 00997 00998 if ( lower <= who && who < upper ) { 00999 01000 #ifdef DEBUG_PRINT_COMM 01001 ( oflog << "comm_service::barrier" << " => " << who ).flush(); 01002 #endif 01003 01004 MPI_Request req ; 01005 R = MPI_Isend((void*)0, 0, MPI_BYTE, who, tag, C , &req ); 01006 if ( MPI_SUCCESS != R ) error_die( "comm_service::barrier" , "MPI_Isend" , R ); 01007 01008 #ifdef DEBUG_PRINT_COMM 01009 ( oflog << " Isend" << endl ).flush(); 01010 #endif 01011 01012 R = serve( req ); 01013 if ( MPI_SUCCESS != R ) error_die( "comm_service::barrier" , "comm_service::serve" , R ); 01014 01015 #ifdef DEBUG_PRINT_COMM 01016 ( oflog << "comm_service::barrier" << " => " << who << " Done" << endl ).flush(); 01017 #endif 01018 01019 } 01020 else if ( lower <= me && me < upper ) { 01021 01022 #ifdef DEBUG_PRINT_COMM 01023 ( oflog << "comm_service::barrier" << " <= " << who << "..." << endl ).flush(); 01024 #endif 01025 01026 if ( ! B[cycle]->flag ) { 01027 const int I = findTag( comm_service_null_id , tag, MPI_ANY_SOURCE ); 01028 if ( I < 0 ) error_die( "comm_service::barrier" , "comm_service::find" , 0 ); 01029 R = serve( RList[I] ); 01030 if ( MPI_SUCCESS != R ) error_die( "comm_service::barrier" , "comm_service::serve" , R ); 01031 } 01032 01033 #ifdef DEBUG_PRINT_COMM 01034 ( oflog << "comm_service::barrier" << " <= " << who << endl ).flush(); 01035 #endif 01036 01037 B[cycle]->flag = 0 ; 01038 } 01039 } 01040 } 01041 01042 #ifdef DEBUG_PRINT_COMM 01043 ( oflog << "comm_service::barrier" << " DONE" << endl ).flush(); 01044 #endif 01045 01046 #endif 01047 return ; // All sync'd 01048 } 01049 // --------------------------------------------------------------------- 01050 01051 // --------------------------------------------------------------------- 01052 // Receive / log barrier messages 01053 // --------------------------------------------------------------------- 01054 comm_barrier::comm_barrier( const int Cycle , const int Who, MPI_Comm C, unsigned const T ) 01055 : comm_service( comm_service_null_id, T | (Cycle+1) ) 01056 , cycle( Cycle ) , who( Who ), comm(C) 01057 { 01058 01059 flag = 0 ; 01060 01061 #ifdef DAGH_NO_MPI 01062 #else 01063 01064 #ifdef DEBUG_PRINT_COMM 01065 ( comm_service::log() << "comm_service::barrier " << Tag 01066 << " (" << *req() << ")" 01067 << " (" << cycle << "," << who << ")" 01068 << endl ).flush(); 01069 #endif 01070 01071 int R = MPI_Irecv((void*)0,0,MPI_BYTE,who,Tag,comm,req()); 01072 if ( MPI_SUCCESS != R ) error_die( "comm_service::barrier" , "MPI_Irecv" , R ); 01073 01074 #endif 01075 } 01076 01077 // --------------------------------------------------------------------- 01078 01079 void comm_barrier::callrecv( const MPI_Status & MS ) 01080 { 01081 01082 #ifdef DAGH_NO_MPI 01083 #else 01084 01085 #ifdef DEBUG_PRINT_COMM 01086 (comm_service::log() << "comm_barrier::callrecv()" << MS << endl ).flush(); 01087 #endif 01088 01089 if ( flag ) { 01090 comm_service::log() << "comm_barrier::callrecv()" << " ERROR: flag != 0" << endl ; 01091 comm_service::log().flush(); 01092 } 01093 01094 int R = MPI_Irecv((void*)0,0,MPI_BYTE,who,Tag,comm,req()); 01095 01096 if ( MPI_SUCCESS != R ) error_die( "comm_barrier::callrecv()" , "MPI_Irecv" , R ); 01097 01098 flag = 1 ; 01099 01100 #endif 01101 01102 return ; 01103 } 01104 01105 // --------------------------------------------------------------------- 01106 const char * comm_barrier::name( void ) const 01107 { 01108 static const char Name[] = "comm_barrier" ; 01109 return Name ; 01110 } 01111 // --------------------------------------------------------------------- 01112 01113 // --------------------------------------------------------------------- 01114 // All to "proc" comm ! 01115 // --------------------------------------------------------------------- 01116 // Will do later... :-) 01117 // --------------------------------------------------------------------- 01118
Quickstart Users Guide Programmers Reference Installation Examples Download
AMROC Main Home Contactlast update: 06/01/04