Blockstructured Adaptive Mesh Refinement in object-oriented C++
00001
00006 #if __GNUC__ >= 3
00007 #else
00008 extern "C" {
00009 void exit(int);
00010 }
00011 #endif
00012
00013 #include "CommServer.h"
00014
00015 // ---------------------------------------------------------------------
00016
00017 int dim_cube( const int N )
00018 { int i; for ( i = 0 ; ( 1 << i ) < N ; i++ ); return i ; }
00019
00020 // ---------------------------------------------------------------------
00021
00022 ostream & operator << ( ostream & OS , const MPI_Status & MS )
00023 {
00024 return
00025 OS << "{" << dec << MS.MPI_SOURCE << " , " << MS.MPI_TAG << dec << "}";
00026 //OS << "{ " << hex << MS.MPI_SOURCE << " , " << MS.MPI_TAG << dec << "}";
00027 }
00028
00029 // ---------------------------------------------------------------------
00030
00031 // ---------------------------------------------------------------------
00032
00033 void comm_service::error_msg( const char who[] , const char what[] , int R )
00034 {
00035
00036 #ifdef DAGH_NO_MPI
00037 #else
00038
00039 char message[ MPI_MAX_ERROR_STRING ];
00040 int me ;
00041 int len = MPI_MAX_ERROR_STRING ;
00042
00043 MPI_Comm_rank( CommWorld , &me );
00044
00045 MPI_Error_string( R , message , &len );
00046
00047 for (int i=0; i<1000; i++)
00048 cerr << "P" << me << " : " << who << " " << what << " => " << R << endl ;
00049
00050 #endif
00051 }
00052 void comm_service::error_die( const char who[] , const char what[] , int R )
00053 {
00054 error_msg( who , what , R );
00055 exit(-1);
00056 }
00057
00058 // ---------------------------------------------------------------------
00059
00060 class comm_barrier : public comm_service {
00061 public:
00062 const int cycle ;
00063 const int who ;
00064 int flag ;
00065 MPI_Comm comm;
00066
00067 comm_barrier( const int Cycle , const int Who, MPI_Comm C, unsigned const T );
00068 inline ~comm_barrier() {}
00069
00070 void callrecv( const MPI_Status & );
00071
00072 const char * name(void) const ;
00073 };
00074
00075 // ---------------------------------------------------------------------
00076
00077 #ifndef CommIncrement
00078 #define CommIncrement (128)
00079 #endif
00080
00081 // ---------------------------------------------------------------------
00082
00083 ofstream comm_service::oflog ;
00084 ostream* comm_service::olog = (ostream *) 0;
00085 int comm_service::init_flag = 0 ;
00086 int comm_service::dce_flag = 0 ;
00087
00088 int comm_service::PMe = -1 ;
00089
00090 int comm_service::NList = 0 ;
00091 comm_service ** comm_service::SList = 0 ;
00092 comm_barrier ** comm_service::Barrier = 0 ;
00093 comm_barrier ** comm_service::BarrierWorld = 0 ;
00094 MPI_Request * comm_service::RList = 0 ;
00095
00096 int comm_service::PNum = -1 ;
00097 MPI_Comm comm_service::Comm = 0 ;
00098 MPI_Group comm_service::Grp = 0 ;
00099
00100 int comm_service::io_flag = 0;
00101 int comm_service::PIO = -1;
00102 MPI_Comm comm_service::CommIO = 0;
00103
00104 int comm_service::PWorld = 0;
00105 MPI_Comm comm_service::CommWorld = 0;
00106 MPI_Group comm_service::GrpWorld = 0 ;
00107
00108 int comm_service::NumCommArray = 0;
00109 MPI_Comm * comm_service::CommArray = 0;
00110
00111 double comm_service::Idle_Time = 0.0 ;
00112 double comm_service::Srvc_Time = 0.0 ;
00113
00114 // ---------------------------------------------------------------------
00115
00116 int comm_service::findTag( const int Id , const int Tag, const int Src )
00117 {
00118 const int NL = NList ;
00119 comm_service ** const SL = SList ;
00120 for ( register int i = 1 ; i < NL ; i++ )
00121 if ( SL[i] && SL[i]->Id == Id &&
00122 SL[i]->Tag == Tag && SL[i]->Src == Src ) return i ;
00123 return -1 ;
00124 }
00125
00126 int comm_service::findService( comm_service * const CS )
00127 {
00128 const int NL = NList ;
00129 comm_service ** const SL = SList ;
00130 for ( register int i = 1 ; i < NL ; i++ ) if ( SL[i] == CS ) return i ;
00131 return -1 ;
00132 }
00133
00134 int comm_service::findRequest( const MPI_Request req_in )
00135 {
00136 const int NL = NList ;
00137 const MPI_Request * const RL = RList ;
00138 for ( register int i = 1 ; i < NL ; i++ ) if ( RL[i] == req_in ) return i ;
00139 return -1 ;
00140 }
00141
00142 void comm_service::setreq( int const I , int const T , int const S)
00143 {
00144 int J;
00145 if ( (J = findTag(I,T,S)) >= 0)
00146 Req = RList + J;
00147 else
00148 Req = 0;
00149 }
00150
00151 // ---------------------------------------------------------------------
00152 // Initialize
00153 // ---------------------------------------------------------------------
00154 int comm_service::init( MPI_Comm c )
00155 {
00156 if (init_flag == 1) return MPI_SUCCESS;
00157
00158 int flag;
00159 #ifdef DAGH_NO_MPI
00160 flag = 0;
00161 #else
00162 MPI_Initialized(&flag);
00163 #endif
00164
00165 if (flag == 0) {
00166 PMe = 0;
00167 PNum = 1;
00168 dce_flag = 0;
00169 CommWorld = Comm = c ;
00170 if (io_flag == 1) {
00171 CommIO = c;
00172 PIO = 0;
00173 }
00174 PWorld = 1;
00175
00176 #ifdef DEBUG_PRINT
00177 {
00178 char buf[64];
00179 ostrstream obuf(buf,sizeof(buf),ios::out);
00180 obuf << "P" << PMe << ".log" << ((char)0) ;
00181 oflog.open( buf , ios::out );
00182 if (olog) delete olog;
00183 olog = new ostream(oflog.rdbuf());
00184 }
00185 #endif
00186
00187 init_flag = 1;
00188 return MPI_SUCCESS;
00189 }
00190
00191 #ifdef DAGH_NO_MPI
00192 #else
00193
00194 if ( SList ) return MPI_ERR_COMM ;
00195
00196 // All MPI_Errhandler_set() added for correct error-handling --- RD
00197 MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_RETURN);
00198
00199 //if ( 0 == c ) c = MPI_COMM_WORLD ;
00200 if ( 0 == c ) MPI_Comm_dup(MPI_COMM_WORLD, &c) ;
00201 MPI_Errhandler_set(c, MPI_ERRORS_RETURN);
00202
00203 dce_flag = 1 ;
00204
00205 {
00206 int R , Me , Num ;
00207 if ( MPI_SUCCESS != ( R = MPI_Comm_rank( c , &Me ) ) ) return R ;
00208 if ( MPI_SUCCESS != ( R = MPI_Comm_size( c , &Num ) ) ) return R ;
00209
00210 if (io_flag == 1 && Num > 1) {
00211 PMe = Me ;
00212 PIO = Num-1;
00213 PNum = Num-1 ;
00214 PWorld = Num;
00215 CommWorld = CommIO = c;
00216
00217 // Define the compute node communicator
00218 int ranks[1]; ranks[0] = Num-1;
00219 MPI_Comm_group(CommWorld, &GrpWorld);
00220 MPI_Group_excl(GrpWorld, 1, ranks, &Grp);
00221 MPI_Comm_create(CommWorld, Grp, &Comm);
00222 MPI_Errhandler_set(Comm, MPI_ERRORS_RETURN);
00223 }
00224 else if (io_flag == 1 && Num == 1) {
00225 PMe = Me ;
00226 PIO = Me;
00227 PNum = Num ;
00228 PWorld = Num;
00229 CommWorld = CommIO = Comm = c;
00230 MPI_Comm_group(Comm, &Grp);
00231 GrpWorld = Grp;
00232 }
00233 else {
00234 PMe = Me ;
00235 PWorld = PNum = Num ;
00236 CommWorld = Comm = c ;
00237 MPI_Comm_group(Comm, &Grp);
00238 GrpWorld = Grp;
00239 }
00240 }
00241
00242 #ifdef DEBUG_PRINT
00243 {
00244 char buf[64];
00245 ostrstream obuf(buf,sizeof(buf),ios::out);
00246 obuf << "P" << PMe << ".log" << ((char)0) ;
00247 oflog.open( buf , ios::out );
00248 if (olog) delete olog;
00249 olog = new ostream(oflog.rdbuf());
00250 }
00251 #endif
00252
00253 {
00254 comm_service ** const SL = SList = new comm_service*[ CommIncrement ];
00255 MPI_Request * const RL = RList = new MPI_Request[ CommIncrement ];
00256 register int i ;
00257 for ( i = 0 ; i < CommIncrement ; i++ ) SL[i] = 0 ;
00258 for ( i = 0 ; i < CommIncrement ; i++ ) RL[i] = MPI_REQUEST_NULL ;
00259 }
00260 NList = 1 ;
00261
00262 init_flag = 1;
00263
00264 // Post compute barrier receives
00265 if (PMe < PNum) {
00266 const int me = PMe ;
00267 const int num = PNum ;
00268
00269 const int dcube = dim_cube( num );
00270 if (dcube > 0) {
00271 comm_barrier ** const B = Barrier = new comm_barrier * [ dcube ];
00272
00273 for ( int cycle = 0 ; cycle < dcube ; cycle++ ) {
00274 B[cycle] = 0 ;
00275
00276 const int upper = 1 << ( cycle + 1 );
00277 const int lower = 1 << cycle ;
00278 const int who = me ^ lower ;
00279
00280 if ( who < num )
00281 if ( ( lower <= who && who < upper ) // Fan-in receive
00282 || ( lower <= me && me < upper ) ) // Fan-out receive
00283 B[cycle] = new comm_barrier( cycle , who, Comm,
00284 (comm_service_tag|comm_service_comp_tag) );
00285 }
00286 }
00287 }
00288
00289 // Post world barrier receives
00290 {
00291 const int me = PMe ;
00292 const int num = PWorld ;
00293
00294 const int dcube = dim_cube( num );
00295 if (dcube > 0) {
00296 comm_barrier ** const B = BarrierWorld = new comm_barrier * [ dcube ];
00297
00298 for ( int cycle = 0 ; cycle < dcube ; cycle++ ) {
00299
00300 B[cycle] = 0 ;
00301
00302 const int upper = 1 << ( cycle + 1 );
00303 const int lower = 1 << cycle ;
00304 const int who = me ^ lower ;
00305
00306 if ( who < num )
00307 if ( ( lower <= who && who < upper ) // Fan-in receive
00308 || ( lower <= me && me < upper ) ) // Fan-out receive
00309 B[cycle] = new comm_barrier( cycle , who, CommWorld,
00310 (comm_service_tag|comm_service_world_tag) );
00311 }
00312 }
00313 }
00314
00315 #endif
00316
00317 return MPI_SUCCESS ;
00318 }
00319
00320 // ---------------------------------------------------------------------
00321 // Clean & Kill
00322 // ---------------------------------------------------------------------
00323 void comm_service::clean(void)
00324
00325 {
00326 #ifdef DAGH_NO_MPI
00327 #else
00328
00329 if (!dce_flag) return;
00330
00331 #ifdef DEBUG_PRINT_COMM
00332 ( oflog << "comm_service::clean Cleaning Up !" << endl ).flush();
00333 #endif
00334
00335 if ( Barrier ) {
00336
00337 #ifdef DEBUG_PRINT_COMM
00338 ( oflog << "comm_service::clean Deleting Barrier" << endl ).flush();
00339 #endif
00340
00341 const int dcube = dim_cube( PNum );
00342 for ( int i = 0 ; i < dcube ; i++ )
00343 if ( Barrier[i] ) delete Barrier[i] ;
00344 delete[] Barrier ;
00345 Barrier = 0 ;
00346 }
00347 if ( BarrierWorld ) {
00348 #ifdef DEBUG_PRINT_COMM
00349 ( oflog << "comm_service::clean Deleting BarrierWorld" << endl ).flush();
00350 #endif
00351
00352 const int dcube = dim_cube( PWorld );
00353 for ( int i = 0 ; i < dcube ; i++ )
00354 if ( BarrierWorld[i] ) delete BarrierWorld[i] ;
00355 delete[] BarrierWorld ;
00356 BarrierWorld = 0 ;
00357 }
00358
00359 {
00360 comm_service ** const SL = SList ;
00361 MPI_Request * const RL = RList ;
00362 const int N = NList ;
00363
00364 for ( int i = 0 ; i < N ; i++ ) {
00365 if (RL[i] != MPI_REQUEST_NULL) {
00366 if (comm_service::proc_me() == comm_service::proc_io()) {
00367 while (RL[i] != MPI_REQUEST_NULL) serve(RL[i]);
00368 }
00369 else {
00370 serve();
00371 // #ifndef MPICH
00372 // MPI_Status MS ;
00373 // MPI_Cancel( RL + i );
00374 // MPI_Wait( RL + i, &MS );
00375 // #else
00376 // MPI_Request_free( RL + i );
00377 // #endif
00378 #ifndef MPI_SCORE2
00379 MPI_Status MS ;
00380 MPI_Cancel( RL + i );
00381 MPI_Wait( RL + i, &MS );
00382 #else
00383 MPI_Request_free( RL + i );
00384 #endif
00385 }
00386 }
00387 if ( SL[i] ) {
00388 SL[i]->Req = 0;
00389 SL[i] = 0;
00390 }
00391 }
00392
00393 delete [] SL;
00394 delete [] RL;
00395 }
00396
00397 int commcomp1 = MPI_IDENT;
00398 int commcomp2 = MPI_IDENT;
00399
00400 if (Comm == (MPI_Comm)-1) Comm = 0; // Only on the IO proc.
00401
00402 /* I addded the if's because on the SGI's MPICH
00403 gives and error if the args are not valid communicators */
00404 if (Comm != 0 && Comm != MPI_COMM_NULL)
00405 MPI_Comm_compare(Comm,CommWorld,&commcomp1);
00406 if (CommIO != 0 && CommIO != MPI_COMM_NULL)
00407 MPI_Comm_compare(CommWorld,CommIO,&commcomp2);
00408
00409 if (commcomp1 == MPI_IDENT && commcomp2 == MPI_IDENT) {
00410 if (CommWorld != 0 && CommWorld != MPI_COMM_NULL && CommWorld != MPI_COMM_WORLD)
00411 MPI_Comm_free(&CommWorld);
00412 }
00413 else if (commcomp1 == MPI_IDENT) {
00414 if (CommWorld != 0 && CommWorld != MPI_COMM_NULL && CommWorld != MPI_COMM_WORLD)
00415 MPI_Comm_free(&CommWorld);
00416 if (CommIO != 0 && CommIO != MPI_COMM_NULL && CommIO != MPI_COMM_WORLD)
00417 MPI_Comm_free(&CommIO);
00418 }
00419 else if (commcomp2 == MPI_IDENT) {
00420 if (CommWorld != 0 && CommWorld != MPI_COMM_NULL && CommWorld != MPI_COMM_WORLD)
00421 MPI_Comm_free(&CommWorld);
00422 if (Comm != 0 && Comm != MPI_COMM_NULL && Comm != MPI_COMM_WORLD)
00423 MPI_Comm_free(&Comm);
00424 }
00425 else {
00426 if (CommIO != 0 && CommIO != MPI_COMM_NULL && CommIO != MPI_COMM_WORLD)
00427 MPI_Comm_free(&CommIO);
00428 if (CommWorld != 0 && CommWorld != MPI_COMM_NULL && CommWorld != MPI_COMM_WORLD)
00429 MPI_Comm_free(&CommWorld);
00430 if (Comm != 0 && Comm != MPI_COMM_NULL && Comm != MPI_COMM_WORLD)
00431 MPI_Comm_free(&Comm);
00432 }
00433
00434 if (CommArray) delete [] CommArray;
00435
00436 dce_flag = 0;
00437
00438 #ifdef DEBUG_PRINT_COMM
00439 ( oflog << "comm_service::clean Done Cleaning Up !" << endl ).flush();
00440 #endif
00441
00442 return ;
00443 #endif
00444 }
00445
00446 void comm_service::kill(void)
00447 {
00448
00449 if ( dce_flag ) clean() ;
00450
00451 #ifdef DEBUG_PRINT_COMM
00452 ( oflog << "comm_service::kill Communications shut-down !" << endl ).flush();
00453 #endif
00454
00455 #ifdef DEBUG_PRINT
00456 oflog.close();
00457 if (olog) delete olog;
00458 #endif
00459
00460 return ;
00461 }
00462
00463 // ---------------------------------------------------------------------
00464 // Add a service
00465 // ---------------------------------------------------------------------
00466 comm_service::comm_service( const int I , const int T , const int S )
00467 : Req(0), Id(I), Tag(T), Src(S)
00468 {
00469 if ( !init_flag ) comm_service::init(); // Initialize static parameters
00470
00471 #ifdef DAGH_NO_MPI
00472 #else
00473
00474 if ( 0 <= findTag( I , T , S ) ) {
00475 Req = 0 ;
00476 oflog << "comm_service::comm_service"
00477 << "(Id:" << I << ")"
00478 << "(Tag:" << T << ")"
00479 << "(Src:" << S << ")"
00480 << "DUPLICATE TAG: Service not constructed."
00481 << endl ;
00482 oflog.flush();
00483 return ;
00484 }
00485
00486 comm_service ** const SL = SList ;
00487 MPI_Request * const RL = RList ;
00488 const int N = NList ;
00489
00490 // Enlarge array as necessary
00491
00492 if ( 0 == ( N % CommIncrement ) ) {
00493 MPI_Request * const R = RList = new MPI_Request[ N + CommIncrement ];
00494 comm_service ** const S = SList = new comm_service *[ N + CommIncrement ];
00495 register int i ;
00496 if ( SL ) {
00497 for ( i = 0 ; i < N ; i++ ) {
00498 if (RL[i]) {
00499 R[i] = RL[i] ;
00500 RL[i] = MPI_REQUEST_NULL;
00501 }
00502 else R[i] = MPI_REQUEST_NULL;
00503
00504 if (SL[i]) {
00505 S[i] = SL[i];
00506 S[i]->Req = (R + i) ;
00507 SL[i] = 0;
00508 }
00509 else S[i] = 0 ;
00510 }
00511 delete[] SL ;
00512 delete[] RL ;
00513 }
00514 for ( i = N ; i < N + CommIncrement ; i++ ) {
00515 S[i] = 0 ;
00516 R[i] = MPI_REQUEST_NULL ;
00517 }
00518 }
00519
00520 // Add to list
00521 SList[N] = this;
00522 SList[N]->Req = RList + N ;
00523 RList[N] = MPI_REQUEST_NULL ;
00524
00525 #ifdef DEBUG_PRINT_COMM
00526 ( oflog << "comm_service::comm_service"
00527 << "(Id:" << Id << ")"
00528 << "(Tag:" << Tag << ")"
00529 << "(Src:" << Src << ")"
00530 << "(NList:" << NList << ")"
00531 << "(N:" << N << ")"
00532 << "(*Req:" << *Req << ")"
00533 << "(RList[N]:" << RList[N] << ")"
00534 << endl ).flush();
00535 #endif
00536
00537 NList++ ;
00538
00539 #endif
00540 }
00541
00542 // ---------------------------------------------------------------------
00543 // Delete a service
00544 // ---------------------------------------------------------------------
00545 comm_service::~comm_service()
00546
00547 {
00548
00549 #ifdef DAGH_NO_MPI
00550 #else
00551
00552 if ( !Req ) return ; // Never registered
00553
00554 const int I = findService( this );
00555
00556 if ( I < 0 ) {
00557 oflog << "comm_service::~comm_service() UNREGISTERED OBJECT ?" << endl ;
00558 oflog.flush();
00559 return ;
00560 }
00561
00562 #ifdef DEBUG_PRINT_COMM
00563 ( oflog << "comm_service::~comm_service"
00564 << "(Id:" << Id << ")"
00565 << "(Tag:" << Tag << ")"
00566 << "(Src:" << Src << ")"
00567 << "(NList:" << NList << ")"
00568 << "(I:" << I << ")"
00569 << "(*Req:" << *Req << ")"
00570 << "(RList[I]:" << RList[I] << ")"
00571 << endl ).flush();
00572 #endif
00573
00574 // Remove service from "registry"
00575 if ( MPI_REQUEST_NULL != *Req ) {
00576 // #ifndef MPICH
00577 // MPI_Status MS ;
00578 // MPI_Cancel( Req );
00579 // MPI_Wait( Req , &MS );
00580 // #else
00581 // MPI_Request_free( Req );
00582 // #endif
00583 #ifndef MPI_SCORE2
00584 MPI_Status MS ;
00585 MPI_Cancel( Req );
00586 MPI_Wait( Req , &MS );
00587 #else
00588 MPI_Request_free( Req );
00589 #endif
00590 }
00591
00592 comm_service ** const SL = SList ;
00593 MPI_Request * const RL = RList ;
00594
00595 const int N = NList - 1 ;
00596
00597 {
00598 for ( register int i = I ; i < N; i++ ) {
00599 RL[i] = RL[i+1];
00600 SL[i] = SL[i+1];
00601 SL[i]->Req = (RL + i);
00602 }
00603 }
00604
00605 SL[N] = 0 ;
00606 RL[N] = MPI_REQUEST_NULL ;
00607
00608 if ( 0 == ( N % CommIncrement ) ) {
00609 if ( N > 0 ) {
00610 MPI_Request * const R = RList = new MPI_Request[ N ];
00611 comm_service ** const S = SList = new comm_service*[ N ];
00612 register int i ;
00613 for ( i = 0 ; i < N ; i++ ) {
00614 if (RL[i]) {
00615 R[i] = RL[i] ;
00616 RL[i] = MPI_REQUEST_NULL;
00617 }
00618 else R[i] = MPI_REQUEST_NULL;
00619
00620 if (SL[i]) {
00621 S[i] = SL[i];
00622 S[i]->Req = R + i ;
00623 SL[i] = 0;
00624 }
00625 else S[i] = 0 ;
00626 }
00627 }
00628 else {
00629 SList = 0 ;
00630 RList = 0 ;
00631 }
00632 delete[] SL ;
00633 delete[] RL ;
00634 }
00635
00636 NList-- ;
00637
00638 #endif
00639
00640 return ;
00641 }
00642
00643 // ---------------------------------------------------------------------
00644
00645 void comm_service::callrecv( const MPI_Status & MS )
00646 {
00647 oflog << "comm_service::callrecv("
00648 << MS << ") DEFAULT CALLBACK!" << endl ;
00649 oflog.flush();
00650 }
00651
00652 const char * comm_service::name( void ) const
00653 {
00654 static const char Name[] = "comm_service" ;
00655 return Name ;
00656 }
00657
00658 // ---------------------------------------------------------------------
00659 // ---------------------------------------------------------------------
00660 // Perform services
00661
00662 int comm_service::serve( void )
00663 {
00664
00665 #ifdef DAGH_NO_MPI
00666 #else
00667
00668 if ( NList <= 1 ) {
00669 #ifdef DEBUG_PRINT_COMM
00670 ( oflog << "comm_service::serve" << NList << " <= 1" << endl ).flush();
00671 #endif
00672 return MPI_SUCCESS ;
00673 }
00674
00675 // Handle stockpiled requests
00676
00677 int NumR ;
00678 int * const IndR = new int[ NList ];
00679 MPI_Status * const MSR = new MPI_Status[ NList ];
00680
00681 MSR[0].MPI_SOURCE = MSR[0].MPI_TAG = 0 ;
00682
00683 double next_time ;
00684 double start_time = MPI_Wtime();
00685
00686 const int R = MPI_Testsome(NList,RList,&NumR,IndR,MSR);
00687
00688 if ( MPI_SUCCESS == R ) {
00689 for ( int i = 0 ; i < NumR ; i++ ) {
00690 const int I = IndR[i];
00691
00692 // MPI Spec says to use MSR[i], but 'chimp' uses MSR[I]
00693
00694 //const MPI_Status & MS = MSR[I] ;
00695 const MPI_Status & MS = MSR[i] ;
00696 comm_service * const CS = SList[I];
00697
00698 #ifdef DEBUG_PRINT_COMM
00699 if ( CS )
00700 ( oflog << "MPI_Testsome #" << i << " (" << I << ")" << " => " << MS
00701 << " => " << CS->name() << endl ).flush();
00702 else
00703 ( oflog << "MPI_Testsome #" << i << " => " << MS << endl ).flush();
00704 #endif
00705
00706 if ( CS ) {
00707 Idle_Time += ( next_time = MPI_Wtime() ) - start_time ;
00708 CS->callrecv( MS );
00709 Srvc_Time += ( start_time = MPI_Wtime() ) - next_time ;
00710 }
00711 }
00712 }
00713
00714 delete [] IndR ;
00715 delete [] MSR ;
00716
00717 Idle_Time += MPI_Wtime() - start_time ;
00718
00719 #endif
00720
00721 return MPI_SUCCESS ;
00722 }
00723
00724 // ---------------------------------------------------------------------
00725
00726 int comm_service::serve( MPI_Request req_in )
00727 {
00728
00729 #ifdef DAGH_NO_MPI
00730 #else
00731
00732 double start_time = MPI_Wtime();
00733 double next_time ;
00734
00735 // No services, just wait
00736
00737 if ( 0 == NList ) {
00738 MPI_Status MS ;
00739 const int R = MPI_Wait( &req_in , &MS );
00740 Idle_Time += MPI_Wtime() - start_time ;
00741 return R ;
00742 }
00743
00744 // Wait for the desired request
00745
00746 int J = findRequest( req_in );
00747 if ( J < 0 ) RList[ J = 0 ] = req_in ;
00748
00749 int NumR = 0;
00750 int * const IndR = new int[ NList ];
00751 MPI_Status * const MSR = new MPI_Status[ NList ];
00752
00753 assert(IndR != 0);
00754 assert(MSR != 0);
00755
00756 MSR[0].MPI_SOURCE = MSR[0].MPI_TAG = 0 ;
00757
00758 do {
00759
00760 #ifdef DEBUG_PRINT_COMM
00761 ( oflog << "MPI_Waitsome " << J << "th" << endl ).flush();
00762 #endif
00763
00764 const int R = MPI_Waitsome(NList,RList,&NumR,IndR,MSR);
00765
00766 if ( MPI_SUCCESS != R ) {
00767 Idle_Time += MPI_Wtime() - start_time ;
00768 delete [] IndR ;
00769 delete [] MSR ;
00770 error_die( "comm_service::serve" , "MPI_Waitsome" , R );
00771 return R ;
00772 }
00773
00774 #ifdef DEBUG_PRINT_COMM
00775 ( oflog << "MPI_Waitsome " << NumR << " requests served" << endl ).flush();
00776 #endif
00777
00778 for ( int i = 0 ; i < NumR ; i++ ) {
00779 const int I = IndR[i];
00780
00781 // MPI Spec says to use MSR[i], but 'chimp' uses MSR[I]
00782 //const MPI_Status & MS = MSR[I] ;
00783 const MPI_Status & MS = MSR[i] ;
00784
00785 comm_service * const CS = SList[I];
00786
00787 #ifdef DEBUG_PRINT_COMM
00788 if ( CS )
00789 ( oflog << "MPI_Waitsome #" << i << "(" << I << ")" << " => " << MS
00790 << " => " << CS->name() << endl ).flush();
00791 else
00792 ( oflog << "MPI_Waitsome #" << i << "(" << I << ")" << " => " << MS << endl ).flush();
00793 #endif
00794
00795 if ( CS ) { // A service requests
00796 Idle_Time += ( next_time = MPI_Wtime() ) - start_time ;
00797 CS->callrecv( MS );
00798 Srvc_Time += ( start_time = MPI_Wtime() ) - next_time ;
00799 }
00800
00801 if ( J == I ) J = -1 ;
00802 }
00803
00804 } while ( 0 <= J );
00805
00806 if (IndR) delete [] IndR ;
00807 if (MSR) delete [] MSR ;
00808
00809 RList[0] = MPI_REQUEST_NULL ;
00810
00811 Idle_Time += MPI_Wtime() - start_time ;
00812
00813 #endif
00814
00815 return MPI_SUCCESS ;
00816 }
00817
00818 // ---------------------------------------------------------------------
00819 // Add/Delete a communicator
00820 // ---------------------------------------------------------------------
00821
00822 void comm_service::add_comm(int const id)
00823 {
00824 #ifdef DAGH_NO_MPI
00825 #else
00826 MPI_Comm_dup(Comm, &CommArray[id]) ;
00827 MPI_Errhandler_set(CommArray[id], MPI_ERRORS_RETURN);
00828 NumCommArray++ ;
00829 #endif
00830 }
00831
00832 void comm_service::delete_comm(int const id)
00833 {
00834 #ifdef DAGH_NO_MPI
00835 #else
00836 MPI_Comm_free(&CommArray[id]) ;
00837 NumCommArray-- ;
00838 #endif
00839 }
00840
00841 void comm_service::reset_comm(int const id)
00842 {
00843 #ifdef DAGH_NO_MPI
00844 #else
00845 // Reseting the communication is necessary only in specific situations.
00846 // See the usage of this function in GridHierarchy!
00847 // The function must be used with care, because most MPI-implementations
00848 // allow only 32767 communicator duplications during a single run. If this
00849 // number is reached, MPI usually blocks and waits an infinite time.
00850
00851 if (CommArray[id] != MPI_COMM_NULL) {
00852 MPI_Comm_free(&CommArray[id]) ;
00853 MPI_Comm_dup(Comm, &CommArray[id]) ;
00854 MPI_Errhandler_set(CommArray[id], MPI_ERRORS_RETURN);
00855 }
00856 #endif
00857 }
00858
00859 void comm_service::inc_commarray(int const inc)
00860 {
00861 #ifdef DAGH_NO_MPI
00862 #else
00863 MPI_Comm * const CA = CommArray ;
00864 const int N = NumCommArray ;
00865
00866 MPI_Comm * const C = CommArray = new MPI_Comm[ N + inc ];
00867 register int i ;
00868 if ( CA ) {
00869 for ( i = 0 ; i < N ; i++ ) {
00870 if (CA[i] != MPI_COMM_NULL) {
00871 C[i] = CA[i] ;
00872 CA[i] = MPI_COMM_NULL;
00873 }
00874 else C[i] = MPI_COMM_NULL;
00875 }
00876 delete[] CA ;
00877 }
00878 for ( i = N ; i < N + inc ; i++ ) {
00879 C[i] = MPI_COMM_NULL ;
00880 }
00881 #endif
00882 }
00883 // ---------------------------------------------------------------------
00884
00885 // ---------------------------------------------------------------------
00886 // Barrier sync....
00887 // ---------------------------------------------------------------------
00888 void comm_service::barrier(int const type)
00889 {
00890 #ifdef DAGH_NO_MPI
00891 #else
00892 int R ;
00893 const int me = PMe ;
00894 int num = 0;
00895 unsigned tag_prefix = 0;
00896 comm_barrier **B = 0;
00897 MPI_Comm C = 0;
00898
00899 if (type == comm_service_comp) {
00900 num = PNum ;
00901 tag_prefix = (comm_service_tag | comm_service_comp_tag);
00902 B = Barrier;
00903 C = Comm;
00904 }
00905 /*******************/
00906 //else if (type == comm_sevice_io) {
00907 // num = (PWorld > 1) ? PNum+1 : PNum ;
00908 // tag_prefix = (comm_service_tag | comm_service_io_tag);
00909 // B = BarrierIO;
00910 // C = CommIO;
00911 //}
00912 /*******************/
00913 else if (type == comm_service_world) {
00914 num = PWorld ;
00915 tag_prefix = (comm_service_tag | comm_service_world_tag);
00916 B = BarrierWorld;
00917 C = CommWorld;
00918 }
00919 else {
00920 num = PNum ;
00921 tag_prefix = (comm_service_tag | comm_service_comp_tag);
00922 B = Barrier;
00923 C = Comm;
00924 }
00925
00926 const int dcube = dim_cube( num );
00927
00928 #ifdef DEBUG_PRINT_COMM
00929 oflog << "comm_service::barrier" << " DO " << dcube << endl ;
00930 oflog.flush();
00931 #endif
00932
00933 // Fan-in
00934
00935 int cycle;
00936 for ( cycle = dcube - 1 ; 0 <= cycle ; cycle-- ) {
00937 const int upper = 1 << ( cycle + 1 );
00938 const int lower = 1 << cycle ;
00939 const int who = me ^ lower ;
00940 const int tag = tag_prefix | ( cycle + 1 ) ;
00941
00942 if ( who < num ) {
00943
00944 if ( lower <= who && who < upper ) {
00945
00946 #ifdef DEBUG_PRINT_COMM
00947 ( oflog << "comm_service::barrier" << " <= " << who << "..." << endl ).flush();
00948 #endif
00949
00950 if ( ! B[cycle]->flag ) {
00951 const int I = findTag( comm_service_null_id, tag, MPI_ANY_SOURCE );
00952 if ( I < 0 ) error_die( "comm_service::barrier" , "comm_service::find" , 0 );
00953 R = serve( RList[I] );
00954 if ( MPI_SUCCESS != R ) error_die( "comm_service::barrier" , "comm_service::serve" , R );
00955 }
00956 B[cycle]->flag = 0 ;
00957
00958 #ifdef DEBUG_PRINT_COMM
00959 ( oflog << "comm_service::barrier" << " <= " << who << " DONE" << endl ).flush();
00960 #endif
00961
00962 }
00963 else if ( lower <= me && me < upper ) {
00964
00965 #ifdef DEBUG_PRINT_COMM
00966 ( oflog << "comm_service::barrier" << " => " << who ).flush();
00967 #endif
00968
00969 MPI_Request req ;
00970 R = MPI_Isend((void*)0, 0, MPI_BYTE, who, tag, C , &req );
00971 if ( MPI_SUCCESS != R ) error_die( "comm_service::barrier" , "MPI_Isend" , R );
00972
00973 #ifdef DEBUG_PRINT_COMM
00974 ( oflog << " Isend" << endl ).flush();
00975 #endif
00976
00977 R = serve( req );
00978 if ( MPI_SUCCESS != R ) error_die( "comm_service::barrier" , "comm_service::serve" , R );
00979
00980 #ifdef DEBUG_PRINT_COMM
00981 ( oflog << "comm_service::barrier" << " => " << who << " Done" << endl ).flush();
00982 #endif
00983
00984 }
00985 }
00986 }
00987
00988 // Fan-out
00989
00990 for ( cycle = 0 ; cycle < dcube ; cycle++ ) {
00991 const int upper = 1 << ( cycle + 1 );
00992 const int lower = 1 << cycle ;
00993 const int who = me ^ lower ;
00994 const int tag = tag_prefix | ( cycle + 1 ) ;
00995
00996 if ( who < num ) {
00997
00998 if ( lower <= who && who < upper ) {
00999
01000 #ifdef DEBUG_PRINT_COMM
01001 ( oflog << "comm_service::barrier" << " => " << who ).flush();
01002 #endif
01003
01004 MPI_Request req ;
01005 R = MPI_Isend((void*)0, 0, MPI_BYTE, who, tag, C , &req );
01006 if ( MPI_SUCCESS != R ) error_die( "comm_service::barrier" , "MPI_Isend" , R );
01007
01008 #ifdef DEBUG_PRINT_COMM
01009 ( oflog << " Isend" << endl ).flush();
01010 #endif
01011
01012 R = serve( req );
01013 if ( MPI_SUCCESS != R ) error_die( "comm_service::barrier" , "comm_service::serve" , R );
01014
01015 #ifdef DEBUG_PRINT_COMM
01016 ( oflog << "comm_service::barrier" << " => " << who << " Done" << endl ).flush();
01017 #endif
01018
01019 }
01020 else if ( lower <= me && me < upper ) {
01021
01022 #ifdef DEBUG_PRINT_COMM
01023 ( oflog << "comm_service::barrier" << " <= " << who << "..." << endl ).flush();
01024 #endif
01025
01026 if ( ! B[cycle]->flag ) {
01027 const int I = findTag( comm_service_null_id , tag, MPI_ANY_SOURCE );
01028 if ( I < 0 ) error_die( "comm_service::barrier" , "comm_service::find" , 0 );
01029 R = serve( RList[I] );
01030 if ( MPI_SUCCESS != R ) error_die( "comm_service::barrier" , "comm_service::serve" , R );
01031 }
01032
01033 #ifdef DEBUG_PRINT_COMM
01034 ( oflog << "comm_service::barrier" << " <= " << who << endl ).flush();
01035 #endif
01036
01037 B[cycle]->flag = 0 ;
01038 }
01039 }
01040 }
01041
01042 #ifdef DEBUG_PRINT_COMM
01043 ( oflog << "comm_service::barrier" << " DONE" << endl ).flush();
01044 #endif
01045
01046 #endif
01047 return ; // All sync'd
01048 }
01049 // ---------------------------------------------------------------------
01050
01051 // ---------------------------------------------------------------------
01052 // Receive / log barrier messages
01053 // ---------------------------------------------------------------------
01054 comm_barrier::comm_barrier( const int Cycle , const int Who, MPI_Comm C, unsigned const T )
01055 : comm_service( comm_service_null_id, T | (Cycle+1) )
01056 , cycle( Cycle ) , who( Who ), comm(C)
01057 {
01058
01059 flag = 0 ;
01060
01061 #ifdef DAGH_NO_MPI
01062 #else
01063
01064 #ifdef DEBUG_PRINT_COMM
01065 ( comm_service::log() << "comm_service::barrier " << Tag
01066 << " (" << *req() << ")"
01067 << " (" << cycle << "," << who << ")"
01068 << endl ).flush();
01069 #endif
01070
01071 int R = MPI_Irecv((void*)0,0,MPI_BYTE,who,Tag,comm,req());
01072 if ( MPI_SUCCESS != R ) error_die( "comm_service::barrier" , "MPI_Irecv" , R );
01073
01074 #endif
01075 }
01076
01077 // ---------------------------------------------------------------------
01078
01079 void comm_barrier::callrecv( const MPI_Status & MS )
01080 {
01081
01082 #ifdef DAGH_NO_MPI
01083 #else
01084
01085 #ifdef DEBUG_PRINT_COMM
01086 (comm_service::log() << "comm_barrier::callrecv()" << MS << endl ).flush();
01087 #endif
01088
01089 if ( flag ) {
01090 comm_service::log() << "comm_barrier::callrecv()" << " ERROR: flag != 0" << endl ;
01091 comm_service::log().flush();
01092 }
01093
01094 int R = MPI_Irecv((void*)0,0,MPI_BYTE,who,Tag,comm,req());
01095
01096 if ( MPI_SUCCESS != R ) error_die( "comm_barrier::callrecv()" , "MPI_Irecv" , R );
01097
01098 flag = 1 ;
01099
01100 #endif
01101
01102 return ;
01103 }
01104
01105 // ---------------------------------------------------------------------
01106 const char * comm_barrier::name( void ) const
01107 {
01108 static const char Name[] = "comm_barrier" ;
01109 return Name ;
01110 }
01111 // ---------------------------------------------------------------------
01112
01113 // ---------------------------------------------------------------------
01114 // All to "proc" comm !
01115 // ---------------------------------------------------------------------
01116 // Will do later... :-)
01117 // ---------------------------------------------------------------------
01118
Quickstart Users Guide Programmers Reference Installation Examples Download
AMROC Main Home Contactlast update: 06/01/04