diff --git a/MPI_basic/AllReduce/StdDeviation.cpp b/MPI_basic/AllReduce/StdDeviation.cpp new file mode 100644 index 0000000..8d4b4a4 --- /dev/null +++ b/MPI_basic/AllReduce/StdDeviation.cpp @@ -0,0 +1,57 @@ +#include +#include +#include + +/* +MPI_Allreduce( + void* send_data, //the element to apply the reduce + void* recv_data, //the reduce result + int count, //size of result in recv_data is sizeof(datatype)*count + MPI_Datatype datatype, //type of element + MPI_Op op, //operation to apply. Can also be defined custom operation + MPI_Comm comm //communicator + ) +*/ +int main(int argc, char* argv[]) +{ + MPI_Init(&argc, &argv); //Initialize the MPI environment //TODO perche qui non (NULL, NULL) ?? + + int world_size; + MPI_Comm_size(MPI_COMM_WORLD, &world_size); //Get the number of processes + + int world_rank; + MPI_Comm_rank(MPI_COMM_WORLD, &world_rank); //Get the number of process + + float rand_nums[10] = {1,2,3,42,55,66,71,82,93,190}; + int size = 10; //Related to the correct dize of rand_nums array + + float local_sum = 0; + + for(int i=0; i +#include +#include + +int main(int argc, char* argv[]) +{ + double start, end; + + MPI_Init(&argc, &argv); //Initialize the MPI environment //TODO perche qui non (NULL, NULL) ?? + + int world_size; + MPI_Comm_size(MPI_COMM_WORLD, &world_size); //Get the number of processes + + int world_rank; + MPI_Comm_rank(MPI_COMM_WORLD, &world_rank); //Get the number of process + + MPI_Barrier(MPI_COMM_WORLD); + start = MPI_Wtime(); //Microsecond precision. Can't use time(), because each process will have a different "zero" time + + sleep(30); + + MPI_Barrier(MPI_COMM_WORLD); + end = MPI_Wtime(); + + if(world_rank == 0) + std::cout << "Process (rank " << world_rank+1 << "/" << world_size << "): time of work is " << end-start << "seconds" << std::endl; + + MPI_Finalize(); //Clean up the MPI environment + return 0; +} \ No newline at end of file diff --git a/MPI_basic/Broadcast/BroadcastTest.cpp b/MPI_basic/Broadcast/BroadcastTest.cpp new file mode 100644 index 0000000..7d73425 --- /dev/null +++ b/MPI_basic/Broadcast/BroadcastTest.cpp @@ -0,0 +1,44 @@ +#include +#include +#include +#include + +/* +MPI_Bcast( + void* buffer, //the element to send + int count, //number of element to send + MPI_Datatype datatype, //type of element + int root, //rank of the process that have to sand value to others + MPI_Comm comm //communicator + ) +*/ +int main(int argc, char* argv[]) +{ + MPI_Init(&argc, &argv); //Initialize the MPI environment //TODO perche qui non (NULL, NULL) ?? + + int world_size; + MPI_Comm_size(MPI_COMM_WORLD, &world_size); //Get the number of processes + + int recvbuf[4*world_size]; + + int world_rank; + MPI_Comm_rank(MPI_COMM_WORLD, &world_rank); //Get the number of process + + long long offset; + + std::srand(std::time(0) + world_rank); + offset = std::rand() % 100; //Give a random value to initialize element of each process + + std::cout << "Process (rank " << world_rank+1 << "/" << world_size << ") recived: " << offset << std::endl; + + MPI_Barrier(MPI_COMM_WORLD); //Used to not mess with the prints + + //MPI_Bcast(buffer, count, datatype, root, comm) + MPI_Bcast(&offset, 1, MPI_LONG_LONG, 0, MPI_COMM_WORLD); + /*Posso utilizzare BroadCast anche con un solo nodo, invia a se stesso. Non posso usilizzare come root un nodo con rank maggiore della size.*/ + + std::cout <<"Process (rank " << world_rank+1 << "/" << world_size << ") recived: " << offset << std::endl; + + MPI_Finalize(); //Clean up the MPI environment + return 0; +} \ No newline at end of file diff --git a/MPI_basic/Communicators/SplitComm.cpp b/MPI_basic/Communicators/SplitComm.cpp new file mode 100644 index 0000000..36e12c3 --- /dev/null +++ b/MPI_basic/Communicators/SplitComm.cpp @@ -0,0 +1,42 @@ +#include +#include + +/* +Creates new communicators by "splitting" a communicator into a group of sub-communicators based on the input values color and key. +MPI_Comm_split( + MPI_Comm comm, //communicator to split + int color, //determines to which new communicator each process will belong. Same color same communicator + int key, //determ the order(rank) within each new communicator. The smallest key value is assigned to 0, next to 1 and so on + MPI_Comm* newcomm //new communicator + ) +*/ +int main(int argc, char* argv[]) +{ + MPI_Init(NULL, NULL); //Initialize the MPI environment + + int world_size; + MPI_Comm_size(MPI_COMM_WORLD, &world_size); //Get the number of processes + + int world_rank; + MPI_Comm_rank(MPI_COMM_WORLD, &world_rank); //Get the number of process + + //In the following code we suppose to have 10 processes and we want to split them in sub groups "rows" as they are a 2x5 Matrix + + int color = world_rank / 2; //Determine color based on row + + MPI_Comm row_comm; //To free + //MPI_Comm_split(comm, color, key, newcomm) + MPI_Comm_split(MPI_COMM_WORLD, color, -world_rank, &row_comm); + + int row_size; + MPI_Comm_size(row_comm, &row_size); //Get the number of processes related to the communicator + int row_rank; + MPI_Comm_rank(row_comm, &row_rank); //Get the number of process related to the communicator + + std::cout << "Process (rank " << world_rank+1 << "/" << world_size << ") is splitted to sub-group " << row_rank+1 << "/" << row_size << "-" << color << " (row/size-color)" << std::endl; + + MPI_Comm_free(&row_comm); //We have to free the communicator that we created above + + MPI_Finalize(); //Clean up the MPI environment + return 0; +} \ No newline at end of file diff --git a/MPI_basic/Gather/GatherTest.cpp b/MPI_basic/Gather/GatherTest.cpp new file mode 100644 index 0000000..70a932e --- /dev/null +++ b/MPI_basic/Gather/GatherTest.cpp @@ -0,0 +1,51 @@ +#include +#include + +/* +MPI_Gather( + const void* sendbuf, //the element to send + int sendcount, //number of element to send + MPI_Datatype sendtype, //type of element + void* recvbuf, //buffer to receive + int recvcount, //number of elements to receive from each process + MPI_Datatype recvtype, //type of element + int root, //rank of the process that have to receive + MPI_Comm comm //communicator + ) +*/ +int main(int argc, char* argv[]) +{ + int sendbuf[4] = {1,2,3,4}; + + MPI_Init(&argc, &argv); //Initialize the MPI environment //TODO perche qui non (NULL, NULL) ?? + + int world_size; + MPI_Comm_size(MPI_COMM_WORLD, &world_size); //Get the number of processes + + int recvbuf[4*world_size]; + + int world_rank; + MPI_Comm_rank(MPI_COMM_WORLD, &world_rank); //Get the number of process + + //MPI_Gather(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, root, comm) + MPI_Gather(sendbuf, 3, MPI_INT, recvbuf, 4, MPI_INT, 1, MPI_COMM_WORLD); + /*Il sendcount non puo esere maggiore della dimensione del sendbuffer ma puo essere minore, tuttavia + il processo root si aspetta di ricevere tutto il buffer di partenza. Per questo motivo oltre a dimensionare + il recvbuf in modo appropriato bisogna ricordare che il processo root disporra in questo modo i dati che + riceve: [expectedFrom0-0, expectedFrom0-1, expectedFrom0-2, expectedFrom0-3, expectedFrom1-0, ...] supponendo un sendbuf da 4. + Se vengono inviati meno dati dell'originale, supponiamo 3, allora la parte expectedFrom*-3 risultera' casualmente riempita. + Il processo root riceve anche i dati che si manda da solo come se facesse parte dei senders.*/ + + if(world_rank == 1) + { + std::cout << "Process (rank " << world_rank+1 << "/" << world_size << ") recived:" << std::endl; + for (int i=0; i<4*world_size; i++) + { + std::cout << " " << recvbuf[i] << " "; + } + std::cout << std::endl; + } + + MPI_Finalize(); //Clean up the MPI environment + return 0; +} \ No newline at end of file diff --git a/MPI_basic/Groups/Group.cpp b/MPI_basic/Groups/Group.cpp new file mode 100644 index 0000000..5eec40b --- /dev/null +++ b/MPI_basic/Groups/Group.cpp @@ -0,0 +1,79 @@ +#include +#include + +/* +A group is just the set of all processes in the communicator. +A remote operation involves communication with other ranks whereas a local +operation does not. Creating a new communicator is a remote operation +because all processes need to decide on the same context and group, whereas +creating a group is local because it isn’t used for communication and +therefore doesn’t need to have the same context for each process. You can +manipulate a group all you like without performing any communication at all. + +MPI_Comm_group( + MPI_Comm comm, //context communicator + MPI_Comm* group //the group created + ) +MPI_Comm_create_group( + MPI_Comm comm, //context communicator + MPI_Group group, //group, which is a subset of the group of comm + int tag, // ??? + MPI_Comm* newcomm //the group created (?something like a sub-group?) + ) +MPI_Group_union( + MPI_Group group1, + MPI_Group group2, + MPI_Group* newgroup //the group created by the union + ) +MPI_Group_intersection( + MPI_Group group1, + MPI_Group group2, + MPI_Group* newgroup //the group created by the intersection + ) +MPI_Group_incl( + MPI_Group group, //group of processes where ranks value are contained, at least + int n, //size of ranks + const int ranks[], //ranks to chose + MPI_Group* newgroup //group with only the specific ranks from the other group + ) +*/ +int main(int argc, char* argv[]) +{ + MPI_Init(NULL, NULL); //Initialize the MPI environment + + int world_size; + MPI_Comm_size(MPI_COMM_WORLD, &world_size); //Get the number of processes + + int world_rank; + MPI_Comm_rank(MPI_COMM_WORLD, &world_rank); //Get the number of process + + MPI_Group world_group; //To free + //MPI_Comm_group(comm, group) + MPI_Comm_group(MPI_COMM_WORLD, &world_group); + + const int ranks[5] = {1,3,5,7,9}; //Just use ranks that are in the group + + MPI_Group spare_group; //To free + //MPI_Group_incl(group, n, ranks[], newgroup) + MPI_Group_incl(world_group, 5, ranks, &spare_group); + + MPI_Comm spare_comm; //To free + //MPI_Comm_create_group(comm, group, tag, newcomm) + MPI_Comm_create_group(MPI_COMM_WORLD, spare_group, 0, &spare_comm); + + int spare_rank = -1, spare_size = -1; //If this rank isn't in the new communicator, it will be MPI_COMM_NULL. Using MPI_COMM_NULL for MPI_Comm_rank or MPI_Comm_size is erroneous + if(MPI_COMM_NULL != spare_comm) + { + MPI_Comm_size(spare_comm, &spare_size); //Get the number of processes related to the communicator + MPI_Comm_rank(spare_comm, &spare_rank); //Get the number of process related to the communicator + } + + std::cout << "Process (rank " << world_rank+1 << "/" << world_size << ") is splitted to sub-group " << spare_rank+1 << "/" << spare_size << " (row/size)" << std::endl; + + MPI_Group_free(&world_group); //We have to free the group that we created above + MPI_Group_free(&spare_group); //We have to free the group that we created above + MPI_Comm_free(&spare_comm); //We have to free the communicator that we created above + + MPI_Finalize(); //Clean up the MPI environment + return 0; +} \ No newline at end of file diff --git a/MPI_basic/Hello_World/HelloWorld.cpp b/MPI_basic/Hello_World/HelloWorld.cpp new file mode 100644 index 0000000..9a5a340 --- /dev/null +++ b/MPI_basic/Hello_World/HelloWorld.cpp @@ -0,0 +1,21 @@ +#include +#include + +int main(int argc, char* argv[]) +{ + MPI_Init(NULL, NULL); //Initialize the MPI environment + + int world_size; + MPI_Comm_size(MPI_COMM_WORLD, &world_size); //Get the number of processes + + int world_rank; + MPI_Comm_rank(MPI_COMM_WORLD, &world_rank); //Get the number of process + + char processor_name[MPI_MAX_PROCESSOR_NAME]; + int name_len; + MPI_Get_processor_name(processor_name, &name_len); //Get the name of the processor + + std::cout << "Hello World from '" << processor_name << "'(lenght:" << name_len << ") (rank " << world_rank+1 << " out of " << world_size << " process)" << std::endl; + MPI_Finalize(); //Clean up the MPI environment + return 0; +} \ No newline at end of file diff --git a/MPI_basic/Makefile b/MPI_basic/Makefile new file mode 100644 index 0000000..523be38 --- /dev/null +++ b/MPI_basic/Makefile @@ -0,0 +1,119 @@ +# Program for compiling MPI cpp programs +CC = mpicxx +CXX = mpicxx +# Extra flags to give to the processor compiler +CFLAGS = -g +# -Wall -Werror -Wextra + +# +SRC = $(wildcard *.cpp) +OBJ = $(SRC:.cpp=.o) +NAME = test_computation + +# SBATCH parameters +JOBNAME = Distributed_Sorting +PARTITION = production +TIME = 12:00:00 +MEM = 3G +NODELIST = steffe[1-10] +CPUS_PER_TASK = 1 +NTASKS_PER_NODE = 1 +OUTPUT = ./%x.%j.out +ERROR = ./e%x.%j.err + +# +.PHONY: all run detail clean fclean re + +.o: .cpp + $(CC) -c $(CFLAGS) $< -o $@ + +all: $(NAME) + +$(NAME): $(OBJ) + $(CC) $(CFLAGS) -o $@ $^ + +run: $(NAME) +## Se modifico il Makefile con i parametri di sbatch ricreo anche il launcher.sh +# @if ! [ -f launcher.sh ]; then \ + + @echo "#!/bin/bash" > launcher.sh; \ + echo "## sbatch is the command line interpreter for Slurm" >> launcher.sh; \ + echo "" >> launcher.sh; \ + echo "## specify the name of the job in the queueing system" >> launcher.sh; \ + echo "#SBATCH --job-name=$(JOBNAME)" >> launcher.sh; \ + echo "## specify the partition for the resource allocation. if not specified, slurm is allowed to take the default(the one with a star *)" >> launcher.sh; \ + echo "#SBATCH --partition=$(PARTITION)" >> launcher.sh; \ + echo "## format for time is days-hours:minutes:seconds, is used as time limit for the execution duration" >> launcher.sh; \ + echo "#SBATCH --time=$(TIME)" >> launcher.sh; \ + echo "## specify the real memory required per node. suffix can be K-M-G-T but if not present is MegaBytes by default" >> launcher.sh; \ + echo "#SBATCH --mem=$(MEM)" >> launcher.sh; \ + echo "## format for hosts as a range(steffe[1-4,10-15,20]), to specify hosts needed to satisfy resource requirements" >> launcher.sh; \ + echo "#SBATCH --nodelist=$(NODELIST)" >> launcher.sh; \ + echo "## to specify the number of processors per task, default is one" >> launcher.sh; \ + echo "#SBATCH --cpus-per-task=$(CPUS_PER_TASK)" >> launcher.sh; \ + echo "## to specify the number of tasks to be invoked on each node" >> launcher.sh; \ + echo "#SBATCH --ntasks-per-node=$(NTASKS_PER_NODE)" >> launcher.sh; \ + echo "## to specify the file of utput and error" >> launcher.sh; \ + echo "#SBATCH --output $(OUTPUT)" >> launcher.sh; \ + echo "#SBATCH --error $(ERROR)" >> launcher.sh; \ + echo "" >> launcher.sh; \ + echo "mpirun $(NAME)" >> launcher.sh; \ + chmod +x launcher.sh; \ + echo "The 'launcher.sh' script has been created and is ready to run."; \ + +# else \ +# chmod +x launcher.sh; \ +# fi + @echo; sbatch launcher.sh + @echo " To see job list you can use 'squeue'." + @echo " To cancel a job you can use 'scancel jobid'." + +detail: + @echo "Compiler flags and options that mpicxx would use for compiling an MPI program: " + @mpicxx --showme:compile + @echo + @echo "Linker flags and options that mpicxx would use for linking an MPI program: " + @mpicxx --showme:link + +clean: +## Sembra non funzionare read +# read -p "rm: remove all files \"./$(JOBNAME).*.out\" and \"./e$(JOBNAME).*.err\"? (y/n)" choice +# @if [ "$$choice" = "y" ]; then \ + + @echo rm -f ./$(JOBNAME).*.out + @for file in ./$(JOBNAME).*.out; do \ + rm -f "$$file"; \ + done + @echo rm -f ./e$(JOBNAME).*.err + @for file in ./e$(JOBNAME).*.err; do \ + rm -f "$$file"; \ + done + +# fi + rm -f *~ $(OBJ) + +fclean: clean + @if [ -f launcher.sh ]; then \ + rm -i ./launcher.sh; \ + fi + rm -f $(NAME) + +re: fclean all + + + +# mpicxx *.c + +# mpirun/mpiexec ... //will run X copies of the program in the current run-time environment, scheduling(by default) in a round-robin fashion by CPU slot. + +# SLIDE 5 Durastante +# The Script +# #!/bin/bash +# #SBATCH --job-name=dascegliere +# #SBATCH --mem=size[unis] +# #SBATCH -n 10 +# #SBATCH --time=12:00:00 +# #SBATCH --nodelist=lista +# #SBATCH --partition=ports +# #ecc.. +# mpirun ... diff --git a/MPI_basic/Messaging/PingPong.cpp b/MPI_basic/Messaging/PingPong.cpp new file mode 100644 index 0000000..54d0882 --- /dev/null +++ b/MPI_basic/Messaging/PingPong.cpp @@ -0,0 +1,67 @@ +#include +#include + +/* +MPI_Send( + void* data, //data buffer + int count, //count of elements + MPI_Datatype datatype, //type of element + int destination, //rank of receiver + int tag, //tag of message + MPI_Comm communicator //communicator + ) +MPI_Recv( + void* data, //data buffer + int count, //count of elements + MPI_Datatype datatype, //type of element + int source, //rank of receiver + int tag, //tag of message + MPI_Comm communicator, //communicator + MPI_Status* status //about received message + ) +*/ +int main(int argc, char* argv[]) +{ + const long long PINGPONG_MAX = 3; + + MPI_Init(NULL, NULL); //Initialize the MPI environment + + int world_size; + MPI_Comm_size(MPI_COMM_WORLD, &world_size); //Get the number of processes + + int world_rank; + MPI_Comm_rank(MPI_COMM_WORLD, &world_rank); //Get the number of process + + if (world_size % 2 != 0) + { + std::cerr << "World size must be pair, actually is " << world_rank+1 << " (odd)" << std::endl; + MPI_Abort(MPI_COMM_WORLD, 1); + } + + long long pingpong_count = 0; + int partner_rank; + + if (world_rank % 2 == 0) + partner_rank = (world_rank + 1); + else + partner_rank = (world_rank - 1); + while (pingpong_count < PINGPONG_MAX) + { + if (world_rank%2 == pingpong_count%2) + { + pingpong_count++; + // MPI_Send(data, count, datatype, dest, tag, communicator) + MPI_Send(&pingpong_count, 1, MPI_LONG_LONG, partner_rank, 0, MPI_COMM_WORLD); + std::cout << "(PING)Process (rank " << world_rank+1 << "/" << world_size << ") sent '" << pingpong_count << "' to " << partner_rank+1 << std::endl; + } + else + { + // MPI_Recv(data, count, datatype, source, tag, communicator, status) + MPI_Recv(&pingpong_count, 1, MPI_LONG_LONG, partner_rank, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + std::cout << "(PONG)Process (rank " << world_rank+1 << "/" << world_size << ") received '" << pingpong_count << "' from " << partner_rank+1 << std::endl; + } + } + + MPI_Finalize(); //Clean up the MPI environment + return 0; +} \ No newline at end of file diff --git a/MPI_basic/MessagingBarrier/PingPongBarrier.cpp b/MPI_basic/MessagingBarrier/PingPongBarrier.cpp new file mode 100644 index 0000000..fb4d87c --- /dev/null +++ b/MPI_basic/MessagingBarrier/PingPongBarrier.cpp @@ -0,0 +1,72 @@ +#include +#include + +/* +MPI_Send( + void* data, //data buffer + int count, //count of elements + MPI_Datatype datatype, //type of element + int destination, //rank of receiver + int tag, //tag of message + MPI_Comm communicator //communicator + ) +MPI_Recv( + void* data, //data buffer + int count, //count of elements + MPI_Datatype datatype, //type of element + int source, //rank of receiver + int tag, //tag of message + MPI_Comm communicator, //communicator + MPI_Status* status //about received message + ) +*/ +int main(int argc, char* argv[]) +{ + const long long PINGPONG_MAX = 3; + + MPI_Init(NULL, NULL); //Initialize the MPI environment + + int world_size; + MPI_Comm_size(MPI_COMM_WORLD, &world_size); //Get the number of processes + + int world_rank; + MPI_Comm_rank(MPI_COMM_WORLD, &world_rank); //Get the number of process + + if (world_size % 2 != 0) + { + std::cerr << "World size must be pair, actually is " << world_rank+1 << " (odd)" << std::endl; + MPI_Abort(MPI_COMM_WORLD, 1); + } + + long long pingpong_count = 0; + int partner_rank; + if (world_rank % 2 == 0) + partner_rank = (world_rank + 1); + else + partner_rank = (world_rank - 1); + std::cout << "Process (rank " << world_rank+1 << "/" << world_size << "): my partner is " << partner_rank+1 << std::endl; + + MPI_Barrier(MPI_COMM_WORLD); + if (world_rank == 0) + std::cout << "Passed Barrier" << std::endl; + + while (pingpong_count < PINGPONG_MAX) + { + if (world_rank%2 == pingpong_count%2) + { + pingpong_count++; + // MPI_Send(data, count, datatype, dest, tag, communicator) + MPI_Send(&pingpong_count, 1, MPI_LONG_LONG, partner_rank, 0, MPI_COMM_WORLD); + std::cout << "Process (rank " << world_rank+1 << "/" << world_size << ") sent '" << pingpong_count << "' to " << partner_rank+1 << std::endl; + } + else + { + // MPI_Recv(data, count, datatype, source, tag, communicator, status) + MPI_Recv(&pingpong_count, 1, MPI_LONG_LONG, partner_rank, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + std::cout << "Process (rank " << world_rank+1 << "/" << world_size << ") received '" << pingpong_count << "' from " << partner_rank+1 << std::endl; + } + } + + MPI_Finalize(); //Clean up the MPI environment + return 0; +} \ No newline at end of file diff --git a/MPI_basic/MessagingRing/Ring.cpp b/MPI_basic/MessagingRing/Ring.cpp new file mode 100644 index 0000000..79fdf24 --- /dev/null +++ b/MPI_basic/MessagingRing/Ring.cpp @@ -0,0 +1,57 @@ +#include +#include + +/* +MPI_Send( + void* data, //data buffer + int count, //count of elements + MPI_Datatype datatype, //type of element + int destination, //rank of receiver + int tag, //tag of message + MPI_Comm communicator //communicator + ) +MPI_Recv( + void* data, //data buffer + int count, //count of elements + MPI_Datatype datatype, //type of element + int source, //rank of receiver + int tag, //tag of message + MPI_Comm communicator, //communicator + MPI_Status* status //about received message + ) +*/ +int main(int argc, char* argv[]) +{ + MPI_Init(NULL, NULL); //Initialize the MPI environment + + int world_size; + MPI_Comm_size(MPI_COMM_WORLD, &world_size); //Get the number of processes + + int world_rank; + MPI_Comm_rank(MPI_COMM_WORLD, &world_rank); //Get the number of process + + long long token; + + if (world_rank != 0) + { + // MPI_Recv(data, count, datatype, source, tag, communicator, status) + MPI_Recv(&token, 1, MPI_LONG_LONG, world_rank-1, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); //We use MPI_Recv to force not 0 process to wait the token, this because the function is blocking + std::cout << "Process (rank " << world_rank+1 << "/" << world_size << ") received '" << token << "' from " << world_rank << std::endl; + } + else + token = 19; //Setup the token value if im processor 0 + + // MPI_Send(data, count, datatype, dest, tag, communicator) + MPI_Send(&token, 1, MPI_LONG_LONG, (world_rank+1)%world_size, 0, MPI_COMM_WORLD); //As first time for process 0, each process after getting the token, send it to the next(+1) process + + //Make process 0 finally get able to retrieve his token after passing it + if (world_rank == 0) + { + // MPI_Recv(data, count, datatype, source, tag, communicator, status) + MPI_Recv(&token, 1, MPI_LONG_LONG, (world_rank-1)%world_size, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); //To recive the last Send token before ending + std::cout << "Process (rank " << world_rank+1 << "/" << world_size << ") received '" << token << "' from " << (world_rank)%world_size << std::endl; + } + + MPI_Finalize(); //Clean up the MPI environment + return 0; +} \ No newline at end of file diff --git a/MPI_basic/README b/MPI_basic/README new file mode 100644 index 0000000..3030399 --- /dev/null +++ b/MPI_basic/README @@ -0,0 +1,28 @@ +# Usage of cluster with slurm + +Information about che cluster and partitions could be obtained doing: + sinfo + scontrol show partition + +With srun we cen reserve a subset of the resources of the compute nodes for out tasks: +More parameters are provided in the documentation. + srun --partition=production --time=00:30:00 --nodes=1 --pty bash -i + (take for half hour 1 node from the production partition and launch the command 'bash -i') + +To simplify all this expression we can create a basic batch file with all directives and the code to run. +Is a good thing to redirect the output in a file to save it securely. + sbatch run.sh + +Example of batch script + #!/bin/bash + #SBATCH -n 10 + #SBATCH --time=00:30:00 + ... + mpirun ./my_exec_code + +To see the queue of jobs in the system we can run + squeue + squeue --long + +To remove a job from the queue we can run + scancel diff --git a/MPI_basic/Reduce/Average.cpp b/MPI_basic/Reduce/Average.cpp new file mode 100644 index 0000000..4a2a074 --- /dev/null +++ b/MPI_basic/Reduce/Average.cpp @@ -0,0 +1,48 @@ +#include +#include + +/* +MPI_Reduce( + void* send_data, //the element to apply the reduce + void* recv_data, //the reduce result + int count, //size of result in recv_data is sizeof(datatype)*count + MPI_Datatype datatype, //type of element + MPI_Op op, //operation to apply. Can also be defined custom operation + int root, //rank of the process that receive result + MPI_Comm comm //communicator + ) +*/ +int main(int argc, char* argv[]) +{ + MPI_Init(&argc, &argv); //Initialize the MPI environment //TODO perche qui non (NULL, NULL) ?? + + int world_size; + MPI_Comm_size(MPI_COMM_WORLD, &world_size); //Get the number of processes + + int world_rank; + MPI_Comm_rank(MPI_COMM_WORLD, &world_rank); //Get the number of process + + float rand_nums[4] = {1,2,3,42}; + int size = 4; //Related to the correct dize of rand_nums array + + float local_sum = 0; + + for(int i=0; i +#include + +/* +MPI_Scatter( + const void* sendbuf, //the element to send + int sendcount, //number of element to send + MPI_Datatype sendtype, //type of element + void* recvbuf, //buffer to receive + int recvcount, //number of elements to receive from each process + MPI_Datatype recvtype, //type of element + int root, //rank of the process that have to receive + MPI_Comm comm //communicator + ) +*/ +int main(int argc, char* argv[]) +{ + int sendbuf[16] = {1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16}; + int recvbuf[4]; + + MPI_Init(&argc, &argv); //Initialize the MPI environment //TODO perche qui non (NULL, NULL) ?? + + int world_size; + MPI_Comm_size(MPI_COMM_WORLD, &world_size); //Get the number of processes + + if (world_size != 4) + { + std::cerr << "World size must be 4 for this example, actually is " << world_size << std::endl; + MPI_Abort(MPI_COMM_WORLD, 1); + } + + int world_rank; + MPI_Comm_rank(MPI_COMM_WORLD, &world_rank); //Get the number of process + + //MPI_Scatter(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, root, comm) + MPI_Scatter(sendbuf, 5, MPI_INT, recvbuf, 6, MPI_INT, 0, MPI_COMM_WORLD); + /*Il nodo di root comprende se stesso per dividere i dati e se ne invia una perte.Il counter degli + elementi ricevuti deve essere almeno della dimensione di quelli mandati per funzionare. Quando vado a decidere quanti elementi mandare + posso mettere qualsiasi numero, anche se non stanno nel recvbuf, vengono salvati solo quelli per cui c'e spazio. Poiche facendo + cosi finiranno prima i valori, se rimane da mandare solo un numero e ne devono essere inviati N ne viene inviato + solo 1 (solo fino a svuotare il sendbuf)*/ + + std::cout << "Process (rank " << world_rank+1 << "/" << world_size << ") recived: " << recvbuf[0] << ", " << recvbuf[1] << ", " << recvbuf[2] << ", " << recvbuf[3] < launcher.sh; \ + echo "## sbatch is the command line interpreter for Slurm" >> launcher.sh; \ + echo "" >> launcher.sh; \ + echo "## specify the name of the job in the queueing system" >> launcher.sh; \ + echo "#SBATCH --job-name=$(JOBNAME)" >> launcher.sh; \ + echo "## specify the partition for the resource allocation. if not specified, slurm is allowed to take the default(the one with a star *)" >> launcher.sh; \ + echo "#SBATCH --partition=$(PARTITION)" >> launcher.sh; \ + echo "## format for time is days-hours:minutes:seconds, is used as time limit for the execution duration" >> launcher.sh; \ + echo "#SBATCH --time=$(TIME)" >> launcher.sh; \ + echo "## specify the real memory required per node. suffix can be K-M-G-T but if not present is MegaBytes by default" >> launcher.sh; \ + echo "#SBATCH --mem=$(MEM)" >> launcher.sh; \ + echo "## format for hosts as a range(steffe[1-4,10-15,20]), to specify hosts needed to satisfy resource requirements" >> launcher.sh; \ + echo "#SBATCH --nodelist=$(NODELIST)" >> launcher.sh; \ + echo "## to specify the number of processors per task, default is one" >> launcher.sh; \ + echo "#SBATCH --cpus-per-task=$(CPUS_PER_TASK)" >> launcher.sh; \ + echo "## to specify the number of tasks to be invoked on each node" >> launcher.sh; \ + echo "#SBATCH --ntasks-per-node=$(NTASKS_PER_NODE)" >> launcher.sh; \ + echo "## to specify the file of utput and error" >> launcher.sh; \ + echo "#SBATCH --output $(OUTPUT)" >> launcher.sh; \ + echo "#SBATCH --error $(ERROR)" >> launcher.sh; \ + echo "" >> launcher.sh; \ + echo "mpirun $(NAME) $(ARGS)" >> launcher.sh; \ + chmod +x launcher.sh; \ + echo "The 'launcher.sh' script has been created and is ready to run."; \ + +# else \ +# chmod +x launcher.sh; \ +# fi + @echo; sbatch launcher.sh + @echo " To see job list you can use 'squeue'." + @echo " To cancel a job you can use 'scancel jobid'." + +detail: + @echo "Compiler flags and options that mpicxx would use for compiling an MPI program: " + @mpicxx --showme:compile + @echo + @echo "Linker flags and options that mpicxx would use for linking an MPI program: " + @mpicxx --showme:link + +clean: +## Sembra non funzionare read +# read -p "rm: remove all files \"./$(JOBNAME).*.out\" and \"./e$(JOBNAME).*.err\"? (y/n)" choice +# @if [ "$$choice" = "y" ]; then \ + + @echo rm -f ./$(JOBNAME).*.out + @for file in ./$(JOBNAME).*.out; do \ + rm -f "$$file"; \ + done + @echo rm -f ./e$(JOBNAME).*.err + @for file in ./e$(JOBNAME).*.err; do \ + rm -f "$$file"; \ + done + +# fi + rm -f *~ $(OBJ) + +fclean: clean + rm -f ./launcher.sh; + rm -f ./nohup.out + rm -f /mnt/raid/tmp/SortedRun* + rm -f /mnt/raid/tmp/*.sort + rm -f $(NAME) + +re: fclean all + + + +# mpicxx *.c + +# mpirun/mpiexec ... //will run X copies of the program in the current run-time environment, scheduling(by default) in a round-robin fashion by CPU slot. + +# SLIDE 5 Durastante +# The Script +# #!/bin/bash +# #SBATCH --job-name=dascegliere +# #SBATCH --mem=size[unis] +# #SBATCH -n 10 +# #SBATCH --time=12:00:00 +# #SBATCH --nodelist=lista +# #SBATCH --partition=ports +# #ecc.. +# mpirun ... diff --git a/SortingAlg/checkSort.exe b/SortingAlg/checkSort.exe new file mode 100755 index 0000000..45a90c9 Binary files /dev/null and b/SortingAlg/checkSort.exe differ diff --git a/SortingAlg/fileManage/creator.c b/SortingAlg/fileManage/creator.c new file mode 100644 index 0000000..a4aeaa1 --- /dev/null +++ b/SortingAlg/fileManage/creator.c @@ -0,0 +1,65 @@ + +#include +#include +#include +#include + +//The number of numbers that populate the file +#define TEST_SIZE 1 +#define BUF_SIZE 2097152//16 MegaBytes +#define BUF_SIZE_HGB 67108864//512 MegaBytes +#define BUF_SIZE_GB 134217728//1 GigaBytes +#define BUF_SIZE_10GB 1342177280//10 GigaBytes + +/* +Generate a file of given size filling it with random 8 bytes numbers. +Return the number of microseconds needed for the generation and writing on disk. +*/ +long long benchmark_generate_file(const char *pathname, unsigned int seed) +{ + struct timeval start, end; + unsigned int size = 0; + FILE *file; + int64_t *buffer; + + srand(seed); + // if (access(pathname, F_OK) == 0) + // return -2;//File already exist + if (size == 0) + { + printf("Insert a multiple of %d(%d MegaBytes) for the size of the target file:\n", BUF_SIZE_GB, BUF_SIZE_GB/131072); + while (1) + { + if (scanf("%u", &size) != 1) + { + printf("Insert a valid size for the file:\n"); + while (getchar() != '\n');//Clear the input buffer to prevent an infinite loop + } + else + break; + } + printf("Future file dimension: (%d * %u) Mb\n",BUF_SIZE_GB/131072, size); + } + buffer = (int64_t*)malloc(BUF_SIZE_GB * sizeof(int64_t)); + if (!buffer) + return -1;//Something went wrong + file = fopen(pathname, "wb"); + if (!file){ + free(buffer); + return -1;//Something went wrong + } + gettimeofday(&start, NULL);//Timer Start + for (unsigned int i = 0; i < size; i++) + { + for(unsigned int j=0; j < BUF_SIZE_GB; j++) + { + buffer[j] = ((int64_t)rand() << 32) | rand(); + // printf("%ld\n", buffer[j]); + } + fwrite(buffer, sizeof(int64_t), BUF_SIZE_GB, file); + } + gettimeofday(&end, NULL);//Timer Stop + free(buffer); + fclose(file); + return (end.tv_sec - start.tv_sec) * 1000000LL + (end.tv_usec - start.tv_usec); +} diff --git a/SortingAlg/fileManage/main.c b/SortingAlg/fileManage/main.c new file mode 100644 index 0000000..78f2300 --- /dev/null +++ b/SortingAlg/fileManage/main.c @@ -0,0 +1,20 @@ +#include + +long long benchmark_generate_file(const char *pathname, unsigned int seed); +long long benchmark_reader_file(const char *pathname); +int is_sorted(const char *pathname); +void print_partial_file(const char *pathname, unsigned long long startOffset, unsigned long long endOffset); +void print_all_file(const char *pathname); + +int main(int argc, char* argv[]){ + + printf("scrittura file %s: time(%lld microseconds)\n","testiamolo.bin",benchmark_generate_file("testiamolo.bin",42));//TODO cambiare il seed una volta completatp +// printf("POI\n"); +// printf("lettura file %s: time(%lld microseconds)\n","testiamolo.bin",benchmark_reader_file("testiamolo.bin")); +// printf("POI\n"); +// printf("sono uguali? %d\n",is_sorted("testiamolo.bin")); +// print_partial_file("testiamolo.bin",2,4); +// printf("POI\n"); +// print_all_file("testiamolo.bin"); + return 0; +} diff --git a/SortingAlg/fileManage/reader.c b/SortingAlg/fileManage/reader.c new file mode 100644 index 0000000..8ff4744 --- /dev/null +++ b/SortingAlg/fileManage/reader.c @@ -0,0 +1,107 @@ +#include +#include +#include +#include + +/* +*/ +long long benchmark_reader_file(const char *pathname) +{ + struct timeval start, end; + unsigned long long startOffset = 0; // Start from the first number (0-based index) + unsigned long long endOffset = -1; // Read up to the X number (exclusive), -1 in the last one because is unsigned + FILE *file; + int64_t num; + + file = fopen(pathname, "rb"); + if (!file) + return -1;//Something went wrong + fseek(file, startOffset * sizeof(int64_t), SEEK_SET); + gettimeofday(&start, NULL);//Timer Start + unsigned long long currentOffset = startOffset; + while (currentOffset < endOffset && fread(&num, sizeof(int64_t), 1, file) == 1) + currentOffset++; + gettimeofday(&end, NULL);//Timer Stop + fclose(file); + return (end.tv_sec - start.tv_sec) * 1000000LL + (end.tv_usec - start.tv_usec); +} + +/* +*/ +int is_sorted(const char *pathname) +{ + unsigned long long startOffset = 0; // Start from the first number (0-based index) + unsigned long long endOffset = -1; // Read up to the X number (exclusive), -1 in the last one because is unsigned + FILE *file; + int64_t num; + long long int count=1; + + file = fopen(pathname, "rb"); + if (!file) + return -1;//Something went wrong + fseek(file, startOffset * sizeof(int64_t), SEEK_SET); + unsigned long long currentOffset = startOffset; + int64_t tmp; + fread(&tmp, sizeof(int64_t), 1, file); // Take first element(number) in the file + currentOffset++; + while (currentOffset < endOffset && fread(&num, sizeof(int64_t), 1, file) == 1) + { + count++; + if(tmp > num){ + fclose(file); + // printf("non ordinati\n"); + return 0; + } + tmp = num; + currentOffset++; + } + fclose(file); + // printf("%lld numeri ordinati",count); + return 1; +} + +/* +Start from the start number (0-based index), read up to the end number (exclusive). +*/ +void print_partial_file(const char *pathname, unsigned long long startOffset, unsigned long long endOffset) +{ + FILE *file; + int64_t num; + + file = fopen(pathname, "rb"); + if (!file) + return;//Something went wrong + fseek(file, startOffset * sizeof(int64_t), SEEK_SET); + unsigned long long currentOffset = startOffset; + while (currentOffset < endOffset && fread(&num, sizeof(int64_t), 1, file) == 1) + { + printf("%lld\n", (long long)num); + currentOffset++; + } + fclose(file); + return; +} + +/* +*/ +void print_all_file(const char *pathname) +{ + unsigned long long startOffset = 0; // Start from the first number (0-based index) + unsigned long long endOffset = -1; // Read up to the X number (exclusive), -1 in the last one because is unsigned + FILE *file; + int64_t num; + + file = fopen(pathname, "rb"); + if (!file) + return;//Something went wrong + fseek(file, startOffset * sizeof(int64_t), SEEK_SET); + unsigned long long currentOffset = startOffset; + while (currentOffset < endOffset && fread(&num, sizeof(int64_t), 1, file) == 1) + { + printf("%lld - ", (long long)num); + currentOffset++; + } + printf("EOF\n"); + fclose(file); + return; +} diff --git a/SortingAlg/main.cpp b/SortingAlg/main.cpp new file mode 100644 index 0000000..739c587 --- /dev/null +++ b/SortingAlg/main.cpp @@ -0,0 +1,357 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +/* +8-Byte numbers in 256KB = 32768 +8-Byte numbers in 1MB = 131072 +8-Byte numbers in 1GB = 134217728 + +All the programm assume numbers as 64_bits. +To visualize binary files in bash can be used: +od -t d8 -A n binaryfile.bin #For in use format +od -t d8 -A n --endian=little binaryfile.bin #For little-endian format +od -t d8 -A n --endian=big binaryfile.bin #For big-endian format +*/ +#define BUFFERSIZE 32768 +#define CACHENUM 130000 +#define RAMNUM 268435456 +#define ALLOW_BUFFER 1 +#define ALLOW_SNOWPLOW 1 + +void sortRuns(unsigned long long fileSize, unsigned long long sliceSize, unsigned long long maxLoop, FILE* file, int id, int mpiRank, int mpiSize) +{ + unsigned long long startOffset, endOffset, currentOffset; //The interval is [startOffset, endOffset) + double startTot, start, end; + int64_t num; + std::vector bigVect; + int64_t buffer[BUFFERSIZE]; + bigVect.reserve(sliceSize); + + startTot = MPI_Wtime(); //Microsecond precision. Can't use time(), because each process will have a different "zero" time + start = MPI_Wtime(); + for(unsigned long long l = 0; l < maxLoop; l++) //Populate the vector with the values in the file + { + startOffset = sliceSize * (mpiRank + (mpiSize * l)); + if (startOffset >= fileSize) + break; + endOffset = startOffset + sliceSize; + fseek(file, startOffset * sizeof(int64_t), SEEK_SET); + currentOffset = startOffset; + bigVect.clear(); + + if (ALLOW_BUFFER) //Branch to test performance with and without buffer + { + while (currentOffset < endOffset) + { + unsigned long long elementsToRead = std::min(endOffset - currentOffset, static_cast(BUFFERSIZE)); //It's important to check because if the difference between endOffset and startOffset is smaller than BUFFERSIZE we don't have to read further + unsigned long long elementsRead = fread(buffer, sizeof(int64_t), elementsToRead, file); + + for (unsigned long long i = 0; i < elementsRead; ++i) + { + bigVect.push_back(buffer[i]); + } + currentOffset += elementsRead; //Increment currentOffset based on the number of elements read + if (elementsRead < BUFFERSIZE) // Check if we have reached the end of the file + break; + } + } + else + { + while (currentOffset < endOffset && fread(&num, sizeof(int64_t), 1, file) == 1) + { + bigVect.push_back(num); + currentOffset++; + } + } + end = MPI_Wtime(); + std::cout << " " << end-start << "s" << " => Time to read file from offset " << startOffset << " to " << endOffset << " in Process " << mpiRank+1 << "/" << mpiSize << " memory" << std::endl; + start = MPI_Wtime(); + + sort(bigVect.begin(), bigVect.end()); + + end = MPI_Wtime(); + std::cout << " " << end-start << "s" << " => Time to sort elements in Process " << mpiRank+1 << "/" << mpiSize << " memory" << std::endl; + start = MPI_Wtime(); + + std::string templateName = "/mnt/raid/tmp/SortedRun" + std::to_string(id) + "_XXXXXX"; //If absolute path does not exist the temporary file will not be created + int tmpFile = mkstemp(&templateName[0]); //Create a temporary file based on template + if (tmpFile == -1) + { + std::cout << "Error creating temporary file" << std::endl; + MPI_Abort(MPI_COMM_WORLD, 1); + } + for (unsigned long long i = 0; i < bigVect.size(); ++i) //Write the ordered number in a temp file + { + if (ALLOW_BUFFER) //Branch to test performance with and without buffer + { + buffer[i % BUFFERSIZE] = bigVect[i]; + if ((i + 1) % BUFFERSIZE == 0 || i == bigVect.size() - 1) + { + ssize_t tw = write(tmpFile, buffer, sizeof(int64_t) * ((i % BUFFERSIZE) + 1)); + if (tw == -1) + { + std::cout << "Error writing to file" << std::endl; + MPI_Abort(MPI_COMM_WORLD, 1); + } + } + } + else + { + int64_t elem = bigVect[i]; + ssize_t tw = write(tmpFile, &elem, sizeof(int64_t)); + if (tw == -1) + { + std::cout << "Error writing to file" << std::endl; + MPI_Abort(MPI_COMM_WORLD, 1); + } + } + } + off_t sz = lseek(tmpFile, 0, SEEK_END); + if (sz == 0) + { + if (close(tmpFile) == -1) + { + std::cout << "Error closing the file" << std::endl; + MPI_Abort(MPI_COMM_WORLD, 1); + } + if (unlink(&templateName[0]) == -1) + { + std::cout << "Error unlinking file" << std::endl; + MPI_Abort(MPI_COMM_WORLD, 1); + } + } + if (close(tmpFile) == -1) + { + std::cout << "Error closing the file" << std::endl; + MPI_Abort(MPI_COMM_WORLD, 1); + } + end = MPI_Wtime(); + std::cout << " " << end-start << "s" << " => Time to write '" << templateName << "' and fill it up with " << sz/8 << " sorted elements by Process " << mpiRank+1 << "/" << mpiSize << std::endl; + start = MPI_Wtime(); + } + end = MPI_Wtime(); + std::cout << end-startTot << "s" << " => Time function sortRuns() in Process " << mpiRank+1 << "/" << mpiSize << std::endl; +} + +void snowPlowRuns(unsigned long long fileSize, unsigned long long sliceSize, unsigned long long maxLoop, FILE* file, int id, int mpiRank, int mpiSize) +{ + if (ALLOW_SNOWPLOW) + std::cout << "Can't compute files of size bigger then " << RAMNUM * mpiSize / 134217728 << "Gb with " << mpiSize << " processes (currently file is " << fileSize / 134217728 << "Gb)" << std::endl; + else + { + maxLoop = (fileSize / (RAMNUM * mpiSize)) + 1; + sortRuns(fileSize, RAMNUM, maxLoop, file, id, mpiRank, mpiSize); + } +} + +void kMerge(const std::string &argFile, int id, int mpiRank, int mpiSize) +{ + std::string fileDir = "/mnt/raid/tmp/"; + std::string pattern = "SortedRun" + std::to_string(id) + "_"; + std::vector fds; //To store the file descriptor of each file to merge + std::vector fns; //To store the file name of each file to delete after merge + size_t lastSlash = argFile.find_last_of('/'); + std::string nameOnly = (lastSlash != std::string::npos) ? argFile.substr(lastSlash + 1) : argFile; + std::string finalFile = "/mnt/raid/tmp/" + nameOnly + (ALLOW_BUFFER == 1 ? ".buf" : "") + ".sort"; + double start, end; + int fileCount = 0; + + DIR *dir = opendir(fileDir.c_str()); + if (dir) + { + struct dirent *entry; + while ((entry = readdir(dir)) != nullptr) + { + if (entry->d_type == DT_REG) //Check if it's a regular file + { + std::string filename = entry->d_name; + if (filename.find(pattern) != std::string::npos) //Check if the file name matches the pattern + { + std::string tmpFile = fileDir + "/" + filename; + int fd = open(tmpFile.c_str(), O_RDONLY); //Open the file and save the file descriptor + if (fd != -1) + { + fds.push_back(fd); + fns.push_back(tmpFile); + fileCount++; + } + else + std::cout << "Error opening file '" << tmpFile << "' by Process " << mpiRank+1 << "/" << mpiSize << std::endl; + } + } + } + closedir(dir); + } + else + { + std::cout << "Error opening directory '" << fileDir << "' by Process " << mpiRank+1 << "/" << mpiSize << std::endl; + MPI_Abort(MPI_COMM_WORLD, 1); + } + + int fdFinal = open(finalFile.c_str(), O_WRONLY | O_CREAT | O_EXCL, S_IRUSR | S_IWUSR); //Open the file for writing only, creating it if it doesn't exist and not overwrite if it exists + if (fdFinal == -1) + { + std::cout << "Error opening or creating final file '" << finalFile << "' by Process " << mpiRank+1 << "/" << mpiSize << std::endl; + MPI_Abort(MPI_COMM_WORLD, 1); + } + + std::cout << std::endl << "Starting the merge process for " << fileCount << " files" << std::endl << std::endl; + start = MPI_Wtime(); + + std::priority_queue, std::vector>, std::greater>> minHeap; //Creating a Min Heap using a priority queue + int64_t tmpValue; + for (int fd : fds) //Populate the Min Heap with initial values from each file descriptor + { + if (read(fd, &tmpValue, sizeof(int64_t)) == sizeof(int64_t)) + minHeap.push({tmpValue, fd}); + else + { + std::cout << "Error reading from file descriptor by Process " << mpiRank+1 << "/" << mpiSize << std::endl; + MPI_Abort(MPI_COMM_WORLD, 1); + } + } + + int tmpfd; + int64_t tmpValue2; + int64_t buffer[BUFFERSIZE]; + unsigned long long i = 0; + while (!minHeap.empty()) //Write sorted elements to the temporary file + { + tmpValue = minHeap.top().first; + tmpfd = minHeap.top().second; + if (read(tmpfd, &tmpValue2, sizeof(int64_t)) == sizeof(int64_t)) //Read another integer from the same file descriptor + { + minHeap.pop(); + minHeap.push({tmpValue2, tmpfd}); + } + else //If no more values can be read + { + minHeap.pop(); + if (close(tmpfd) == -1) + { + std::cout << "Error closing the file descriptor by Process " << mpiRank+1 << "/" << mpiSize << std::endl; + MPI_Abort(MPI_COMM_WORLD, 1); + } + } + if (ALLOW_BUFFER) //Branch to test performance with and without buffer + { + buffer[i % BUFFERSIZE] = tmpValue; + if ((i + 1) % BUFFERSIZE == 0 || minHeap.empty()) + { + ssize_t tw = write(fdFinal, buffer, sizeof(int64_t) * ((i % BUFFERSIZE) + 1)); + if (tw == -1) + { + std::cout << "Error writing to file" << std::endl; + MPI_Abort(MPI_COMM_WORLD, 1); + } + } + i++; + } + else + { + ssize_t tw = write(fdFinal, &tmpValue, sizeof(int64_t)); + if (tw == -1) + { + std::cout << "Error writing to file" << std::endl; + MPI_Abort(MPI_COMM_WORLD, 1); + } + } + } + + for (const std::string &fn : fns) //Remove all temporary files after merging them + { + if (unlink(&fn[0]) == -1) + { + std::cout << "Error unlinking file '" << fn << "' by Process " << mpiRank+1 << "/" << mpiSize << std::endl; + MPI_Abort(MPI_COMM_WORLD, 1); + } + } + end = MPI_Wtime(); + std::cout << end-start << "s" << " => Time function kMerge() in Process " << mpiRank+1 << "/" << mpiSize << std::endl; + std::cout << std::endl << "Sorted file '" << finalFile << "'" << std::endl; +} + +int main(int argc, char* argv[]) +{ + MPI_Init(&argc, &argv); //Initialize the MPI environment + + double startGlobal, endGlobal; + int id, mpiSize, mpiRank; + MPI_Comm_size(MPI_COMM_WORLD, &mpiSize); //Get the number of processes + MPI_Comm_rank(MPI_COMM_WORLD, &mpiRank); //Get the number of process + if (mpiRank == 0) + { + startGlobal = MPI_Wtime(); + std::srand(std::time(0)); + id = std::rand() % 10000; //Get a random id number to recognize files of different executions + } + MPI_Bcast(&id, 1, MPI_INT, 0, MPI_COMM_WORLD); + + if (argc != 2) + { + if (mpiRank == 0) + { + std::cout << "Usage: " << argv[0] << " " << std::endl; + std::cout << "It returns a file with extension '.sort' in the same directory of the not-parsed one. Make sure to have space before." << std::endl; + std::cout << "Use arguments in the make as ARGS=\"stuff\". Example 'make run ARGS=\"/path/to/file\"'." << std::endl; + } + MPI_Finalize(); //Clean up the MPI environment + return 0; + } + + FILE *file; + unsigned long long fileSize, slices, sliceSize, maxLoop; + file = fopen(argv[1], "rb"); //Open the file in mode rb (read binary) + if(!file) + { + std::cout << "Error opening file: " << argv[1] << std::endl; + MPI_Abort(MPI_COMM_WORLD, 1); + } + fseek(file,0,SEEK_END); + fileSize = ftell(file) / 8; //Size in bytes of the file, correspond to the number of numbers to parse. Each number is 8 bytes + if (mpiRank == 0) + std::cout << "Sorting file '" << argv[1] << "' of " << fileSize << " elements" << std::endl << std::endl; + + if (fileSize < (CACHENUM * mpiSize)) + slices = (fileSize / CACHENUM) + 1; + else if (fileSize < (RAMNUM * mpiSize)) //TODO add more granularity considering double RAM for snow plow technique + slices = (fileSize / RAMNUM) + 1; + else + slices = mpiSize + 1; + sliceSize = (fileSize / slices) + 1; //Each process divides a number of 8-byte integers based on the size of the starting file, Attualmente dentro create Runs + maxLoop = (slices / mpiSize) + 1; + if (sliceSize > RAMNUM) + snowPlowRuns(fileSize, sliceSize, maxLoop, file, id, mpiRank, mpiSize); + else + sortRuns(fileSize, sliceSize, maxLoop, file, id, mpiRank, mpiSize); + fclose(file); + + MPI_Barrier(MPI_COMM_WORLD); //Blocks the caller until all processes in the communicator have called it + + if(mpiRank==0) + { + kMerge(argv[1], id, mpiRank, mpiSize); + + endGlobal = MPI_Wtime(); + std::cout << (endGlobal-startGlobal)/60.0 << "min" << " => FULL EXECUTION TIME" << std::endl; + std::cout << std::endl << "To visualize binary files in bash can be used:" << std::endl; + std::cout << "od -t d8 -A n binaryfile.bin #For in use format" << std::endl; + std::cout << "od -t d8 -A n --endian=little binaryfile.bin #For little-endian format" << std::endl; + std::cout << "od -t d8 -A n --endian=big binaryfile.bin #For big-endian format" << std::endl; + } + MPI_Finalize(); //Clean up the MPI environment + return 0; +}