MultiThreading, hurray

main
Luca Lombardo 3 years ago
parent 599f49d8a4
commit bbc7bc5914

@ -1,4 +1,4 @@
// g++ -Wall -pedantic -std=c++17 -Ofast kenobi.cpp -o kenobi
// g++ -Wall -pedantic -std=c++17 -Ofast -pthread kenobi.cpp -o kenobi
#include <iostream>
#include <iomanip>
#include <vector>
@ -6,6 +6,8 @@
#include <string>
#include <queue>
#include <list>
#include <thread>
#include <mutex>
#include <stack>
#include <set>
#include <fstream> // getline
@ -29,6 +31,8 @@ map<int, Actor> A; // Dizionario {actor_id (key): Actor (value)}
map<int, Film> F; // Dizionario {film_id (value): Film (value)}
int MAX_ACTOR_ID = -1;
const int N_THREADS = 12; // Number of threads to use for some functions
void DataRead()
{
ifstream actors("data/Attori.txt"); // leggo il file
@ -174,66 +178,84 @@ vector<pair<int, double>> closeness(const size_t k) {
// We do not need to define Q either, as we will loop over each vertex anyway, and the order does not matter.
vector<pair<int, double>> top_actors; // Each pair is (actor_index, farness).
top_actors.reserve(k+1); // We need exactly k items, no more and no less.
vector<bool> enqueued(MAX_ACTOR_ID, false); // Vector to see which vertices with put in the queue during the BSF
// We loop over each vertex
for (const auto& [actor_id, actor] : A) {
// if |Top| ≥ k and L[v] > Farn[Top[k]] then return Top; => We can not exploit the lower bound of our vertex to stop the loop, as we are not updating lower bounds L.
// We just compute the farness of our vertex using a BFS
queue<pair<int,int>> q; // FIFO of pairs (actor_index, distance from our vertex).
for (size_t i = 0; i < enqueued.size(); i++)
enqueued[i] = false;
int r = 0; // |R|, where R is the set of vertices reachable from our vertex
long long int sum_distances = 0; // Sum of the distances to other nodes
int prev_distance = 0; // Previous distance, to see when we get to a deeper level of the BFS
q.push(make_pair(actor_id, 0));
enqueued[actor_id] = true;
bool skip = false;
while (!q.empty()) {
auto [bfs_actor_id, distance] = q.front();
q.pop();
// Try to set a lower bound on the farness
if (top_actors.size() == k && distance > prev_distance) { // We are in the first item of the next exploration level
// We assume r = A.size(), the maximum possible value
double farness_lower_bound = 1.0 / ((double)A.size() - 1) * (sum_distances + q.size() * distance);
if (top_actors[k-1].second <= farness_lower_bound) { // Stop the BFS
skip = true;
break;
}
}
// We compute the farness of our vertex actor_id
r++;
sum_distances += distance;
// We loop on the adjacencies of bfs_actor_id and add them to the queue
for (int bfs_film_id : A[bfs_actor_id].film_indices) {
for (int adj_actor_id : F[bfs_film_id].actor_indicies) {
if (!enqueued[adj_actor_id]) {
// The adjacent vertices have distance +1 w.r.t. the current vertex
q.push(make_pair(adj_actor_id, distance+1));
enqueued[adj_actor_id] = true;
vector<thread> threads;
mutex top_actors_mutex; // To prevent simultaneous accesses to top_actors
threads.reserve(N_THREADS);
for (int i = 0; i < N_THREADS; i++) {
threads.push_back(thread([&top_actors,&top_actors_mutex,&k](int start) {
vector<bool> enqueued(MAX_ACTOR_ID, false); // Vector to see which vertices with put in the queue during the BSF
// We loop over each vertex
for (int actor_id = start; actor_id <= MAX_ACTOR_ID; actor_id += N_THREADS) {
if (!A.count(actor_id)) // The actor must exist, otherwise A[actor_id] would attempt to write A, and this may produce a race condition if multiple threads do it at the same time
continue;
// if |Top| ≥ k and L[v] > Farn[Top[k]] then return Top; => We can not exploit the lower bound of our vertex to stop the loop, as we are not updating lower bounds L.
// We just compute the farness of our vertex using a BFS
queue<pair<int,int>> q; // FIFO of pairs (actor_index, distance from our vertex).
for (size_t i = 0; i < enqueued.size(); i++)
enqueued[i] = false;
int r = 0; // |R|, where R is the set of vertices reachable from our vertex
long long int sum_distances = 0; // Sum of the distances to other nodes
int prev_distance = 0; // Previous distance, to see when we get to a deeper level of the BFS
q.push(make_pair(actor_id, 0));
enqueued[actor_id] = true;
bool skip = false;
while (!q.empty()) {
auto [bfs_actor_id, distance] = q.front();
q.pop();
// Try to set a lower bound on the farness
if (distance > prev_distance) {
const lock_guard<mutex> top_actors_lock(top_actors_mutex); // Acquire ownership of the mutex, wait if another thread already owns it. Release the mutex when destroyed.
if (top_actors.size() == k) { // We are in the first item of the next exploration level
// We assume r = A.size(), the maximum possible value
double farness_lower_bound = 1.0 / ((double)A.size() - 1) * (sum_distances + q.size() * distance);
if (top_actors[k-1].second <= farness_lower_bound) { // Stop the BFS
skip = true;
break; // top_actors_lock gets destroyed also if we do this break
}
}
// top_actors_lock gets destroyed after this line, releasing the mutex
}
// We compute the farness of our vertex actor_id
r++;
sum_distances += distance;
// We loop on the adjacencies of bfs_actor_id and add them to the queue
for (int bfs_film_id : A[bfs_actor_id].film_indices) {
for (int adj_actor_id : F[bfs_film_id].actor_indicies) {
if (!enqueued[adj_actor_id]) {
// The adjacent vertices have distance +1 w.r.t. the current vertex
q.push(make_pair(adj_actor_id, distance+1));
enqueued[adj_actor_id] = true;
}
}
}
}
if (skip) {
cout << actor_id << " " << A[actor_id].name << " SKIPPED" << endl;
continue;
}
// BFS is over, we compute the farness
double farness = (A.size()-1) / pow((double)r-1, 2) * sum_distances;
if (isnan(farness)) // This happens when r = 1
continue;
// Insert the actor in top_actors, before the first element with farness >= than our actor's (i.e. sorted insert)
const lock_guard<mutex> top_actors_lock(top_actors_mutex); // Acquire ownership of the mutex, wait if another thread already owns it. Release the mutex when destroyed.
auto idx = find_if(top_actors.begin(), top_actors.end(),
[&farness](const pair<int, double>& p) { return p.second >= farness; });
if (top_actors.size() < k || idx != top_actors.end()) {
top_actors.insert(idx, make_pair(actor_id, farness));
if (top_actors.size() > k)
top_actors.pop_back();
}
cout << actor_id << " " << A[actor_id].name << " " << farness << endl;
// top_actors_lock gets destroyed after this line, releasing the mutex
}
}
if (skip) {
cout << actor_id << " " << A[actor_id].name << " SKIPPED" << endl;
continue;
}
// BFS is over, we compute the farness
double farness = (A.size()-1) / pow((double)r-1, 2) * sum_distances;
if (isnan(farness)) // This happens when r = 1
continue;
// Insert the actor in top_actors, before the first element with farness >= than our actor's (i.e. sorted insert)
auto idx = find_if(top_actors.begin(), top_actors.end(),
[&farness](const pair<int, double>& p) { return p.second >= farness; });
if (top_actors.size() < k || idx != top_actors.end()) {
top_actors.insert(idx, make_pair(actor_id, farness));
if (top_actors.size() > k)
top_actors.pop_back();
}
cout << actor_id << " " << A[actor_id].name << " " << farness << endl;
}, i));
}
for (auto& thread : threads)
thread.join();
return top_actors;
}
@ -284,7 +306,7 @@ int main()
// ------------------------------------------------------------- //
cout << "Grafo, grafo delle mie brame... chi è il più centrale del reame?" << endl;
for (const auto& [actor_id, farness] : closeness(3)) {
for (const auto& [actor_id, farness] : closeness(100)) {
cout << A[actor_id].name << " " << farness << endl;
}

Loading…
Cancel
Save