processMessags partial refactoring, message const refactoring

omp-walther
StefanoPetrilli 2 years ago
parent d19443052d
commit 64c23f93f8

@ -65,6 +65,10 @@
using namespace std; using namespace std;
#define NUM_THREAD 4 #define NUM_THREAD 4
const MilanLongInt REQUEST = 1;
const MilanLongInt SUCCESS = 2;
const MilanLongInt FAILURE = 3;
const MilanLongInt SIZEINFO = 4;
// MPI type map // MPI type map
template <typename T> template <typename T>

@ -9,7 +9,8 @@
#include "processExposedVertex.cpp" #include "processExposedVertex.cpp"
#include "processMatchedVertices.cpp" #include "processMatchedVertices.cpp"
#include "sendBundledMessages.cpp" #include "sendBundledMessages.cpp"
//#include "processCrossEdge.cpp" #include "processMessages.cpp"
// *********************************************************************** // ***********************************************************************
// //
@ -155,10 +156,6 @@ void dalgoDistEdgeApproxDomEdgesLinearSearchMesgBndlSmallMateCMP(
// Data structures for sending and receiving messages: // Data structures for sending and receiving messages:
vector<MilanLongInt> Message; // [ u, v, message_type ] vector<MilanLongInt> Message; // [ u, v, message_type ]
Message.resize(3, -1); Message.resize(3, -1);
const MilanLongInt REQUEST = 1;
const MilanLongInt SUCCESS = 2;
const MilanLongInt FAILURE = 3;
const MilanLongInt SIZEINFO = 4;
MilanLongInt message_type = 0; MilanLongInt message_type = 0;
// Data structures for Message Bundling: // Data structures for Message Bundling:
// Although up to two messages can be sent along any cross edge, // Although up to two messages can be sent along any cross edge,
@ -186,7 +183,6 @@ void dalgoDistEdgeApproxDomEdgesLinearSearchMesgBndlSmallMateCMP(
MilanLongInt k = -1, adj1 = -1, adj2 = -1; MilanLongInt k = -1, adj1 = -1, adj2 = -1;
MilanLongInt k1 = -1, adj11 = -1, adj12 = -1; MilanLongInt k1 = -1, adj11 = -1, adj12 = -1;
MilanLongInt myCard = 0; MilanLongInt myCard = 0;
MilanInt Sender = 0; // This is the rank of the sending nodes, it has to be an integer! Fabio
// Build the Ghost Vertex Set: Vg // Build the Ghost Vertex Set: Vg
map<MilanLongInt, MilanLongInt> Ghost2LocalMap; // Map each ghost vertex to a local vertex map<MilanLongInt, MilanLongInt> Ghost2LocalMap; // Map each ghost vertex to a local vertex
@ -614,90 +610,19 @@ void dalgoDistEdgeApproxDomEdgesLinearSearchMesgBndlSmallMateCMP(
/////////////////////////// PROCESS MESSAGES ////////////////////////////////////// /////////////////////////// PROCESS MESSAGES //////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////
#ifdef PRINT_DEBUG_INFO_ processMessages(error_codeC,
cout << "\n(" << myRank << "=========================************===============================" << endl; numProcs,
fflush(stdout); myRank,
fflush(stdout); ComputeTag,
#endif BundleTag,
#ifdef PRINT_DEBUG_INFO_ comm,
cout << "\n(" << myRank << ")About to begin Message processing phase ... S=" << S << endl; Message,
fflush(stdout); error_message,
#endif message_length,
#ifdef PRINT_DEBUG_INFO_ ReceiveBuffer,
cout << "\n(" << myRank << "=========================************===============================" << endl; &bundleSize);
fflush(stdout);
fflush(stdout);
#endif
// BLOCKING RECEIVE:
#ifdef PRINT_DEBUG_INFO_
cout << "\n(" << myRank << " Waiting for blocking receive..." << endl;
fflush(stdout);
fflush(stdout);
#endif
error_codeC = MPI_Recv(&Message[0], 3, TypeMap<MilanLongInt>(), MPI_ANY_SOURCE, ComputeTag, comm, &computeStatus);
if (error_codeC != MPI_SUCCESS)
{
MPI_Error_string(error_codeC, error_message, &message_length);
cout << "\n*Error in call to MPI_Receive on Slave: " << error_message << "\n";
fflush(stdout);
}
Sender = computeStatus.MPI_SOURCE;
#ifdef PRINT_DEBUG_INFO_
cout << "\n(" << myRank << ")Received message from Process " << Sender << " Type= " << Message[2] << endl;
fflush(stdout);
#endif
// If the Message Type is a size indicator, then receive the bigger message.
if (Message[2] == SIZEINFO)
{
#ifdef PRINT_DEBUG_INFO_
cout << "\n(" << myRank << ")Received bundled message from Process " << Sender << " Size= " << Message[0] << endl;
fflush(stdout);
#endif
bundleSize = Message[0]; //#of integers in the message
// Build the Message Buffer:
if (!ReceiveBuffer.empty())
ReceiveBuffer.clear(); // Empty it out first
ReceiveBuffer.resize(bundleSize, -1); // Initialize
#ifdef PRINT_DEBUG_INFO_
cout << "\n(" << myRank << ")Message Bundle Before: " << endl;
for (i = 0; i < bundleSize; i++)
cout << ReceiveBuffer[i] << ",";
cout << endl;
fflush(stdout);
#endif
// Receive the message
error_codeC = MPI_Recv(&ReceiveBuffer[0], bundleSize, TypeMap<MilanLongInt>(), Sender, BundleTag, comm, &computeStatus);
if (error_codeC != MPI_SUCCESS)
{
MPI_Error_string(error_codeC, error_message, &message_length);
cout << "\n*Error in call to MPI_Receive on processor " << myRank << " Error: " << error_message << "\n";
fflush(stdout);
}
#ifdef PRINT_DEBUG_INFO_
cout << "\n(" << myRank << ")Message Bundle After: " << endl;
for (i = 0; i < bundleSize; i++)
cout << ReceiveBuffer[i] << ",";
cout << endl;
fflush(stdout);
#endif
}
else
{ // Just a single message:
#ifdef PRINT_DEBUG_INFO_
cout << "\n(" << myRank << ")Received regular message from Process " << Sender << " u= " << Message[0] << " v= " << Message[1] << endl;
fflush(stdout);
#endif
// Add the current message to Queue:
bundleSize = 3; //#of integers in the message
// Build the Message Buffer:
if (!ReceiveBuffer.empty())
ReceiveBuffer.clear(); // Empty it out first
ReceiveBuffer.resize(bundleSize, -1); // Initialize
ReceiveBuffer[0] = Message[0]; // u
ReceiveBuffer[1] = Message[1]; // v
ReceiveBuffer[2] = Message[2]; // message_type
}
bundleCounter = 0; bundleCounter = 0;
while (bundleCounter < bundleSize) while (bundleCounter < bundleSize)
{ {
@ -707,17 +632,7 @@ void dalgoDistEdgeApproxDomEdgesLinearSearchMesgBndlSmallMateCMP(
bundleCounter++; bundleCounter++;
message_type = ReceiveBuffer[bundleCounter]; // TYPE message_type = ReceiveBuffer[bundleCounter]; // TYPE
bundleCounter++; bundleCounter++;
#ifdef DEBUG_GHOST_
if ((v < StartIndex) || (v > EndIndex))
{
cout << "\n(" << myRank << ") From ReceiveBuffer: This should not happen: u= " << u << " v= " << v << " Type= " << message_type << " StartIndex " << StartIndex << " EndIndex " << EndIndex << endl;
fflush(stdout);
}
#endif
#ifdef PRINT_DEBUG_INFO_
cout << "\n(" << myRank << ")Processing message: u= " << u << " v= " << v << " Type= " << message_type << endl;
fflush(stdout);
#endif
// CASE I: REQUEST // CASE I: REQUEST
if (message_type == REQUEST) if (message_type == REQUEST)
{ {
@ -774,33 +689,8 @@ void dalgoDistEdgeApproxDomEdgesLinearSearchMesgBndlSmallMateCMP(
if (candidateMate[v - StartIndex] == u) if (candidateMate[v - StartIndex] == u)
{ {
// Start: PARALLEL_PROCESS_EXPOSED_VERTEX_B(v) // Start: PARALLEL_PROCESS_EXPOSED_VERTEX_B(v)
// Start: PARALLEL_COMPUTE_CANDIDATE_MATE_B(v) w = computeCandidateMate(verLocPtr[v - StartIndex], verLocPtr[v - StartIndex + 1], edgeLocWeight, k, verLocInd, StartIndex, EndIndex, GMate, Mate, Ghost2LocalMap);
adj11 = verLocPtr[v - StartIndex];
adj12 = verLocPtr[v - StartIndex + 1];
w = -1;
heaviestEdgeWt = MilanRealMin; // Assign the smallest Value possible first LDBL_MIN
for (k1 = adj11; k1 < adj12; k1++)
{
if ((verLocInd[k1] < StartIndex) || (verLocInd[k1] > EndIndex))
{ // Is it a ghost vertex?
if (GMate[Ghost2LocalMap[verLocInd[k1]]] >= 0) // Already matched
continue;
}
else
{ // A local vertex
if (Mate[verLocInd[k1] - StartIndex] >= 0) // Already matched
continue;
}
if ((edgeLocWeight[k1] > heaviestEdgeWt) ||
((edgeLocWeight[k1] == heaviestEdgeWt) && (w < verLocInd[k1])))
{
heaviestEdgeWt = edgeLocWeight[k1];
w = verLocInd[k1];
}
} // End of for loop
candidateMate[v - StartIndex] = w; candidateMate[v - StartIndex] = w;
// End: PARALLEL_COMPUTE_CANDIDATE_MATE_B(v)
#ifdef PRINT_DEBUG_INFO_ #ifdef PRINT_DEBUG_INFO_
cout << "\n(" << myRank << ")" << v << " Points to: " << w << endl; cout << "\n(" << myRank << ")" << v << " Points to: " << w << endl;
fflush(stdout); fflush(stdout);
@ -830,7 +720,6 @@ void dalgoDistEdgeApproxDomEdgesLinearSearchMesgBndlSmallMateCMP(
{ {
Mate[v - StartIndex] = w; // v is local Mate[v - StartIndex] = w; // v is local
GMate[Ghost2LocalMap[w]] = v; // w is ghost GMate[Ghost2LocalMap[w]] = v; // w is ghost
// Q.push_back(u);
U.push_back(v); U.push_back(v);
U.push_back(w); U.push_back(w);
myCard++; myCard++;
@ -878,7 +767,6 @@ void dalgoDistEdgeApproxDomEdgesLinearSearchMesgBndlSmallMateCMP(
cout << "\n(" << myRank << ")Ghost is " << w << " Owner is: " << findOwnerOfGhost(w, verDistance, myRank, numProcs) << endl; cout << "\n(" << myRank << ")Ghost is " << w << " Owner is: " << findOwnerOfGhost(w, verDistance, myRank, numProcs) << endl;
fflush(stdout); fflush(stdout);
#endif #endif
// MPI_Bsend(&Message[0], 3, MilanMpiLongInt, findOwnerOfGhost(w, verDistance, myRank, numProcs),
ghostOwner = findOwnerOfGhost(w, verDistance, myRank, numProcs); ghostOwner = findOwnerOfGhost(w, verDistance, myRank, numProcs);
assert(ghostOwner != -1); assert(ghostOwner != -1);
assert(ghostOwner != myRank); assert(ghostOwner != myRank);

@ -11,21 +11,23 @@
*/ */
inline MilanLongInt firstComputeCandidateMate(MilanLongInt adj1, inline MilanLongInt firstComputeCandidateMate(MilanLongInt adj1,
MilanLongInt adj2, MilanLongInt adj2,
MilanLongInt* verLocInd, MilanLongInt *verLocInd,
MilanReal* edgeLocWeight) MilanReal *edgeLocWeight)
{ {
MilanInt w = -1; MilanInt w = -1;
MilanReal heaviestEdgeWt = MilanRealMin; //Assign the smallest Value possible first LDBL_MIN MilanReal heaviestEdgeWt = MilanRealMin; // Assign the smallest Value possible first LDBL_MIN
int finalK; int finalK;
for (int k = adj1; k < adj2; k++) { for (int k = adj1; k < adj2; k++)
{
if ((edgeLocWeight[k] > heaviestEdgeWt) || if ((edgeLocWeight[k] > heaviestEdgeWt) ||
((edgeLocWeight[k] == heaviestEdgeWt) && (w < verLocInd[k]))) { ((edgeLocWeight[k] == heaviestEdgeWt) && (w < verLocInd[k])))
{
heaviestEdgeWt = edgeLocWeight[k]; heaviestEdgeWt = edgeLocWeight[k];
w = verLocInd[k]; w = verLocInd[k];
finalK = k; finalK = k;
} }
} //End of for loop } // End of for loop
return finalK; return finalK;
} }
@ -45,25 +47,32 @@ inline MilanLongInt firstComputeCandidateMate(MilanLongInt adj1,
*/ */
inline MilanLongInt computeCandidateMate(MilanLongInt adj1, inline MilanLongInt computeCandidateMate(MilanLongInt adj1,
MilanLongInt adj2, MilanLongInt adj2,
MilanReal* edgeLocWeight, MilanReal *edgeLocWeight,
MilanLongInt k, MilanLongInt k,
MilanLongInt* verLocInd, MilanLongInt *verLocInd,
MilanLongInt StartIndex, MilanLongInt StartIndex,
MilanLongInt EndIndex, MilanLongInt EndIndex,
vector <MilanLongInt>& GMate, vector<MilanLongInt> &GMate,
MilanLongInt* Mate, MilanLongInt *Mate,
map <MilanLongInt, MilanLongInt>& Ghost2LocalMap) map<MilanLongInt, MilanLongInt> &Ghost2LocalMap)
{ {
// Start: PARALLEL_COMPUTE_CANDIDATE_MATE_B(v)
MilanInt w = -1; MilanInt w = -1;
MilanReal heaviestEdgeWt = MilanRealMin; //Assign the smallest Value possible first LDBL_MIN MilanReal heaviestEdgeWt = MilanRealMin; // Assign the smallest Value possible first LDBL_MIN
for (k = adj1; k < adj2; k++) { for (k = adj1; k < adj2; k++)
if (isAlreadyMatched(verLocInd[k], StartIndex, EndIndex, GMate, Mate, Ghost2LocalMap)) continue; {
if (isAlreadyMatched(verLocInd[k], StartIndex, EndIndex, GMate, Mate, Ghost2LocalMap))
continue;
if ((edgeLocWeight[k] > heaviestEdgeWt) || if ((edgeLocWeight[k] > heaviestEdgeWt) ||
((edgeLocWeight[k] == heaviestEdgeWt) && (w < verLocInd[k]))) { ((edgeLocWeight[k] == heaviestEdgeWt) && (w < verLocInd[k])))
{
heaviestEdgeWt = edgeLocWeight[k]; heaviestEdgeWt = edgeLocWeight[k];
w = verLocInd[k]; w = verLocInd[k];
} }
} //End of for loop } // End of for loop
// End: PARALLEL_COMPUTE_CANDIDATE_MATE_B(v)
return w; return w;
} }

@ -18,7 +18,7 @@ inline void PARALLEL_COMPUTE_CANDIDATE_MATE_B(MilanLongInt NLVer,
MilanLongInt v = -1; MilanLongInt v = -1;
#pragma omp parallel private(v) default(shared) num_threads(4) #pragma omp parallel private(v) default(shared) num_threads(NUM_THREAD)
{ {
#pragma omp for schedule(static) #pragma omp for schedule(static)

@ -41,17 +41,12 @@ inline void PARALLEL_PROCESS_EXPOSED_VERTEX_B(MilanLongInt NLVer,
staticQueue &privateQOwner) staticQueue &privateQOwner)
{ {
//TODO define all the constants in a single place!
const MilanLongInt REQUEST = 1;
const MilanLongInt SUCCESS = 2;
const MilanLongInt FAILURE = 3;
const MilanLongInt SIZEINFO = 4;
MilanLongInt v = -1, k = -1, w = -1, adj11 = 0, adj12 = 0, k1 = 0, S = *SPtr; MilanLongInt v = -1, k = -1, w = -1, adj11 = 0, adj12 = 0, k1 = 0, S = *SPtr;
MilanLongInt myCard = 0, msgInd = 0; MilanLongInt myCard = 0, msgInd = 0;
MilanLongInt NumMessagesBundled = 0; MilanLongInt NumMessagesBundled = 0;
MilanInt ghostOwner = 0; MilanInt ghostOwner = 0;
#pragma omp parallel private(k, w, v, k1, adj11, adj12, ghostOwner) firstprivate(privateU, StartIndex, EndIndex, privateQLocalVtx, privateQGhostVtx, privateQMsgType, privateQOwner) default(shared) num_threads(4) #pragma omp parallel private(k, w, v, k1, adj11, adj12, ghostOwner) firstprivate(privateU, StartIndex, EndIndex, privateQLocalVtx, privateQGhostVtx, privateQMsgType, privateQOwner) default(shared) num_threads(NUM_THREAD)
{ {
#pragma omp for reduction(+ \ #pragma omp for reduction(+ \
: msgInd, NumMessagesBundled, myCard, PCounter[:numProcs]) schedule(static) : msgInd, NumMessagesBundled, myCard, PCounter[:numProcs]) schedule(static)

@ -43,11 +43,6 @@ inline void processMatchedVertices(
staticQueue &privateQOwner) staticQueue &privateQOwner)
{ {
// TODO define all the constants in a single place!
const MilanLongInt REQUEST = 1;
const MilanLongInt SUCCESS = 2;
const MilanLongInt FAILURE = 3;
const MilanLongInt SIZEINFO = 4;
MilanLongInt adj1, adj2, adj11, adj12, k, k1, v = -1, w = -1, ghostOwner; MilanLongInt adj1, adj2, adj11, adj12, k, k1, v = -1, w = -1, ghostOwner;
MilanLongInt myCard = *myCardPtr, msgInd = *msgIndPtr, NumMessagesBundled = *NumMessagesBundledPtr, S = *SPtr, privateMyCard = 0; MilanLongInt myCard = *myCardPtr, msgInd = *msgIndPtr, NumMessagesBundled = *NumMessagesBundledPtr, S = *SPtr, privateMyCard = 0;

@ -0,0 +1,130 @@
#include "MatchBoxPC.h"
#include <stdio.h>
#include <iostream>
#include <map>
#include <vector>
#include "primitiveDataTypeDefinitions.h"
#include "dataStrStaticQueue.h"
#include "omp.h"
inline void processMessages(int error_codeC,
MilanInt numProcs,
MilanInt myRank,
int ComputeTag,
int BundleTag,
MPI_Comm comm,
vector<MilanLongInt> &Message,
char *error_message,
int message_length,
vector<MilanLongInt> &ReceiveBuffer,
MilanLongInt *BundleSizePtr)
{
MilanInt Sender;
MPI_Status computeStatus;
MilanLongInt bundleSize = *BundleSizePtr;
#ifdef PRINT_DEBUG_INFO_
cout
<< "\n(" << myRank << "=========================************===============================" << endl;
fflush(stdout);
fflush(stdout);
#endif
#ifdef PRINT_DEBUG_INFO_
cout << "\n(" << myRank << ")About to begin Message processing phase ... S=" << S << endl;
fflush(stdout);
#endif
#ifdef PRINT_DEBUG_INFO_
cout << "\n(" << myRank << "=========================************===============================" << endl;
fflush(stdout);
fflush(stdout);
#endif
// BLOCKING RECEIVE:
#ifdef PRINT_DEBUG_INFO_
cout << "\n(" << myRank << " Waiting for blocking receive..." << endl;
fflush(stdout);
fflush(stdout);
#endif
error_codeC = MPI_Recv(&Message[0], 3, TypeMap<MilanLongInt>(), MPI_ANY_SOURCE, ComputeTag, comm, &computeStatus);
if (error_codeC != MPI_SUCCESS)
{
MPI_Error_string(error_codeC, error_message, &message_length);
cout << "\n*Error in call to MPI_Receive on Slave: " << error_message << "\n";
fflush(stdout);
}
Sender = computeStatus.MPI_SOURCE;
#ifdef PRINT_DEBUG_INFO_
cout << "\n(" << myRank << ")Received message from Process " << Sender << " Type= " << Message[2] << endl;
fflush(stdout);
#endif
if (Message[2] == SIZEINFO)
{
#ifdef PRINT_DEBUG_INFO_
cout << "\n(" << myRank << ")Received bundled message from Process " << Sender << " Size= " << Message[0] << endl;
fflush(stdout);
#endif
bundleSize = Message[0]; //#of integers in the message
// Build the Message Buffer:
if (!ReceiveBuffer.empty())
ReceiveBuffer.clear(); // Empty it out first
ReceiveBuffer.resize(bundleSize, -1); // Initialize
#ifdef PRINT_DEBUG_INFO_
cout << "\n(" << myRank << ")Message Bundle Before: " << endl;
for (i = 0; i < bundleSize; i++)
cout << ReceiveBuffer[i] << ",";
cout << endl;
fflush(stdout);
#endif
// Receive the message
error_codeC = MPI_Recv(&ReceiveBuffer[0], bundleSize, TypeMap<MilanLongInt>(), Sender, BundleTag, comm, &computeStatus);
if (error_codeC != MPI_SUCCESS)
{
MPI_Error_string(error_codeC, error_message, &message_length);
cout << "\n*Error in call to MPI_Receive on processor " << myRank << " Error: " << error_message << "\n";
fflush(stdout);
}
#ifdef PRINT_DEBUG_INFO_
cout << "\n(" << myRank << ")Message Bundle After: " << endl;
for (i = 0; i < bundleSize; i++)
cout << ReceiveBuffer[i] << ",";
cout << endl;
fflush(stdout);
#endif
}
else
{ // Just a single message:
#ifdef PRINT_DEBUG_INFO_
cout << "\n(" << myRank << ")Received regular message from Process " << Sender << " u= " << Message[0] << " v= " << Message[1] << endl;
fflush(stdout);
#endif
// Add the current message to Queue:
bundleSize = 3; //#of integers in the message
// Build the Message Buffer:
if (!ReceiveBuffer.empty())
ReceiveBuffer.clear(); // Empty it out first
ReceiveBuffer.resize(bundleSize, -1); // Initialize
ReceiveBuffer[0] = Message[0]; // u
ReceiveBuffer[1] = Message[1]; // v
ReceiveBuffer[2] = Message[2]; // message_type
}
#ifdef DEBUG_GHOST_
if ((v < StartIndex) || (v > EndIndex))
{
cout << "\n(" << myRank << ") From ReceiveBuffer: This should not happen: u= " << u << " v= " << v << " Type= " << message_type << " StartIndex " << StartIndex << " EndIndex " << EndIndex << endl;
fflush(stdout);
}
#endif
#ifdef PRINT_DEBUG_INFO_
cout << "\n(" << myRank << ")Processing message: u= " << u << " v= " << v << " Type= " << message_type << endl;
fflush(stdout);
#endif
*BundleSizePtr = bundleSize;
return;
}
Loading…
Cancel
Save