sendBundledMessages refactoring

omp-walther
StefanoPetrilli 2 years ago
parent 9b13aef1ce
commit 3de1e607eb

@ -66,6 +66,18 @@
using namespace std;
#define NUM_THREAD 4
// MPI type map
template <typename T>
MPI_Datatype TypeMap();
template <>
inline MPI_Datatype TypeMap<int64_t>() { return MPI_LONG_LONG; }
template <>
inline MPI_Datatype TypeMap<int>() { return MPI_INT; }
template <>
inline MPI_Datatype TypeMap<double>() { return MPI_DOUBLE; }
template <>
inline MPI_Datatype TypeMap<float>() { return MPI_FLOAT; }
#ifdef __cplusplus
extern "C"
{
@ -150,6 +162,8 @@ extern "C"
#define MilanRealMin MINUS_INFINITY
#endif
// Function of find the owner of a ghost vertex using binary search:
inline MilanInt findOwnerOfGhost(MilanLongInt vtxIndex, MilanLongInt *mVerDistance,
MilanInt myRank, MilanInt numProcs);

@ -8,7 +8,7 @@
#include "parallelComputeCandidateMateB.cpp"
#include "processExposedVertex.cpp"
#include "processMatchedVertices.cpp"
//#include "extractUChunk.cpp"
#include "sendBundledMessages.cpp"
// ***********************************************************************
//
@ -85,17 +85,6 @@
#ifdef SERIAL_MPI
#else
// MPI type map
template <typename T>
MPI_Datatype TypeMap();
template <>
inline MPI_Datatype TypeMap<int64_t>() { return MPI_LONG_LONG; }
template <>
inline MPI_Datatype TypeMap<int>() { return MPI_INT; }
template <>
inline MPI_Datatype TypeMap<double>() { return MPI_DOUBLE; }
template <>
inline MPI_Datatype TypeMap<float>() { return MPI_FLOAT; }
// DOUBLE PRECISION VERSION
// WARNING: The vertex block on a given rank is contiguous
@ -177,6 +166,7 @@ void dalgoDistEdgeApproxDomEdgesLinearSearchMesgBndlSmallMateCMP(
vector<MilanLongInt> QLocalVtx, QGhostVtx, QMsgType;
vector<MilanInt> QOwner; // Changed by Fabio to be an integer, addresses needs to be integers!
// TODO move this inseide the initialization function
MilanLongInt *PCounter = new MilanLongInt[numProcs];
for (int i = 0; i < numProcs; i++)
PCounter[i] = 0;
@ -220,13 +210,10 @@ void dalgoDistEdgeApproxDomEdgesLinearSearchMesgBndlSmallMateCMP(
MilanLongInt S;
MilanLongInt privateMyCard = 0;
staticQueue U, privateU, privateQLocalVtx, privateQGhostVtx, privateQMsgType, privateQOwner;
MilanLongInt myIndex = 0;
vector<MilanLongInt> PCumulative, PMessageBundle, PSizeInfoMessages;
vector<MPI_Request> SRequest; // Requests that are used for each send message
vector<MPI_Status> SStatus; // Status of sent messages, used in MPI_Wait
MilanLongInt MessageIndex = 0; // Pointer for current message
MilanInt OneMessageSize = 0;
MilanLongInt numMessagesToSend;
MilanInt BufferSize;
MilanLongInt *Buffer;
@ -318,8 +305,6 @@ void dalgoDistEdgeApproxDomEdgesLinearSearchMesgBndlSmallMateCMP(
///////////////////////////////////////////////////////////////////////////////////
/////////////////////////// PROCESS MATCHED VERTICES //////////////////////////////
///////////////////////////////////////////////////////////////////////////////////
//#define debug
#ifndef debug
vector<MilanLongInt> UChunkBeingProcessed;
UChunkBeingProcessed.reserve(UCHUNK);
@ -354,166 +339,32 @@ void dalgoDistEdgeApproxDomEdgesLinearSearchMesgBndlSmallMateCMP(
privateQMsgType,
privateQOwner);
#endif
#pragma omp parallel private(k, u, w, v, k1, adj1, adj2, adj11, adj12, heaviestEdgeWt, ghostOwner, privateMyCard) firstprivate(privateU, StartIndex, EndIndex, privateQLocalVtx, privateQGhostVtx, privateQMsgType, privateQOwner) default(shared) num_threads(4)
{
#ifdef DEBUG_HANG_
if (myRank == 0)
cout << "\n(" << myRank << ") Send Bundles" << endl;
fflush(stdout);
#endif
/////////////////////////////////////////////////////////////////////////////////////////
///////////////////////////// SEND BUNDLED MESSAGES /////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////////////////
#pragma omp barrier // TODO check if necessary
#pragma omp master
{
// Data structures for Bundled Messages:
try
{
PMessageBundle.reserve(NumMessagesBundled * 3); // Three integers per message
PCumulative.reserve(numProcs + 1); // Similar to Row Pointer vector in CSR data structure
PSizeInfoMessages.reserve(numProcs * 3); // Buffer to hold the Size info message packets
}
catch (length_error)
{
cout << "Error in function algoDistEdgeApproxDominatingEdgesMessageBundling: \n";
cout << "Not enough memory to allocate the internal variables \n";
exit(1);
}
PMessageBundle.resize(NumMessagesBundled * 3, -1); // Initialize
PCumulative.resize(numProcs + 1, 0); // Only initialize the counter variable
PSizeInfoMessages.resize(numProcs * 3, 0);
for (MilanInt i = 0; i < numProcs; i++) // Changed by Fabio to be an integer, addresses needs to be integers!
PCumulative[i + 1] = PCumulative[i] + PCounter[i];
// OMP not worth parallelizing
// Reuse PCounter to keep track of how many messages were inserted:
for (MilanInt i = 0; i < numProcs; i++) // Changed by Fabio to be an integer, addresses needs to be integers!
PCounter[i] = 0;
// Build the Message Bundle packet:
// OMP Not parallelizable
for (MilanInt i = 0; i < NumMessagesBundled; i++)
{ // Changed by Fabio to be an integer, addresses needs to be integers!
myIndex = (PCumulative[QOwner[i]] + PCounter[QOwner[i]]) * 3;
PMessageBundle[myIndex + 0] = QLocalVtx[i];
PMessageBundle[myIndex + 1] = QGhostVtx[i];
PMessageBundle[myIndex + 2] = QMsgType[i];
PCounter[QOwner[i]]++;
}
// Send the Bundled Messages: Use ISend
try
{
SRequest.reserve(numProcs * 2); // At most two messages per processor
SStatus.reserve(numProcs * 2); // At most two messages per processor
}
catch (length_error)
{
cout << "Error in function algoDistEdgeApproxDominatingEdgesLinearSearchImmediateSend: \n";
cout << "Not enough memory to allocate the internal variables \n";
exit(1);
}
MPI_Request myReq; // A sample request
SRequest.resize(numProcs * 2, myReq);
MPI_Status myStat; // A sample status
SStatus.resize(numProcs * 2, myStat);
// Send the Messages
for (MilanInt i = 0; i < numProcs; i++)
{ // Changed by Fabio to be an integer, addresses needs to be integers!
if (i == myRank) // Do not send anything to yourself
continue;
// Send the Message with information about the size of next message:
// Build the Message Packet:
PSizeInfoMessages[i * 3 + 0] = (PCumulative[i + 1] - PCumulative[i]) * 3; // # of integers in the next message
PSizeInfoMessages[i * 3 + 1] = -1; // Dummy packet
PSizeInfoMessages[i * 3 + 2] = SIZEINFO; // TYPE
// Send a Request (Asynchronous)
#ifdef PRINT_DEBUG_INFO_
cout << "\n(" << myRank << ")Sending bundled message to process " << i << " size: " << PSizeInfoMessages[i * 3 + 0] << endl;
fflush(stdout);
#endif
if (PSizeInfoMessages[i * 3 + 0] > 0)
{ // Send only if it is a nonempty packet
MPI_Isend(&PSizeInfoMessages[i * 3 + 0], 3, TypeMap<MilanLongInt>(), i, ComputeTag, comm,
&SRequest[MessageIndex]);
msgActual++;
MessageIndex++;
// Now Send the message with the data packet:
#ifdef PRINT_DEBUG_INFO_
cout << "\n(" << myRank << ")Sending Bundle to : " << i << endl;
for (k = (PCumulative[i] * 3); k < (PCumulative[i] * 3 + PSizeInfoMessages[i * 3 + 0]); k++)
cout << PMessageBundle[k] << ",";
cout << endl;
fflush(stdout);
#endif
MPI_Isend(&PMessageBundle[PCumulative[i] * 3], PSizeInfoMessages[i * 3 + 0],
TypeMap<MilanLongInt>(), i, BundleTag, comm, &SRequest[MessageIndex]);
MessageIndex++;
} // End of if size > 0
}
// Free up temporary memory:
PCumulative.clear();
QLocalVtx.clear();
QGhostVtx.clear();
QMsgType.clear();
QOwner.clear();
#ifdef PRINT_DEBUG_INFO_
cout << "\n(" << myRank << ")Number of Ghost edges = " << numGhostEdges;
cout << "\n(" << myRank << ")Total number of potential message X 2 = " << numGhostEdges * 2;
cout << "\n(" << myRank << ")Number messages already sent in bundles = " << NumMessagesBundled;
if (numGhostEdges > 0)
{
cout << "\n(" << myRank << ")Percentage of total = " << ((double)NumMessagesBundled / (double)(numGhostEdges * 2)) * 100.0 << "% \n";
}
fflush(stdout);
#endif
// Allocate memory for MPI Send messages:
/* WILL COME BACK HERE - NO NEED TO STORE ALL THIS MEMORY !! */
OneMessageSize = 0;
MPI_Pack_size(3, TypeMap<MilanLongInt>(), comm, &OneMessageSize); // Size of one message packet
// How many messages to send?
// Potentially three kinds of messages will be sent/received:
// Request, Success, Failure.
// But only two will be sent from a given processor.
// Substract the number of messages that have already been sent as bundled messages:
numMessagesToSend = numGhostEdges * 2 - NumMessagesBundled;
BufferSize = (OneMessageSize + MPI_BSEND_OVERHEAD) * numMessagesToSend;
Buffer = 0;
#ifdef PRINT_DEBUG_INFO_
cout << "\n(" << myRank << ")Size of One Message from PACK= " << OneMessageSize;
cout << "\n(" << myRank << ")Size of Message overhead = " << MPI_BSEND_OVERHEAD;
cout << "\n(" << myRank << ")Number of Ghost edges = " << numGhostEdges;
cout << "\n(" << myRank << ")Number of remaining message = " << numMessagesToSend;
cout << "\n(" << myRank << ")BufferSize = " << BufferSize;
cout << "\n(" << myRank << ")Attaching Buffer on.. ";
fflush(stdout);
#endif
if (BufferSize > 0)
{
Buffer = (MilanLongInt *)malloc(BufferSize); // Allocate memory
if (Buffer == 0)
{
cout << "Error in function algoDistEdgeApproxDominatingEdgesLinearSearch: \n";
cout << "Not enough memory to allocate for send buffer on process " << myRank << "\n";
exit(1);
}
MPI_Buffer_attach(Buffer, BufferSize); // Attach the Buffer
}
} // End of master
sendBundledMessages(&numGhostEdges,
&BufferSize,
Buffer,
PCumulative,
PMessageBundle,
PSizeInfoMessages,
PCounter,
NumMessagesBundled,
&msgActual,
&MessageIndex,
numProcs,
myRank,
ComputeTag,
BundleTag,
comm,
QLocalVtx,
QGhostVtx,
QMsgType,
QOwner,
SRequest,
SStatus);
} // end of parallel region
///////////////////////// END OF SEND BUNDLED MESSAGES //////////////////////////////////
finishTime = MPI_Wtime();
@ -773,10 +624,7 @@ void dalgoDistEdgeApproxDomEdgesLinearSearchMesgBndlSmallMateCMP(
///////////////////////////////////////////////////////////////////////////////////
/////////////////////////// PROCESS MESSAGES //////////////////////////////////////
///////////////////////////////////////////////////////////////////////////////////
/*
RECEIVE message ( u, v, message_type );
// u is a GHOST vertex ... v is a LOCAL vertex
*/
#ifdef PRINT_DEBUG_INFO_
cout << "\n(" << myRank << "=========================************===============================" << endl;
fflush(stdout);

@ -215,6 +215,7 @@ inline void PARALLEL_PROCESS_EXPOSED_VERTEX_B(MilanLongInt NLVer,
privateQMsgType,
privateQOwner);
//TODO move this outside of the parallel region!!
#pragma omp master
{
*myCardPtr = myCard;

@ -0,0 +1,225 @@
#include "MatchBoxPC.h"
#include <stdio.h>
#include <iostream>
#include <map>
#include <vector>
#include "primitiveDataTypeDefinitions.h"
#include "dataStrStaticQueue.h"
#include "omp.h"
inline void sendBundledMessages(MilanLongInt *numGhostEdgesPtr,
MilanInt *BufferSizePtr,
MilanLongInt *Buffer,
vector<MilanLongInt> &PCumulative,
vector<MilanLongInt> &PMessageBundle,
vector<MilanLongInt> &PSizeInfoMessages,
MilanLongInt *PCounter,
MilanLongInt NumMessagesBundled,
MilanLongInt *msgActualPtr,
MilanLongInt *MessageIndexPtr,
MilanInt numProcs,
MilanInt myRank,
int ComputeTag,
int BundleTag,
MPI_Comm comm,
vector<MilanLongInt> &QLocalVtx,
vector<MilanLongInt> &QGhostVtx,
vector<MilanLongInt> &QMsgType,
vector<MilanInt> &QOwner,
vector<MPI_Request> &SRequest,
vector<MPI_Status> &SStatus)
{
MilanLongInt myIndex = 0, msgActual = *msgActualPtr, MessageIndex = *MessageIndexPtr, numGhostEdges = *numGhostEdgesPtr, numMessagesToSend;
const MilanLongInt SIZEINFO = 4;
MilanInt i = 0, OneMessageSize = 0, BufferSize = *BufferSizePtr;
#ifdef DEBUG_HANG_
if (myRank == 0)
cout << "\n(" << myRank << ") Send Bundles" << endl;
fflush(stdout);
#endif
#pragma omp parallel private(i) default(shared) num_threads(NUM_THREAD)
{
#pragma omp master
{
// Data structures for Bundled Messages:
#pragma omp task depend(inout \
: PCumulative, PMessageBundle, PSizeInfoMessages) depend(in \
: NumMessagesBundled, numProcs)
{try {
PMessageBundle.reserve(NumMessagesBundled * 3); // Three integers per message
PCumulative.reserve(numProcs + 1); // Similar to Row Pointer vector in CSR data structure
PSizeInfoMessages.reserve(numProcs * 3); // Buffer to hold the Size info message packets
}
catch (length_error)
{
cout << "Error in function algoDistEdgeApproxDominatingEdgesMessageBundling: \n";
cout << "Not enough memory to allocate the internal variables \n";
exit(1);
}
PMessageBundle.resize(NumMessagesBundled * 3, -1); // Initialize
PCumulative.resize(numProcs + 1, 0); // Only initialize the counter variable
PSizeInfoMessages.resize(numProcs * 3, 0);
}
#pragma omp task depend(inout \
: PCumulative) depend(in \
: PCounter)
{
for (i = 0; i < numProcs; i++)
PCumulative[i + 1] = PCumulative[i] + PCounter[i];
}
#pragma omp task depend(inout \
: PCounter)
{
// Reuse PCounter to keep track of how many messages were inserted:
for (MilanInt i = 0; i < numProcs; i++) // Changed by Fabio to be an integer, addresses needs to be integers!
PCounter[i] = 0;
}
// Build the Message Bundle packet:
#pragma omp task depend(in \
: PCounter, QLocalVtx, QGhostVtx, QMsgType, QOwner, PMessageBundle, PCumulative) depend(out \
: myIndex, PMessageBundle, PCounter)
{
for (i = 0; i < NumMessagesBundled; i++)
{
myIndex = (PCumulative[QOwner[i]] + PCounter[QOwner[i]]) * 3;
PMessageBundle[myIndex + 0] = QLocalVtx[i];
PMessageBundle[myIndex + 1] = QGhostVtx[i];
PMessageBundle[myIndex + 2] = QMsgType[i];
PCounter[QOwner[i]]++;
}
}
// Send the Bundled Messages: Use ISend
#pragma omp task depend(out \
: SRequest, SStatus)
{
try
{
SRequest.reserve(numProcs * 2); // At most two messages per processor
SStatus.reserve(numProcs * 2); // At most two messages per processor
}
catch (length_error)
{
cout << "Error in function algoDistEdgeApproxDominatingEdgesLinearSearchImmediateSend: \n";
cout << "Not enough memory to allocate the internal variables \n";
exit(1);
}
}
// Send the Messages
#pragma omp task depend(inout \
: SRequest, PSizeInfoMessages, PCumulative) depend(out \
: msgActual, MessageIndex)
{
for (i = 0; i < numProcs; i++)
{ // Changed by Fabio to be an integer, addresses needs to be integers!
if (i == myRank) // Do not send anything to yourself
continue;
// Send the Message with information about the size of next message:
// Build the Message Packet:
PSizeInfoMessages[i * 3 + 0] = (PCumulative[i + 1] - PCumulative[i]) * 3; // # of integers in the next message
PSizeInfoMessages[i * 3 + 1] = -1; // Dummy packet
PSizeInfoMessages[i * 3 + 2] = SIZEINFO; // TYPE
// Send a Request (Asynchronous)
#ifdef PRINT_DEBUG_INFO_
cout << "\n(" << myRank << ")Sending bundled message to process " << i << " size: " << PSizeInfoMessages[i * 3 + 0] << endl;
fflush(stdout);
#endif
if (PSizeInfoMessages[i * 3 + 0] > 0)
{ // Send only if it is a nonempty packet
MPI_Isend(&PSizeInfoMessages[i * 3 + 0], 3, TypeMap<MilanLongInt>(), i, ComputeTag, comm,
&SRequest[MessageIndex]);
msgActual++;
MessageIndex++;
// Now Send the message with the data packet:
#ifdef PRINT_DEBUG_INFO_
cout << "\n(" << myRank << ")SendiFFng Bundle to : " << i << endl;
for (k = (PCumulative[i] * 3); k < (PCumulative[i] * 3 + PSizeInfoMessages[i * 3 + 0]); k++)
cout << PMessageBundle[k] << ",";
cout << endl;
fflush(stdout);
#endif
MPI_Isend(&PMessageBundle[PCumulative[i] * 3], PSizeInfoMessages[i * 3 + 0],
TypeMap<MilanLongInt>(), i, BundleTag, comm, &SRequest[MessageIndex]);
MessageIndex++;
} // End of if size > 0
}
}
#pragma omp task depend(inout \
: PCumulative, QLocalVtx, QGhostVtx, QMsgType, QOwner)
{
// Free up temporary memory:
PCumulative.clear();
QLocalVtx.clear();
QGhostVtx.clear();
QMsgType.clear();
QOwner.clear();
}
#pragma omp task depend(inout : OneMessageSize, BufferSize) depend(out : numMessagesToSend) depend(in : numGhostEdges)
{
#ifdef PRINT_DEBUG_INFO_
cout << "\n(" << myRank << ")Number of Ghost edges = " << numGhostEdges;
cout << "\n(" << myRank << ")Total number of potential message X 2 = " << numGhostEdges * 2;
cout << "\n(" << myRank << ")Number messages already sent in bundles = " << NumMessagesBundled;
if (numGhostEdges > 0)
{
cout << "\n(" << myRank << ")Percentage of total = " << ((double)NumMessagesBundled / (double)(numGhostEdges * 2)) * 100.0 << "% \n";
}
fflush(stdout);
#endif
// Allocate memory for MPI Send messages:
/* WILL COME BACK HERE - NO NEED TO STORE ALL THIS MEMORY !! */
OneMessageSize = 0;
MPI_Pack_size(3, TypeMap<MilanLongInt>(), comm, &OneMessageSize); // Size of one message packet
// How many messages to send?
// Potentially three kinds of messages will be sent/received:
// Request, Success, Failure.
// But only two will be sent from a given processor.
// Substract the number of messages that have already been sent as bundled messages:
numMessagesToSend = numGhostEdges * 2 - NumMessagesBundled;
BufferSize = (OneMessageSize + MPI_BSEND_OVERHEAD) * numMessagesToSend;
}
#pragma omp task depend(out : Buffer) depend(in : BufferSize)
{
Buffer = 0;
#ifdef PRINT_DEBUG_INFO_
cout << "\n(" << myRank << ")Size of One Message from PACK= " << OneMessageSize;
cout << "\n(" << myRank << ")Size of Message overhead = " << MPI_BSEND_OVERHEAD;
cout << "\n(" << myRank << ")Number of Ghost edges = " << numGhostEdges;
cout << "\n(" << myRank << ")Number of remaining message = " << numMessagesToSend;
cout << "\n(" << myRank << ")BufferSize = " << BufferSize;
cout << "\n(" << myRank << ")Attaching Buffer on.. ";
fflush(stdout);
#endif
if (BufferSize > 0)
{
Buffer = (MilanLongInt *)malloc(BufferSize); // Allocate memory
if (Buffer == 0)
{
cout << "Error in function algoDistEdgeApproxDominatingEdgesLinearSearch: \n";
cout << "Not enough memory to allocate for send buffer on process " << myRank << "\n";
exit(1);
}
MPI_Buffer_attach(Buffer, BufferSize); // Attach the Buffer
}
}
}
}
*MessageIndexPtr = MessageIndex;
*msgActualPtr = msgActual;
*numGhostEdgesPtr = numGhostEdges;
*BufferSizePtr = BufferSize;
}
Loading…
Cancel
Save