MPI sorter with buffers, missing SnowPlow technique

main
edoardocoli 12 months ago
parent 7a05f4d1c0
commit c2865d50cf

@ -0,0 +1,57 @@
#include <mpi.h>
#include <cmath>
#include <iostream>
/*
MPI_Allreduce(
void* send_data, //the element to apply the reduce
void* recv_data, //the reduce result
int count, //size of result in recv_data is sizeof(datatype)*count
MPI_Datatype datatype, //type of element
MPI_Op op, //operation to apply. Can also be defined custom operation
MPI_Comm comm //communicator
)
*/
int main(int argc, char* argv[])
{
MPI_Init(&argc, &argv); //Initialize the MPI environment //TODO perche qui non (NULL, NULL) ??
int world_size;
MPI_Comm_size(MPI_COMM_WORLD, &world_size); //Get the number of processes
int world_rank;
MPI_Comm_rank(MPI_COMM_WORLD, &world_rank); //Get the number of process
float rand_nums[10] = {1,2,3,42,55,66,71,82,93,190};
int size = 10; //Related to the correct dize of rand_nums array
float local_sum = 0;
for(int i=0; i<size; i++) //each process compute the sum (locally)
local_sum += rand_nums[i];
std::cout << "STEP1-Process (rank " << world_rank+1 << "/" << world_size << ") sum is " << local_sum << std::endl;
//Reduce all the local sums into the global sum in order to calculate the mean
float global_sum;
//MPI_Allreduce(send_data, recv_data, count, datatype, op, comm)
MPI_Allreduce(&local_sum, &global_sum, 1, MPI_FLOAT, MPI_SUM, MPI_COMM_WORLD);
/*Come la Reduce ma senza il processo root perche' il risultato viene distribuito a tutti i processi*/
float mean = global_sum/(size * world_size);
float local_sq_diff = 0;
for(int i=0; i<size; i++) //each process compute the sum of the squared differences from the mean (locally)
local_sq_diff += (rand_nums[i] - mean) * (rand_nums[i] - mean);
std::cout << "STEP2-Process (rank " << world_rank+1 << "/" << world_size << ") sum of the squared differences from the mean is " << local_sq_diff << std::endl;
float global_sq_diff;
MPI_Reduce(&local_sq_diff, &global_sq_diff, 1, MPI_FLOAT, MPI_SUM, 1, MPI_COMM_WORLD);
if(world_rank == 1)
std::cout << " FINAL RESULT:" << std::endl << "Process (rank " << world_rank+1 << "/" << world_size << ") mean is " << mean << ", standard deviation is " << sqrt(global_sq_diff/(size*world_size)) << std::endl;
MPI_Finalize(); //Clean up the MPI environment
return 0;
}

@ -0,0 +1,30 @@
#include <mpi.h>
#include <iostream>
#include <unistd.h>
int main(int argc, char* argv[])
{
double start, end;
MPI_Init(&argc, &argv); //Initialize the MPI environment //TODO perche qui non (NULL, NULL) ??
int world_size;
MPI_Comm_size(MPI_COMM_WORLD, &world_size); //Get the number of processes
int world_rank;
MPI_Comm_rank(MPI_COMM_WORLD, &world_rank); //Get the number of process
MPI_Barrier(MPI_COMM_WORLD);
start = MPI_Wtime(); //Microsecond precision. Can't use time(), because each process will have a different "zero" time
sleep(30);
MPI_Barrier(MPI_COMM_WORLD);
end = MPI_Wtime();
if(world_rank == 0)
std::cout << "Process (rank " << world_rank+1 << "/" << world_size << "): time of work is " << end-start << "seconds" << std::endl;
MPI_Finalize(); //Clean up the MPI environment
return 0;
}

@ -0,0 +1,44 @@
#include <mpi.h>
#include <iostream>
#include <cstdlib>
#include <ctime>
/*
MPI_Bcast(
void* buffer, //the element to send
int count, //number of element to send
MPI_Datatype datatype, //type of element
int root, //rank of the process that have to sand value to others
MPI_Comm comm //communicator
)
*/
int main(int argc, char* argv[])
{
MPI_Init(&argc, &argv); //Initialize the MPI environment //TODO perche qui non (NULL, NULL) ??
int world_size;
MPI_Comm_size(MPI_COMM_WORLD, &world_size); //Get the number of processes
int recvbuf[4*world_size];
int world_rank;
MPI_Comm_rank(MPI_COMM_WORLD, &world_rank); //Get the number of process
long long offset;
std::srand(std::time(0) + world_rank);
offset = std::rand() % 100; //Give a random value to initialize element of each process
std::cout << "Process (rank " << world_rank+1 << "/" << world_size << ") recived: " << offset << std::endl;
MPI_Barrier(MPI_COMM_WORLD); //Used to not mess with the prints
//MPI_Bcast(buffer, count, datatype, root, comm)
MPI_Bcast(&offset, 1, MPI_LONG_LONG, 0, MPI_COMM_WORLD);
/*Posso utilizzare BroadCast anche con un solo nodo, invia a se stesso. Non posso usilizzare come root un nodo con rank maggiore della size.*/
std::cout <<"Process (rank " << world_rank+1 << "/" << world_size << ") recived: " << offset << std::endl;
MPI_Finalize(); //Clean up the MPI environment
return 0;
}

@ -0,0 +1,42 @@
#include <mpi.h>
#include <iostream>
/*
Creates new communicators by "splitting" a communicator into a group of sub-communicators based on the input values color and key.
MPI_Comm_split(
MPI_Comm comm, //communicator to split
int color, //determines to which new communicator each process will belong. Same color same communicator
int key, //determ the order(rank) within each new communicator. The smallest key value is assigned to 0, next to 1 and so on
MPI_Comm* newcomm //new communicator
)
*/
int main(int argc, char* argv[])
{
MPI_Init(NULL, NULL); //Initialize the MPI environment
int world_size;
MPI_Comm_size(MPI_COMM_WORLD, &world_size); //Get the number of processes
int world_rank;
MPI_Comm_rank(MPI_COMM_WORLD, &world_rank); //Get the number of process
//In the following code we suppose to have 10 processes and we want to split them in sub groups "rows" as they are a 2x5 Matrix
int color = world_rank / 2; //Determine color based on row
MPI_Comm row_comm; //To free
//MPI_Comm_split(comm, color, key, newcomm)
MPI_Comm_split(MPI_COMM_WORLD, color, -world_rank, &row_comm);
int row_size;
MPI_Comm_size(row_comm, &row_size); //Get the number of processes related to the communicator
int row_rank;
MPI_Comm_rank(row_comm, &row_rank); //Get the number of process related to the communicator
std::cout << "Process (rank " << world_rank+1 << "/" << world_size << ") is splitted to sub-group " << row_rank+1 << "/" << row_size << "-" << color << " (row/size-color)" << std::endl;
MPI_Comm_free(&row_comm); //We have to free the communicator that we created above
MPI_Finalize(); //Clean up the MPI environment
return 0;
}

