From 3de1e607eb0a4d9b4e4a857125c791e1001fe614 Mon Sep 17 00:00:00 2001 From: StefanoPetrilli Date: Sun, 10 Jul 2022 03:39:58 -0500 Subject: [PATCH] sendBundledMessages refactoring --- amgprec/impl/aggregator/MatchBoxPC.h | 14 ++ ...mEdgesLinearSearchMesgBndlSmallMateCMP.cpp | 210 +++------------- .../impl/aggregator/processExposedVertex.cpp | 1 + .../impl/aggregator/sendBundledMessages.cpp | 225 ++++++++++++++++++ 4 files changed, 269 insertions(+), 181 deletions(-) create mode 100644 amgprec/impl/aggregator/sendBundledMessages.cpp diff --git a/amgprec/impl/aggregator/MatchBoxPC.h b/amgprec/impl/aggregator/MatchBoxPC.h index 96630f9c..351dca98 100644 --- a/amgprec/impl/aggregator/MatchBoxPC.h +++ b/amgprec/impl/aggregator/MatchBoxPC.h @@ -66,6 +66,18 @@ using namespace std; #define NUM_THREAD 4 +// MPI type map +template +MPI_Datatype TypeMap(); +template <> +inline MPI_Datatype TypeMap() { return MPI_LONG_LONG; } +template <> +inline MPI_Datatype TypeMap() { return MPI_INT; } +template <> +inline MPI_Datatype TypeMap() { return MPI_DOUBLE; } +template <> +inline MPI_Datatype TypeMap() { return MPI_FLOAT; } + #ifdef __cplusplus extern "C" { @@ -150,6 +162,8 @@ extern "C" #define MilanRealMin MINUS_INFINITY #endif + + // Function of find the owner of a ghost vertex using binary search: inline MilanInt findOwnerOfGhost(MilanLongInt vtxIndex, MilanLongInt *mVerDistance, MilanInt myRank, MilanInt numProcs); diff --git a/amgprec/impl/aggregator/algoDistEdgeApproxDomEdgesLinearSearchMesgBndlSmallMateCMP.cpp b/amgprec/impl/aggregator/algoDistEdgeApproxDomEdgesLinearSearchMesgBndlSmallMateCMP.cpp index b6ac6364..cfd6b927 100644 --- a/amgprec/impl/aggregator/algoDistEdgeApproxDomEdgesLinearSearchMesgBndlSmallMateCMP.cpp +++ b/amgprec/impl/aggregator/algoDistEdgeApproxDomEdgesLinearSearchMesgBndlSmallMateCMP.cpp @@ -8,7 +8,7 @@ #include "parallelComputeCandidateMateB.cpp" #include "processExposedVertex.cpp" #include "processMatchedVertices.cpp" -//#include "extractUChunk.cpp" +#include "sendBundledMessages.cpp" // *********************************************************************** // @@ -85,17 +85,6 @@ #ifdef SERIAL_MPI #else -// MPI type map -template -MPI_Datatype TypeMap(); -template <> -inline MPI_Datatype TypeMap() { return MPI_LONG_LONG; } -template <> -inline MPI_Datatype TypeMap() { return MPI_INT; } -template <> -inline MPI_Datatype TypeMap() { return MPI_DOUBLE; } -template <> -inline MPI_Datatype TypeMap() { return MPI_FLOAT; } // DOUBLE PRECISION VERSION // WARNING: The vertex block on a given rank is contiguous @@ -177,6 +166,7 @@ void dalgoDistEdgeApproxDomEdgesLinearSearchMesgBndlSmallMateCMP( vector QLocalVtx, QGhostVtx, QMsgType; vector QOwner; // Changed by Fabio to be an integer, addresses needs to be integers! + // TODO move this inseide the initialization function MilanLongInt *PCounter = new MilanLongInt[numProcs]; for (int i = 0; i < numProcs; i++) PCounter[i] = 0; @@ -220,13 +210,10 @@ void dalgoDistEdgeApproxDomEdgesLinearSearchMesgBndlSmallMateCMP( MilanLongInt S; MilanLongInt privateMyCard = 0; staticQueue U, privateU, privateQLocalVtx, privateQGhostVtx, privateQMsgType, privateQOwner; - MilanLongInt myIndex = 0; vector PCumulative, PMessageBundle, PSizeInfoMessages; vector SRequest; // Requests that are used for each send message vector SStatus; // Status of sent messages, used in MPI_Wait MilanLongInt MessageIndex = 0; // Pointer for current message - MilanInt OneMessageSize = 0; - MilanLongInt numMessagesToSend; MilanInt BufferSize; MilanLongInt *Buffer; @@ -318,9 +305,7 @@ void dalgoDistEdgeApproxDomEdgesLinearSearchMesgBndlSmallMateCMP( /////////////////////////////////////////////////////////////////////////////////// /////////////////////////// PROCESS MATCHED VERTICES ////////////////////////////// /////////////////////////////////////////////////////////////////////////////////// -//#define debug -#ifndef debug - + vector UChunkBeingProcessed; UChunkBeingProcessed.reserve(UCHUNK); processMatchedVertices(NLVer, @@ -336,7 +321,7 @@ void dalgoDistEdgeApproxDomEdgesLinearSearchMesgBndlSmallMateCMP( verLocPtr, verLocInd, verDistance, - PCounter, + PCounter, Counter, myRank, numProcs, @@ -354,166 +339,32 @@ void dalgoDistEdgeApproxDomEdgesLinearSearchMesgBndlSmallMateCMP( privateQMsgType, privateQOwner); + ///////////////////////////////////////////////////////////////////////////////////////// + ///////////////////////////// SEND BUNDLED MESSAGES ///////////////////////////////////// + ///////////////////////////////////////////////////////////////////////////////////////// -#endif - -#pragma omp parallel private(k, u, w, v, k1, adj1, adj2, adj11, adj12, heaviestEdgeWt, ghostOwner, privateMyCard) firstprivate(privateU, StartIndex, EndIndex, privateQLocalVtx, privateQGhostVtx, privateQMsgType, privateQOwner) default(shared) num_threads(4) - { - - -#ifdef DEBUG_HANG_ - if (myRank == 0) - cout << "\n(" << myRank << ") Send Bundles" << endl; - fflush(stdout); -#endif - ///////////////////////////////////////////////////////////////////////////////////////// - ///////////////////////////// SEND BUNDLED MESSAGES ///////////////////////////////////// - ///////////////////////////////////////////////////////////////////////////////////////// -#pragma omp barrier // TODO check if necessary -#pragma omp master - { - // Data structures for Bundled Messages: - try - { - PMessageBundle.reserve(NumMessagesBundled * 3); // Three integers per message - PCumulative.reserve(numProcs + 1); // Similar to Row Pointer vector in CSR data structure - PSizeInfoMessages.reserve(numProcs * 3); // Buffer to hold the Size info message packets - } - catch (length_error) - { - cout << "Error in function algoDistEdgeApproxDominatingEdgesMessageBundling: \n"; - cout << "Not enough memory to allocate the internal variables \n"; - exit(1); - } - PMessageBundle.resize(NumMessagesBundled * 3, -1); // Initialize - PCumulative.resize(numProcs + 1, 0); // Only initialize the counter variable - PSizeInfoMessages.resize(numProcs * 3, 0); - - for (MilanInt i = 0; i < numProcs; i++) // Changed by Fabio to be an integer, addresses needs to be integers! - PCumulative[i + 1] = PCumulative[i] + PCounter[i]; - - // OMP not worth parallelizing - // Reuse PCounter to keep track of how many messages were inserted: - for (MilanInt i = 0; i < numProcs; i++) // Changed by Fabio to be an integer, addresses needs to be integers! - PCounter[i] = 0; - // Build the Message Bundle packet: - - // OMP Not parallelizable - for (MilanInt i = 0; i < NumMessagesBundled; i++) - { // Changed by Fabio to be an integer, addresses needs to be integers! - myIndex = (PCumulative[QOwner[i]] + PCounter[QOwner[i]]) * 3; - PMessageBundle[myIndex + 0] = QLocalVtx[i]; - PMessageBundle[myIndex + 1] = QGhostVtx[i]; - PMessageBundle[myIndex + 2] = QMsgType[i]; - PCounter[QOwner[i]]++; - } - - // Send the Bundled Messages: Use ISend - - try - { - SRequest.reserve(numProcs * 2); // At most two messages per processor - SStatus.reserve(numProcs * 2); // At most two messages per processor - } - catch (length_error) - { - cout << "Error in function algoDistEdgeApproxDominatingEdgesLinearSearchImmediateSend: \n"; - cout << "Not enough memory to allocate the internal variables \n"; - exit(1); - } - MPI_Request myReq; // A sample request - SRequest.resize(numProcs * 2, myReq); - MPI_Status myStat; // A sample status - SStatus.resize(numProcs * 2, myStat); - - // Send the Messages - for (MilanInt i = 0; i < numProcs; i++) - { // Changed by Fabio to be an integer, addresses needs to be integers! - if (i == myRank) // Do not send anything to yourself - continue; - // Send the Message with information about the size of next message: - // Build the Message Packet: - PSizeInfoMessages[i * 3 + 0] = (PCumulative[i + 1] - PCumulative[i]) * 3; // # of integers in the next message - PSizeInfoMessages[i * 3 + 1] = -1; // Dummy packet - PSizeInfoMessages[i * 3 + 2] = SIZEINFO; // TYPE - // Send a Request (Asynchronous) -#ifdef PRINT_DEBUG_INFO_ - cout << "\n(" << myRank << ")Sending bundled message to process " << i << " size: " << PSizeInfoMessages[i * 3 + 0] << endl; - fflush(stdout); -#endif - if (PSizeInfoMessages[i * 3 + 0] > 0) - { // Send only if it is a nonempty packet - MPI_Isend(&PSizeInfoMessages[i * 3 + 0], 3, TypeMap(), i, ComputeTag, comm, - &SRequest[MessageIndex]); - msgActual++; - MessageIndex++; - // Now Send the message with the data packet: -#ifdef PRINT_DEBUG_INFO_ - cout << "\n(" << myRank << ")Sending Bundle to : " << i << endl; - for (k = (PCumulative[i] * 3); k < (PCumulative[i] * 3 + PSizeInfoMessages[i * 3 + 0]); k++) - cout << PMessageBundle[k] << ","; - cout << endl; - fflush(stdout); -#endif - MPI_Isend(&PMessageBundle[PCumulative[i] * 3], PSizeInfoMessages[i * 3 + 0], - TypeMap(), i, BundleTag, comm, &SRequest[MessageIndex]); - MessageIndex++; - } // End of if size > 0 - } - // Free up temporary memory: - PCumulative.clear(); - QLocalVtx.clear(); - QGhostVtx.clear(); - QMsgType.clear(); - QOwner.clear(); - -#ifdef PRINT_DEBUG_INFO_ - cout << "\n(" << myRank << ")Number of Ghost edges = " << numGhostEdges; - cout << "\n(" << myRank << ")Total number of potential message X 2 = " << numGhostEdges * 2; - cout << "\n(" << myRank << ")Number messages already sent in bundles = " << NumMessagesBundled; - if (numGhostEdges > 0) - { - cout << "\n(" << myRank << ")Percentage of total = " << ((double)NumMessagesBundled / (double)(numGhostEdges * 2)) * 100.0 << "% \n"; - } - fflush(stdout); -#endif - - // Allocate memory for MPI Send messages: - /* WILL COME BACK HERE - NO NEED TO STORE ALL THIS MEMORY !! */ - OneMessageSize = 0; - MPI_Pack_size(3, TypeMap(), comm, &OneMessageSize); // Size of one message packet - // How many messages to send? - // Potentially three kinds of messages will be sent/received: - // Request, Success, Failure. - // But only two will be sent from a given processor. - // Substract the number of messages that have already been sent as bundled messages: - numMessagesToSend = numGhostEdges * 2 - NumMessagesBundled; - BufferSize = (OneMessageSize + MPI_BSEND_OVERHEAD) * numMessagesToSend; - - Buffer = 0; -#ifdef PRINT_DEBUG_INFO_ - cout << "\n(" << myRank << ")Size of One Message from PACK= " << OneMessageSize; - cout << "\n(" << myRank << ")Size of Message overhead = " << MPI_BSEND_OVERHEAD; - cout << "\n(" << myRank << ")Number of Ghost edges = " << numGhostEdges; - cout << "\n(" << myRank << ")Number of remaining message = " << numMessagesToSend; - cout << "\n(" << myRank << ")BufferSize = " << BufferSize; - cout << "\n(" << myRank << ")Attaching Buffer on.. "; - fflush(stdout); -#endif - if (BufferSize > 0) - { - Buffer = (MilanLongInt *)malloc(BufferSize); // Allocate memory - if (Buffer == 0) - { - cout << "Error in function algoDistEdgeApproxDominatingEdgesLinearSearch: \n"; - cout << "Not enough memory to allocate for send buffer on process " << myRank << "\n"; - exit(1); - } - MPI_Buffer_attach(Buffer, BufferSize); // Attach the Buffer - } - } // End of master + sendBundledMessages(&numGhostEdges, + &BufferSize, + Buffer, + PCumulative, + PMessageBundle, + PSizeInfoMessages, + PCounter, + NumMessagesBundled, + &msgActual, + &MessageIndex, + numProcs, + myRank, + ComputeTag, + BundleTag, + comm, + QLocalVtx, + QGhostVtx, + QMsgType, + QOwner, + SRequest, + SStatus); - } // end of parallel region ///////////////////////// END OF SEND BUNDLED MESSAGES ////////////////////////////////// finishTime = MPI_Wtime(); @@ -773,10 +624,7 @@ void dalgoDistEdgeApproxDomEdgesLinearSearchMesgBndlSmallMateCMP( /////////////////////////////////////////////////////////////////////////////////// /////////////////////////// PROCESS MESSAGES ////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////////// - /* - RECEIVE message ( u, v, message_type ); - // u is a GHOST vertex ... v is a LOCAL vertex - */ + #ifdef PRINT_DEBUG_INFO_ cout << "\n(" << myRank << "=========================************===============================" << endl; fflush(stdout); diff --git a/amgprec/impl/aggregator/processExposedVertex.cpp b/amgprec/impl/aggregator/processExposedVertex.cpp index 3847110a..dd9562d5 100644 --- a/amgprec/impl/aggregator/processExposedVertex.cpp +++ b/amgprec/impl/aggregator/processExposedVertex.cpp @@ -215,6 +215,7 @@ inline void PARALLEL_PROCESS_EXPOSED_VERTEX_B(MilanLongInt NLVer, privateQMsgType, privateQOwner); +//TODO move this outside of the parallel region!! #pragma omp master { *myCardPtr = myCard; diff --git a/amgprec/impl/aggregator/sendBundledMessages.cpp b/amgprec/impl/aggregator/sendBundledMessages.cpp new file mode 100644 index 00000000..e16c5669 --- /dev/null +++ b/amgprec/impl/aggregator/sendBundledMessages.cpp @@ -0,0 +1,225 @@ +#include "MatchBoxPC.h" +#include +#include +#include +#include +#include "primitiveDataTypeDefinitions.h" +#include "dataStrStaticQueue.h" +#include "omp.h" + +inline void sendBundledMessages(MilanLongInt *numGhostEdgesPtr, + MilanInt *BufferSizePtr, + MilanLongInt *Buffer, + vector &PCumulative, + vector &PMessageBundle, + vector &PSizeInfoMessages, + MilanLongInt *PCounter, + MilanLongInt NumMessagesBundled, + MilanLongInt *msgActualPtr, + MilanLongInt *MessageIndexPtr, + MilanInt numProcs, + MilanInt myRank, + int ComputeTag, + int BundleTag, + MPI_Comm comm, + vector &QLocalVtx, + vector &QGhostVtx, + vector &QMsgType, + vector &QOwner, + vector &SRequest, + vector &SStatus) +{ + + MilanLongInt myIndex = 0, msgActual = *msgActualPtr, MessageIndex = *MessageIndexPtr, numGhostEdges = *numGhostEdgesPtr, numMessagesToSend; + const MilanLongInt SIZEINFO = 4; + MilanInt i = 0, OneMessageSize = 0, BufferSize = *BufferSizePtr; + +#ifdef DEBUG_HANG_ + if (myRank == 0) + cout << "\n(" << myRank << ") Send Bundles" << endl; + fflush(stdout); +#endif + +#pragma omp parallel private(i) default(shared) num_threads(NUM_THREAD) + { +#pragma omp master + { +// Data structures for Bundled Messages: +#pragma omp task depend(inout \ + : PCumulative, PMessageBundle, PSizeInfoMessages) depend(in \ + : NumMessagesBundled, numProcs) + {try { + PMessageBundle.reserve(NumMessagesBundled * 3); // Three integers per message + PCumulative.reserve(numProcs + 1); // Similar to Row Pointer vector in CSR data structure + PSizeInfoMessages.reserve(numProcs * 3); // Buffer to hold the Size info message packets +} +catch (length_error) +{ + cout << "Error in function algoDistEdgeApproxDominatingEdgesMessageBundling: \n"; + cout << "Not enough memory to allocate the internal variables \n"; + exit(1); +} +PMessageBundle.resize(NumMessagesBundled * 3, -1); // Initialize +PCumulative.resize(numProcs + 1, 0); // Only initialize the counter variable +PSizeInfoMessages.resize(numProcs * 3, 0); +} + +#pragma omp task depend(inout \ + : PCumulative) depend(in \ + : PCounter) +{ + for (i = 0; i < numProcs; i++) + PCumulative[i + 1] = PCumulative[i] + PCounter[i]; +} + +#pragma omp task depend(inout \ + : PCounter) +{ + // Reuse PCounter to keep track of how many messages were inserted: + for (MilanInt i = 0; i < numProcs; i++) // Changed by Fabio to be an integer, addresses needs to be integers! + PCounter[i] = 0; +} + +// Build the Message Bundle packet: +#pragma omp task depend(in \ + : PCounter, QLocalVtx, QGhostVtx, QMsgType, QOwner, PMessageBundle, PCumulative) depend(out \ + : myIndex, PMessageBundle, PCounter) +{ + for (i = 0; i < NumMessagesBundled; i++) + { + myIndex = (PCumulative[QOwner[i]] + PCounter[QOwner[i]]) * 3; + PMessageBundle[myIndex + 0] = QLocalVtx[i]; + PMessageBundle[myIndex + 1] = QGhostVtx[i]; + PMessageBundle[myIndex + 2] = QMsgType[i]; + PCounter[QOwner[i]]++; + } +} + +// Send the Bundled Messages: Use ISend +#pragma omp task depend(out \ + : SRequest, SStatus) +{ + try + { + SRequest.reserve(numProcs * 2); // At most two messages per processor + SStatus.reserve(numProcs * 2); // At most two messages per processor + } + catch (length_error) + { + cout << "Error in function algoDistEdgeApproxDominatingEdgesLinearSearchImmediateSend: \n"; + cout << "Not enough memory to allocate the internal variables \n"; + exit(1); + } +} + +// Send the Messages +#pragma omp task depend(inout \ + : SRequest, PSizeInfoMessages, PCumulative) depend(out \ + : msgActual, MessageIndex) +{ + for (i = 0; i < numProcs; i++) + { // Changed by Fabio to be an integer, addresses needs to be integers! + if (i == myRank) // Do not send anything to yourself + continue; + // Send the Message with information about the size of next message: + // Build the Message Packet: + PSizeInfoMessages[i * 3 + 0] = (PCumulative[i + 1] - PCumulative[i]) * 3; // # of integers in the next message + PSizeInfoMessages[i * 3 + 1] = -1; // Dummy packet + PSizeInfoMessages[i * 3 + 2] = SIZEINFO; // TYPE + // Send a Request (Asynchronous) +#ifdef PRINT_DEBUG_INFO_ + cout << "\n(" << myRank << ")Sending bundled message to process " << i << " size: " << PSizeInfoMessages[i * 3 + 0] << endl; + fflush(stdout); +#endif + if (PSizeInfoMessages[i * 3 + 0] > 0) + { // Send only if it is a nonempty packet + MPI_Isend(&PSizeInfoMessages[i * 3 + 0], 3, TypeMap(), i, ComputeTag, comm, + &SRequest[MessageIndex]); + msgActual++; + MessageIndex++; + // Now Send the message with the data packet: +#ifdef PRINT_DEBUG_INFO_ + cout << "\n(" << myRank << ")SendiFFng Bundle to : " << i << endl; + for (k = (PCumulative[i] * 3); k < (PCumulative[i] * 3 + PSizeInfoMessages[i * 3 + 0]); k++) + cout << PMessageBundle[k] << ","; + cout << endl; + fflush(stdout); +#endif + MPI_Isend(&PMessageBundle[PCumulative[i] * 3], PSizeInfoMessages[i * 3 + 0], + TypeMap(), i, BundleTag, comm, &SRequest[MessageIndex]); + MessageIndex++; + } // End of if size > 0 + } +} + +#pragma omp task depend(inout \ + : PCumulative, QLocalVtx, QGhostVtx, QMsgType, QOwner) +{ + + // Free up temporary memory: + PCumulative.clear(); + QLocalVtx.clear(); + QGhostVtx.clear(); + QMsgType.clear(); + QOwner.clear(); +} + +#pragma omp task depend(inout : OneMessageSize, BufferSize) depend(out : numMessagesToSend) depend(in : numGhostEdges) +{ + +#ifdef PRINT_DEBUG_INFO_ + cout << "\n(" << myRank << ")Number of Ghost edges = " << numGhostEdges; + cout << "\n(" << myRank << ")Total number of potential message X 2 = " << numGhostEdges * 2; + cout << "\n(" << myRank << ")Number messages already sent in bundles = " << NumMessagesBundled; + if (numGhostEdges > 0) + { + cout << "\n(" << myRank << ")Percentage of total = " << ((double)NumMessagesBundled / (double)(numGhostEdges * 2)) * 100.0 << "% \n"; + } + fflush(stdout); +#endif + + // Allocate memory for MPI Send messages: + /* WILL COME BACK HERE - NO NEED TO STORE ALL THIS MEMORY !! */ + OneMessageSize = 0; + MPI_Pack_size(3, TypeMap(), comm, &OneMessageSize); // Size of one message packet + // How many messages to send? + // Potentially three kinds of messages will be sent/received: + // Request, Success, Failure. + // But only two will be sent from a given processor. + // Substract the number of messages that have already been sent as bundled messages: + numMessagesToSend = numGhostEdges * 2 - NumMessagesBundled; + BufferSize = (OneMessageSize + MPI_BSEND_OVERHEAD) * numMessagesToSend; +} + +#pragma omp task depend(out : Buffer) depend(in : BufferSize) +{ + Buffer = 0; +#ifdef PRINT_DEBUG_INFO_ + cout << "\n(" << myRank << ")Size of One Message from PACK= " << OneMessageSize; + cout << "\n(" << myRank << ")Size of Message overhead = " << MPI_BSEND_OVERHEAD; + cout << "\n(" << myRank << ")Number of Ghost edges = " << numGhostEdges; + cout << "\n(" << myRank << ")Number of remaining message = " << numMessagesToSend; + cout << "\n(" << myRank << ")BufferSize = " << BufferSize; + cout << "\n(" << myRank << ")Attaching Buffer on.. "; + fflush(stdout); +#endif + if (BufferSize > 0) + { + Buffer = (MilanLongInt *)malloc(BufferSize); // Allocate memory + if (Buffer == 0) + { + cout << "Error in function algoDistEdgeApproxDominatingEdgesLinearSearch: \n"; + cout << "Not enough memory to allocate for send buffer on process " << myRank << "\n"; + exit(1); + } + MPI_Buffer_attach(Buffer, BufferSize); // Attach the Buffer + } +} +} +} + +*MessageIndexPtr = MessageIndex; +*msgActualPtr = msgActual; +*numGhostEdgesPtr = numGhostEdges; +*BufferSizePtr = BufferSize; +} \ No newline at end of file