main
edoardocoli 12 months ago
parent c2865d50cf
commit 375024691c

@ -15,7 +15,7 @@ JOBNAME = Distributed_Sorting
PARTITION = production PARTITION = production
TIME = 12:00:00 TIME = 12:00:00
MEM = 3G MEM = 3G
NODELIST = steffe[12-14] NODELIST = steffe[9,12-20]
CPUS_PER_TASK = 1 CPUS_PER_TASK = 1
NTASKS_PER_NODE = 1 NTASKS_PER_NODE = 1
OUTPUT = ./%x.%j.out OUTPUT = ./%x.%j.out

@ -26,10 +26,10 @@ od -t d8 -A n --endian=little binaryfile.bin #For little-endian format
od -t d8 -A n --endian=big binaryfile.bin #For big-endian format od -t d8 -A n --endian=big binaryfile.bin #For big-endian format
*/ */
#define BUFFERSIZE 32768 #define BUFFERSIZE 32768
#define CACHENUM 130000 #define CACHENUM 131072
#define RAMNUM 268435456 #define RAMNUM 268435456
#define ALLOW_BUFFER 1 #define ALLOW_BUFFER 1
#define ALLOW_SNOWPLOW 1 #define ALLOW_SNOWPLOW 0
void sortRuns(unsigned long long fileSize, unsigned long long sliceSize, unsigned long long maxLoop, FILE* file, int id, int mpiRank, int mpiSize) void sortRuns(unsigned long long fileSize, unsigned long long sliceSize, unsigned long long maxLoop, FILE* file, int id, int mpiRank, int mpiSize)
{ {
@ -37,7 +37,7 @@ void sortRuns(unsigned long long fileSize, unsigned long long sliceSize, unsigne
double startTot, start, end; double startTot, start, end;
int64_t num; int64_t num;
std::vector<int64_t> bigVect; std::vector<int64_t> bigVect;
int64_t buffer[BUFFERSIZE]; int64_t buffer[static_cast<unsigned long long>(BUFFERSIZE)];
bigVect.reserve(sliceSize); bigVect.reserve(sliceSize);
startTot = MPI_Wtime(); //Microsecond precision. Can't use time(), because each process will have a different "zero" time startTot = MPI_Wtime(); //Microsecond precision. Can't use time(), because each process will have a different "zero" time
@ -56,7 +56,7 @@ void sortRuns(unsigned long long fileSize, unsigned long long sliceSize, unsigne
{ {
while (currentOffset < endOffset) while (currentOffset < endOffset)
{ {
unsigned long long elementsToRead = std::min(endOffset - currentOffset, static_cast<unsigned long long>(BUFFERSIZE)); //It's important to check because if the difference between endOffset and startOffset is smaller than BUFFERSIZE we don't have to read further unsigned long long elementsToRead = std::min(endOffset - currentOffset, static_cast<unsigned long long>(static_cast<unsigned long long>(BUFFERSIZE))); //It's important to check because if the difference between endOffset and startOffset is smaller than BUFFERSIZE we don't have to read further
unsigned long long elementsRead = fread(buffer, sizeof(int64_t), elementsToRead, file); unsigned long long elementsRead = fread(buffer, sizeof(int64_t), elementsToRead, file);
for (unsigned long long i = 0; i < elementsRead; ++i) for (unsigned long long i = 0; i < elementsRead; ++i)
@ -64,7 +64,7 @@ void sortRuns(unsigned long long fileSize, unsigned long long sliceSize, unsigne
bigVect.push_back(buffer[i]); bigVect.push_back(buffer[i]);
} }
currentOffset += elementsRead; //Increment currentOffset based on the number of elements read currentOffset += elementsRead; //Increment currentOffset based on the number of elements read
if (elementsRead < BUFFERSIZE) // Check if we have reached the end of the file if (elementsRead < static_cast<unsigned long long>(BUFFERSIZE)) // Check if we have reached the end of the file
break; break;
} }
} }
@ -97,10 +97,10 @@ void sortRuns(unsigned long long fileSize, unsigned long long sliceSize, unsigne
{ {
if (ALLOW_BUFFER) //Branch to test performance with and without buffer if (ALLOW_BUFFER) //Branch to test performance with and without buffer
{ {
buffer[i % BUFFERSIZE] = bigVect[i]; buffer[i % static_cast<unsigned long long>(BUFFERSIZE)] = bigVect[i];
if ((i + 1) % BUFFERSIZE == 0 || i == bigVect.size() - 1) if ((i + 1) % static_cast<unsigned long long>(BUFFERSIZE) == 0 || i == bigVect.size() - 1)
{ {
ssize_t tw = write(tmpFile, buffer, sizeof(int64_t) * ((i % BUFFERSIZE) + 1)); ssize_t tw = write(tmpFile, buffer, sizeof(int64_t) * ((i % static_cast<unsigned long long>(BUFFERSIZE)) + 1));
if (tw == -1) if (tw == -1)
{ {
std::cout << "Error writing to file" << std::endl; std::cout << "Error writing to file" << std::endl;
@ -149,11 +149,11 @@ void sortRuns(unsigned long long fileSize, unsigned long long sliceSize, unsigne
void snowPlowRuns(unsigned long long fileSize, unsigned long long sliceSize, unsigned long long maxLoop, FILE* file, int id, int mpiRank, int mpiSize) void snowPlowRuns(unsigned long long fileSize, unsigned long long sliceSize, unsigned long long maxLoop, FILE* file, int id, int mpiRank, int mpiSize)
{ {
if (ALLOW_SNOWPLOW) if (ALLOW_SNOWPLOW)
std::cout << "Can't compute files of size bigger then " << RAMNUM * mpiSize / 134217728 << "Gb with " << mpiSize << " processes (currently file is " << fileSize / 134217728 << "Gb)" << std::endl; std::cout << "Can't compute files of size bigger then " << static_cast<unsigned long long>(RAMNUM) * mpiSize / 134217728 << "Gb with " << mpiSize << " processes (currently file is " << fileSize / 134217728 << "Gb)" << std::endl;
else else
{ {
maxLoop = (fileSize / (RAMNUM * mpiSize)) + 1; maxLoop = (fileSize / (static_cast<unsigned long long>(RAMNUM) * mpiSize)) + 1;
sortRuns(fileSize, RAMNUM, maxLoop, file, id, mpiRank, mpiSize); sortRuns(fileSize, static_cast<unsigned long long>(RAMNUM), maxLoop, file, id, mpiRank, mpiSize);
} }
} }
@ -226,7 +226,7 @@ void kMerge(const std::string &argFile, int id, int mpiRank, int mpiSize)
int tmpfd; int tmpfd;
int64_t tmpValue2; int64_t tmpValue2;
int64_t buffer[BUFFERSIZE]; int64_t buffer[static_cast<unsigned long long>(BUFFERSIZE)];
unsigned long long i = 0; unsigned long long i = 0;
while (!minHeap.empty()) //Write sorted elements to the temporary file while (!minHeap.empty()) //Write sorted elements to the temporary file
{ {
@ -248,10 +248,10 @@ void kMerge(const std::string &argFile, int id, int mpiRank, int mpiSize)
} }
if (ALLOW_BUFFER) //Branch to test performance with and without buffer if (ALLOW_BUFFER) //Branch to test performance with and without buffer
{ {
buffer[i % BUFFERSIZE] = tmpValue; buffer[i % static_cast<unsigned long long>(BUFFERSIZE)] = tmpValue;
if ((i + 1) % BUFFERSIZE == 0 || minHeap.empty()) if ((i + 1) % static_cast<unsigned long long>(BUFFERSIZE) == 0 || minHeap.empty())
{ {
ssize_t tw = write(fdFinal, buffer, sizeof(int64_t) * ((i % BUFFERSIZE) + 1)); ssize_t tw = write(fdFinal, buffer, sizeof(int64_t) * ((i % static_cast<unsigned long long>(BUFFERSIZE)) + 1));
if (tw == -1) if (tw == -1)
{ {
std::cout << "Error writing to file" << std::endl; std::cout << "Error writing to file" << std::endl;
@ -325,15 +325,18 @@ int main(int argc, char* argv[])
if (mpiRank == 0) if (mpiRank == 0)
std::cout << "Sorting file '" << argv[1] << "' of " << fileSize << " elements" << std::endl << std::endl; std::cout << "Sorting file '" << argv[1] << "' of " << fileSize << " elements" << std::endl << std::endl;
if (fileSize < (CACHENUM * mpiSize)) //Load balancer
slices = (fileSize / CACHENUM) + 1; if (fileSize < static_cast<unsigned long long>(CACHENUM)) //Can add more granularity considering efficiency, now is used by default all nodes
else if (fileSize < (RAMNUM * mpiSize)) //TODO add more granularity considering double RAM for snow plow technique slices = 1;
slices = (fileSize / RAMNUM) + 1; else if (fileSize < (static_cast<unsigned long long>(CACHENUM) * mpiSize))
slices = mpiSize;
else if (fileSize < ((unsigned long long) static_cast<unsigned long long>(RAMNUM) * mpiSize))
slices = mpiSize;
else else
slices = mpiSize + 1; slices = mpiSize;
sliceSize = (fileSize / slices) + 1; //Each process divides a number of 8-byte integers based on the size of the starting file, Attualmente dentro create Runs sliceSize = (fileSize / slices) + 1; //Each process divides a number of 8-byte integers based on the size of the starting file
maxLoop = (slices / mpiSize) + 1; maxLoop = 1;
if (sliceSize > RAMNUM) if (sliceSize > static_cast<unsigned long long>(RAMNUM))
snowPlowRuns(fileSize, sliceSize, maxLoop, file, id, mpiRank, mpiSize); snowPlowRuns(fileSize, sliceSize, maxLoop, file, id, mpiRank, mpiSize);
else else
sortRuns(fileSize, sliceSize, maxLoop, file, id, mpiRank, mpiSize); sortRuns(fileSize, sliceSize, maxLoop, file, id, mpiRank, mpiSize);
@ -355,3 +358,4 @@ int main(int argc, char* argv[])
MPI_Finalize(); //Clean up the MPI environment MPI_Finalize(); //Clean up the MPI environment
return 0; return 0;
} }

Binary file not shown.

Binary file not shown.
Loading…
Cancel
Save