@ -0,0 +1,51 @@
#include <mpi.h>
#include <iostream>
/*
MPI_Gather(
const void* sendbuf, //the element to send
int sendcount, //number of element to send
MPI_Datatype sendtype, //type of element
void* recvbuf, //buffer to receive
int recvcount, //number of elements to receive from each process
MPI_Datatype recvtype, //type of element
int root, //rank of the process that have to receive
MPI_Comm comm //communicator
)
*/
int main(int argc, char* argv[])
{
int sendbuf[4] = {1,2,3,4};
MPI_Init(&argc, &argv); //Initialize the MPI environment //TODO perche qui non (NULL, NULL) ??
int world_size;
MPI_Comm_size(MPI_COMM_WORLD, &world_size); //Get the number of processes
int recvbuf[4*world_size];
int world_rank;
MPI_Comm_rank(MPI_COMM_WORLD, &world_rank); //Get the number of process
//MPI_Gather(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, root, comm)
MPI_Gather(sendbuf, 3, MPI_INT, recvbuf, 4, MPI_INT, 1, MPI_COMM_WORLD);
/*Il sendcount non puo esere maggiore della dimensione del sendbuffer ma puo essere minore, tuttavia
il processo root si aspetta di ricevere tutto il buffer di partenza. Per questo motivo oltre a dimensionare
il recvbuf in modo appropriato bisogna ricordare che il processo root disporra in questo modo i dati che
riceve: [expectedFrom0-0, expectedFrom0-1, expectedFrom0-2, expectedFrom0-3, expectedFrom1-0, ...] supponendo un sendbuf da 4.
Se vengono inviati meno dati dell'originale, supponiamo 3, allora la parte expectedFrom*-3 risultera' casualmente riempita.
Il processo root riceve anche i dati che si manda da solo come se facesse parte dei senders.*/
if(world_rank == 1)
{
std::cout << "Process (rank " << world_rank+1 << "/" << world_size << ") recived:" << std::endl;
for (int i=0; i<4*world_size; i++)
{
std::cout << " " << recvbuf[i] << " ";
}
std::cout << std::endl;
}
MPI_Finalize(); //Clean up the MPI environment
return 0;
}

@ -0,0 +1,79 @@
#include <mpi.h>
#include <iostream>
/*
A group is just the set of all processes in the communicator.
A remote operation involves communication with other ranks whereas a local
operation does not. Creating a new communicator is a remote operation
because all processes need to decide on the same context and group, whereas
creating a group is local because it isnt used for communication and
therefore doesnt need to have the same context for each process. You can
manipulate a group all you like without performing any communication at all.
MPI_Comm_group(
MPI_Comm comm, //context communicator
MPI_Comm* group //the group created
)
MPI_Comm_create_group(
MPI_Comm comm, //context communicator
MPI_Group group, //group, which is a subset of the group of comm
int tag, // ???
MPI_Comm* newcomm //the group created (?something like a sub-group?)
)
MPI_Group_union(
MPI_Group group1,
MPI_Group group2,
MPI_Group* newgroup //the group created by the union
)
MPI_Group_intersection(
MPI_Group group1,
MPI_Group group2,
MPI_Group* newgroup //the group created by the intersection
)
MPI_Group_incl(
MPI_Group group, //group of processes where ranks value are contained, at least
int n, //size of ranks
const int ranks[], //ranks to chose
MPI_Group* newgroup //group with only the specific ranks from the other group
)
*/
int main(int argc, char* argv[])
{
MPI_Init(NULL, NULL); //Initialize the MPI environment
int world_size;
MPI_Comm_size(MPI_COMM_WORLD, &world_size); //Get the number of processes
int world_rank;
MPI_Comm_rank(MPI_COMM_WORLD, &world_rank); //Get the number of process
MPI_Group world_group; //To free
//MPI_Comm_group(comm, group)
MPI_Comm_group(MPI_COMM_WORLD, &world_group);
const int ranks[5] = {1,3,5,7,9}; //Just use ranks that are in the group
MPI_Group spare_group; //To free
//MPI_Group_incl(group, n, ranks[], newgroup)
MPI_Group_incl(world_group, 5, ranks, &spare_group);
MPI_Comm spare_comm; //To free
//MPI_Comm_create_group(comm, group, tag, newcomm)
MPI_Comm_create_group(MPI_COMM_WORLD, spare_group, 0, &spare_comm);
int spare_rank = -1, spare_size = -1; //If this rank isn't in the new communicator, it will be MPI_COMM_NULL. Using MPI_COMM_NULL for MPI_Comm_rank or MPI_Comm_size is erroneous
if(MPI_COMM_NULL != spare_comm)
{
MPI_Comm_size(spare_comm, &spare_size); //Get the number of processes related to the communicator
MPI_Comm_rank(spare_comm, &spare_rank); //Get the number of process related to the communicator
}
std::cout << "Process (rank " << world_rank+1 << "/" << world_size << ") is splitted to sub-group " << spare_rank+1 << "/" << spare_size << " (row/size)" << std::endl;
MPI_Group_free(&world_group); //We have to free the group that we created above
MPI_Group_free(&spare_group); //We have to free the group that we created above
MPI_Comm_free(&spare_comm); //We have to free the communicator that we created above
MPI_Finalize(); //Clean up the MPI environment
return 0;
}

@ -0,0 +1,21 @@
#include <mpi.h>
#include <iostream>
int main(int argc, char* argv[])
{
MPI_Init(NULL, NULL); //Initialize the MPI environment
int world_size;
MPI_Comm_size(MPI_COMM_WORLD, &world_size); //Get the number of processes
int world_rank;
MPI_Comm_rank(MPI_COMM_WORLD, &world_rank); //Get the number of process
char processor_name[MPI_MAX_PROCESSOR_NAME];
int name_len;
MPI_Get_processor_name(processor_name, &name_len); //Get the name of the processor
std::cout << "Hello World from '" << processor_name << "'(lenght:" << name_len << ") (rank " << world_rank+1 << " out of " << world_size << " process)" << std::endl;
MPI_Finalize(); //Clean up the MPI environment
return 0;
}

