You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
amg4psblas/amgprec/impl/aggregator/processMessages.cpp

316 lines
12 KiB
C++

#include "MatchBoxPC.h"
//#define DEBUG_HANG_
void processMessages(
MilanLongInt NLVer,
MilanLongInt *Mate,
MilanLongInt *candidateMate,
map<MilanLongInt, MilanLongInt> &Ghost2LocalMap,
vector<MilanLongInt> &GMate,
vector<MilanLongInt> &Counter,
MilanLongInt StartIndex,
MilanLongInt EndIndex,
MilanLongInt *myCard,
MilanLongInt *msgInd,
MilanLongInt *msgActual,
MilanReal *edgeLocWeight,
MilanLongInt *verDistance,
MilanLongInt *verLocPtr,
MilanLongInt k,
MilanLongInt *verLocInd,
MilanInt numProcs,
MilanInt myRank,
MPI_Comm comm,
vector<MilanLongInt> &Message,
MilanLongInt numGhostEdges,
MilanLongInt u,
MilanLongInt v,
MilanLongInt *S,
vector<MilanLongInt> &U)
{
//#define PRINT_DEBUG_INFO_
MilanInt Sender;
MPI_Status computeStatus;
MilanLongInt bundleSize, w;
MilanLongInt adj11, adj12, k1;
MilanLongInt ghostOwner;
int error_codeC;
error_codeC = MPI_Comm_set_errhandler(MPI_COMM_WORLD, MPI_ERRORS_RETURN);
char error_message[MPI_MAX_ERROR_STRING];
int message_length;
MilanLongInt message_type = 0;
// Buffer to receive bundled messages
// Maximum messages that can be received from any processor is
// twice the edge cut: REQUEST; REQUEST+(FAILURE/SUCCESS)
vector<MilanLongInt> ReceiveBuffer;
try
{
ReceiveBuffer.reserve(numGhostEdges * 2 * 3); // Three integers per cross edge
}
catch (length_error)
{
cout << "Error in function algoDistEdgeApproxDominatingEdgesMessageBundling: \n";
cout << "Not enough memory to allocate the internal variables \n";
exit(1);
}
#ifdef PRINT_DEBUG_INFO_
cout
<< "\n(" << myRank << "=========================************===============================" << endl;
fflush(stdout);
fflush(stdout);
#endif
#ifdef PRINT_DEBUG_INFO_
cout << "\n(" << myRank << ")About to begin Message processing phase ... *S=" << *S << endl;
fflush(stdout);
#endif
#ifdef PRINT_DEBUG_INFO_
cout << "\n(" << myRank << "=========================************===============================" << endl;
fflush(stdout);
fflush(stdout);
#endif
// BLOCKING RECEIVE:
#ifdef PRINT_DEBUG_INFO_
cout << "\n(" << myRank << " Waiting for blocking receive..." << endl;
fflush(stdout);
fflush(stdout);
#endif
//cout << myRank<<" Receiving ...";
error_codeC = MPI_Recv(&Message[0], 3, TypeMap<MilanLongInt>(), MPI_ANY_SOURCE, ComputeTag, comm, &computeStatus);
if (error_codeC != MPI_SUCCESS)
{
MPI_Error_string(error_codeC, error_message, &message_length);
cout << "\n*Error in call to MPI_Receive on Slave: " << error_message << "\n";
fflush(stdout);
}
Sender = computeStatus.MPI_SOURCE;
//cout << " ...from "<<Sender << endl;
#ifdef PRINT_DEBUG_INFO_
cout << "\n(" << myRank << ")Received message from Process " << Sender << " Type= " << Message[2] << endl;
fflush(stdout);
#endif
if (Message[2] == SIZEINFO) {
#ifdef PRINT_DEBUG_INFO_
cout << "\n(" << myRank << ")Received bundled message from Process " << Sender << " Size= " << Message[0] << endl;
fflush(stdout);
#endif
bundleSize = Message[0]; //#of integers in the message
// Build the Message Buffer:
if (!ReceiveBuffer.empty())
ReceiveBuffer.clear(); // Empty it out first
ReceiveBuffer.resize(bundleSize, -1); // Initialize
#ifdef PRINT_DEBUG_INFO_
cout << "\n(" << myRank << ")Message Bundle Before: " << endl;
for (int i = 0; i < bundleSize; i++)
cout << ReceiveBuffer[i] << ",";
cout << endl;
fflush(stdout);
#endif
// Receive the message
//cout << myRank<<" Receiving from "<<Sender<<endl;
error_codeC = MPI_Recv(&ReceiveBuffer[0], bundleSize, TypeMap<MilanLongInt>(), Sender, BundleTag, comm, &computeStatus);
if (error_codeC != MPI_SUCCESS) {
MPI_Error_string(error_codeC, error_message, &message_length);
cout << "\n*Error in call to MPI_Receive on processor " << myRank << " Error: " << error_message << "\n";
fflush(stdout);
}
#ifdef PRINT_DEBUG_INFO_
cout << "\n(" << myRank << ")Message Bundle After: " << endl;
for (int i = 0; i < bundleSize; i++)
cout << ReceiveBuffer[i] << ",";
cout << endl;
fflush(stdout);
#endif
} else { // Just a single message:
#ifdef PRINT_DEBUG_INFO_
cout << "\n(" << myRank << ")Received regular message from Process " << Sender << " u= " << Message[0] << " v= " << Message[1] << endl;
fflush(stdout);
#endif
// Add the current message to Queue:
bundleSize = 3; //#of integers in the message
// Build the Message Buffer:
if (!ReceiveBuffer.empty())
ReceiveBuffer.clear(); // Empty it out first
ReceiveBuffer.resize(bundleSize, -1); // Initialize
ReceiveBuffer[0] = Message[0]; // u
ReceiveBuffer[1] = Message[1]; // v
ReceiveBuffer[2] = Message[2]; // message_type
}
#ifdef DEBUG_GHOST_
if ((v < StartIndex) || (v > EndIndex)) {
cout << "\n(" << myRank << ") From ReceiveBuffer: This should not happen: u= " << u << " v= " << v << " Type= " << message_type << " StartIndex " << StartIndex << " EndIndex " << EndIndex << endl;
fflush(stdout);
}
#endif
#ifdef PRINT_DEBUG_INFO_
cout << "\n(" << myRank << ")Processing message: u= " << u << " v= " << v << " Type= " << message_type << endl;
fflush(stdout);
#endif
// Most of the time bundleSize == 3, thus, it's not worth parallelizing thi loop
for (MilanLongInt bundleCounter = 3; bundleCounter < bundleSize + 3; bundleCounter += 3) {
u = ReceiveBuffer[bundleCounter - 3]; // GHOST
v = ReceiveBuffer[bundleCounter - 2]; // LOCAL
message_type = ReceiveBuffer[bundleCounter - 1]; // TYPE
// CASE I: REQUEST
if (message_type == REQUEST) {
#ifdef PRINT_DEBUG_INFO_
cout << "\n(" << myRank << ")Message type is REQUEST" << endl;
fflush(stdout);
#endif
#ifdef DEBUG_GHOST_
if ((v < 0) || (v < StartIndex) || ((v - StartIndex) > NLVer)) {
cout << "\n(" << myRank << ") case 1 Bad address " << v << " " << StartIndex << " " << v - StartIndex << " " << NLVer << endl;
fflush(stdout);
}
#endif
if (Mate[v - StartIndex] == -1) {
// Process only if not already matched (v is local)
candidateMate[NLVer + Ghost2LocalMap[u]] = v; // Set CandidateMate for the ghost
if (candidateMate[v - StartIndex] == u) {
GMate[Ghost2LocalMap[u]] = v; // u is ghost
Mate[v - StartIndex] = u; // v is local
U.push_back(v);
U.push_back(u);
(*myCard)++;
#ifdef PRINT_DEBUG_INFO_
cout << "\n(" << myRank << ")MATCH: (" << v << "," << u << ") " << endl;
fflush(stdout);
#endif
PROCESS_CROSS_EDGE(&Counter[Ghost2LocalMap[u]], S);
} // End of if ( candidateMate[v-StartIndex] == u )e
} // End of if ( Mate[v] == -1 )
} // End of REQUEST
else { // CASE II: SUCCESS
if (message_type == SUCCESS) {
#ifdef PRINT_DEBUG_INFO_
cout << "\n(" << myRank << ")Message type is SUCCESS" << endl;
fflush(stdout);
#endif
GMate[Ghost2LocalMap[u]] = EndIndex + 1; // Set a Dummy Mate to make sure that we do not (u is a ghost) process it again
PROCESS_CROSS_EDGE(&Counter[Ghost2LocalMap[u]], S);
#ifdef DEBUG_GHOST_
if ((v < 0) || (v < StartIndex) || ((v - StartIndex) > NLVer)) {
cout << "\n(" << myRank << ") case 2 Bad address " << v << " " << StartIndex << " " << v - StartIndex << " " << NLVer << endl;
fflush(stdout);
}
#endif
if (Mate[v - StartIndex] == -1) {
// Process only if not already matched ( v is local)
if (candidateMate[v - StartIndex] == u) {
// Start: PARALLEL_PROCESS_EXPOSED_VERTEX_B(v)
w = computeCandidateMate(verLocPtr[v - StartIndex], verLocPtr[v - StartIndex + 1], edgeLocWeight, k,
verLocInd, StartIndex, EndIndex, GMate, Mate, Ghost2LocalMap);
candidateMate[v - StartIndex] = w;
#ifdef PRINT_DEBUG_INFO_
cout << "\n(" << myRank << ")" << v << " Points to: " << w << endl;
fflush(stdout);
#endif
// If found a dominating edge:
if (w >= 0) {
if ((w < StartIndex) || (w > EndIndex)) {
// w is a ghost
// Build the Message Packet:
Message[0] = v; // LOCAL
Message[1] = w; // GHOST
Message[2] = REQUEST; // TYPE
// Send a Request (Asynchronous)
#ifdef PRINT_DEBUG_INFO_
cout << "\n(" << myRank << ")Sending a request message: ";
cout << "\n(" << myRank << ")Ghost is " << w << " Owner is: " << findOwnerOfGhost(w, verDistance, myRank, numProcs) << endl;
fflush(stdout);
#endif
ghostOwner = findOwnerOfGhost(w, verDistance, myRank, numProcs);
//assert(ghostOwner != -1);
//assert(ghostOwner != myRank);
//cout << myRank<<" Sending to "<<ghostOwner<<endl;
MPI_Bsend(&Message[0], 3, TypeMap<MilanLongInt>(), ghostOwner, ComputeTag, comm);
(*msgInd)++;
(*msgActual)++;
if (candidateMate[NLVer + Ghost2LocalMap[w]] == v) {
Mate[v - StartIndex] = w; // v is local
GMate[Ghost2LocalMap[w]] = v; // w is ghost
U.push_back(v);
U.push_back(w);
(*myCard)++;
#ifdef PRINT_DEBUG_INFO_
cout << "\n(" << myRank << ")MATCH: (" << v << "," << w << ") " << endl;
fflush(stdout);
#endif
PROCESS_CROSS_EDGE(&Counter[Ghost2LocalMap[w]], S);
} // End of if CandidateMate[w] = v
} // End of if a Ghost Vertex
else { // w is a local vertex
if (candidateMate[w - StartIndex] == v) {
Mate[v - StartIndex] = w; // v is local
Mate[w - StartIndex] = v; // w is local
// Q.push_back(u);
U.push_back(v);
U.push_back(w);
(*myCard)++;
#ifdef PRINT_DEBUG_INFO_
cout << "\n(" << myRank << ")MATCH: (" << v << "," << w << ") " << endl;
fflush(stdout);
#endif
} // End of if(CandidateMate(w) = v
} // End of Else
} // End of if(w >=0)
else { // No dominant edge found
adj11 = verLocPtr[v - StartIndex];
adj12 = verLocPtr[v - StartIndex + 1];
for (k1 = adj11; k1 < adj12; k1++) {
w = verLocInd[k1];
if ((w < StartIndex) || (w > EndIndex)) {
// A ghost
// Build the Message Packet:
Message[0] = v; // LOCAL
Message[1] = w; // GHOST
Message[2] = FAILURE; // TYPE
// Send a Request (Asynchronous)
#ifdef PRINT_DEBUG_INFO_
cout << "\n(" << myRank << ")Sending a failure message: ";
cout << "\n(" << myRank << ")Ghost is " << w << " Owner is: " << findOwnerOfGhost(w, verDistance, myRank, numProcs) << endl;
fflush(stdout);
#endif
ghostOwner = findOwnerOfGhost(w, verDistance, myRank, numProcs);
//assert(ghostOwner != -1);
//assert(ghostOwner != myRank);
//cout << myRank<<" Sending to "<<ghostOwner<<endl;
MPI_Bsend(&Message[0], 3, TypeMap<MilanLongInt>(), ghostOwner, ComputeTag, comm);
(*msgInd)++;
(*msgActual)++;
} // End of if(GHOST)
} // End of for loop
} // End of Else: w == -1
// End: PARALLEL_PROCESS_EXPOSED_VERTEX_B(v)
} // End of if ( candidateMate[v-StartIndex] == u )
} // End of if ( Mate[v] == -1 )
} // End of if ( message_type == SUCCESS )
else {
// CASE III: FAILURE
#ifdef PRINT_DEBUG_INFO_
cout << "\n(" << myRank << ")Message type is FAILURE" << endl;
fflush(stdout);
#endif
GMate[Ghost2LocalMap[u]] = EndIndex + 1; // Set a Dummy Mate to make sure that we do not (u is a ghost) process this anymore
PROCESS_CROSS_EDGE(&Counter[Ghost2LocalMap[u]], S); // Decrease the counter
} // End of else: CASE III
} // End of else: CASE I
}
return;
}