From bbc7bc59148ffd13b07263d1e9795419fcdec0b4 Mon Sep 17 00:00:00 2001 From: Luca Lombardo Date: Mon, 31 Jan 2022 22:46:31 +0100 Subject: [PATCH] MultiThreading, hurray --- kenobi.cpp | 136 +++++++++++++++++++++++++++++++---------------------- 1 file changed, 79 insertions(+), 57 deletions(-) diff --git a/kenobi.cpp b/kenobi.cpp index 7b68688..d869ae3 100644 --- a/kenobi.cpp +++ b/kenobi.cpp @@ -1,4 +1,4 @@ -// g++ -Wall -pedantic -std=c++17 -Ofast kenobi.cpp -o kenobi +// g++ -Wall -pedantic -std=c++17 -Ofast -pthread kenobi.cpp -o kenobi #include #include #include @@ -6,6 +6,8 @@ #include #include #include +#include +#include #include #include #include // getline @@ -29,6 +31,8 @@ map A; // Dizionario {actor_id (key): Actor (value)} map F; // Dizionario {film_id (value): Film (value)} int MAX_ACTOR_ID = -1; +const int N_THREADS = 12; // Number of threads to use for some functions + void DataRead() { ifstream actors("data/Attori.txt"); // leggo il file @@ -174,66 +178,84 @@ vector> closeness(const size_t k) { // We do not need to define Q either, as we will loop over each vertex anyway, and the order does not matter. vector> top_actors; // Each pair is (actor_index, farness). top_actors.reserve(k+1); // We need exactly k items, no more and no less. - vector enqueued(MAX_ACTOR_ID, false); // Vector to see which vertices with put in the queue during the BSF - - // We loop over each vertex - for (const auto& [actor_id, actor] : A) { - // if |Top| ≥ k and L[v] > Farn[Top[k]] then return Top; => We can not exploit the lower bound of our vertex to stop the loop, as we are not updating lower bounds L. - // We just compute the farness of our vertex using a BFS - queue> q; // FIFO of pairs (actor_index, distance from our vertex). - for (size_t i = 0; i < enqueued.size(); i++) - enqueued[i] = false; - int r = 0; // |R|, where R is the set of vertices reachable from our vertex - long long int sum_distances = 0; // Sum of the distances to other nodes - int prev_distance = 0; // Previous distance, to see when we get to a deeper level of the BFS - q.push(make_pair(actor_id, 0)); - enqueued[actor_id] = true; - bool skip = false; - while (!q.empty()) { - auto [bfs_actor_id, distance] = q.front(); - q.pop(); - // Try to set a lower bound on the farness - if (top_actors.size() == k && distance > prev_distance) { // We are in the first item of the next exploration level - // We assume r = A.size(), the maximum possible value - double farness_lower_bound = 1.0 / ((double)A.size() - 1) * (sum_distances + q.size() * distance); - if (top_actors[k-1].second <= farness_lower_bound) { // Stop the BFS - skip = true; - break; - } - } - // We compute the farness of our vertex actor_id - r++; - sum_distances += distance; - // We loop on the adjacencies of bfs_actor_id and add them to the queue - for (int bfs_film_id : A[bfs_actor_id].film_indices) { - for (int adj_actor_id : F[bfs_film_id].actor_indicies) { - if (!enqueued[adj_actor_id]) { - // The adjacent vertices have distance +1 w.r.t. the current vertex - q.push(make_pair(adj_actor_id, distance+1)); - enqueued[adj_actor_id] = true; + + vector threads; + mutex top_actors_mutex; // To prevent simultaneous accesses to top_actors + threads.reserve(N_THREADS); + for (int i = 0; i < N_THREADS; i++) { + threads.push_back(thread([&top_actors,&top_actors_mutex,&k](int start) { + vector enqueued(MAX_ACTOR_ID, false); // Vector to see which vertices with put in the queue during the BSF + // We loop over each vertex + for (int actor_id = start; actor_id <= MAX_ACTOR_ID; actor_id += N_THREADS) { + if (!A.count(actor_id)) // The actor must exist, otherwise A[actor_id] would attempt to write A, and this may produce a race condition if multiple threads do it at the same time + continue; + // if |Top| ≥ k and L[v] > Farn[Top[k]] then return Top; => We can not exploit the lower bound of our vertex to stop the loop, as we are not updating lower bounds L. + // We just compute the farness of our vertex using a BFS + queue> q; // FIFO of pairs (actor_index, distance from our vertex). + for (size_t i = 0; i < enqueued.size(); i++) + enqueued[i] = false; + int r = 0; // |R|, where R is the set of vertices reachable from our vertex + long long int sum_distances = 0; // Sum of the distances to other nodes + int prev_distance = 0; // Previous distance, to see when we get to a deeper level of the BFS + q.push(make_pair(actor_id, 0)); + enqueued[actor_id] = true; + bool skip = false; + while (!q.empty()) { + auto [bfs_actor_id, distance] = q.front(); + q.pop(); + // Try to set a lower bound on the farness + if (distance > prev_distance) { + const lock_guard top_actors_lock(top_actors_mutex); // Acquire ownership of the mutex, wait if another thread already owns it. Release the mutex when destroyed. + if (top_actors.size() == k) { // We are in the first item of the next exploration level + // We assume r = A.size(), the maximum possible value + double farness_lower_bound = 1.0 / ((double)A.size() - 1) * (sum_distances + q.size() * distance); + if (top_actors[k-1].second <= farness_lower_bound) { // Stop the BFS + skip = true; + break; // top_actors_lock gets destroyed also if we do this break + } + } + // top_actors_lock gets destroyed after this line, releasing the mutex } + // We compute the farness of our vertex actor_id + r++; + sum_distances += distance; + // We loop on the adjacencies of bfs_actor_id and add them to the queue + for (int bfs_film_id : A[bfs_actor_id].film_indices) { + for (int adj_actor_id : F[bfs_film_id].actor_indicies) { + if (!enqueued[adj_actor_id]) { + // The adjacent vertices have distance +1 w.r.t. the current vertex + q.push(make_pair(adj_actor_id, distance+1)); + enqueued[adj_actor_id] = true; + } + } + } + } + if (skip) { + cout << actor_id << " " << A[actor_id].name << " SKIPPED" << endl; + continue; + } + // BFS is over, we compute the farness + double farness = (A.size()-1) / pow((double)r-1, 2) * sum_distances; + if (isnan(farness)) // This happens when r = 1 + continue; + // Insert the actor in top_actors, before the first element with farness >= than our actor's (i.e. sorted insert) + const lock_guard top_actors_lock(top_actors_mutex); // Acquire ownership of the mutex, wait if another thread already owns it. Release the mutex when destroyed. + auto idx = find_if(top_actors.begin(), top_actors.end(), + [&farness](const pair& p) { return p.second >= farness; }); + if (top_actors.size() < k || idx != top_actors.end()) { + top_actors.insert(idx, make_pair(actor_id, farness)); + if (top_actors.size() > k) + top_actors.pop_back(); } + cout << actor_id << " " << A[actor_id].name << " " << farness << endl; + // top_actors_lock gets destroyed after this line, releasing the mutex } - } - if (skip) { - cout << actor_id << " " << A[actor_id].name << " SKIPPED" << endl; - continue; - } - // BFS is over, we compute the farness - double farness = (A.size()-1) / pow((double)r-1, 2) * sum_distances; - if (isnan(farness)) // This happens when r = 1 - continue; - // Insert the actor in top_actors, before the first element with farness >= than our actor's (i.e. sorted insert) - auto idx = find_if(top_actors.begin(), top_actors.end(), - [&farness](const pair& p) { return p.second >= farness; }); - if (top_actors.size() < k || idx != top_actors.end()) { - top_actors.insert(idx, make_pair(actor_id, farness)); - if (top_actors.size() > k) - top_actors.pop_back(); - } - cout << actor_id << " " << A[actor_id].name << " " << farness << endl; + }, i)); } + for (auto& thread : threads) + thread.join(); + return top_actors; } @@ -284,7 +306,7 @@ int main() // ------------------------------------------------------------- // cout << "Grafo, grafo delle mie brame... chi è il più centrale del reame?" << endl; - for (const auto& [actor_id, farness] : closeness(3)) { + for (const auto& [actor_id, farness] : closeness(100)) { cout << A[actor_id].name << " " << farness << endl; }