@ -0,0 +1,119 @@
# Program for compiling MPI cpp programs
CC = mpicxx
CXX = mpicxx
# Extra flags to give to the processor compiler
CFLAGS = -g
# -Wall -Werror -Wextra
#
SRC = $(wildcard *.cpp)
OBJ = $(SRC:.cpp=.o)
NAME = test_computation
# SBATCH parameters
JOBNAME = Distributed_Sorting
PARTITION = production
TIME = 12:00:00
MEM = 3G
NODELIST = steffe[1-10]
CPUS_PER_TASK = 1
NTASKS_PER_NODE = 1
OUTPUT = ./%x.%j.out
ERROR = ./e%x.%j.err
#
.PHONY: all run detail clean fclean re
.o: .cpp
$(CC) -c $(CFLAGS) $< -o $@
all: $(NAME)
$(NAME): $(OBJ)
$(CC) $(CFLAGS) -o $@ $^
run: $(NAME)
## Se modifico il Makefile con i parametri di sbatch ricreo anche il launcher.sh
# @if ! [ -f launcher.sh ]; then \
@echo "#!/bin/bash" > launcher.sh; \
echo "## sbatch is the command line interpreter for Slurm" >> launcher.sh; \
echo "" >> launcher.sh; \
echo "## specify the name of the job in the queueing system" >> launcher.sh; \
echo "#SBATCH --job-name=$(JOBNAME)" >> launcher.sh; \
echo "## specify the partition for the resource allocation. if not specified, slurm is allowed to take the default(the one with a star *)" >> launcher.sh; \
echo "#SBATCH --partition=$(PARTITION)" >> launcher.sh; \
echo "## format for time is days-hours:minutes:seconds, is used as time limit for the execution duration" >> launcher.sh; \
echo "#SBATCH --time=$(TIME)" >> launcher.sh; \
echo "## specify the real memory required per node. suffix can be K-M-G-T but if not present is MegaBytes by default" >> launcher.sh; \
echo "#SBATCH --mem=$(MEM)" >> launcher.sh; \
echo "## format for hosts as a range(steffe[1-4,10-15,20]), to specify hosts needed to satisfy resource requirements" >> launcher.sh; \
echo "#SBATCH --nodelist=$(NODELIST)" >> launcher.sh; \
echo "## to specify the number of processors per task, default is one" >> launcher.sh; \
echo "#SBATCH --cpus-per-task=$(CPUS_PER_TASK)" >> launcher.sh; \
echo "## to specify the number of tasks to be invoked on each node" >> launcher.sh; \
echo "#SBATCH --ntasks-per-node=$(NTASKS_PER_NODE)" >> launcher.sh; \
echo "## to specify the file of utput and error" >> launcher.sh; \
echo "#SBATCH --output $(OUTPUT)" >> launcher.sh; \
echo "#SBATCH --error $(ERROR)" >> launcher.sh; \
echo "" >> launcher.sh; \
echo "mpirun $(NAME)" >> launcher.sh; \
chmod +x launcher.sh; \
echo "The 'launcher.sh' script has been created and is ready to run."; \
# else \
# chmod +x launcher.sh; \
# fi
@echo; sbatch launcher.sh
@echo " To see job list you can use 'squeue'."
@echo " To cancel a job you can use 'scancel jobid'."
detail:
@echo "Compiler flags and options that mpicxx would use for compiling an MPI program: "
@mpicxx --showme:compile
@echo
@echo "Linker flags and options that mpicxx would use for linking an MPI program: "
@mpicxx --showme:link
clean:
## Sembra non funzionare read
# read -p "rm: remove all files \"./$(JOBNAME).*.out\" and \"./e$(JOBNAME).*.err\"? (y/n)" choice
# @if [ "$$choice" = "y" ]; then \
@echo rm -f ./$(JOBNAME).*.out
@for file in ./$(JOBNAME).*.out; do \
rm -f "$$file"; \
done
@echo rm -f ./e$(JOBNAME).*.err
@for file in ./e$(JOBNAME).*.err; do \
rm -f "$$file"; \
done
# fi
rm -f *~ $(OBJ)
fclean: clean
@if [ -f launcher.sh ]; then \
rm -i ./launcher.sh; \
fi
rm -f $(NAME)
re: fclean all
# mpicxx *.c
# mpirun/mpiexec ... //will run X copies of the program in the current run-time environment, scheduling(by default) in a round-robin fashion by CPU slot.
# SLIDE 5 Durastante
# The Script
# #!/bin/bash
# #SBATCH --job-name=dascegliere
# #SBATCH --mem=size[unis]
# #SBATCH -n 10
# #SBATCH --time=12:00:00
# #SBATCH --nodelist=lista
# #SBATCH --partition=ports
# #ecc..
# mpirun ...

