@ -26,10 +26,10 @@ od -t d8 -A n --endian=little binaryfile.bin #For little-endian format
od - t d8 - A n - - endian = big binaryfile . bin # For big - endian format
od - t d8 - A n - - endian = big binaryfile . bin # For big - endian format
*/
*/
# define BUFFERSIZE 32768
# define BUFFERSIZE 32768
# define CACHENUM 13 1072
# define CACHENUM 13 0000
# define RAMNUM 268435456
# define RAMNUM 268435456
# define ALLOW_BUFFER 1
# define ALLOW_BUFFER 1
# define ALLOW_SNOWPLOW 0
# define ALLOW_SNOWPLOW 1
void sortRuns ( unsigned long long fileSize , unsigned long long sliceSize , unsigned long long maxLoop , FILE * file , int id , int mpiRank , int mpiSize )
void sortRuns ( unsigned long long fileSize , unsigned long long sliceSize , unsigned long long maxLoop , FILE * file , int id , int mpiRank , int mpiSize )
{
{
@ -37,7 +37,7 @@ void sortRuns(unsigned long long fileSize, unsigned long long sliceSize, unsigne
double startTot , start , end ;
double startTot , start , end ;
int64_t num ;
int64_t num ;
std : : vector < int64_t > bigVect ;
std : : vector < int64_t > bigVect ;
int64_t buffer [ static_cast < unsigned long long > ( BUFFERSIZE ) ] ;
int64_t buffer [ BUFFERSIZE ] ;
bigVect . reserve ( sliceSize ) ;
bigVect . reserve ( sliceSize ) ;
startTot = MPI_Wtime ( ) ; //Microsecond precision. Can't use time(), because each process will have a different "zero" time
startTot = MPI_Wtime ( ) ; //Microsecond precision. Can't use time(), because each process will have a different "zero" time
@ -56,7 +56,7 @@ void sortRuns(unsigned long long fileSize, unsigned long long sliceSize, unsigne
{
{
while ( currentOffset < endOffset )
while ( currentOffset < endOffset )
{
{
unsigned long long elementsToRead = std : : min ( endOffset - currentOffset , static_cast < unsigned long long > ( static_cast < unsigned long long > ( BUFFERSIZE ) ) ) ; //It's important to check because if the difference between endOffset and startOffset is smaller than BUFFERSIZE we don't have to read further
unsigned long long elementsToRead = std : : min ( endOffset - currentOffset , static_cast < unsigned long long > ( BUFFERSIZE ) ) ; //It's important to check because if the difference between endOffset and startOffset is smaller than BUFFERSIZE we don't have to read further
unsigned long long elementsRead = fread ( buffer , sizeof ( int64_t ) , elementsToRead , file ) ;
unsigned long long elementsRead = fread ( buffer , sizeof ( int64_t ) , elementsToRead , file ) ;
for ( unsigned long long i = 0 ; i < elementsRead ; + + i )
for ( unsigned long long i = 0 ; i < elementsRead ; + + i )
@ -64,7 +64,7 @@ void sortRuns(unsigned long long fileSize, unsigned long long sliceSize, unsigne
bigVect . push_back ( buffer [ i ] ) ;
bigVect . push_back ( buffer [ i ] ) ;
}
}
currentOffset + = elementsRead ; //Increment currentOffset based on the number of elements read
currentOffset + = elementsRead ; //Increment currentOffset based on the number of elements read
if ( elementsRead < static_cast < unsigned long long > ( BUFFERSIZE ) ) // Check if we have reached the end of the file
if ( elementsRead < BUFFERSIZE ) // Check if we have reached the end of the file
break ;
break ;
}
}
}
}
@ -97,10 +97,10 @@ void sortRuns(unsigned long long fileSize, unsigned long long sliceSize, unsigne
{
{
if ( ALLOW_BUFFER ) //Branch to test performance with and without buffer
if ( ALLOW_BUFFER ) //Branch to test performance with and without buffer
{
{
buffer [ i % static_cast < unsigned long long > ( BUFFERSIZE ) ] = bigVect [ i ] ;
buffer [ i % BUFFERSIZE ] = bigVect [ i ] ;
if ( ( i + 1 ) % static_cast < unsigned long long > ( BUFFERSIZE ) = = 0 | | i = = bigVect . size ( ) - 1 )
if ( ( i + 1 ) % BUFFERSIZE = = 0 | | i = = bigVect . size ( ) - 1 )
{
{
ssize_t tw = write ( tmpFile , buffer , sizeof ( int64_t ) * ( ( i % static_cast < unsigned long long > ( BUFFERSIZE ) ) + 1 ) ) ;
ssize_t tw = write ( tmpFile , buffer , sizeof ( int64_t ) * ( ( i % BUFFERSIZE ) + 1 ) ) ;
if ( tw = = - 1 )
if ( tw = = - 1 )
{
{
std : : cout < < " Error writing to file " < < std : : endl ;
std : : cout < < " Error writing to file " < < std : : endl ;
@ -149,11 +149,11 @@ void sortRuns(unsigned long long fileSize, unsigned long long sliceSize, unsigne
void snowPlowRuns ( unsigned long long fileSize , unsigned long long sliceSize , unsigned long long maxLoop , FILE * file , int id , int mpiRank , int mpiSize )
void snowPlowRuns ( unsigned long long fileSize , unsigned long long sliceSize , unsigned long long maxLoop , FILE * file , int id , int mpiRank , int mpiSize )
{
{
if ( ALLOW_SNOWPLOW )
if ( ALLOW_SNOWPLOW )
std : : cout < < " Can't compute files of size bigger then " < < static_cast < unsigned long long > ( RAMNUM ) * mpiSize / 134217728 < < " Gb with " < < mpiSize < < " processes (currently file is " < < fileSize / 134217728 < < " Gb) " < < std : : endl ;
std : : cout < < " Can't compute files of size bigger then " < < RAMNUM * mpiSize / 134217728 < < " Gb with " < < mpiSize < < " processes (currently file is " < < fileSize / 134217728 < < " Gb) " < < std : : endl ;
else
else
{
{
maxLoop = ( fileSize / ( static_cast < unsigned long long > ( RAMNUM ) * mpiSize ) ) + 1 ;
maxLoop = ( fileSize / ( RAMNUM * mpiSize ) ) + 1 ;
sortRuns ( fileSize , static_cast < unsigned long long > ( RAMNUM ) , maxLoop , file , id , mpiRank , mpiSize ) ;
sortRuns ( fileSize , RAMNUM , maxLoop , file , id , mpiRank , mpiSize ) ;
}
}
}
}
@ -226,7 +226,7 @@ void kMerge(const std::string &argFile, int id, int mpiRank, int mpiSize)
int tmpfd ;
int tmpfd ;
int64_t tmpValue2 ;
int64_t tmpValue2 ;
int64_t buffer [ static_cast < unsigned long long > ( BUFFERSIZE ) ] ;
int64_t buffer [ BUFFERSIZE ] ;
unsigned long long i = 0 ;
unsigned long long i = 0 ;
while ( ! minHeap . empty ( ) ) //Write sorted elements to the temporary file
while ( ! minHeap . empty ( ) ) //Write sorted elements to the temporary file
{
{
@ -248,10 +248,10 @@ void kMerge(const std::string &argFile, int id, int mpiRank, int mpiSize)
}
}
if ( ALLOW_BUFFER ) //Branch to test performance with and without buffer
if ( ALLOW_BUFFER ) //Branch to test performance with and without buffer
{
{
buffer [ i % static_cast < unsigned long long > ( BUFFERSIZE ) ] = tmpValue ;
buffer [ i % BUFFERSIZE ] = tmpValue ;
if ( ( i + 1 ) % static_cast < unsigned long long > ( BUFFERSIZE ) = = 0 | | minHeap . empty ( ) )
if ( ( i + 1 ) % BUFFERSIZE = = 0 | | minHeap . empty ( ) )
{
{
ssize_t tw = write ( fdFinal , buffer , sizeof ( int64_t ) * ( ( i % static_cast < unsigned long long > ( BUFFERSIZE ) ) + 1 ) ) ;
ssize_t tw = write ( fdFinal , buffer , sizeof ( int64_t ) * ( ( i % BUFFERSIZE ) + 1 ) ) ;
if ( tw = = - 1 )
if ( tw = = - 1 )
{
{
std : : cout < < " Error writing to file " < < std : : endl ;
std : : cout < < " Error writing to file " < < std : : endl ;
@ -325,18 +325,15 @@ int main(int argc, char* argv[])
if ( mpiRank = = 0 )
if ( mpiRank = = 0 )
std : : cout < < " Sorting file ' " < < argv [ 1 ] < < " ' of " < < fileSize < < " elements " < < std : : endl < < std : : endl ;
std : : cout < < " Sorting file ' " < < argv [ 1 ] < < " ' of " < < fileSize < < " elements " < < std : : endl < < std : : endl ;
//Load balancer
if ( fileSize < ( CACHENUM * mpiSize ) )
if ( fileSize < static_cast < unsigned long long > ( CACHENUM ) ) //Can add more granularity considering efficiency, now is used by default all nodes
slices = ( fileSize / CACHENUM ) + 1 ;
slices = 1 ;
else if ( fileSize < ( RAMNUM * mpiSize ) ) //TODO add more granularity considering double RAM for snow plow technique
else if ( fileSize < ( static_cast < unsigned long long > ( CACHENUM ) * mpiSize ) )
slices = ( fileSize / RAMNUM ) + 1 ;
slices = mpiSize ;
else if ( fileSize < ( ( unsigned long long ) static_cast < unsigned long long > ( RAMNUM ) * mpiSize ) )
slices = mpiSize ;
else
else
slices = mpiSize ;
slices = mpiSize + 1 ;
sliceSize = ( fileSize / slices ) + 1 ; //Each process divides a number of 8-byte integers based on the size of the starting file
sliceSize = ( fileSize / slices ) + 1 ; //Each process divides a number of 8-byte integers based on the size of the starting file, Attualmente dentro create Runs
maxLoop = 1 ;
maxLoop = ( slices / mpiSize ) + 1 ;
if ( sliceSize > static_cast < unsigned long long > ( RAMNUM ) )
if ( sliceSize > RAMNUM )
snowPlowRuns ( fileSize , sliceSize , maxLoop , file , id , mpiRank , mpiSize ) ;
snowPlowRuns ( fileSize , sliceSize , maxLoop , file , id , mpiRank , mpiSize ) ;
else
else
sortRuns ( fileSize , sliceSize , maxLoop , file , id , mpiRank , mpiSize ) ;
sortRuns ( fileSize , sliceSize , maxLoop , file , id , mpiRank , mpiSize ) ;
@ -358,4 +355,3 @@ int main(int argc, char* argv[])
MPI_Finalize ( ) ; //Clean up the MPI environment
MPI_Finalize ( ) ; //Clean up the MPI environment
return 0 ;
return 0 ;
}
}