Extendend parallel region after SEND PACKET BUNDLE

Nothing parallelizable founded
omp-walther
StefanoPetrilli 3 years ago
parent b079d71f30
commit 532701031e

@ -213,7 +213,15 @@ void dalgoDistEdgeApproxDomEdgesLinearSearchMesgBndlSmallMateCMP(
MilanLongInt S;
MilanLongInt privateMyCard = 0;
staticQueue U, privateU, privateQLocalVtx, privateQGhostVtx, privateQMsgType, privateQOwner;
MilanLongInt myIndex = 0;
vector <MilanLongInt> PCumulative, PMessageBundle, PSizeInfoMessages;
vector <MPI_Request> SRequest; //Requests that are used for each send message
vector <MPI_Status> SStatus; //Status of sent messages, used in MPI_Wait
MilanLongInt MessageIndex = 0; //Pointer for current message
MilanInt OneMessageSize = 0;
MilanLongInt numMessagesToSend;
MilanInt BufferSize;
MilanLongInt *Buffer;
bool isEmpty;
#ifdef TIME_TRACKER
double Ghost2LocalInitialization = MPI_Wtime();
@ -868,7 +876,6 @@ void dalgoDistEdgeApproxDomEdgesLinearSearchMesgBndlSmallMateCMP(
omp_get_thread_num(),
myRank);
#endif
} // end of parallel region
///////////////////////// END OF PROCESS MATCHED VERTICES /////////////////////////
@ -878,28 +885,34 @@ void dalgoDistEdgeApproxDomEdgesLinearSearchMesgBndlSmallMateCMP(
/////////////////////////////////////////////////////////////////////////////////////////
///////////////////////////// SEND BUNDLED MESSAGES /////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////////////////
//Data structures for Bundled Messages:
vector<MilanLongInt> PCumulative, PMessageBundle, PSizeInfoMessages;
MilanLongInt myIndex=0;
try {
PMessageBundle.reserve(NumMessagesBundled*3); //Three integers per message
PCumulative.reserve(numProcs+1); //Similar to Row Pointer vector in CSR data structure
PSizeInfoMessages.reserve(numProcs*3); //Buffer to hold the Size info message packets
} catch ( length_error ) {
cout<<"Error in function algoDistEdgeApproxDominatingEdgesMessageBundling: \n";
cout<<"Not enough memory to allocate the internal variables \n";
exit(1);
}
PMessageBundle.resize(NumMessagesBundled*3, -1);//Initialize
PCumulative.resize(numProcs+1, 0); //Only initialize the counter variable
PSizeInfoMessages.resize(numProcs*3, 0);
for (MilanInt i=0; i<numProcs; i++) // Changed by Fabio to be an integer, addresses needs to be integers!
PCumulative[i+1]=PCumulative[i]+PCounter[i];
//Reuse PCounter to keep track of how many messages were inserted:
for (MilanInt i=0; i<numProcs; i++) // Changed by Fabio to be an integer, addresses needs to be integers!
PCounter[i]=0;
//Build the Message Bundle packet:
#pragma omp barrier
#pragma omp master
{
//Data structures for Bundled Messages:
try {
PMessageBundle.reserve(NumMessagesBundled * 3); //Three integers per message
PCumulative.reserve(numProcs + 1); //Similar to Row Pointer vector in CSR data structure
PSizeInfoMessages.reserve(numProcs * 3); //Buffer to hold the Size info message packets
} catch (length_error) {
cout << "Error in function algoDistEdgeApproxDominatingEdgesMessageBundling: \n";
cout << "Not enough memory to allocate the internal variables \n";
exit(1);
}
PMessageBundle.resize(NumMessagesBundled * 3, -1);//Initialize
PCumulative.resize(numProcs + 1, 0); //Only initialize the counter variable
PSizeInfoMessages.resize(numProcs * 3, 0);
for (MilanInt i = 0; i < numProcs; i++) // Changed by Fabio to be an integer, addresses needs to be integers!
PCumulative[i + 1] = PCumulative[i] + PCounter[i];
//OMP not worth parallelizing
//Reuse PCounter to keep track of how many messages were inserted:
for (MilanInt i = 0; i < numProcs; i++) // Changed by Fabio to be an integer, addresses needs to be integers!
PCounter[i] = 0;
//Build the Message Bundle packet:
//OMP Not parallelizable
for (MilanInt i=0; i<NumMessagesBundled; i++) { // Changed by Fabio to be an integer, addresses needs to be integers!
myIndex = ( PCumulative[QOwner[i]] + PCounter[QOwner[i]] )*3;
PMessageBundle[myIndex+0] = QLocalVtx[i];
@ -907,58 +920,62 @@ void dalgoDistEdgeApproxDomEdgesLinearSearchMesgBndlSmallMateCMP(
PMessageBundle[myIndex+2] = QMsgType[i];
PCounter[QOwner[i]]++;
}
//Send the Bundled Messages: Use ISend
vector<MPI_Request> SRequest; //Requests that are used for each send message
vector<MPI_Status> SStatus; //Status of sent messages, used in MPI_Wait
MilanLongInt MessageIndex=0; //Pointer for current message
try {
SRequest.reserve(numProcs*2); //At most two messages per processor
SStatus.reserve(numProcs*2);//At most two messages per processor
} catch ( length_error ) {
cout<<"Error in function algoDistEdgeApproxDominatingEdgesLinearSearchImmediateSend: \n";
cout<<"Not enough memory to allocate the internal variables \n";
exit(1);
}
MPI_Request myReq; //A sample request
SRequest.resize(numProcs*2,myReq);
MPI_Status myStat; //A sample status
SStatus.resize(numProcs*2,myStat);
//Send the Messages
for (MilanInt i=0; i<numProcs; i++) { // Changed by Fabio to be an integer, addresses needs to be integers!
if (i==myRank) //Do not send anything to yourself
continue;
//Send the Message with information about the size of next message:
//Build the Message Packet:
PSizeInfoMessages[i*3+0] = (PCumulative[i+1]-PCumulative[i])*3; // # of integers in the next message
PSizeInfoMessages[i*3+1] = -1; //Dummy packet
PSizeInfoMessages[i*3+2] = SIZEINFO; //TYPE
//Send a Request (Asynchronous)
#ifdef PRINT_DEBUG_INFO_
cout<<"\n("<<myRank<<")Sending bundled message to process "<<i<<" size: "<<PSizeInfoMessages[i*3+0]<<endl;
fflush(stdout);
try {
SRequest.reserve(numProcs * 2); //At most two messages per processor
SStatus.reserve(numProcs * 2);//At most two messages per processor
} catch (length_error) {
cout << "Error in function algoDistEdgeApproxDominatingEdgesLinearSearchImmediateSend: \n";
cout << "Not enough memory to allocate the internal variables \n";
exit(1);
}
MPI_Request myReq; //A sample request
SRequest.resize(numProcs * 2, myReq);
MPI_Status myStat; //A sample status
SStatus.resize(numProcs * 2, myStat);
//Send the Messages
for (MilanInt i = 0; i < numProcs; i++) { // Changed by Fabio to be an integer, addresses needs to be integers!
if (i == myRank) //Do not send anything to yourself
continue;
//Send the Message with information about the size of next message:
//Build the Message Packet:
PSizeInfoMessages[i * 3 + 0] = (PCumulative[i + 1] - PCumulative[i]) * 3; // # of integers in the next message
PSizeInfoMessages[i * 3 + 1] = -1; //Dummy packet
PSizeInfoMessages[i * 3 + 2] = SIZEINFO; //TYPE
//Send a Request (Asynchronous)
#ifdef PRINT_DEBUG_INFO_
cout<<"\n("<<myRank<<")Sending bundled message to process "<<i<<" size: "<<PSizeInfoMessages[i*3+0]<<endl;
fflush(stdout);
#endif
if ( PSizeInfoMessages[i*3+0] > 0 ) { //Send only if it is a nonempty packet
MPI_Isend(&PSizeInfoMessages[i*3+0], 3, TypeMap<MilanLongInt>(), i, ComputeTag, comm, &SRequest[MessageIndex]);
msgActual++;
MessageIndex++;
//Now Send the message with the data packet:
#ifdef PRINT_DEBUG_INFO_
cout<<"\n("<<myRank<<")Sending Bundle to : "<<i<<endl;
for (k=(PCumulative[i]*3); k< (PCumulative[i]*3+PSizeInfoMessages[i*3+0]); k++)
cout<<PMessageBundle[k]<<",";
cout<<endl;
fflush(stdout);
if (PSizeInfoMessages[i * 3 + 0] > 0) { //Send only if it is a nonempty packet
MPI_Isend(&PSizeInfoMessages[i * 3 + 0], 3, TypeMap<MilanLongInt>(), i, ComputeTag, comm,
&SRequest[MessageIndex]);
msgActual++;
MessageIndex++;
//Now Send the message with the data packet:
#ifdef PRINT_DEBUG_INFO_
cout<<"\n("<<myRank<<")Sending Bundle to : "<<i<<endl;
for (k=(PCumulative[i]*3); k< (PCumulative[i]*3+PSizeInfoMessages[i*3+0]); k++)
cout<<PMessageBundle[k]<<",";
cout<<endl;
fflush(stdout);
#endif
MPI_Isend(&PMessageBundle[PCumulative[i]*3], PSizeInfoMessages[i*3+0], TypeMap<MilanLongInt>(), i, BundleTag, comm, &SRequest[MessageIndex]);
MessageIndex++;
} //End of if size > 0
}
//Free up temporary memory:
PCumulative.clear();
QLocalVtx.clear();
QGhostVtx.clear();
QMsgType.clear();
QOwner.clear();
MPI_Isend(&PMessageBundle[PCumulative[i] * 3], PSizeInfoMessages[i * 3 + 0],
TypeMap<MilanLongInt>(), i, BundleTag, comm, &SRequest[MessageIndex]);
MessageIndex++;
} //End of if size > 0
}
//Free up temporary memory:
PCumulative.clear();
QLocalVtx.clear();
QGhostVtx.clear();
QMsgType.clear();
QOwner.clear();
#ifdef PRINT_DEBUG_INFO_
cout<<"\n("<<myRank<<")Number of Ghost edges = "<<numGhostEdges;
cout<<"\n("<<myRank<<")Total number of potential message X 2 = "<<numGhostEdges*2;
@ -971,17 +988,17 @@ void dalgoDistEdgeApproxDomEdgesLinearSearchMesgBndlSmallMateCMP(
//Allocate memory for MPI Send messages:
/* WILL COME BACK HERE - NO NEED TO STORE ALL THIS MEMORY !! */
MilanInt OneMessageSize=0;
OneMessageSize=0;
MPI_Pack_size(3, TypeMap<MilanLongInt>(), comm, &OneMessageSize); //Size of one message packet
//How many messages to send?
//Potentially three kinds of messages will be sent/received:
//Request, Success, Failure.
//But only two will be sent from a given processor.
//Substract the number of messages that have already been sent as bundled messages:
MilanLongInt numMessagesToSend = numGhostEdges*2 - NumMessagesBundled;
MilanInt BufferSize = (OneMessageSize+MPI_BSEND_OVERHEAD)*numMessagesToSend;
numMessagesToSend = numGhostEdges*2 - NumMessagesBundled;
BufferSize = (OneMessageSize+MPI_BSEND_OVERHEAD)*numMessagesToSend;
MilanLongInt *Buffer=0;
Buffer=0;
#ifdef PRINT_DEBUG_INFO_
cout<<"\n("<<myRank<<")Size of One Message from PACK= "<<OneMessageSize;
cout<<"\n("<<myRank<<")Size of Message overhead = "<<MPI_BSEND_OVERHEAD;
@ -1000,11 +1017,14 @@ void dalgoDistEdgeApproxDomEdgesLinearSearchMesgBndlSmallMateCMP(
}
MPI_Buffer_attach(Buffer, BufferSize); //Attach the Buffer
}
} //End of master
} // end of parallel region
///////////////////////// END OF SEND BUNDLED MESSAGES //////////////////////////////////
finishTime = MPI_Wtime();
*ph1_time = finishTime-startTime; //Time taken for Phase-1
*ph1_card = myCard ; //Cardinality at the end of Phase-1
*ph1_card = myCard; //Cardinality at the end of Phase-1
startTime = MPI_Wtime();
/////////////////////////////////////////////////////////////////////////////////////////
//////////////////////////////////////// MAIN LOOP //////////////////////////////////////

@ -1,6 +1,6 @@
%%%%%%%%%%% General arguments % Lines starting with % are ignored.
CSR ! Storage format CSR COO JAD
00123 ! IDIM; domain size. Linear system size is IDIM**3
0123 ! IDIM; domain size. Linear system size is IDIM**3
CONST ! PDECOEFF: CONST, EXP, GAUSS Coefficients of the PDE
BICGSTAB ! Iterative method: BiCGSTAB BiCGSTABL BiCG CG CGS FCG GCR RGMRES
2 ! ISTOPC

Loading…
Cancel
Save