processMatchedVerticess add send messages with error

omp-walther
StefanoPetrilli 2 years ago
parent 22d9baf296
commit cdf92ea2b2

@ -76,6 +76,8 @@ const MilanLongInt SIZEINFO = 4;
const int ComputeTag = 7; // Predefined tag const int ComputeTag = 7; // Predefined tag
const int BundleTag = 9; // Predefined tag const int BundleTag = 9; // Predefined tag
static vector<MilanLongInt> DEFAULT_VECTOR;
// MPI type map // MPI type map
template <typename T> template <typename T>
MPI_Datatype TypeMap(); MPI_Datatype TypeMap();
@ -320,7 +322,12 @@ extern "C"
staticQueue &privateQLocalVtx, staticQueue &privateQLocalVtx,
staticQueue &privateQGhostVtx, staticQueue &privateQGhostVtx,
staticQueue &privateQMsgType, staticQueue &privateQMsgType,
staticQueue &privateQOwner); staticQueue &privateQOwner,
bool sendMessages = false,
MPI_Comm comm = NULL,
MilanLongInt *msgActual = nullptr,
MilanLongInt *msgInd = nullptr,
vector<MilanLongInt> &Message = DEFAULT_VECTOR);
void sendBundledMessages(MilanLongInt *numGhostEdgesPtr, void sendBundledMessages(MilanLongInt *numGhostEdgesPtr,
MilanInt *BufferSizePtr, MilanInt *BufferSizePtr,

@ -368,9 +368,51 @@ void dalgoDistEdgeApproxDomEdgesLinearSearchMesgBndlSmallMateCMP(
/////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////
/////////////////////////// PROCESS MATCHED VERTICES ////////////////////////////// /////////////////////////// PROCESS MATCHED VERTICES //////////////////////////////
/////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////
while (/*!Q.empty()*/ !U.empty()) ///*
//#define error
#ifdef error
processMatchedVertices(NLVer,
UChunkBeingProcessed,
U,
privateU,
StartIndex,
EndIndex,
&myCard,
&msgInd,
&NumMessagesBundled,
&S,
verLocPtr,
verLocInd,
verDistance,
PCounter,
Counter,
myRank,
numProcs,
candidateMate,
GMate,
Mate,
Ghost2LocalMap,
edgeLocWeight,
QLocalVtx,
QGhostVtx,
QMsgType,
QOwner,
privateQLocalVtx,
privateQGhostVtx,
privateQMsgType,
privateQOwner,
true,
comm,
&msgActual,
&msgInd,
Message);
#endif
#ifndef error
while (!U.empty())
{ {
// Q.pop_front();
u = U.pop_front(); // Get an element from the queue u = U.pop_front(); // Get an element from the queue
#ifdef PRINT_DEBUG_INFO_ #ifdef PRINT_DEBUG_INFO_
cout << "\n(" << myRank << ")u: " << u; cout << "\n(" << myRank << ")u: " << u;
@ -386,7 +428,7 @@ void dalgoDistEdgeApproxDomEdgesLinearSearchMesgBndlSmallMateCMP(
v = verLocInd[k]; v = verLocInd[k];
if ((v >= StartIndex) && (v <= EndIndex)) if ((v >= StartIndex) && (v <= EndIndex))
{ // v is a Local Vertex: { // v is a Local Vertex:
//if (Mate[v - StartIndex] >= 0) // v is already matched // if (Mate[v - StartIndex] >= 0) // v is already matched
// continue; // continue;
#ifdef PRINT_DEBUG_INFO_ #ifdef PRINT_DEBUG_INFO_
cout << "\n(" << myRank << ")v: " << v << " c(v)= " << candidateMate[v - StartIndex] << " Mate[v]: " << Mate[v]; cout << "\n(" << myRank << ")v: " << v << " c(v)= " << candidateMate[v - StartIndex] << " Mate[v]: " << Mate[v];
@ -500,7 +542,7 @@ void dalgoDistEdgeApproxDomEdgesLinearSearchMesgBndlSmallMateCMP(
} // End of Else: w == -1 } // End of Else: w == -1
// End: PARALLEL_PROCESS_EXPOSED_VERTEX_B(v) // End: PARALLEL_PROCESS_EXPOSED_VERTEX_B(v)
} // End of If (candidateMate[v-StartIndex] == u) } // End of If (candidateMate[v-StartIndex] == u)
} } // if (Mate[v - StartIndex] < 0)
} // End of if ( (v >= StartIndex) && (v <= EndIndex) ) //If Local Vertex: } // End of if ( (v >= StartIndex) && (v <= EndIndex) ) //If Local Vertex:
else else
{ // Neighbor v is a ghost vertex { // Neighbor v is a ghost vertex
@ -536,7 +578,9 @@ void dalgoDistEdgeApproxDomEdgesLinearSearchMesgBndlSmallMateCMP(
} // End of Else //A Ghost Vertex } // End of Else //A Ghost Vertex
} // End of For Loop adj(u) } // End of For Loop adj(u)
} // End of if ( (u >= StartIndex) && (u <= EndIndex) ) //Process Only If a Local Vertex } // End of if ( (u >= StartIndex) && (u <= EndIndex) ) //Process Only If a Local Vertex
} // End of while ( /*!Q.empty()*/ !U.empty() ) } // End of while ( !U.empty() )
#endif
///////////////////////// END OF PROCESS MATCHED VERTICES ///////////////////////// ///////////////////////// END OF PROCESS MATCHED VERTICES /////////////////////////
//// BREAK IF NO MESSAGES EXPECTED ///////// //// BREAK IF NO MESSAGES EXPECTED /////////

@ -32,7 +32,12 @@ void processMatchedVertices(
staticQueue &privateQLocalVtx, staticQueue &privateQLocalVtx,
staticQueue &privateQGhostVtx, staticQueue &privateQGhostVtx,
staticQueue &privateQMsgType, staticQueue &privateQMsgType,
staticQueue &privateQOwner) staticQueue &privateQOwner,
bool sendMessages,
MPI_Comm comm,
MilanLongInt *msgActual,
MilanLongInt *msgInd,
vector<MilanLongInt> &Message)
{ {
MilanLongInt adj1, adj2, adj11, adj12, k, k1, v = -1, w = -1, ghostOwner; MilanLongInt adj1, adj2, adj11, adj12, k, k1, v = -1, w = -1, ghostOwner;
@ -48,7 +53,7 @@ void processMatchedVertices(
#ifdef COUNT_LOCAL_VERTEX #ifdef COUNT_LOCAL_VERTEX
MilanLongInt localVertices = 0; MilanLongInt localVertices = 0;
#endif #endif
#pragma omp parallel private(k, w, v, k1, adj1, adj2, adj11, adj12, ghostOwner, option) firstprivate(privateU, StartIndex, EndIndex, privateQLocalVtx, privateQGhostVtx, privateQMsgType, privateQOwner, UChunkBeingProcessed) default(shared) num_threads(NUM_THREAD) #pragma omp parallel private(k, w, v, k1, adj1, adj2, adj11, adj12, ghostOwner, option) firstprivate(Message, privateU, StartIndex, EndIndex, privateQLocalVtx, privateQGhostVtx, privateQMsgType, privateQOwner, UChunkBeingProcessed) default(shared) num_threads(NUM_THREAD)
{ {
while (!U.empty()) while (!U.empty())
@ -144,15 +149,13 @@ void processMatchedVertices(
#endif #endif
} // End of if(CandidateMate(w) = v } // End of if(CandidateMate(w) = v
} // End of Else } // End of Else
} // End of if(w >=0) } // End of if(w >=0)
else else
option = 4; // End of Else: w == -1 option = 4; // End of Else: w == -1
// End: PARALLEL_PROCESS_EXPOSED_VERTEX_B(v) // End: PARALLEL_PROCESS_EXPOSED_VERTEX_B(v)
}
} // End of task
} // End of If (candidateMate[v-StartIndex] == u } // End of If (candidateMate[v-StartIndex] == u
} // End of task
} // mateval < 0
} // End of if ( (v >= StartIndex) && (v <= EndIndex) ) //If Local Vertex: } // End of if ( (v >= StartIndex) && (v <= EndIndex) ) //If Local Vertex:
else else
{ // Neighbor is a ghost vertex { // Neighbor is a ghost vertex
@ -184,16 +187,36 @@ void processMatchedVertices(
// Decrement the counter: // Decrement the counter:
PROCESS_CROSS_EDGE(&Counter[Ghost2LocalMap[w]], SPtr); PROCESS_CROSS_EDGE(&Counter[Ghost2LocalMap[w]], SPtr);
case 2: case 2:
// Found a dominating edge, it is a ghost // Found a dominating edge, it is a ghost
ghostOwner = findOwnerOfGhost(w, verDistance, myRank, numProcs); ghostOwner = findOwnerOfGhost(w, verDistance, myRank, numProcs);
assert(ghostOwner != -1); assert(ghostOwner != -1);
assert(ghostOwner != myRank); assert(ghostOwner != myRank);
if (sendMessages)
{
// Build the Message Packet:
Message[0] = v; // LOCAL
Message[1] = w; // GHOST
Message[2] = REQUEST; // TYPE
// Send a Request (Asynchronous)
//#pragma omp master
// {
MPI_Bsend(&Message[0], 3, TypeMap<MilanLongInt>(), ghostOwner, ComputeTag, comm);
// }
#pragma omp atomic #pragma omp atomic
PCounter[ghostOwner]++; (*msgActual)++;
}
else
{
#pragma omp atomic #pragma omp atomic
(*msgIndPtr)++; PCounter[ghostOwner]++;
#pragma omp atomic #pragma omp atomic
(*NumMessagesBundledPtr)++; (*NumMessagesBundledPtr)++;
}
#pragma omp atomic
(*msgIndPtr)++;
privateQLocalVtx.push_back(v); privateQLocalVtx.push_back(v);
privateQGhostVtx.push_back(w); privateQGhostVtx.push_back(w);
privateQMsgType.push_back(REQUEST); privateQMsgType.push_back(REQUEST);
@ -224,12 +247,30 @@ void processMatchedVertices(
ghostOwner = findOwnerOfGhost(w, verDistance, myRank, numProcs); ghostOwner = findOwnerOfGhost(w, verDistance, myRank, numProcs);
assert(ghostOwner != -1); assert(ghostOwner != -1);
assert(ghostOwner != myRank); assert(ghostOwner != myRank);
if (sendMessages)
{
// Build the Message Packet:
Message[0] = v; // LOCAL
Message[1] = w; // GHOST
Message[2] = FAILURE; // TYPE
// Send a Request (Asynchronous)
//#pragma omp master
// {
MPI_Bsend(&Message[0], 3, TypeMap<MilanLongInt>(), ghostOwner, ComputeTag, comm);
// }
#pragma omp atomic #pragma omp atomic
PCounter[ghostOwner]++; (*msgActual)++;
}
else
{
#pragma omp atomic #pragma omp atomic
(*msgIndPtr)++; PCounter[ghostOwner]++;
#pragma omp atomic #pragma omp atomic
(*NumMessagesBundledPtr)++; (*NumMessagesBundledPtr)++;
}
#pragma omp atomic
(*msgIndPtr)++;
privateQLocalVtx.push_back(v); privateQLocalVtx.push_back(v);
privateQGhostVtx.push_back(w); privateQGhostVtx.push_back(w);
@ -239,6 +280,7 @@ void processMatchedVertices(
} // End of if(GHOST) } // End of if(GHOST)
} // End of for loop } // End of for loop
break; break;
case 5:
default: default:
#ifdef PRINT_DEBUG_INFO_ #ifdef PRINT_DEBUG_INFO_
@ -250,12 +292,32 @@ void processMatchedVertices(
ghostOwner = findOwnerOfGhost(v, verDistance, myRank, numProcs); ghostOwner = findOwnerOfGhost(v, verDistance, myRank, numProcs);
assert(ghostOwner != -1); assert(ghostOwner != -1);
assert(ghostOwner != myRank); assert(ghostOwner != myRank);
if (sendMessages)
{
// Build the Message Packet:
Message[0] = u; // LOCAL
Message[1] = v; // GHOST
Message[2] = SUCCESS; // TYPE
// Send a Request (Asynchronous)
//#pragma omp master
// {
MPI_Bsend(&Message[0], 3, TypeMap<MilanLongInt>(), ghostOwner, ComputeTag, comm);
// }
#pragma omp atomic
(*msgActual)++;
}
else
{
#pragma omp atomic
(*NumMessagesBundledPtr)++;
#pragma omp atomic #pragma omp atomic
PCounter[ghostOwner]++; PCounter[ghostOwner]++;
}
#pragma omp atomic #pragma omp atomic
(*msgIndPtr)++; (*msgIndPtr)++;
#pragma omp atomic
(*NumMessagesBundledPtr)++;
privateQLocalVtx.push_back(u); privateQLocalVtx.push_back(u);
privateQGhostVtx.push_back(v); privateQGhostVtx.push_back(v);
privateQMsgType.push_back(SUCCESS); privateQMsgType.push_back(SUCCESS);

@ -104,7 +104,7 @@ void processMessages(
ReceiveBuffer.resize(bundleSize, -1); // Initialize ReceiveBuffer.resize(bundleSize, -1); // Initialize
#ifdef PRINT_DEBUG_INFO_ #ifdef PRINT_DEBUG_INFO_
cout << "\n(" << myRank << ")Message Bundle Before: " << endl; cout << "\n(" << myRank << ")Message Bundle Before: " << endl;
for (i = 0; i < bundleSize; i++) for (int i = 0; i < bundleSize; i++)
cout << ReceiveBuffer[i] << ","; cout << ReceiveBuffer[i] << ",";
cout << endl; cout << endl;
fflush(stdout); fflush(stdout);
@ -119,7 +119,7 @@ void processMessages(
} }
#ifdef PRINT_DEBUG_INFO_ #ifdef PRINT_DEBUG_INFO_
cout << "\n(" << myRank << ")Message Bundle After: " << endl; cout << "\n(" << myRank << ")Message Bundle After: " << endl;
for (i = 0; i < bundleSize; i++) for (int i = 0; i < bundleSize; i++)
cout << ReceiveBuffer[i] << ","; cout << ReceiveBuffer[i] << ",";
cout << endl; cout << endl;
fflush(stdout); fflush(stdout);

@ -9,7 +9,7 @@ void sendBundledMessages(MilanLongInt *numGhostEdges,
MilanLongInt *PCounter, MilanLongInt *PCounter,
MilanLongInt NumMessagesBundled, MilanLongInt NumMessagesBundled,
MilanLongInt *msgActual, MilanLongInt *msgActual,
MilanLongInt *MessageIndex, MilanLongInt *msgInd,
MilanInt numProcs, MilanInt numProcs,
MilanInt myRank, MilanInt myRank,
MPI_Comm comm, MPI_Comm comm,
@ -105,7 +105,7 @@ PSizeInfoMessages.resize(numProcs * 3, 0);
// Send the Messages // Send the Messages
#pragma omp task depend(inout \ #pragma omp task depend(inout \
: SRequest, PSizeInfoMessages, PCumulative) depend(out \ : SRequest, PSizeInfoMessages, PCumulative) depend(out \
: *msgActual, *MessageIndex) : *msgActual, *msgInd)
{ {
for (i = 0; i < numProcs; i++) for (i = 0; i < numProcs; i++)
{ // Changed by Fabio to be an integer, addresses needs to be integers! { // Changed by Fabio to be an integer, addresses needs to be integers!
@ -124,9 +124,9 @@ PSizeInfoMessages.resize(numProcs * 3, 0);
if (PSizeInfoMessages[i * 3 + 0] > 0) if (PSizeInfoMessages[i * 3 + 0] > 0)
{ // Send only if it is a nonempty packet { // Send only if it is a nonempty packet
MPI_Isend(&PSizeInfoMessages[i * 3 + 0], 3, TypeMap<MilanLongInt>(), i, ComputeTag, comm, MPI_Isend(&PSizeInfoMessages[i * 3 + 0], 3, TypeMap<MilanLongInt>(), i, ComputeTag, comm,
&SRequest[(*MessageIndex)]); &SRequest[(*msgInd)]);
(*msgActual)++; (*msgActual)++;
(*MessageIndex)++; (*msgInd)++;
// Now Send the message with the data packet: // Now Send the message with the data packet:
#ifdef PRINT_DEBUG_INFO_ #ifdef PRINT_DEBUG_INFO_
cout << "\n(" << myRank << ")SendiFFng Bundle to : " << i << endl; cout << "\n(" << myRank << ")SendiFFng Bundle to : " << i << endl;
@ -136,8 +136,8 @@ PSizeInfoMessages.resize(numProcs * 3, 0);
fflush(stdout); fflush(stdout);
#endif #endif
MPI_Isend(&PMessageBundle[PCumulative[i] * 3], PSizeInfoMessages[i * 3 + 0], MPI_Isend(&PMessageBundle[PCumulative[i] * 3], PSizeInfoMessages[i * 3 + 0],
TypeMap<MilanLongInt>(), i, BundleTag, comm, &SRequest[(*MessageIndex)]); TypeMap<MilanLongInt>(), i, BundleTag, comm, &SRequest[(*msgInd)]);
(*MessageIndex)++; (*msgInd)++;
} // End of if size > 0 } // End of if size > 0
} }
} }
@ -207,9 +207,4 @@ PSizeInfoMessages.resize(numProcs * 3, 0);
} }
} }
} }
//*MessageIndexPtr = MessageIndex;
//*msgActualPtr = msgActual;
//*numGhostEdgesPtr = numGhostEdges;
//*BufferSizePtr = BufferSize;
} }

@ -0,0 +1 @@
!<arch>
Loading…
Cancel
Save