Carica file su ''

main
c.dinunzio 8 months ago
parent d87ada04aa
commit ab54aefa93

@ -0,0 +1,642 @@
/**
* @file main.c
* @author Carmen Di Nunzio
* @brief Quicksort with PSRS - Parallel sorting by regular sampling
*
* Al momento si ordinano int (sizeof(int) = 4 (bytes), quindi di 32 bit).
*
* Si è pensato di usare variabili di tipo 'long' o più larghe (ovvero unsigned long long) per salvare le informazioni
* relative alle partizioni (indice di partenza, fine e dimensione), per aggirare un eventuale problema di overflow
* qualora si cerchi di ordinare una grande mole di dati/elementi.
* Tuttavia, l'uso di variabili diverse da 'int' rende impossibile utilizzare funzioni come la MPI_Alltoallv, le quali
* richiedono che alcuni dei parametri (es: sendcounts[]) siano array di 'int'.
* Probabimente è un problema risolvibile ma al momento non sono state esplorate potenziali soluzioni.
*/
// C++ includes
#include <cstdio>
#include <cstdlib>
#include <filesystem>
#include <fstream>
#include <iostream>
#include <vector>
// C includes
#include <stdio.h> // Per fprintf
#include <stdlib.h> // Per qsort
// MPI includes
#include "mpi.h"
// Personal MACROs
#define ABORT_ALL MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE) // Abort-all shortcut.
/** Gestione di funzioni che restituiscono valori diversi da MPI_SUCCESS in caso
* di errore. In tal caso: aborto di tutti i processi.
*
* Naming shcheme: ab_ec_neq_MPI_SUCCESS:
* ab : abort (se si verifica)
* ec : error (check, controllo se)
* neq : not equal (a)
* MPI_SUCCESS : valore contro cui testare
*/
#define ab_ec_neq_MPI_SUCCESS(x, err_string) ab_ec_cmp_ne(x, MPI_SUCCESS, err_string)
#define ab_ec_cmp_ne(var, err_rtn, err_string) \
{ \
if ((var) != (err_rtn)) { \
std::cerr << (err_string) << std::endl; \
MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE); \
} \
}
int g_nodes_per_debug = -1; // [Per debug]
// Structures
struct partition_t { /* Partizioni come interalli chiusi [start,end) */
std::vector<int> vect; // Vettore della partizione
size_t start; // vect[start]: incluso
size_t end; // vect[ end ]: escluso
size_t length; // Lunghezza della partizione (intesa come n_memb)
};
typedef struct partition_t partition;
// -----------------------------------------------------------------------------
//
// Functions declaration
//
// -----------------------------------------------------------------------------
// Algorithm functions
// Parallel sorting by regular sampling
double PSRS(std::vector<int> &vect, int nodes, MPI_Offset *out_f_offest, long *sort_slice_size);
void create_prtn(std::vector<int> &vect, partition *p, int *pivot, int nodes);
// Other functinos
double populate_vec(std::vector<int> &vect, MPI_Offset file_size, int nodes, MPI_Offset slice_size, MPI_File file, int id, MPI_Offset total_nmemb);
int compare(const void *a, const void *b);
double print_to_file(std::vector<int> vect, MPI_File f, int type_width_bytes, MPI_Offset offest, long sort_slice_size);
void print_to_screen(std::vector<int> vect, int mpi_rank, int nodes);
void print_dbg_arr(int *a, int size, int mpi_rank, int nodes, char *msg);
// -----------------------------------------------------------------------------
//
// Functions definition
//
// -----------------------------------------------------------------------------
/** Main
* @param argc Numero di argomenti (Default)
* @param argv[0] Nome dell'eseguibile (Default)
* @param argv[1] Filepath (abs/rel) del file da ordinare
* @param argv[2] Dimensione (in bytes) degli elementi del file da ordinare
*
* @return int
*/
int main(int argc, char *argv[]) {
// --------------------Variables--------------------
int op_result; // Salva risulatato delle operazioni/chiamate a funzioni
int mpi_rank; // Numero del processo
int mpi_size; // Numero di processi totali
int slices; // Numero di segmenti in cui dividere il file/array
int rest; // Resto della div. tra total_nmemb e numero di processi totali
int size_of_arr_elements; // Dimensione (in bytes) del tipo di elementi da ordinare
MPI_File file, output_file; // File(s) da aprire
MPI_Offset total_nmemb; // Numero di elementi (memb) da ordinare
MPI_Offset file_size; // Dimensione del file (in bytes)
MPI_Offset slice_size; // Dimensione dei segmenti (in bytes) in cui dividere il file/array
MPI_Offset sorted_file_nmemb_offest; // Offest (in elementi) dal quale scrivere il proprio vector ordinato
long sorted_vector_nmemb; // Dimensione (in elementi) del vector ordinato
std::string output_filename; // Nome del file di output
MPI_Status status;
// Time(s)
double start_t; // Tempo di partenza
double end_t; // Tempo di fine
double vect_load_t; // Tempo di caricamento dei vettori
double vect_load_all_t; // Tempo di caricamento di tutti i vettori
double PSRS_t; // Tempo esecuzione algoritmo
double final_wrt_t; // Tempo scrittura finale su file
double tot_t; // Tempo totale
double aux1_t, aux2_t; // Tempi ausiliari per altro
// Vector
std::vector<int> local_vect; // Per-process local vector (of int type)
// --------------------Code-------------------------
// Start
MPI_Init(&argc, &argv); // Initialize the MPI environment
MPI_Comm_size(MPI_COMM_WORLD, &mpi_size); // Ottieni il numero di processi presenti
MPI_Comm_rank(MPI_COMM_WORLD, &mpi_rank); // Ottieni il proprio rank (ID) in rank
start_t = MPI_Wtime();
// Arguments check and handling (Made by node 0)
if (mpi_rank == 0) {
if (argc != 2) {
std::cerr << "Usage: " << argv[0] << " <input_file_path>" << std::endl;
ABORT_ALL;
} else {
std::filesystem::path filePath = argv[1]; // Object that represent paths on a filesystem
if (!std::filesystem::exists(filePath)) { // If passed-file doesn't exists
std::cerr << "File or directory does not exist or cannot access to it. Aborting." << std::endl;
ABORT_ALL;
}
}
}
// File opening
op_result = MPI_File_open(MPI_COMM_WORLD, argv[1], MPI_MODE_RDONLY, MPI_INFO_NULL, &file); // Try to open the file
ab_ec_neq_MPI_SUCCESS(op_result, "Error opening the passed file. Aborting all processes."); // Macro checking for errors
// File size managing
op_result = MPI_File_get_size(file, &file_size); // Try to obtain filesize
ab_ec_neq_MPI_SUCCESS(op_result, "Error getting file size. Aborting all processes."); // Macro checking for errors
size_of_arr_elements = 4; // We are using int(s) ->
// elem_size is 4
total_nmemb = file_size / size_of_arr_elements; // Get total number of elements
// Compute additional info
slices = mpi_size; // Number of slices depends on number of nodes/process
slice_size = total_nmemb / slices; // Slices sizes (number of elements)
rest = total_nmemb % slices; // From 0 to (slices-1)
// Start message printing
if (mpi_rank == 0) {
std::cout << "Starting sorting process." << std::endl
<< "Loading file elements into " << mpi_rank << " local vector." << std::endl;
}
// Populate all the local vectors
MPI_Barrier(MPI_COMM_WORLD);
aux1_t = MPI_Wtime();
vect_load_t = populate_vec(local_vect, file_size, mpi_size, slice_size, file, mpi_rank, total_nmemb);
MPI_Barrier(MPI_COMM_WORLD);
aux2_t = MPI_Wtime();
vect_load_all_t = aux2_t - aux1_t; // Ottengo tempo di caricamento di tutti i vettori, rilevante qualora i nodi
// avessero velocità non simili
MPI_File_close(&file); // Chiudo il file in input, non serve più
// Execute the sorting algorithm
printf("[Nodo %d] Inizio algoritmo PSRS...\n", mpi_rank);
PSRS_t = PSRS(local_vect, mpi_size, &sorted_file_nmemb_offest, &sorted_vector_nmemb);
MPI_Barrier(MPI_COMM_WORLD);
printf("[Nodo %d] Ordinamento finito.\n", mpi_rank);
// Creazione della stringa per il nome del file di output
std::string argv1 = argv[1]; // Creo un oggetto di tipo std::string
size_t pos = argv1.find_last_of('/') + 1; // Ottengo posiz. inio nome file (ignoro il path)
output_filename = "Sorted_" + argv1.substr(pos); // Creo il nome output_file
// Apertura file di output
op_result = MPI_File_open(MPI_COMM_WORLD, output_filename.data(), (MPI_MODE_CREATE | MPI_MODE_WRONLY), MPI_INFO_NULL, &output_file);
ab_ec_neq_MPI_SUCCESS(op_result, "Error while opening/creating final file. Aborting.");
// Scrittura collettiva parallela su file
final_wrt_t = print_to_file(local_vect, output_file, size_of_arr_elements, sorted_file_nmemb_offest, sorted_vector_nmemb);
// Calcolo durata totale del programma
end_t = MPI_Wtime();
tot_t = end_t - start_t;
// Salvataggio delle statistiche su file .csv
if (mpi_rank == 0) {
std::cout << "Sorting completed" << std::endl;
std::ofstream csv_file;
std::filesystem::path csv_path = "executions_stats.csv";
if (!std::filesystem::exists(csv_path)) {
csv_file.open(csv_path, std::ios::out | std::ios::app);
csv_file << "Filename"
<< ","
<< "Num_elementi"
<< ","
<< "File_size_bytes"
<< ","
<< "Nodes"
<< ","
<< "vect_load_t"
<< ","
<< "vect_load_all_t"
<< ","
<< "PSRS_t"
<< ","
<< "final_wrt_t"
<< ","
<< "tot_t" << std::endl;
} else {
csv_file.open(csv_path, std::ios::out | std::ios::app);
}
csv_file << argv1 << ","
<< total_nmemb << ","
<< file_size << ","
<< mpi_size << ","
<< vect_load_t << ","
<< vect_load_all_t << ","
<< PSRS_t << ","
<< final_wrt_t << ","
<< tot_t << std::endl;
csv_file.close(); // Chiusura file .csv
}
MPI_File_close(&output_file);
MPI_Finalize(); // Clean up the MPI environment
return 0;
}
/** @brief Populate the in-memory-local vector with the process's portion of the unsorted file/array
*
* In questo caso non si aspetta che tutti carichino il proprio vettore in quanto, essendo di dimensioni uguali se non
* per un elemento, i tempi di caricamento sono quasi identici, escludendo altre variabili.
*
* @param vect Local vector
* @param file_size File total size (in bytes)
* @param nodes Number of nodes/processes
* @param slice_size Size of main file slices (number of elements)
* @param file Input file
* @param id Node's id/rank
* @param total_nmemb Total numb
*
* @return double elapsed_time - Tempo trascorso
*/
double populate_vec(std::vector<int> &vect, MPI_Offset file_size, int nodes, MPI_Offset slice_size, MPI_File file, int id, MPI_Offset total_nmemb) {
// --------------------Variables--------------------
int op_result; // Esito operazione/chiamata di funzione
int temp; // Variabile ausliare per la letture di un intero
int elements_read_result = 0; // Numero di interi
MPI_Offset offset; // Offset (in bytes) usato per scorrere il file in lettura
MPI_Status status; // Struttura per informazioni extra sulle operazioni
double start_time, end_time, elapsed_time; // Time measuring
// --------------------Code-------------------------
start_time = MPI_Wtime(); // Timestamp partenza
vect.reserve(slice_size + 1); // Riservo memoria per evitare reallocazioni (vedi: std::vector document.)
/* Calcolo dell'offeset (in bytes) in base al proprio id (ovvero rank)
*
* Il calcolo viene eseguito moltiplicando:
* id : Rank del nodo ( [0, mpi_size - 1])
* slice_size : Numero di elementi della porzione del file
* sizeof(int) : Dimensione in byte degli elementi da leggere dal file
* (di tipo binario, .dat)
* (Parametrico ma per ora gestiamo solo interi)
*
* In questo modo, ogni nodo si legge solamente i bytes della propria
* porzione di file.
*
* La lettura degli ultimi elementi, qualora la divisione
* total_nmemb / nodes
* non risulti esatta, è gestita successivamente. */
offset = id * slice_size * sizeof(int);
#ifdef READ_SINGLE
// Inserimento dei valori nel vector uno alla volta
// Esegui per: numero-di-elementi-della-propria-porzione volte
for (MPI_Offset i = 0; i < slice_size; i++) {
op_result = MPI_File_read_at(file, offset + (i * sizeof(int)), &temp, 1, MPI_INT, &status); // Leggi 1 int
ab_ec_neq_MPI_SUCCESS(op_result, "Error reading the main file content. Aborting."); // Controllo operaz.
op_result = MPI_Get_count(&status, MPI_INT, &elements_read_result); // Numero int letti
ab_ec_neq_MPI_SUCCESS(op_result, "Error checking the read result. Aborting.");
if (elements_read_result == 1) { // Lettura andata a buon fine ⇒
vect.push_back(temp); // Aggiungi l'elemento al vector
} else {
fprintf(stderr, "[%d] Offest: %ld, valore non letto\n", id, offset + (i * sizeof(int)));
ABORT_ALL;
}
}
#else
// Inserimento dei valori nel vector sfruttando la possibilità di leggere
// più elementi in una volta sola con la MPI_File_read_at
vect.resize(slice_size); // Ridimensiono il vettore
op_result = MPI_File_read_at(file, offset, vect.data(), slice_size, MPI_INT, &status); // Lettura multipla
ab_ec_neq_MPI_SUCCESS(op_result, "Error when reading the main file content (using multiple read). Aborting.");
op_result = MPI_Get_count(&status, MPI_INT, &elements_read_result); // Num. int letti
ab_ec_neq_MPI_SUCCESS(op_result, "Error checking the read result. Aborting."); // Lettura: Num. int. letti OK?
if (elements_read_result != slice_size) { // Letto tutta la mia porzione?
std::cerr << "Error: read less element than " << id << "'s portion length. Aborting." << std::endl;
ABORT_ALL;
}
#endif
// Inserimento dei valori rimasti (caso in cui la divisione non è esatta)
MPI_Offset last_part_startpoint = nodes * slice_size * sizeof(int); // Indice in bytes primo elemento rimasto da leggere
int rest = total_nmemb % nodes; // From 0 to (slices-1)
// Esegui per: numero-di-elementi-rimasti-da-leggere volte
for (int i = 0; i < rest; i++) {
if (id == i) { // "Assegna" un elemento rimasto in stile "round-robin" ai processi, ovvero:
// rest_0 -> 0; rest_1 -> 1, ...
op_result = MPI_File_read_at(file, last_part_startpoint + (i * sizeof(int)), &temp, 1, MPI_INT, &status);
ab_ec_neq_MPI_SUCCESS(op_result, "Error reading the last part of the file/array");
op_result = MPI_Get_count(&status, MPI_INT, &elements_read_result);
ab_ec_neq_MPI_SUCCESS(op_result, "Error checking the read result. Aborting.");
if (elements_read_result == 1) { // Lettura andata a buon fine ⇒
vect.push_back(temp); // Aggiungi l'elemento al vector
} else {
fprintf(stderr, "[%d] Offest: %lld, valore non letto\n", id, last_part_startpoint + (i * sizeof(int)));
ABORT_ALL;
}
}
}
// Calcolo del tempo impiegato per il caricamento
end_time = MPI_Wtime(); // Timestamp fine
elapsed_time = end_time - start_time; // Get elapsed time
return elapsed_time;
}
/** @brief Stampa i vector(s) sul file passato in modo collettivo (e parallelo)
*
* @param vect Vettire da scrivere
* @param file File in output
* @param type_width_bytes Larghezza in bytes degli elementi del vettore
* @param offset Offset (logico, quindi degli elementi) dal quale scrivere su f.
* @param sort_slice_size Numero di elementi da scrivere
*/
double print_to_file(std::vector<int> vect, MPI_File f, int type_width_bytes, MPI_Offset offset, long sort_slice_size) {
// --------------------Variables--------------------
MPI_Status status; // Informazioni sulla scrittura
long offset_bytes; // Offset in bytes
int op_result; // Esito funzione
// Time(s)
double start_time, end_time, elapsed_time;
// --------------------Code-------------------------
offset_bytes = offset * type_width_bytes; // Ottengo l'offset in bytes
MPI_Barrier(MPI_COMM_WORLD); // Fa si che tutti i nodi "inizino insieme", creando un sync-point
start_time = MPI_Wtime(); // Ottengo tempo d'inizio
op_result = MPI_File_write_at_all(f, offset_bytes, vect.data(), sort_slice_size, MPI_INT, &status);
ab_ec_neq_MPI_SUCCESS(op_result, "Error writing the sorted local vectors to file. Aborting.");
MPI_Barrier(MPI_COMM_WORLD); // Prosegue solo se tutti i nodi hanno finito
end_time = MPI_Wtime(); // Ottengo tempo di fine
elapsed_time = end_time - start_time; // Durata stampa su file
return elapsed_time;
}
/** @brief Print array content in a synchonized way [DEBUG PURPOSES]
* Si utilizza lo stdout per avere un output meno spezzettato
*
* @param a Array (indirizzo iniziale)
* @param size Numero di elementi dell'array
* @param mpi_rank ID del nodo/processo
* @param nodes Numero di nodi/processi
* @param msg Stringa da stampare per informazionei aggiuntive sull'array e/o
* sulla fase del programma
*/
void print_dbg_arr(int *a, int size, int mpi_rank, int nodes, char *msg) {
for (int i = 0; i < nodes; i++) { // Ciclo sequenzial. dal nodo: [0, nodes)
if (mpi_rank == i) { // Sono il nodo i-esimo ⇒ Stampa
// Formattazione stampe
fprintf(stdout, "[%d %s] ", mpi_rank, msg);
for (int j = 0; j < size; j++) { // Cicla dall'elemento: [0,size)
fprintf(stdout, "%2d; ", a[j]);
}
fprintf(stdout, "\n");
fflush(stdout);
}
// Se nodes (parametro) è effettivamente il numero di nodi presenti,
// allora metti tutti in attesa. Senza questo controllo, la MPI_Barrier
// manderebbe in stallo il programma.
if (nodes == g_nodes_per_debug) {
MPI_Barrier(MPI_COMM_WORLD); // Forza punto di sincronizzaziones
}
}
}
/** @brief Stampa i vector(s) su schermo (scrittura sequenziale)
*
* @param vect Vector da stampare su schermo
* @param mpi_rank ID/Rank del nodo
* @param nodes Numero dei nodi/processi
*/
void print_to_screen(std::vector<int> vect, int mpi_rank, int nodes) {
for (int i = 0; i < nodes; i++) { // For all nodes
if (mpi_rank == i) { // Iterate on nodes-IDs
fprintf(stderr, "[%d] ", mpi_rank); // Print node's ID, then
for (size_t j = 0; j < vect.size(); j++) { // Print all elements
fprintf(stderr, "%2d; ", vect[j]);
}
fprintf(stderr, "\n");
} // If not interested node, wait for i-th node to complete
MPI_Barrier(MPI_COMM_WORLD); // Force sync
}
}
/** @brief Compare due interi
*
* @param a Indirizzo del primo intero
* @param b Indirizzo del secondo intero
* @return Intero con segno che descrive l'ordinamento tra i due interi
*/
int compare(const void *a, const void *b) {
return (*(int *)a - *(int *)b);
}
/** @brief Parallel sorting by random sampling
*
* @param vect Local vector
* @param nodes Numero di nodi/processi
* @param out_f_offest Offset logico (numero di elementi) dal quale scrivere successivamente su file
* @param sort_slice_size Dimensione (numero di elementi) della porzione dopo ordinamento
* @return double Tempo di esecuzione della funzione
*/
double PSRS(std::vector<int> &vect, int nodes, MPI_Offset *out_f_offest, long *sort_slice_size) {
// --------------------Variables--------------------
// Misc
int mpi_rank; // Rank nodo
int op_result; // Risultato operazione
double start_time, end_time, elapsed_time; // Tempi di esecuzione
MPI_Status status; // Status post-operazione
// Informazioni vettore e pivot(s)
int n = vect.size(); // Dimensione vettore
int *samples = new int[nodes]; // Campioni scelti
int *rcv_samples = new int[nodes * nodes]; // Campioni ricevuti dai nodi
int *tmp_pvt = new int[nodes - 1]; // Pivot(s) temporanei
// Partizioni
partition *prtn = new partition[nodes]; // Partizioni del vettore
int *p_sizes = new int[nodes]; // Array containing the sizes of vector's partitions
int *p_starts = new int[nodes]; // Array cointaining the starting points of the vector's partition
int *rcv_prt_sizes = new int[nodes]; // Array dove salvo la dimensione delle porzioni che riceverò
int *rcv_prt_starts = new int[nodes]; // Array dove salvo gli starting-point delle porzioni che riceverò
// --------------------Code-------------------------
start_time = MPI_Wtime(); // Tempo di parteza
MPI_Comm_rank(MPI_COMM_WORLD, &mpi_rank); // Ottieni il proprio rank (ID) in rank
// Local sorting
qsort(vect.data(), vect.size(), 4, compare); // Lib. function for sorting the local vector
// Controllo se ho almeno due nodi, in caso contrario il vettore è già ordinato
if (nodes > 1) {
// Select samples
samples[0] = vect[0]; // Il primo campione è il primo elemento
for (int p = 1; p < nodes; p++) { // Selezioni altri campioni
/* Non abbiamo 'total_nmemb' ma possiamo ottenerlo come: nodes*n
(n=vect.size)
Questo calcolo ignora 'rest' elementi, ma per la selezione degli
indici mediani ciò non è molto influente. */
// Formula originale ma semplificata
int a = nodes;
int b = n * p;
b = b / a;
samples[p] = vect[b];
}
// Sending the choosen samples to the node 0;
op_result = MPI_Gather(samples, nodes, MPI_INT, rcv_samples, nodes, MPI_INT, 0, MPI_COMM_WORLD);
ab_ec_neq_MPI_SUCCESS(op_result, "Error in gathering the samples from each vector. Aborting.");
// Gathering all the (nodes*nodes) sampled elements (NODE 0)
if (mpi_rank == 0) {
// Sort them
qsort(rcv_samples, nodes * nodes, sizeof(int), compare);
// Choose (nodes - 1) pivot
for (int i = 0; i < nodes - 1; i++) {
tmp_pvt[i] = rcv_samples[nodes * (i + 1)];
}
}
MPI_Barrier(MPI_COMM_WORLD); // E' davvero necessario questo sync point?
// Broadcast the choosen (nodes - 1) pivot(s)
op_result = MPI_Bcast(tmp_pvt, nodes - 1, MPI_INT, 0, MPI_COMM_WORLD);
ab_ec_neq_MPI_SUCCESS(op_result, "Error broadcasting the (nodes-1) pivot(s) after getting them sorted. Aborting.");
// Create the local (nodes - 1) partitions
create_prtn(vect, prtn, tmp_pvt, nodes);
int *underlying_array = vect.data(); /* Ottengo l'indirizzo dell'array sottostante al vector per poi passarlo alla
* MPI_Alltoallv, la quale richiede, come primo parametro, l'indirizzo del
* buffer/array dal quale prendere i dati da inviare. */
// Every node sends the sizes of its partitions to every other node.
for (int i = 0; i < nodes; i++) {
p_sizes[i] = prtn[i].length; // Salvo le lunghezze delle partizioni del vettore locale
p_starts[i] = prtn[i].start; // Salvo le partenze delle partizioni del vettore locale
}
op_result = MPI_Alltoall(p_sizes, 1, MPI_INT, rcv_prt_sizes, 1, MPI_INT, MPI_COMM_WORLD);
ab_ec_neq_MPI_SUCCESS(op_result, "Erorr in getting the sizes of partition to be received. Aborting");
// Calcolo (in locale, per evitare comunicazioni superflue) degli starting points delle partizioni da ricevere
rcv_prt_starts[0] = 0;
for (int i = 1; i < nodes; i++) {
rcv_prt_starts[i] = rcv_prt_starts[i - 1] + rcv_prt_sizes[i - 1];
}
MPI_Barrier(MPI_COMM_WORLD);
// Calcolo dimensione del buffer da allocare per ricevere le porzioni
size_t aux_size = 0;
for (int i = 0; i < nodes; i++) {
aux_size += rcv_prt_sizes[i]; // aux_size indica quanti elementi avrà il processo a fine ordinamento
}
//! WARNING! Potrebbe essere un problema il fatto di creare un'altro array di dimensioni ~ size(vect)
fprintf(stdout, "[Nodo %d] Sto per allocare l'array ausiliario per la MPI_Alltoallv\nProbabile crash\n", mpi_rank);
fflush(stdout);
int *aux_arr = new int[aux_size];
// Each process i keeps its ith partition and sends the jth partition to process j --- (for all j != i)
MPI_Barrier(MPI_COMM_WORLD);
op_result = MPI_Alltoallv(underlying_array, p_sizes, p_starts, MPI_INT, aux_arr, rcv_prt_sizes, rcv_prt_starts, MPI_INT, MPI_COMM_WORLD);
ab_ec_neq_MPI_SUCCESS(op_result, "Error when sending the partitions. Aborting.");
// Riordina le partizioni e copia vettore ordinato dal buffer a vettore locale
qsort(aux_arr, aux_size, sizeof(int), compare);
vect.assign(aux_arr, aux_arr + aux_size);
delete[] aux_arr;
// Salvataggio parametri da usare per scrittura su file
/* IDEA:
Ogni processo deve sapere, relativamente agli elementi:
1) Dove iniziare a scrivere = Offset su file
2) Per quanto scrivere = Dimensione del proprio vettore ordinato (equiv. a vect.size())
Per calcolare l'offset iniziale è necessario conoscere (offset + size) del vettore precedente.
Per esempio, se il primo vettore avesse 2 elementi, esso partirebbe dall'offeset 0, scriverebbe in posizione
[0,1] e la sua fine (ovvero da dove può scrivere il nodo successivo) sarebbe '2', ovvero 0 + vect.size().
Comunicando la propria fine al nodo successivo, esso può procedere alla scrittura. Tutto ciò ripetuto fino
all'ultimo nodo (ovvero 'nodes-1', perchè numerati a partire da 0), il quale non dovrà inviare nulla.
Alla fine, i dati necessari per la scrittura sono salvati negli ultimi due parametri passati
alla funzione PSRS.
*/
*sort_slice_size = aux_size; // Dimensione vettore ordinato = dimensione arrey allocato per ricevere le partizioni
long buffer[2]; // buffer[0] = Offset, starting point (in termini di elementi) del vettore
// buffer[1] = Dimensione del vettore (in termini di elementi)
if (mpi_rank == 0) { // Primo nodo non deve ricevere nulla
*out_f_offest = 0; // Parte da 0 (inizio file)
buffer[0] = *out_f_offest;
buffer[1] = *sort_slice_size;
if (nodes != 1) {
MPI_Send(&buffer[0], 1, MPI_LONG, mpi_rank + 1, 0, MPI_COMM_WORLD); // Invia offset
MPI_Send(&buffer[1], 1, MPI_LONG, mpi_rank + 1, 1, MPI_COMM_WORLD); // Invia dimensione vettore riordinato
}
} else {
if (mpi_rank == (nodes - 1)) { // Sono l'ultimo nodo, non devo inviare nulla
MPI_Recv(&buffer[0], 1, MPI_LONG, mpi_rank - 1, 0, MPI_COMM_WORLD, &status); // Riceve offeset nodo prec.
MPI_Recv(&buffer[1], 1, MPI_LONG, mpi_rank - 1, 1, MPI_COMM_WORLD, &status); // Riceve size v. nodo prec.
*out_f_offest = buffer[0] + buffer[1];
} else { // Devo ricevere e inviare
MPI_Recv(&buffer[0], 1, MPI_LONG, mpi_rank - 1, 0, MPI_COMM_WORLD, &status);
MPI_Recv(&buffer[1], 1, MPI_LONG, mpi_rank - 1, 1, MPI_COMM_WORLD, &status);
*out_f_offest = buffer[0] + buffer[1];
buffer[0] = *out_f_offest;
buffer[1] = *sort_slice_size;
MPI_Send(&buffer[0], 1, MPI_LONG, mpi_rank + 1, 0, MPI_COMM_WORLD); // Invia offeset
MPI_Send(&buffer[1], 1, MPI_LONG, mpi_rank + 1, 1, MPI_COMM_WORLD); // Invia dimensione vettore ordinato
}
}
}
// Calcolo tempo trascorso
end_time = MPI_Wtime();
elapsed_time = end_time - start_time;
// Parte di 'clean-up'
delete[] samples;
delete[] rcv_samples;
delete[] tmp_pvt;
delete[] prtn;
delete[] p_sizes;
delete[] p_starts;
delete[] rcv_prt_sizes;
delete[] rcv_prt_starts;
return elapsed_time;
}
/** @brief Partions the vector, given an array of pivots.
*
* @param vect Vector da partizionare
* @param p Array di partizioni da modificare
* @param pivot Array di pivot ricevuto in base al quale partizionare vect
* @param nodes Numero di nodi/macchine/processi (pivot.size() +1)
*/
void create_prtn(std::vector<int> &vect, partition *p, int *pivot, int nodes) {
size_t pos = 0;
int n_pivot = nodes - 1;
size_t vect_last_el = vect.size() - 1;
// Start of partition[0] = 0
for (int i = 0; i < n_pivot; i++) {
p[i].vect = vect;
p[i].start = pos;
while (pos <= vect_last_el && vect[pos] <= pivot[i]) {
pos++;
}
p[i].end = pos;
p[i].length = p[i].end - p[i].start;
}
// Gestione ultima partizione (ricevo n pivot, ottendo n+1 partizioni)
p[n_pivot].start = pos;
p[n_pivot].end = vect.size();
p[n_pivot].length = p[n_pivot].end - p[n_pivot].start;
}
Loading…
Cancel
Save