diff --git a/main (3).cpp b/main (3).cpp new file mode 100644 index 0000000..73e2342 --- /dev/null +++ b/main (3).cpp @@ -0,0 +1,642 @@ +/** + * @file main.c + * @author Carmen Di Nunzio + * @brief Quicksort with PSRS - Parallel sorting by regular sampling + * + * Al momento si ordinano int (sizeof(int) = 4 (bytes), quindi di 32 bit). + * + * Si è pensato di usare variabili di tipo 'long' o più larghe (ovvero unsigned long long) per salvare le informazioni + * relative alle partizioni (indice di partenza, fine e dimensione), per aggirare un eventuale problema di overflow + * qualora si cerchi di ordinare una grande mole di dati/elementi. + * Tuttavia, l'uso di variabili diverse da 'int' rende impossibile utilizzare funzioni come la MPI_Alltoallv, le quali + * richiedono che alcuni dei parametri (es: sendcounts[]) siano array di 'int'. + * Probabimente è un problema risolvibile ma al momento non sono state esplorate potenziali soluzioni. + */ + +// C++ includes +#include +#include +#include +#include +#include +#include + +// C includes +#include // Per fprintf +#include // Per qsort + +// MPI includes +#include "mpi.h" + +// Personal MACROs +#define ABORT_ALL MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE) // Abort-all shortcut. + +/** Gestione di funzioni che restituiscono valori diversi da MPI_SUCCESS in caso + * di errore. In tal caso: aborto di tutti i processi. + * + * Naming shcheme: ab_ec_neq_MPI_SUCCESS: + * ab : abort (se si verifica) + * ec : error (check, controllo se) + * neq : not equal (a) + * MPI_SUCCESS : valore contro cui testare + */ +#define ab_ec_neq_MPI_SUCCESS(x, err_string) ab_ec_cmp_ne(x, MPI_SUCCESS, err_string) + +#define ab_ec_cmp_ne(var, err_rtn, err_string) \ + { \ + if ((var) != (err_rtn)) { \ + std::cerr << (err_string) << std::endl; \ + MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE); \ + } \ + } + +int g_nodes_per_debug = -1; // [Per debug] + +// Structures +struct partition_t { /* Partizioni come interalli chiusi [start,end) */ + std::vector vect; // Vettore della partizione + size_t start; // vect[start]: incluso + size_t end; // vect[ end ]: escluso + size_t length; // Lunghezza della partizione (intesa come n_memb) +}; +typedef struct partition_t partition; + +// ----------------------------------------------------------------------------- +// +// Functions declaration +// +// ----------------------------------------------------------------------------- +// Algorithm functions +// Parallel sorting by regular sampling + +double PSRS(std::vector &vect, int nodes, MPI_Offset *out_f_offest, long *sort_slice_size); +void create_prtn(std::vector &vect, partition *p, int *pivot, int nodes); + +// Other functinos + +double populate_vec(std::vector &vect, MPI_Offset file_size, int nodes, MPI_Offset slice_size, MPI_File file, int id, MPI_Offset total_nmemb); +int compare(const void *a, const void *b); +double print_to_file(std::vector vect, MPI_File f, int type_width_bytes, MPI_Offset offest, long sort_slice_size); +void print_to_screen(std::vector vect, int mpi_rank, int nodes); +void print_dbg_arr(int *a, int size, int mpi_rank, int nodes, char *msg); + +// ----------------------------------------------------------------------------- +// +// Functions definition +// +// ----------------------------------------------------------------------------- + +/** Main + * @param argc Numero di argomenti (Default) + * @param argv[0] Nome dell'eseguibile (Default) + * @param argv[1] Filepath (abs/rel) del file da ordinare + * @param argv[2] Dimensione (in bytes) degli elementi del file da ordinare + * + * @return int + */ +int main(int argc, char *argv[]) { + // --------------------Variables-------------------- + int op_result; // Salva risulatato delle operazioni/chiamate a funzioni + int mpi_rank; // Numero del processo + int mpi_size; // Numero di processi totali + int slices; // Numero di segmenti in cui dividere il file/array + int rest; // Resto della div. tra total_nmemb e numero di processi totali + int size_of_arr_elements; // Dimensione (in bytes) del tipo di elementi da ordinare + MPI_File file, output_file; // File(s) da aprire + MPI_Offset total_nmemb; // Numero di elementi (memb) da ordinare + MPI_Offset file_size; // Dimensione del file (in bytes) + MPI_Offset slice_size; // Dimensione dei segmenti (in bytes) in cui dividere il file/array + MPI_Offset sorted_file_nmemb_offest; // Offest (in elementi) dal quale scrivere il proprio vector ordinato + long sorted_vector_nmemb; // Dimensione (in elementi) del vector ordinato + std::string output_filename; // Nome del file di output + MPI_Status status; + + // Time(s) + double start_t; // Tempo di partenza + double end_t; // Tempo di fine + double vect_load_t; // Tempo di caricamento dei vettori + double vect_load_all_t; // Tempo di caricamento di tutti i vettori + double PSRS_t; // Tempo esecuzione algoritmo + double final_wrt_t; // Tempo scrittura finale su file + double tot_t; // Tempo totale + double aux1_t, aux2_t; // Tempi ausiliari per altro + + // Vector + std::vector local_vect; // Per-process local vector (of int type) + + // --------------------Code------------------------- + // Start + MPI_Init(&argc, &argv); // Initialize the MPI environment + MPI_Comm_size(MPI_COMM_WORLD, &mpi_size); // Ottieni il numero di processi presenti + MPI_Comm_rank(MPI_COMM_WORLD, &mpi_rank); // Ottieni il proprio rank (ID) in rank + start_t = MPI_Wtime(); + + // Arguments check and handling (Made by node 0) + if (mpi_rank == 0) { + if (argc != 2) { + std::cerr << "Usage: " << argv[0] << " " << std::endl; + ABORT_ALL; + } else { + std::filesystem::path filePath = argv[1]; // Object that represent paths on a filesystem + if (!std::filesystem::exists(filePath)) { // If passed-file doesn't exists + std::cerr << "File or directory does not exist or cannot access to it. Aborting." << std::endl; + ABORT_ALL; + } + } + } + + // File opening + op_result = MPI_File_open(MPI_COMM_WORLD, argv[1], MPI_MODE_RDONLY, MPI_INFO_NULL, &file); // Try to open the file + ab_ec_neq_MPI_SUCCESS(op_result, "Error opening the passed file. Aborting all processes."); // Macro checking for errors + + // File size managing + op_result = MPI_File_get_size(file, &file_size); // Try to obtain filesize + ab_ec_neq_MPI_SUCCESS(op_result, "Error getting file size. Aborting all processes."); // Macro checking for errors + size_of_arr_elements = 4; // We are using int(s) -> + // elem_size is 4 + total_nmemb = file_size / size_of_arr_elements; // Get total number of elements + + // Compute additional info + slices = mpi_size; // Number of slices depends on number of nodes/process + slice_size = total_nmemb / slices; // Slices sizes (number of elements) + rest = total_nmemb % slices; // From 0 to (slices-1) + + // Start message printing + if (mpi_rank == 0) { + std::cout << "Starting sorting process." << std::endl + << "Loading file elements into " << mpi_rank << " local vector." << std::endl; + } + + // Populate all the local vectors + MPI_Barrier(MPI_COMM_WORLD); + aux1_t = MPI_Wtime(); + vect_load_t = populate_vec(local_vect, file_size, mpi_size, slice_size, file, mpi_rank, total_nmemb); + MPI_Barrier(MPI_COMM_WORLD); + aux2_t = MPI_Wtime(); + vect_load_all_t = aux2_t - aux1_t; // Ottengo tempo di caricamento di tutti i vettori, rilevante qualora i nodi + // avessero velocità non simili + MPI_File_close(&file); // Chiudo il file in input, non serve più + + // Execute the sorting algorithm + printf("[Nodo %d] Inizio algoritmo PSRS...\n", mpi_rank); + PSRS_t = PSRS(local_vect, mpi_size, &sorted_file_nmemb_offest, &sorted_vector_nmemb); + MPI_Barrier(MPI_COMM_WORLD); + printf("[Nodo %d] Ordinamento finito.\n", mpi_rank); + + // Creazione della stringa per il nome del file di output + std::string argv1 = argv[1]; // Creo un oggetto di tipo std::string + size_t pos = argv1.find_last_of('/') + 1; // Ottengo posiz. inio nome file (ignoro il path) + output_filename = "Sorted_" + argv1.substr(pos); // Creo il nome output_file + + // Apertura file di output + op_result = MPI_File_open(MPI_COMM_WORLD, output_filename.data(), (MPI_MODE_CREATE | MPI_MODE_WRONLY), MPI_INFO_NULL, &output_file); + ab_ec_neq_MPI_SUCCESS(op_result, "Error while opening/creating final file. Aborting."); + + // Scrittura collettiva parallela su file + final_wrt_t = print_to_file(local_vect, output_file, size_of_arr_elements, sorted_file_nmemb_offest, sorted_vector_nmemb); + + // Calcolo durata totale del programma + end_t = MPI_Wtime(); + tot_t = end_t - start_t; + + // Salvataggio delle statistiche su file .csv + if (mpi_rank == 0) { + std::cout << "Sorting completed" << std::endl; + + std::ofstream csv_file; + std::filesystem::path csv_path = "executions_stats.csv"; + if (!std::filesystem::exists(csv_path)) { + csv_file.open(csv_path, std::ios::out | std::ios::app); + csv_file << "Filename" + << "," + << "Num_elementi" + << "," + << "File_size_bytes" + << "," + << "Nodes" + << "," + << "vect_load_t" + << "," + << "vect_load_all_t" + << "," + << "PSRS_t" + << "," + << "final_wrt_t" + << "," + << "tot_t" << std::endl; + } else { + csv_file.open(csv_path, std::ios::out | std::ios::app); + } + csv_file << argv1 << "," + << total_nmemb << "," + << file_size << "," + << mpi_size << "," + << vect_load_t << "," + << vect_load_all_t << "," + << PSRS_t << "," + << final_wrt_t << "," + << tot_t << std::endl; + csv_file.close(); // Chiusura file .csv + } + MPI_File_close(&output_file); + MPI_Finalize(); // Clean up the MPI environment + return 0; +} + +/** @brief Populate the in-memory-local vector with the process's portion of the unsorted file/array + * + * In questo caso non si aspetta che tutti carichino il proprio vettore in quanto, essendo di dimensioni uguali se non + * per un elemento, i tempi di caricamento sono quasi identici, escludendo altre variabili. + * + * @param vect Local vector + * @param file_size File total size (in bytes) + * @param nodes Number of nodes/processes + * @param slice_size Size of main file slices (number of elements) + * @param file Input file + * @param id Node's id/rank + * @param total_nmemb Total numb + * + * @return double elapsed_time - Tempo trascorso + */ +double populate_vec(std::vector &vect, MPI_Offset file_size, int nodes, MPI_Offset slice_size, MPI_File file, int id, MPI_Offset total_nmemb) { + // --------------------Variables-------------------- + int op_result; // Esito operazione/chiamata di funzione + int temp; // Variabile ausliare per la letture di un intero + int elements_read_result = 0; // Numero di interi + MPI_Offset offset; // Offset (in bytes) usato per scorrere il file in lettura + MPI_Status status; // Struttura per informazioni extra sulle operazioni + double start_time, end_time, elapsed_time; // Time measuring + + // --------------------Code------------------------- + start_time = MPI_Wtime(); // Timestamp partenza + vect.reserve(slice_size + 1); // Riservo memoria per evitare reallocazioni (vedi: std::vector document.) + + /* Calcolo dell'offeset (in bytes) in base al proprio id (ovvero rank) + * + * Il calcolo viene eseguito moltiplicando: + * id : Rank del nodo (∈ [0, mpi_size - 1]) + * slice_size : Numero di elementi della porzione del file + * sizeof(int) : Dimensione in byte degli elementi da leggere dal file + * (di tipo binario, .dat) + * (Parametrico ma per ora gestiamo solo interi) + * + * In questo modo, ogni nodo si legge solamente i bytes della propria + * porzione di file. + * + * La lettura degli ultimi elementi, qualora la divisione + * total_nmemb / nodes + * non risulti esatta, è gestita successivamente. */ + offset = id * slice_size * sizeof(int); + +#ifdef READ_SINGLE + // Inserimento dei valori nel vector uno alla volta + // Esegui per: numero-di-elementi-della-propria-porzione volte + for (MPI_Offset i = 0; i < slice_size; i++) { + op_result = MPI_File_read_at(file, offset + (i * sizeof(int)), &temp, 1, MPI_INT, &status); // Leggi 1 int + ab_ec_neq_MPI_SUCCESS(op_result, "Error reading the main file content. Aborting."); // Controllo operaz. + op_result = MPI_Get_count(&status, MPI_INT, &elements_read_result); // Numero int letti + ab_ec_neq_MPI_SUCCESS(op_result, "Error checking the read result. Aborting."); + if (elements_read_result == 1) { // Lettura andata a buon fine ⇒ + vect.push_back(temp); // Aggiungi l'elemento al vector + } else { + fprintf(stderr, "[%d] Offest: %ld, valore non letto\n", id, offset + (i * sizeof(int))); + ABORT_ALL; + } + } +#else + // Inserimento dei valori nel vector sfruttando la possibilità di leggere + // più elementi in una volta sola con la MPI_File_read_at + vect.resize(slice_size); // Ridimensiono il vettore + op_result = MPI_File_read_at(file, offset, vect.data(), slice_size, MPI_INT, &status); // Lettura multipla + ab_ec_neq_MPI_SUCCESS(op_result, "Error when reading the main file content (using multiple read). Aborting."); + + op_result = MPI_Get_count(&status, MPI_INT, &elements_read_result); // Num. int letti + ab_ec_neq_MPI_SUCCESS(op_result, "Error checking the read result. Aborting."); // Lettura: Num. int. letti OK? + if (elements_read_result != slice_size) { // Letto tutta la mia porzione? + std::cerr << "Error: read less element than " << id << "'s portion length. Aborting." << std::endl; + ABORT_ALL; + } +#endif + + // Inserimento dei valori rimasti (caso in cui la divisione non è esatta) + MPI_Offset last_part_startpoint = nodes * slice_size * sizeof(int); // Indice in bytes primo elemento rimasto da leggere + int rest = total_nmemb % nodes; // From 0 to (slices-1) + + // Esegui per: numero-di-elementi-rimasti-da-leggere volte + for (int i = 0; i < rest; i++) { + if (id == i) { // "Assegna" un elemento rimasto in stile "round-robin" ai processi, ovvero: + // rest_0 -> 0; rest_1 -> 1, ... + op_result = MPI_File_read_at(file, last_part_startpoint + (i * sizeof(int)), &temp, 1, MPI_INT, &status); + ab_ec_neq_MPI_SUCCESS(op_result, "Error reading the last part of the file/array"); + op_result = MPI_Get_count(&status, MPI_INT, &elements_read_result); + ab_ec_neq_MPI_SUCCESS(op_result, "Error checking the read result. Aborting."); + if (elements_read_result == 1) { // Lettura andata a buon fine ⇒ + vect.push_back(temp); // Aggiungi l'elemento al vector + } else { + fprintf(stderr, "[%d] Offest: %lld, valore non letto\n", id, last_part_startpoint + (i * sizeof(int))); + ABORT_ALL; + } + } + } + + // Calcolo del tempo impiegato per il caricamento + end_time = MPI_Wtime(); // Timestamp fine + elapsed_time = end_time - start_time; // Get elapsed time + + return elapsed_time; +} + +/** @brief Stampa i vector(s) sul file passato in modo collettivo (e parallelo) + * + * @param vect Vettire da scrivere + * @param file File in output + * @param type_width_bytes Larghezza in bytes degli elementi del vettore + * @param offset Offset (logico, quindi degli elementi) dal quale scrivere su f. + * @param sort_slice_size Numero di elementi da scrivere + */ +double print_to_file(std::vector vect, MPI_File f, int type_width_bytes, MPI_Offset offset, long sort_slice_size) { + // --------------------Variables-------------------- + MPI_Status status; // Informazioni sulla scrittura + long offset_bytes; // Offset in bytes + int op_result; // Esito funzione + + // Time(s) + double start_time, end_time, elapsed_time; + + // --------------------Code------------------------- + offset_bytes = offset * type_width_bytes; // Ottengo l'offset in bytes + + MPI_Barrier(MPI_COMM_WORLD); // Fa si che tutti i nodi "inizino insieme", creando un sync-point + start_time = MPI_Wtime(); // Ottengo tempo d'inizio + + op_result = MPI_File_write_at_all(f, offset_bytes, vect.data(), sort_slice_size, MPI_INT, &status); + ab_ec_neq_MPI_SUCCESS(op_result, "Error writing the sorted local vectors to file. Aborting."); + + MPI_Barrier(MPI_COMM_WORLD); // Prosegue solo se tutti i nodi hanno finito + end_time = MPI_Wtime(); // Ottengo tempo di fine + elapsed_time = end_time - start_time; // Durata stampa su file + return elapsed_time; +} + +/** @brief Print array content in a synchonized way [DEBUG PURPOSES] + * Si utilizza lo stdout per avere un output meno spezzettato + * + * @param a Array (indirizzo iniziale) + * @param size Numero di elementi dell'array + * @param mpi_rank ID del nodo/processo + * @param nodes Numero di nodi/processi + * @param msg Stringa da stampare per informazionei aggiuntive sull'array e/o + * sulla fase del programma + */ +void print_dbg_arr(int *a, int size, int mpi_rank, int nodes, char *msg) { + for (int i = 0; i < nodes; i++) { // Ciclo sequenzial. dal nodo: [0, nodes) + if (mpi_rank == i) { // Sono il nodo i-esimo ⇒ Stampa + // Formattazione stampe + fprintf(stdout, "[%d %s] ", mpi_rank, msg); + for (int j = 0; j < size; j++) { // Cicla dall'elemento: [0,size) + fprintf(stdout, "%2d; ", a[j]); + } + fprintf(stdout, "\n"); + fflush(stdout); + } + // Se nodes (parametro) è effettivamente il numero di nodi presenti, + // allora metti tutti in attesa. Senza questo controllo, la MPI_Barrier + // manderebbe in stallo il programma. + if (nodes == g_nodes_per_debug) { + MPI_Barrier(MPI_COMM_WORLD); // Forza punto di sincronizzaziones + } + } +} + +/** @brief Stampa i vector(s) su schermo (scrittura sequenziale) + * + * @param vect Vector da stampare su schermo + * @param mpi_rank ID/Rank del nodo + * @param nodes Numero dei nodi/processi + */ +void print_to_screen(std::vector vect, int mpi_rank, int nodes) { + for (int i = 0; i < nodes; i++) { // For all nodes + if (mpi_rank == i) { // Iterate on nodes-IDs + fprintf(stderr, "[%d] ", mpi_rank); // Print node's ID, then + for (size_t j = 0; j < vect.size(); j++) { // Print all elements + fprintf(stderr, "%2d; ", vect[j]); + } + fprintf(stderr, "\n"); + } // If not interested node, wait for i-th node to complete + MPI_Barrier(MPI_COMM_WORLD); // Force sync + } +} + +/** @brief Compare due interi + * + * @param a Indirizzo del primo intero + * @param b Indirizzo del secondo intero + * @return Intero con segno che descrive l'ordinamento tra i due interi + */ +int compare(const void *a, const void *b) { + return (*(int *)a - *(int *)b); +} + +/** @brief Parallel sorting by random sampling + * + * @param vect Local vector + * @param nodes Numero di nodi/processi + * @param out_f_offest Offset logico (numero di elementi) dal quale scrivere successivamente su file + * @param sort_slice_size Dimensione (numero di elementi) della porzione dopo ordinamento + * @return double Tempo di esecuzione della funzione + */ +double PSRS(std::vector &vect, int nodes, MPI_Offset *out_f_offest, long *sort_slice_size) { + // --------------------Variables-------------------- + // Misc + int mpi_rank; // Rank nodo + int op_result; // Risultato operazione + double start_time, end_time, elapsed_time; // Tempi di esecuzione + MPI_Status status; // Status post-operazione + + // Informazioni vettore e pivot(s) + int n = vect.size(); // Dimensione vettore + int *samples = new int[nodes]; // Campioni scelti + int *rcv_samples = new int[nodes * nodes]; // Campioni ricevuti dai nodi + int *tmp_pvt = new int[nodes - 1]; // Pivot(s) temporanei + + // Partizioni + partition *prtn = new partition[nodes]; // Partizioni del vettore + int *p_sizes = new int[nodes]; // Array containing the sizes of vector's partitions + int *p_starts = new int[nodes]; // Array cointaining the starting points of the vector's partition + int *rcv_prt_sizes = new int[nodes]; // Array dove salvo la dimensione delle porzioni che riceverò + int *rcv_prt_starts = new int[nodes]; // Array dove salvo gli starting-point delle porzioni che riceverò + + // --------------------Code------------------------- + start_time = MPI_Wtime(); // Tempo di parteza + MPI_Comm_rank(MPI_COMM_WORLD, &mpi_rank); // Ottieni il proprio rank (ID) in rank + + // Local sorting + qsort(vect.data(), vect.size(), 4, compare); // Lib. function for sorting the local vector + + // Controllo se ho almeno due nodi, in caso contrario il vettore è già ordinato + if (nodes > 1) { + // Select samples + samples[0] = vect[0]; // Il primo campione è il primo elemento + for (int p = 1; p < nodes; p++) { // Selezioni altri campioni + /* Non abbiamo 'total_nmemb' ma possiamo ottenerlo come: nodes*n + (n=vect.size) + Questo calcolo ignora 'rest' elementi, ma per la selezione degli + indici mediani ciò non è molto influente. */ + // Formula originale ma semplificata + int a = nodes; + int b = n * p; + b = b / a; + samples[p] = vect[b]; + } + + // Sending the choosen samples to the node 0; + op_result = MPI_Gather(samples, nodes, MPI_INT, rcv_samples, nodes, MPI_INT, 0, MPI_COMM_WORLD); + ab_ec_neq_MPI_SUCCESS(op_result, "Error in gathering the samples from each vector. Aborting."); + + // Gathering all the (nodes*nodes) sampled elements (NODE 0) + if (mpi_rank == 0) { + // Sort them + qsort(rcv_samples, nodes * nodes, sizeof(int), compare); + // Choose (nodes - 1) pivot + for (int i = 0; i < nodes - 1; i++) { + tmp_pvt[i] = rcv_samples[nodes * (i + 1)]; + } + } + MPI_Barrier(MPI_COMM_WORLD); // E' davvero necessario questo sync point? + + // Broadcast the choosen (nodes - 1) pivot(s) + op_result = MPI_Bcast(tmp_pvt, nodes - 1, MPI_INT, 0, MPI_COMM_WORLD); + ab_ec_neq_MPI_SUCCESS(op_result, "Error broadcasting the (nodes-1) pivot(s) after getting them sorted. Aborting."); + + // Create the local (nodes - 1) partitions + create_prtn(vect, prtn, tmp_pvt, nodes); + + int *underlying_array = vect.data(); /* Ottengo l'indirizzo dell'array sottostante al vector per poi passarlo alla + * MPI_Alltoallv, la quale richiede, come primo parametro, l'indirizzo del + * buffer/array dal quale prendere i dati da inviare. */ + + // Every node sends the sizes of its partitions to every other node. + for (int i = 0; i < nodes; i++) { + p_sizes[i] = prtn[i].length; // Salvo le lunghezze delle partizioni del vettore locale + p_starts[i] = prtn[i].start; // Salvo le partenze delle partizioni del vettore locale + } + op_result = MPI_Alltoall(p_sizes, 1, MPI_INT, rcv_prt_sizes, 1, MPI_INT, MPI_COMM_WORLD); + ab_ec_neq_MPI_SUCCESS(op_result, "Erorr in getting the sizes of partition to be received. Aborting"); + + // Calcolo (in locale, per evitare comunicazioni superflue) degli starting points delle partizioni da ricevere + rcv_prt_starts[0] = 0; + for (int i = 1; i < nodes; i++) { + rcv_prt_starts[i] = rcv_prt_starts[i - 1] + rcv_prt_sizes[i - 1]; + } + MPI_Barrier(MPI_COMM_WORLD); + + // Calcolo dimensione del buffer da allocare per ricevere le porzioni + size_t aux_size = 0; + for (int i = 0; i < nodes; i++) { + aux_size += rcv_prt_sizes[i]; // aux_size indica quanti elementi avrà il processo a fine ordinamento + } + + //! WARNING! Potrebbe essere un problema il fatto di creare un'altro array di dimensioni ~ size(vect) + fprintf(stdout, "[Nodo %d] Sto per allocare l'array ausiliario per la MPI_Alltoallv\nProbabile crash\n", mpi_rank); + fflush(stdout); + int *aux_arr = new int[aux_size]; + + // Each process i keeps its ith partition and sends the jth partition to process j --- (for all j != i) + MPI_Barrier(MPI_COMM_WORLD); + op_result = MPI_Alltoallv(underlying_array, p_sizes, p_starts, MPI_INT, aux_arr, rcv_prt_sizes, rcv_prt_starts, MPI_INT, MPI_COMM_WORLD); + ab_ec_neq_MPI_SUCCESS(op_result, "Error when sending the partitions. Aborting."); + + // Riordina le partizioni e copia vettore ordinato dal buffer a vettore locale + qsort(aux_arr, aux_size, sizeof(int), compare); + vect.assign(aux_arr, aux_arr + aux_size); + delete[] aux_arr; + + // Salvataggio parametri da usare per scrittura su file + /* IDEA: + Ogni processo deve sapere, relativamente agli elementi: + 1) Dove iniziare a scrivere = Offset su file + 2) Per quanto scrivere = Dimensione del proprio vettore ordinato (equiv. a vect.size()) + + Per calcolare l'offset iniziale è necessario conoscere (offset + size) del vettore precedente. + Per esempio, se il primo vettore avesse 2 elementi, esso partirebbe dall'offeset 0, scriverebbe in posizione + [0,1] e la sua fine (ovvero da dove può scrivere il nodo successivo) sarebbe '2', ovvero 0 + vect.size(). + Comunicando la propria fine al nodo successivo, esso può procedere alla scrittura. Tutto ciò ripetuto fino + all'ultimo nodo (ovvero 'nodes-1', perchè numerati a partire da 0), il quale non dovrà inviare nulla. + + Alla fine, i dati necessari per la scrittura sono salvati negli ultimi due parametri passati + alla funzione PSRS. + */ + + *sort_slice_size = aux_size; // Dimensione vettore ordinato = dimensione arrey allocato per ricevere le partizioni + long buffer[2]; // buffer[0] = Offset, starting point (in termini di elementi) del vettore + // buffer[1] = Dimensione del vettore (in termini di elementi) + + if (mpi_rank == 0) { // Primo nodo non deve ricevere nulla + *out_f_offest = 0; // Parte da 0 (inizio file) + buffer[0] = *out_f_offest; + buffer[1] = *sort_slice_size; + if (nodes != 1) { + MPI_Send(&buffer[0], 1, MPI_LONG, mpi_rank + 1, 0, MPI_COMM_WORLD); // Invia offset + MPI_Send(&buffer[1], 1, MPI_LONG, mpi_rank + 1, 1, MPI_COMM_WORLD); // Invia dimensione vettore riordinato + } + } else { + if (mpi_rank == (nodes - 1)) { // Sono l'ultimo nodo, non devo inviare nulla + MPI_Recv(&buffer[0], 1, MPI_LONG, mpi_rank - 1, 0, MPI_COMM_WORLD, &status); // Riceve offeset nodo prec. + MPI_Recv(&buffer[1], 1, MPI_LONG, mpi_rank - 1, 1, MPI_COMM_WORLD, &status); // Riceve size v. nodo prec. + *out_f_offest = buffer[0] + buffer[1]; + } else { // Devo ricevere e inviare + MPI_Recv(&buffer[0], 1, MPI_LONG, mpi_rank - 1, 0, MPI_COMM_WORLD, &status); + MPI_Recv(&buffer[1], 1, MPI_LONG, mpi_rank - 1, 1, MPI_COMM_WORLD, &status); + *out_f_offest = buffer[0] + buffer[1]; + buffer[0] = *out_f_offest; + buffer[1] = *sort_slice_size; + MPI_Send(&buffer[0], 1, MPI_LONG, mpi_rank + 1, 0, MPI_COMM_WORLD); // Invia offeset + MPI_Send(&buffer[1], 1, MPI_LONG, mpi_rank + 1, 1, MPI_COMM_WORLD); // Invia dimensione vettore ordinato + } + } + } + + // Calcolo tempo trascorso + end_time = MPI_Wtime(); + elapsed_time = end_time - start_time; + + // Parte di 'clean-up' + delete[] samples; + delete[] rcv_samples; + delete[] tmp_pvt; + delete[] prtn; + delete[] p_sizes; + delete[] p_starts; + delete[] rcv_prt_sizes; + delete[] rcv_prt_starts; + return elapsed_time; +} + +/** @brief Partions the vector, given an array of pivots. + * + * @param vect Vector da partizionare + * @param p Array di partizioni da modificare + * @param pivot Array di pivot ricevuto in base al quale partizionare vect + * @param nodes Numero di nodi/macchine/processi (pivot.size() +1) + */ +void create_prtn(std::vector &vect, partition *p, int *pivot, int nodes) { + size_t pos = 0; + int n_pivot = nodes - 1; + size_t vect_last_el = vect.size() - 1; + + // Start of partition[0] = 0 + for (int i = 0; i < n_pivot; i++) { + p[i].vect = vect; + p[i].start = pos; + while (pos <= vect_last_el && vect[pos] <= pivot[i]) { + pos++; + } + p[i].end = pos; + p[i].length = p[i].end - p[i].start; + } + + // Gestione ultima partizione (ricevo n pivot, ottendo n+1 partizioni) + p[n_pivot].start = pos; + p[n_pivot].end = vect.size(); + p[n_pivot].length = p[n_pivot].end - p[n_pivot].start; +}