Compare commits

..

2 Commits

Author SHA1 Message Date
edoardocoli 0456a627ac fix bugs 12 months ago
edoardocoli 375024691c fix bugs 12 months ago

@ -15,7 +15,7 @@ JOBNAME = Distributed_Sorting
PARTITION = production
TIME = 12:00:00
MEM = 3G
NODELIST = steffe[12-14]
NODELIST = steffe[9,12-20]
CPUS_PER_TASK = 1
NTASKS_PER_NODE = 1
OUTPUT = ./%x.%j.out

@ -26,10 +26,10 @@ od -t d8 -A n --endian=little binaryfile.bin #For little-endian format
od -t d8 -A n --endian=big binaryfile.bin #For big-endian format
*/
#define BUFFERSIZE 32768
#define CACHENUM 130000
#define CACHENUM 131072
#define RAMNUM 268435456
#define ALLOW_BUFFER 1
#define ALLOW_SNOWPLOW 1
#define ALLOW_SNOWPLOW 0
void sortRuns(unsigned long long fileSize, unsigned long long sliceSize, unsigned long long maxLoop, FILE* file, int id, int mpiRank, int mpiSize)
{
@ -37,7 +37,7 @@ void sortRuns(unsigned long long fileSize, unsigned long long sliceSize, unsigne
double startTot, start, end;
int64_t num;
std::vector<int64_t> bigVect;
int64_t buffer[BUFFERSIZE];
int64_t buffer[static_cast<unsigned long long>(BUFFERSIZE)];
bigVect.reserve(sliceSize);
startTot = MPI_Wtime(); //Microsecond precision. Can't use time(), because each process will have a different "zero" time
@ -56,7 +56,7 @@ void sortRuns(unsigned long long fileSize, unsigned long long sliceSize, unsigne
{
while (currentOffset < endOffset)
{
unsigned long long elementsToRead = std::min(endOffset - currentOffset, static_cast<unsigned long long>(BUFFERSIZE)); //It's important to check because if the difference between endOffset and startOffset is smaller than BUFFERSIZE we don't have to read further
unsigned long long elementsToRead = std::min(endOffset - currentOffset, static_cast<unsigned long long>(static_cast<unsigned long long>(BUFFERSIZE))); //It's important to check because if the difference between endOffset and startOffset is smaller than BUFFERSIZE we don't have to read further
unsigned long long elementsRead = fread(buffer, sizeof(int64_t), elementsToRead, file);
for (unsigned long long i = 0; i < elementsRead; ++i)
@ -64,7 +64,7 @@ void sortRuns(unsigned long long fileSize, unsigned long long sliceSize, unsigne
bigVect.push_back(buffer[i]);
}
currentOffset += elementsRead; //Increment currentOffset based on the number of elements read
if (elementsRead < BUFFERSIZE) // Check if we have reached the end of the file
if (elementsRead < static_cast<unsigned long long>(BUFFERSIZE)) // Check if we have reached the end of the file
break;
}
}
@ -97,10 +97,10 @@ void sortRuns(unsigned long long fileSize, unsigned long long sliceSize, unsigne
{
if (ALLOW_BUFFER) //Branch to test performance with and without buffer
{
buffer[i % BUFFERSIZE] = bigVect[i];
if ((i + 1) % BUFFERSIZE == 0 || i == bigVect.size() - 1)
buffer[i % static_cast<unsigned long long>(BUFFERSIZE)] = bigVect[i];
if ((i + 1) % static_cast<unsigned long long>(BUFFERSIZE) == 0 || i == bigVect.size() - 1)
{
ssize_t tw = write(tmpFile, buffer, sizeof(int64_t) * ((i % BUFFERSIZE) + 1));
ssize_t tw = write(tmpFile, buffer, sizeof(int64_t) * ((i % static_cast<unsigned long long>(BUFFERSIZE)) + 1));
if (tw == -1)
{
std::cout << "Error writing to file" << std::endl;
@ -149,11 +149,11 @@ void sortRuns(unsigned long long fileSize, unsigned long long sliceSize, unsigne
void snowPlowRuns(unsigned long long fileSize, unsigned long long sliceSize, unsigned long long maxLoop, FILE* file, int id, int mpiRank, int mpiSize)
{
if (ALLOW_SNOWPLOW)
std::cout << "Can't compute files of size bigger then " << RAMNUM * mpiSize / 134217728 << "Gb with " << mpiSize << " processes (currently file is " << fileSize / 134217728 << "Gb)" << std::endl;
std::cout << "Can't compute files of size bigger then " << static_cast<unsigned long long>(RAMNUM) * mpiSize / 134217728 << "Gb with " << mpiSize << " processes (currently file is " << fileSize / 134217728 << "Gb)" << std::endl;
else
{
maxLoop = (fileSize / (RAMNUM * mpiSize)) + 1;
sortRuns(fileSize, RAMNUM, maxLoop, file, id, mpiRank, mpiSize);
maxLoop = (fileSize / (static_cast<unsigned long long>(RAMNUM) * mpiSize)) + 1;
sortRuns(fileSize, static_cast<unsigned long long>(RAMNUM), maxLoop, file, id, mpiRank, mpiSize);
}
}
@ -226,7 +226,7 @@ void kMerge(const std::string &argFile, int id, int mpiRank, int mpiSize)
int tmpfd;
int64_t tmpValue2;
int64_t buffer[BUFFERSIZE];
int64_t buffer[static_cast<unsigned long long>(BUFFERSIZE)];
unsigned long long i = 0;
while (!minHeap.empty()) //Write sorted elements to the temporary file
{
@ -248,10 +248,10 @@ void kMerge(const std::string &argFile, int id, int mpiRank, int mpiSize)
}
if (ALLOW_BUFFER) //Branch to test performance with and without buffer
{
buffer[i % BUFFERSIZE] = tmpValue;
if ((i + 1) % BUFFERSIZE == 0 || minHeap.empty())
buffer[i % static_cast<unsigned long long>(BUFFERSIZE)] = tmpValue;
if ((i + 1) % static_cast<unsigned long long>(BUFFERSIZE) == 0 || minHeap.empty())
{
ssize_t tw = write(fdFinal, buffer, sizeof(int64_t) * ((i % BUFFERSIZE) + 1));
ssize_t tw = write(fdFinal, buffer, sizeof(int64_t) * ((i % static_cast<unsigned long long>(BUFFERSIZE)) + 1));
if (tw == -1)
{
std::cout << "Error writing to file" << std::endl;
@ -325,15 +325,18 @@ int main(int argc, char* argv[])
if (mpiRank == 0)
std::cout << "Sorting file '" << argv[1] << "' of " << fileSize << " elements" << std::endl << std::endl;
if (fileSize < (CACHENUM * mpiSize))
slices = (fileSize / CACHENUM) + 1;
else if (fileSize < (RAMNUM * mpiSize)) //TODO add more granularity considering double RAM for snow plow technique
slices = (fileSize / RAMNUM) + 1;
//Load balancer
if (fileSize < static_cast<unsigned long long>(CACHENUM)) //Can add more granularity considering efficiency, now is used by default all nodes
slices = 1;
else if (fileSize < (static_cast<unsigned long long>(CACHENUM) * mpiSize))
slices = mpiSize;
else if (fileSize < ((unsigned long long) static_cast<unsigned long long>(RAMNUM) * mpiSize))
slices = mpiSize;
else
slices = mpiSize + 1;
sliceSize = (fileSize / slices) + 1; //Each process divides a number of 8-byte integers based on the size of the starting file, Attualmente dentro create Runs
maxLoop = (slices / mpiSize) + 1;
if (sliceSize > RAMNUM)
slices = mpiSize;
sliceSize = (fileSize / slices) + 1; //Each process divides a number of 8-byte integers based on the size of the starting file
maxLoop = 1;
if (sliceSize > static_cast<unsigned long long>(RAMNUM))
snowPlowRuns(fileSize, sliceSize, maxLoop, file, id, mpiRank, mpiSize);
else
sortRuns(fileSize, sliceSize, maxLoop, file, id, mpiRank, mpiSize);
@ -355,3 +358,4 @@ int main(int argc, char* argv[])
MPI_Finalize(); //Clean up the MPI environment
return 0;
}

Loading…
Cancel
Save