@ -0,0 +1,67 @@
#include <mpi.h>
#include <iostream>
/*
MPI_Send(
void* data, //data buffer
int count, //count of elements
MPI_Datatype datatype, //type of element
int destination, //rank of receiver
int tag, //tag of message
MPI_Comm communicator //communicator
)
MPI_Recv(
void* data, //data buffer
int count, //count of elements
MPI_Datatype datatype, //type of element
int source, //rank of receiver
int tag, //tag of message
MPI_Comm communicator, //communicator
MPI_Status* status //about received message
)
*/
int main(int argc, char* argv[])
{
const long long PINGPONG_MAX = 3;
MPI_Init(NULL, NULL); //Initialize the MPI environment
int world_size;
MPI_Comm_size(MPI_COMM_WORLD, &world_size); //Get the number of processes
int world_rank;
MPI_Comm_rank(MPI_COMM_WORLD, &world_rank); //Get the number of process
if (world_size % 2 != 0)
{
std::cerr << "World size must be pair, actually is " << world_rank+1 << " (odd)" << std::endl;
MPI_Abort(MPI_COMM_WORLD, 1);
}
long long pingpong_count = 0;
int partner_rank;
if (world_rank % 2 == 0)
partner_rank = (world_rank + 1);
else
partner_rank = (world_rank - 1);
while (pingpong_count < PINGPONG_MAX)
{
if (world_rank%2 == pingpong_count%2)
{
pingpong_count++;
// MPI_Send(data, count, datatype, dest, tag, communicator)
MPI_Send(&pingpong_count, 1, MPI_LONG_LONG, partner_rank, 0, MPI_COMM_WORLD);
std::cout << "(PING)Process (rank " << world_rank+1 << "/" << world_size << ") sent '" << pingpong_count << "' to " << partner_rank+1 << std::endl;
}
else
{
// MPI_Recv(data, count, datatype, source, tag, communicator, status)
MPI_Recv(&pingpong_count, 1, MPI_LONG_LONG, partner_rank, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
std::cout << "(PONG)Process (rank " << world_rank+1 << "/" << world_size << ") received '" << pingpong_count << "' from " << partner_rank+1 << std::endl;
}
}
MPI_Finalize(); //Clean up the MPI environment
return 0;
}

@ -0,0 +1,72 @@
#include <mpi.h>
#include <iostream>
/*
MPI_Send(
void* data, //data buffer
int count, //count of elements
MPI_Datatype datatype, //type of element
int destination, //rank of receiver
int tag, //tag of message
MPI_Comm communicator //communicator
)
MPI_Recv(
void* data, //data buffer
int count, //count of elements
MPI_Datatype datatype, //type of element
int source, //rank of receiver
int tag, //tag of message
MPI_Comm communicator, //communicator
MPI_Status* status //about received message
)
*/
int main(int argc, char* argv[])
{
const long long PINGPONG_MAX = 3;
MPI_Init(NULL, NULL); //Initialize the MPI environment
int world_size;
MPI_Comm_size(MPI_COMM_WORLD, &world_size); //Get the number of processes
int world_rank;
MPI_Comm_rank(MPI_COMM_WORLD, &world_rank); //Get the number of process
if (world_size % 2 != 0)
{
std::cerr << "World size must be pair, actually is " << world_rank+1 << " (odd)" << std::endl;
MPI_Abort(MPI_COMM_WORLD, 1);
}
long long pingpong_count = 0;
int partner_rank;
if (world_rank % 2 == 0)
partner_rank = (world_rank + 1);
else
partner_rank = (world_rank - 1);
std::cout << "Process (rank " << world_rank+1 << "/" << world_size << "): my partner is " << partner_rank+1 << std::endl;
MPI_Barrier(MPI_COMM_WORLD);
if (world_rank == 0)
std::cout << "Passed Barrier" << std::endl;
while (pingpong_count < PINGPONG_MAX)
{
if (world_rank%2 == pingpong_count%2)
{
pingpong_count++;
// MPI_Send(data, count, datatype, dest, tag, communicator)
MPI_Send(&pingpong_count, 1, MPI_LONG_LONG, partner_rank, 0, MPI_COMM_WORLD);
std::cout << "Process (rank " << world_rank+1 << "/" << world_size << ") sent '" << pingpong_count << "' to " << partner_rank+1 << std::endl;
}
else
{
// MPI_Recv(data, count, datatype, source, tag, communicator, status)
MPI_Recv(&pingpong_count, 1, MPI_LONG_LONG, partner_rank, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
std::cout << "Process (rank " << world_rank+1 << "/" << world_size << ") received '" << pingpong_count << "' from " << partner_rank+1 << std::endl;
}
}
MPI_Finalize(); //Clean up the MPI environment
return 0;
}

@ -0,0 +1,57 @@
#include <mpi.h>
#include <iostream>
/*
MPI_Send(
void* data, //data buffer
int count, //count of elements
MPI_Datatype datatype, //type of element
int destination, //rank of receiver
int tag, //tag of message
MPI_Comm communicator //communicator
)
MPI_Recv(
void* data, //data buffer
int count, //count of elements
MPI_Datatype datatype, //type of element
int source, //rank of receiver
int tag, //tag of message
MPI_Comm communicator, //communicator
MPI_Status* status //about received message
)
*/
int main(int argc, char* argv[])
{
MPI_Init(NULL, NULL); //Initialize the MPI environment
int world_size;
MPI_Comm_size(MPI_COMM_WORLD, &world_size); //Get the number of processes
int world_rank;
MPI_Comm_rank(MPI_COMM_WORLD, &world_rank); //Get the number of process
long long token;
if (world_rank != 0)
{
// MPI_Recv(data, count, datatype, source, tag, communicator, status)
MPI_Recv(&token, 1, MPI_LONG_LONG, world_rank-1, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); //We use MPI_Recv to force not 0 process to wait the token, this because the function is blocking
std::cout << "Process (rank " << world_rank+1 << "/" << world_size << ") received '" << token << "' from " << world_rank << std::endl;
}
else
token = 19; //Setup the token value if im processor 0
// MPI_Send(data, count, datatype, dest, tag, communicator)
MPI_Send(&token, 1, MPI_LONG_LONG, (world_rank+1)%world_size, 0, MPI_COMM_WORLD); //As first time for process 0, each process after getting the token, send it to the next(+1) process
//Make process 0 finally get able to retrieve his token after passing it
if (world_rank == 0)
{
// MPI_Recv(data, count, datatype, source, tag, communicator, status)
MPI_Recv(&token, 1, MPI_LONG_LONG, (world_rank-1)%world_size, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); //To recive the last Send token before ending
std::cout << "Process (rank " << world_rank+1 << "/" << world_size << ") received '" << token << "' from " << (world_rank)%world_size << std::endl;
}
MPI_Finalize(); //Clean up the MPI environment
return 0;
}

@ -0,0 +1,28 @@
# Usage of cluster with slurm
Information about che cluster and partitions could be obtained doing:
sinfo
scontrol show partition
With srun we cen reserve a subset of the resources of the compute nodes for out tasks:
More parameters are provided in the documentation.
srun --partition=production --time=00:30:00 --nodes=1 --pty bash -i
(take for half hour 1 node from the production partition and launch the command 'bash -i')
To simplify all this expression we can create a basic batch file with all directives and the code to run.
Is a good thing to redirect the output in a file to save it securely.
sbatch run.sh
Example of batch script
#!/bin/bash
#SBATCH -n 10
#SBATCH --time=00:30:00
...
mpirun ./my_exec_code
To see the queue of jobs in the system we can run
squeue
squeue --long
To remove a job from the queue we can run
scancel <job/process_id>

@ -0,0 +1,48 @@
#include <mpi.h>
#include <iostream>
/*
MPI_Reduce(
void* send_data, //the element to apply the reduce
void* recv_data, //the reduce result
int count, //size of result in recv_data is sizeof(datatype)*count
MPI_Datatype datatype, //type of element
MPI_Op op, //operation to apply. Can also be defined custom operation
int root, //rank of the process that receive result
MPI_Comm comm //communicator
)
*/
int main(int argc, char* argv[])
{
MPI_Init(&argc, &argv); //Initialize the MPI environment //TODO perche qui non (NULL, NULL) ??
int world_size;
MPI_Comm_size(MPI_COMM_WORLD, &world_size); //Get the number of processes
int world_rank;
MPI_Comm_rank(MPI_COMM_WORLD, &world_rank); //Get the number of process
float rand_nums[4] = {1,2,3,42};
int size = 4; //Related to the correct dize of rand_nums array
float local_sum = 0;
for(int i=0; i<size; i++) //each process compute the sum locally
local_sum += rand_nums[i];
std::cout << "Process (rank " << world_rank+1 << "/" << world_size << ") sum is " << local_sum << " avg is " << local_sum/size << std::endl;
if(world_rank%2 == 0) //explicative example part
local_sum = 1;
float global_sum;
//MPI_Reduce(send_data, recv_data, count, datatype, op, root, comm)
MPI_Reduce(&local_sum, &global_sum, 1, MPI_FLOAT, MPI_SUM, 2, MPI_COMM_WORLD);
/*Il processo root(suppongo), esegue l'operazione specificata su ogni elemento send_data che gli altri processi hanno riempito*/
if(world_rank == 2)
std::cout << " USING REDUCE:" << std::endl << "Process (rank " << world_rank+1 << "/" << world_size << ") sum is " << global_sum << " avg of averages is " << global_sum/(world_size*size) << std::endl;
MPI_Finalize(); //Clean up the MPI environment
return 0;
}

@ -0,0 +1,47 @@
#include <mpi.h>
#include <iostream>
/*
MPI_Scatter(
const void* sendbuf, //the element to send
int sendcount, //number of element to send
MPI_Datatype sendtype, //type of element
void* recvbuf, //buffer to receive
int recvcount, //number of elements to receive from each process
MPI_Datatype recvtype, //type of element
int root, //rank of the process that have to receive
MPI_Comm comm //communicator
)
*/
int main(int argc, char* argv[])
{
int sendbuf[16] = {1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16};
int recvbuf[4];
MPI_Init(&argc, &argv); //Initialize the MPI environment //TODO perche qui non (NULL, NULL) ??
int world_size;
MPI_Comm_size(MPI_COMM_WORLD, &world_size); //Get the number of processes
if (world_size != 4)
{
std::cerr << "World size must be 4 for this example, actually is " << world_size << std::endl;
MPI_Abort(MPI_COMM_WORLD, 1);
}
int world_rank;
MPI_Comm_rank(MPI_COMM_WORLD, &world_rank); //Get the number of process
//MPI_Scatter(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, root, comm)
MPI_Scatter(sendbuf, 5, MPI_INT, recvbuf, 6, MPI_INT, 0, MPI_COMM_WORLD);
/*Il nodo di root comprende se stesso per dividere i dati e se ne invia una perte.Il counter degli
elementi ricevuti deve essere almeno della dimensione di quelli mandati per funzionare. Quando vado a decidere quanti elementi mandare
posso mettere qualsiasi numero, anche se non stanno nel recvbuf, vengono salvati solo quelli per cui c'e spazio. Poiche facendo
cosi finiranno prima i valori, se rimane da mandare solo un numero e ne devono essere inviati N ne viene inviato
solo 1 (solo fino a svuotare il sendbuf)*/
std::cout << "Process (rank " << world_rank+1 << "/" << world_size << ") recived: " << recvbuf[0] << ", " << recvbuf[1] << ", " << recvbuf[2] << ", " << recvbuf[3] <<std::endl;
MPI_Finalize(); //Clean up the MPI environment
return 0;
}

@ -0,0 +1,120 @@
# Program for compiling MPI cpp programs
CC = mpicxx
CXX = mpicxx
# Extra flags to give to the processor compiler
CFLAGS = -g
#TODO -Wall -Werror -Wextra
#
SRC = main.cpp
OBJ = $(SRC:.cpp=.o)
NAME = merge_sort_enhanced
# SBATCH parameters
JOBNAME = Distributed_Sorting
PARTITION = production
TIME = 12:00:00
MEM = 3G
NODELIST = steffe[12-14]
CPUS_PER_TASK = 1
NTASKS_PER_NODE = 1
OUTPUT = ./%x.%j.out
ERROR = ./e%x.%j.err
#
.PHONY: all run detail clean fclean re
.o: .cpp
$(CC) -c $(CFLAGS) $< -o $@
all: $(NAME)
$(NAME): $(OBJ)
$(CC) $(CFLAGS) -o $@ $^
run: $(NAME)
## Se modifico il Makefile con i parametri di sbatch ricreo anche il launcher.sh
# @if ! [ -f launcher.sh ]; then \
@echo "#!/bin/bash" > launcher.sh; \
echo "## sbatch is the command line interpreter for Slurm" >> launcher.sh; \
echo "" >> launcher.sh; \
echo "## specify the name of the job in the queueing system" >> launcher.sh; \
echo "#SBATCH --job-name=$(JOBNAME)" >> launcher.sh; \
echo "## specify the partition for the resource allocation. if not specified, slurm is allowed to take the default(the one with a star *)" >> launcher.sh; \
echo "#SBATCH --partition=$(PARTITION)" >> launcher.sh; \
echo "## format for time is days-hours:minutes:seconds, is used as time limit for the execution duration" >> launcher.sh; \
echo "#SBATCH --time=$(TIME)" >> launcher.sh; \
echo "## specify the real memory required per node. suffix can be K-M-G-T but if not present is MegaBytes by default" >> launcher.sh; \
echo "#SBATCH --mem=$(MEM)" >> launcher.sh; \
echo "## format for hosts as a range(steffe[1-4,10-15,20]), to specify hosts needed to satisfy resource requirements" >> launcher.sh; \
echo "#SBATCH --nodelist=$(NODELIST)" >> launcher.sh; \
echo "## to specify the number of processors per task, default is one" >> launcher.sh; \
echo "#SBATCH --cpus-per-task=$(CPUS_PER_TASK)" >> launcher.sh; \
echo "## to specify the number of tasks to be invoked on each node" >> launcher.sh; \
echo "#SBATCH --ntasks-per-node=$(NTASKS_PER_NODE)" >> launcher.sh; \
echo "## to specify the file of utput and error" >> launcher.sh; \
echo "#SBATCH --output $(OUTPUT)" >> launcher.sh; \
echo "#SBATCH --error $(ERROR)" >> launcher.sh; \
echo "" >> launcher.sh; \
echo "mpirun $(NAME) $(ARGS)" >> launcher.sh; \
chmod +x launcher.sh; \
echo "The 'launcher.sh' script has been created and is ready to run."; \
# else \
# chmod +x launcher.sh; \
# fi
@echo; sbatch launcher.sh
@echo " To see job list you can use 'squeue'."
@echo " To cancel a job you can use 'scancel jobid'."
detail:
@echo "Compiler flags and options that mpicxx would use for compiling an MPI program: "
@mpicxx --showme:compile
@echo
@echo "Linker flags and options that mpicxx would use for linking an MPI program: "
@mpicxx --showme:link
clean:
## Sembra non funzionare read
# read -p "rm: remove all files \"./$(JOBNAME).*.out\" and \"./e$(JOBNAME).*.err\"? (y/n)" choice
# @if [ "$$choice" = "y" ]; then \
@echo rm -f ./$(JOBNAME).*.out
@for file in ./$(JOBNAME).*.out; do \
rm -f "$$file"; \
done
@echo rm -f ./e$(JOBNAME).*.err
@for file in ./e$(JOBNAME).*.err; do \
rm -f "$$file"; \
done
# fi
rm -f *~ $(OBJ)
fclean: clean
rm -f ./launcher.sh;
rm -f ./nohup.out
rm -f /mnt/raid/tmp/SortedRun*
rm -f /mnt/raid/tmp/*.sort
rm -f $(NAME)
re: fclean all
# mpicxx *.c
# mpirun/mpiexec ... //will run X copies of the program in the current run-time environment, scheduling(by default) in a round-robin fashion by CPU slot.
# SLIDE 5 Durastante
# The Script
# #!/bin/bash
# #SBATCH --job-name=dascegliere
# #SBATCH --mem=size[unis]
# #SBATCH -n 10
# #SBATCH --time=12:00:00
# #SBATCH --nodelist=lista
# #SBATCH --partition=ports
# #ecc..
# mpirun ...

Binary file not shown.

@ -0,0 +1,65 @@
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <sys/time.h>
//The number of numbers that populate the file
#define TEST_SIZE 1
#define BUF_SIZE 2097152//16 MegaBytes
#define BUF_SIZE_HGB 67108864//512 MegaBytes
#define BUF_SIZE_GB 134217728//1 GigaBytes
#define BUF_SIZE_10GB 1342177280//10 GigaBytes
/*
Generate a file of given size filling it with random 8 bytes numbers.
Return the number of microseconds needed for the generation and writing on disk.
*/
long long benchmark_generate_file(const char *pathname, unsigned int seed)
{
struct timeval start, end;
unsigned int size = 0;
FILE *file;
int64_t *buffer;
srand(seed);
// if (access(pathname, F_OK) == 0)
// return -2;//File already exist
if (size == 0)
{
printf("Insert a multiple of %d(%d MegaBytes) for the size of the target file:\n", BUF_SIZE_GB, BUF_SIZE_GB/131072);
while (1)
{
if (scanf("%u", &size) != 1)
{
printf("Insert a valid size for the file:\n");
while (getchar() != '\n');//Clear the input buffer to prevent an infinite loop
}
else
break;
}
printf("Future file dimension: (%d * %u) Mb\n",BUF_SIZE_GB/131072, size);
}
buffer = (int64_t*)malloc(BUF_SIZE_GB * sizeof(int64_t));
if (!buffer)
return -1;//Something went wrong
file = fopen(pathname, "wb");
if (!file){
free(buffer);
return -1;//Something went wrong
}
gettimeofday(&start, NULL);//Timer Start
for (unsigned int i = 0; i < size; i++)
{
for(unsigned int j=0; j < BUF_SIZE_GB; j++)
{
buffer[j] = ((int64_t)rand() << 32) | rand();
// printf("%ld\n", buffer[j]);
}
fwrite(buffer, sizeof(int64_t), BUF_SIZE_GB, file);
}
gettimeofday(&end, NULL);//Timer Stop
free(buffer);
fclose(file);
return (end.tv_sec - start.tv_sec) * 1000000LL + (end.tv_usec - start.tv_usec);
}

@ -0,0 +1,20 @@
#include <stdio.h>
long long benchmark_generate_file(const char *pathname, unsigned int seed);
long long benchmark_reader_file(const char *pathname);
int is_sorted(const char *pathname);
void print_partial_file(const char *pathname, unsigned long long startOffset, unsigned long long endOffset);
void print_all_file(const char *pathname);
int main(int argc, char* argv[]){
printf("scrittura file %s: time(%lld microseconds)\n","testiamolo.bin",benchmark_generate_file("testiamolo.bin",42));//TODO cambiare il seed una volta completatp
// printf("POI\n");
// printf("lettura file %s: time(%lld microseconds)\n","testiamolo.bin",benchmark_reader_file("testiamolo.bin"));
// printf("POI\n");
// printf("sono uguali? %d\n",is_sorted("testiamolo.bin"));
// print_partial_file("testiamolo.bin",2,4);
// printf("POI\n");
// print_all_file("testiamolo.bin");
return 0;
}

@ -0,0 +1,107 @@
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <sys/time.h>
/*
*/
long long benchmark_reader_file(const char *pathname)
{
struct timeval start, end;
unsigned long long startOffset = 0; // Start from the first number (0-based index)
unsigned long long endOffset = -1; // Read up to the X number (exclusive), -1 in the last one because is unsigned
FILE *file;
int64_t num;
file = fopen(pathname, "rb");
if (!file)
return -1;//Something went wrong
fseek(file, startOffset * sizeof(int64_t), SEEK_SET);
gettimeofday(&start, NULL);//Timer Start
unsigned long long currentOffset = startOffset;
while (currentOffset < endOffset && fread(&num, sizeof(int64_t), 1, file) == 1)
currentOffset++;
gettimeofday(&end, NULL);//Timer Stop
fclose(file);
return (end.tv_sec - start.tv_sec) * 1000000LL + (end.tv_usec - start.tv_usec);
}
/*
*/
int is_sorted(const char *pathname)
{
unsigned long long startOffset = 0; // Start from the first number (0-based index)
unsigned long long endOffset = -1; // Read up to the X number (exclusive), -1 in the last one because is unsigned
FILE *file;
int64_t num;
long long int count=1;
file = fopen(pathname, "rb");
if (!file)
return -1;//Something went wrong
fseek(file, startOffset * sizeof(int64_t), SEEK_SET);
unsigned long long currentOffset = startOffset;
int64_t tmp;
fread(&tmp, sizeof(int64_t), 1, file); // Take first element(number) in the file
currentOffset++;
while (currentOffset < endOffset && fread(&num, sizeof(int64_t), 1, file) == 1)
{
count++;
if(tmp > num){
fclose(file);
// printf("non ordinati\n");
return 0;
}
tmp = num;
currentOffset++;
}
fclose(file);
// printf("%lld numeri ordinati",count);
return 1;
}
/*
Start from the start number (0-based index), read up to the end number (exclusive).
*/
void print_partial_file(const char *pathname, unsigned long long startOffset, unsigned long long endOffset)
{
FILE *file;
int64_t num;
file = fopen(pathname, "rb");
if (!file)
return;//Something went wrong
fseek(file, startOffset * sizeof(int64_t), SEEK_SET);
unsigned long long currentOffset = startOffset;
while (currentOffset < endOffset && fread(&num, sizeof(int64_t), 1, file) == 1)
{
printf("%lld\n", (long long)num);
currentOffset++;
}
fclose(file);
return;
}
/*
*/
void print_all_file(const char *pathname)
{
unsigned long long startOffset = 0; // Start from the first number (0-based index)
unsigned long long endOffset = -1; // Read up to the X number (exclusive), -1 in the last one because is unsigned
FILE *file;
int64_t num;
file = fopen(pathname, "rb");
if (!file)
return;//Something went wrong
fseek(file, startOffset * sizeof(int64_t), SEEK_SET);
unsigned long long currentOffset = startOffset;
while (currentOffset < endOffset && fread(&num, sizeof(int64_t), 1, file) == 1)
{
printf("%lld - ", (long long)num);
currentOffset++;
}
printf("EOF\n");
fclose(file);
return;
}

@ -0,0 +1,357 @@
#include <mpi.h>
#include <fcntl.h>
#include <dirent.h>
#include <unistd.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <queue>
#include <ctime>
#include <string>
#include <vector>
#include <cstdio>
#include <fstream>
#include <cstdlib>
#include <iostream>
#include <algorithm>
/*
8-Byte numbers in 256KB = 32768
8-Byte numbers in 1MB = 131072
8-Byte numbers in 1GB = 134217728
All the programm assume numbers as 64_bits.
To visualize binary files in bash can be used:
od -t d8 -A n binaryfile.bin #For in use format
od -t d8 -A n --endian=little binaryfile.bin #For little-endian format
od -t d8 -A n --endian=big binaryfile.bin #For big-endian format
*/
#define BUFFERSIZE 32768
#define CACHENUM 130000
#define RAMNUM 268435456
#define ALLOW_BUFFER 1
#define ALLOW_SNOWPLOW 1
void sortRuns(unsigned long long fileSize, unsigned long long sliceSize, unsigned long long maxLoop, FILE* file, int id, int mpiRank, int mpiSize)
{
unsigned long long startOffset, endOffset, currentOffset; //The interval is [startOffset, endOffset)
double startTot, start, end;
int64_t num;
std::vector<int64_t> bigVect;
int64_t buffer[BUFFERSIZE];
bigVect.reserve(sliceSize);
startTot = MPI_Wtime(); //Microsecond precision. Can't use time(), because each process will have a different "zero" time
start = MPI_Wtime();
for(unsigned long long l = 0; l < maxLoop; l++) //Populate the vector with the values in the file
{
startOffset = sliceSize * (mpiRank + (mpiSize * l));
if (startOffset >= fileSize)
break;
endOffset = startOffset + sliceSize;
fseek(file, startOffset * sizeof(int64_t), SEEK_SET);
currentOffset = startOffset;
bigVect.clear();
if (ALLOW_BUFFER) //Branch to test performance with and without buffer
{
while (currentOffset < endOffset)
{
unsigned long long elementsToRead = std::min(endOffset - currentOffset, static_cast<unsigned long long>(BUFFERSIZE)); //It's important to check because if the difference between endOffset and startOffset is smaller than BUFFERSIZE we don't have to read further
unsigned long long elementsRead = fread(buffer, sizeof(int64_t), elementsToRead, file);
for (unsigned long long i = 0; i < elementsRead; ++i)
{
bigVect.push_back(buffer[i]);
}
currentOffset += elementsRead; //Increment currentOffset based on the number of elements read
if (elementsRead < BUFFERSIZE) // Check if we have reached the end of the file
break;
}
}
else
{
while (currentOffset < endOffset && fread(&num, sizeof(int64_t), 1, file) == 1)
{
bigVect.push_back(num);
currentOffset++;
}
}
end = MPI_Wtime();
std::cout << " " << end-start << "s" << " => Time to read file from offset " << startOffset << " to " << endOffset << " in Process " << mpiRank+1 << "/" << mpiSize << " memory" << std::endl;
start = MPI_Wtime();
sort(bigVect.begin(), bigVect.end());
end = MPI_Wtime();
std::cout << " " << end-start << "s" << " => Time to sort elements in Process " << mpiRank+1 << "/" << mpiSize << " memory" << std::endl;
start = MPI_Wtime();
std::string templateName = "/mnt/raid/tmp/SortedRun" + std::to_string(id) + "_XXXXXX"; //If absolute path does not exist the temporary file will not be created
int tmpFile = mkstemp(&templateName[0]); //Create a temporary file based on template
if (tmpFile == -1)
{
std::cout << "Error creating temporary file" << std::endl;
MPI_Abort(MPI_COMM_WORLD, 1);
}
for (unsigned long long i = 0; i < bigVect.size(); ++i) //Write the ordered number in a temp file
{
if (ALLOW_BUFFER) //Branch to test performance with and without buffer
{
buffer[i % BUFFERSIZE] = bigVect[i];
if ((i + 1) % BUFFERSIZE == 0 || i == bigVect.size() - 1)
{
ssize_t tw = write(tmpFile, buffer, sizeof(int64_t) * ((i % BUFFERSIZE) + 1));
if (tw == -1)
{
std::cout << "Error writing to file" << std::endl;
MPI_Abort(MPI_COMM_WORLD, 1);
}
}
}
else
{
int64_t elem = bigVect[i];
ssize_t tw = write(tmpFile, &elem, sizeof(int64_t));
if (tw == -1)
{
std::cout << "Error writing to file" << std::endl;
MPI_Abort(MPI_COMM_WORLD, 1);
}
}
}
off_t sz = lseek(tmpFile, 0, SEEK_END);
if (sz == 0)
{
if (close(tmpFile) == -1)
{
std::cout << "Error closing the file" << std::endl;
MPI_Abort(MPI_COMM_WORLD, 1);
}
if (unlink(&templateName[0]) == -1)
{
std::cout << "Error unlinking file" << std::endl;
MPI_Abort(MPI_COMM_WORLD, 1);
}
}
if (close(tmpFile) == -1)
{
std::cout << "Error closing the file" << std::endl;
MPI_Abort(MPI_COMM_WORLD, 1);
}
end = MPI_Wtime();
std::cout << " " << end-start << "s" << " => Time to write '" << templateName << "' and fill it up with " << sz/8 << " sorted elements by Process " << mpiRank+1 << "/" << mpiSize << std::endl;
start = MPI_Wtime();
}
end = MPI_Wtime();
std::cout << end-startTot << "s" << " => Time function sortRuns() in Process " << mpiRank+1 << "/" << mpiSize << std::endl;
}
void snowPlowRuns(unsigned long long fileSize, unsigned long long sliceSize, unsigned long long maxLoop, FILE* file, int id, int mpiRank, int mpiSize)
{
if (ALLOW_SNOWPLOW)
std::cout << "Can't compute files of size bigger then " << RAMNUM * mpiSize / 134217728 << "Gb with " << mpiSize << " processes (currently file is " << fileSize / 134217728 << "Gb)" << std::endl;
else
{
maxLoop = (fileSize / (RAMNUM * mpiSize)) + 1;
sortRuns(fileSize, RAMNUM, maxLoop, file, id, mpiRank, mpiSize);
}
}
void kMerge(const std::string &argFile, int id, int mpiRank, int mpiSize)
{
std::string fileDir = "/mnt/raid/tmp/";
std::string pattern = "SortedRun" + std::to_string(id) + "_";
std::vector<int> fds; //To store the file descriptor of each file to merge
std::vector<std::string> fns; //To store the file name of each file to delete after merge
size_t lastSlash = argFile.find_last_of('/');
std::string nameOnly = (lastSlash != std::string::npos) ? argFile.substr(lastSlash + 1) : argFile;
std::string finalFile = "/mnt/raid/tmp/" + nameOnly + (ALLOW_BUFFER == 1 ? ".buf" : "") + ".sort";
double start, end;
int fileCount = 0;
DIR *dir = opendir(fileDir.c_str());
if (dir)
{
struct dirent *entry;
while ((entry = readdir(dir)) != nullptr)
{
if (entry->d_type == DT_REG) //Check if it's a regular file
{
std::string filename = entry->d_name;
if (filename.find(pattern) != std::string::npos) //Check if the file name matches the pattern
{
std::string tmpFile = fileDir + "/" + filename;
int fd = open(tmpFile.c_str(), O_RDONLY); //Open the file and save the file descriptor
if (fd != -1)
{
fds.push_back(fd);
fns.push_back(tmpFile);
fileCount++;
}
else
std::cout << "Error opening file '" << tmpFile << "' by Process " << mpiRank+1 << "/" << mpiSize << std::endl;
}
}
}
closedir(dir);
}
else
{
std::cout << "Error opening directory '" << fileDir << "' by Process " << mpiRank+1 << "/" << mpiSize << std::endl;
MPI_Abort(MPI_COMM_WORLD, 1);
}
int fdFinal = open(finalFile.c_str(), O_WRONLY | O_CREAT | O_EXCL, S_IRUSR | S_IWUSR); //Open the file for writing only, creating it if it doesn't exist and not overwrite if it exists
if (fdFinal == -1)
{
std::cout << "Error opening or creating final file '" << finalFile << "' by Process " << mpiRank+1 << "/" << mpiSize << std::endl;
MPI_Abort(MPI_COMM_WORLD, 1);
}
std::cout << std::endl << "Starting the merge process for " << fileCount << " files" << std::endl << std::endl;
start = MPI_Wtime();
std::priority_queue<std::pair<int64_t, int>, std::vector<std::pair<int64_t, int>>, std::greater<std::pair<int64_t, int>>> minHeap; //Creating a Min Heap using a priority queue
int64_t tmpValue;
for (int fd : fds) //Populate the Min Heap with initial values from each file descriptor
{
if (read(fd, &tmpValue, sizeof(int64_t)) == sizeof(int64_t))
minHeap.push({tmpValue, fd});
else
{
std::cout << "Error reading from file descriptor by Process " << mpiRank+1 << "/" << mpiSize << std::endl;
MPI_Abort(MPI_COMM_WORLD, 1);
}
}
int tmpfd;
int64_t tmpValue2;
int64_t buffer[BUFFERSIZE];
unsigned long long i = 0;
while (!minHeap.empty()) //Write sorted elements to the temporary file
{
tmpValue = minHeap.top().first;
tmpfd = minHeap.top().second;
if (read(tmpfd, &tmpValue2, sizeof(int64_t)) == sizeof(int64_t)) //Read another integer from the same file descriptor
{
minHeap.pop();
minHeap.push({tmpValue2, tmpfd});
}
else //If no more values can be read
{
minHeap.pop();
if (close(tmpfd) == -1)
{
std::cout << "Error closing the file descriptor by Process " << mpiRank+1 << "/" << mpiSize << std::endl;
MPI_Abort(MPI_COMM_WORLD, 1);
}
}
if (ALLOW_BUFFER) //Branch to test performance with and without buffer
{
buffer[i % BUFFERSIZE] = tmpValue;
if ((i + 1) % BUFFERSIZE == 0 || minHeap.empty())
{
ssize_t tw = write(fdFinal, buffer, sizeof(int64_t) * ((i % BUFFERSIZE) + 1));
if (tw == -1)
{
std::cout << "Error writing to file" << std::endl;
MPI_Abort(MPI_COMM_WORLD, 1);
}
}
i++;
}
else
{
ssize_t tw = write(fdFinal, &tmpValue, sizeof(int64_t));
if (tw == -1)
{
std::cout << "Error writing to file" << std::endl;
MPI_Abort(MPI_COMM_WORLD, 1);
}
}
}
for (const std::string &fn : fns) //Remove all temporary files after merging them
{
if (unlink(&fn[0]) == -1)
{
std::cout << "Error unlinking file '" << fn << "' by Process " << mpiRank+1 << "/" << mpiSize << std::endl;
MPI_Abort(MPI_COMM_WORLD, 1);
}
}
end = MPI_Wtime();
std::cout << end-start << "s" << " => Time function kMerge() in Process " << mpiRank+1 << "/" << mpiSize << std::endl;
std::cout << std::endl << "Sorted file '" << finalFile << "'" << std::endl;
}
int main(int argc, char* argv[])
{
MPI_Init(&argc, &argv); //Initialize the MPI environment
double startGlobal, endGlobal;
int id, mpiSize, mpiRank;
MPI_Comm_size(MPI_COMM_WORLD, &mpiSize); //Get the number of processes
MPI_Comm_rank(MPI_COMM_WORLD, &mpiRank); //Get the number of process
if (mpiRank == 0)
{
startGlobal = MPI_Wtime();
std::srand(std::time(0));
id = std::rand() % 10000; //Get a random id number to recognize files of different executions
}
MPI_Bcast(&id, 1, MPI_INT, 0, MPI_COMM_WORLD);
if (argc != 2)
{
if (mpiRank == 0)
{
std::cout << "Usage: " << argv[0] << " <file_to_parse>" << std::endl;
std::cout << "It returns a file with extension '.sort' in the same directory of the not-parsed one. Make sure to have space before." << std::endl;
std::cout << "Use arguments in the make as ARGS=\"stuff\". Example 'make run ARGS=\"/path/to/file\"'." << std::endl;
}
MPI_Finalize(); //Clean up the MPI environment
return 0;
}
FILE *file;
unsigned long long fileSize, slices, sliceSize, maxLoop;
file = fopen(argv[1], "rb"); //Open the file in mode rb (read binary)
if(!file)
{
std::cout << "Error opening file: " << argv[1] << std::endl;
MPI_Abort(MPI_COMM_WORLD, 1);
}
fseek(file,0,SEEK_END);
fileSize = ftell(file) / 8; //Size in bytes of the file, correspond to the number of numbers to parse. Each number is 8 bytes
if (mpiRank == 0)
std::cout << "Sorting file '" << argv[1] << "' of " << fileSize << " elements" << std::endl << std::endl;
if (fileSize < (CACHENUM * mpiSize))
slices = (fileSize / CACHENUM) + 1;
else if (fileSize < (RAMNUM * mpiSize)) //TODO add more granularity considering double RAM for snow plow technique
slices = (fileSize / RAMNUM) + 1;
else
slices = mpiSize + 1;
sliceSize = (fileSize / slices) + 1; //Each process divides a number of 8-byte integers based on the size of the starting file, Attualmente dentro create Runs
maxLoop = (slices / mpiSize) + 1;
if (sliceSize > RAMNUM)
snowPlowRuns(fileSize, sliceSize, maxLoop, file, id, mpiRank, mpiSize);
else
sortRuns(fileSize, sliceSize, maxLoop, file, id, mpiRank, mpiSize);
fclose(file);
MPI_Barrier(MPI_COMM_WORLD); //Blocks the caller until all processes in the communicator have called it
if(mpiRank==0)
{
kMerge(argv[1], id, mpiRank, mpiSize);
endGlobal = MPI_Wtime();
std::cout << (endGlobal-startGlobal)/60.0 << "min" << " => FULL EXECUTION TIME" << std::endl;
std::cout << std::endl << "To visualize binary files in bash can be used:" << std::endl;
std::cout << "od -t d8 -A n binaryfile.bin #For in use format" << std::endl;
std::cout << "od -t d8 -A n --endian=little binaryfile.bin #For little-endian format" << std::endl;
std::cout << "od -t d8 -A n --endian=big binaryfile.bin #For big-endian format" << std::endl;
}
MPI_Finalize(); //Clean up the MPI environment
return 0;
}
Loading…
Cancel
Save