#include "MatchBoxPC.h" void sendBundledMessages(MilanLongInt *numGhostEdgesPtr, MilanInt *BufferSizePtr, MilanLongInt *Buffer, vector &PCumulative, vector &PMessageBundle, vector &PSizeInfoMessages, MilanLongInt *PCounter, MilanLongInt NumMessagesBundled, MilanLongInt *msgActualPtr, MilanLongInt *MessageIndexPtr, MilanInt numProcs, MilanInt myRank, int ComputeTag, int BundleTag, MPI_Comm comm, vector &QLocalVtx, vector &QGhostVtx, vector &QMsgType, vector &QOwner, vector &SRequest, vector &SStatus) { MilanLongInt myIndex = 0, msgActual = *msgActualPtr, MessageIndex = *MessageIndexPtr, numGhostEdges = *numGhostEdgesPtr, numMessagesToSend; const MilanLongInt SIZEINFO = 4; MilanInt i = 0, OneMessageSize = 0, BufferSize = *BufferSizePtr; #ifdef DEBUG_HANG_ if (myRank == 0) cout << "\n(" << myRank << ") Send Bundles" << endl; fflush(stdout); #endif #pragma omp parallel private(i) default(shared) num_threads(NUM_THREAD) { #pragma omp master { // Data structures for Bundled Messages: #pragma omp task depend(inout \ : PCumulative, PMessageBundle, PSizeInfoMessages) depend(in \ : NumMessagesBundled, numProcs) {try { PMessageBundle.reserve(NumMessagesBundled * 3); // Three integers per message PCumulative.reserve(numProcs + 1); // Similar to Row Pointer vector in CSR data structure PSizeInfoMessages.reserve(numProcs * 3); // Buffer to hold the Size info message packets } catch (length_error) { cout << "Error in function algoDistEdgeApproxDominatingEdgesMessageBundling: \n"; cout << "Not enough memory to allocate the internal variables \n"; exit(1); } PMessageBundle.resize(NumMessagesBundled * 3, -1); // Initialize PCumulative.resize(numProcs + 1, 0); // Only initialize the counter variable PSizeInfoMessages.resize(numProcs * 3, 0); } #pragma omp task depend(inout \ : PCumulative) depend(in \ : PCounter) { for (i = 0; i < numProcs; i++) PCumulative[i + 1] = PCumulative[i] + PCounter[i]; } #pragma omp task depend(inout \ : PCounter) { // Reuse PCounter to keep track of how many messages were inserted: for (MilanInt i = 0; i < numProcs; i++) // Changed by Fabio to be an integer, addresses needs to be integers! PCounter[i] = 0; } // Build the Message Bundle packet: #pragma omp task depend(in \ : PCounter, QLocalVtx, QGhostVtx, QMsgType, QOwner, PMessageBundle, PCumulative) depend(out \ : myIndex, PMessageBundle, PCounter) { for (i = 0; i < NumMessagesBundled; i++) { myIndex = (PCumulative[QOwner[i]] + PCounter[QOwner[i]]) * 3; PMessageBundle[myIndex + 0] = QLocalVtx[i]; PMessageBundle[myIndex + 1] = QGhostVtx[i]; PMessageBundle[myIndex + 2] = QMsgType[i]; PCounter[QOwner[i]]++; } } // Send the Bundled Messages: Use ISend #pragma omp task depend(out \ : SRequest, SStatus) { try { SRequest.reserve(numProcs * 2); // At most two messages per processor SStatus.reserve(numProcs * 2); // At most two messages per processor } catch (length_error) { cout << "Error in function algoDistEdgeApproxDominatingEdgesLinearSearchImmediateSend: \n"; cout << "Not enough memory to allocate the internal variables \n"; exit(1); } } // Send the Messages #pragma omp task depend(inout \ : SRequest, PSizeInfoMessages, PCumulative) depend(out \ : msgActual, MessageIndex) { for (i = 0; i < numProcs; i++) { // Changed by Fabio to be an integer, addresses needs to be integers! if (i == myRank) // Do not send anything to yourself continue; // Send the Message with information about the size of next message: // Build the Message Packet: PSizeInfoMessages[i * 3 + 0] = (PCumulative[i + 1] - PCumulative[i]) * 3; // # of integers in the next message PSizeInfoMessages[i * 3 + 1] = -1; // Dummy packet PSizeInfoMessages[i * 3 + 2] = SIZEINFO; // TYPE // Send a Request (Asynchronous) #ifdef PRINT_DEBUG_INFO_ cout << "\n(" << myRank << ")Sending bundled message to process " << i << " size: " << PSizeInfoMessages[i * 3 + 0] << endl; fflush(stdout); #endif if (PSizeInfoMessages[i * 3 + 0] > 0) { // Send only if it is a nonempty packet MPI_Isend(&PSizeInfoMessages[i * 3 + 0], 3, TypeMap(), i, ComputeTag, comm, &SRequest[MessageIndex]); msgActual++; MessageIndex++; // Now Send the message with the data packet: #ifdef PRINT_DEBUG_INFO_ cout << "\n(" << myRank << ")SendiFFng Bundle to : " << i << endl; for (k = (PCumulative[i] * 3); k < (PCumulative[i] * 3 + PSizeInfoMessages[i * 3 + 0]); k++) cout << PMessageBundle[k] << ","; cout << endl; fflush(stdout); #endif MPI_Isend(&PMessageBundle[PCumulative[i] * 3], PSizeInfoMessages[i * 3 + 0], TypeMap(), i, BundleTag, comm, &SRequest[MessageIndex]); MessageIndex++; } // End of if size > 0 } } #pragma omp task depend(inout \ : PCumulative, QLocalVtx, QGhostVtx, QMsgType, QOwner) { // Free up temporary memory: PCumulative.clear(); QLocalVtx.clear(); QGhostVtx.clear(); QMsgType.clear(); QOwner.clear(); } #pragma omp task depend(inout : OneMessageSize, BufferSize) depend(out : numMessagesToSend) depend(in : numGhostEdges) { #ifdef PRINT_DEBUG_INFO_ cout << "\n(" << myRank << ")Number of Ghost edges = " << numGhostEdges; cout << "\n(" << myRank << ")Total number of potential message X 2 = " << numGhostEdges * 2; cout << "\n(" << myRank << ")Number messages already sent in bundles = " << NumMessagesBundled; if (numGhostEdges > 0) { cout << "\n(" << myRank << ")Percentage of total = " << ((double)NumMessagesBundled / (double)(numGhostEdges * 2)) * 100.0 << "% \n"; } fflush(stdout); #endif // Allocate memory for MPI Send messages: /* WILL COME BACK HERE - NO NEED TO STORE ALL THIS MEMORY !! */ OneMessageSize = 0; MPI_Pack_size(3, TypeMap(), comm, &OneMessageSize); // Size of one message packet // How many messages to send? // Potentially three kinds of messages will be sent/received: // Request, Success, Failure. // But only two will be sent from a given processor. // Substract the number of messages that have already been sent as bundled messages: numMessagesToSend = numGhostEdges * 2 - NumMessagesBundled; BufferSize = (OneMessageSize + MPI_BSEND_OVERHEAD) * numMessagesToSend; } #pragma omp task depend(out : Buffer) depend(in : BufferSize) { Buffer = 0; #ifdef PRINT_DEBUG_INFO_ cout << "\n(" << myRank << ")Size of One Message from PACK= " << OneMessageSize; cout << "\n(" << myRank << ")Size of Message overhead = " << MPI_BSEND_OVERHEAD; cout << "\n(" << myRank << ")Number of Ghost edges = " << numGhostEdges; cout << "\n(" << myRank << ")Number of remaining message = " << numMessagesToSend; cout << "\n(" << myRank << ")BufferSize = " << BufferSize; cout << "\n(" << myRank << ")Attaching Buffer on.. "; fflush(stdout); #endif if (BufferSize > 0) { Buffer = (MilanLongInt *)malloc(BufferSize); // Allocate memory if (Buffer == 0) { cout << "Error in function algoDistEdgeApproxDominatingEdgesLinearSearch: \n"; cout << "Not enough memory to allocate for send buffer on process " << myRank << "\n"; exit(1); } MPI_Buffer_attach(Buffer, BufferSize); // Attach the Buffer } } } } *MessageIndexPtr = MessageIndex; *msgActualPtr = msgActual; *numGhostEdgesPtr = numGhostEdges; *BufferSizePtr = BufferSize; }