now every thing is done and complete. TODO: better comments, documentation

main
Luca Lombardo 3 years ago
parent 1adca4bfe7
commit 99989720db

@ -107,7 +107,7 @@ filtered_tconsts = df_film["tconst"].to_list()
```
Then we can generate the final filtered file `FilmFiltrati.txt` that has only two columns: `nconst` and `primaryName`
Then we can generate the final filtered file `FilmFiltrati.txt` that has only two columns: `tconst` and `primaryName`
---
@ -348,7 +348,7 @@ The crucial point of the algorithm is the definition of the lower bounds, that i
What we are changing in this code is that since $L=0$ is never updated, we do not need to definite it. We will just loop over each vertex, in the order the map prefers. We do not need to define `Q` either, as we will loop over each vertex anyway, and the order does not matter.
#### Multi-threaded BFS
#### Multi-threaded implementation
We are working on a web-scale graph, multi-threading was a must. At first, we definite a `vector<thread>` and a mutex to prevent simultaneous accesses to the `top_actors` vector. Then preallocate the number of threads we want to use.

@ -5,7 +5,7 @@ import numpy as np
import os
import csv
MIN_MOVIES = 72 # Only keep relations for actors that have made more than this many movies
MIN_MOVIES = 30 # Only keep relations for actors that have made more than this many movies
#-----------------DOWNLOAD .GZ FILES FROM IMDB DATABASE-----------------#
@ -33,6 +33,8 @@ for url in urls:
os.makedirs("data", exist_ok=True) # Generate (recursively) folders, ignores the comand if they already exists
#------------------------------FILTERING------------------------------#
print("Filtering actors...")
df_attori = pd.read_csv(
'name.basics.tsv.gz', sep='\t', compression='gzip',

@ -27,29 +27,29 @@ struct Actor {
vector<int> film_indices;
};
map<int, Actor> A; // Dizionario {actor_id (key): Actor (value)}
map<int, Film> F; // Dizionario {film_id (value): Film (value)}
int MAX_ACTOR_ID = -1;
map<int, Actor> A; // Dictionary {actor_id (key): Actor (value)}
map<int, Film> F; // Dictionary {film_id (value): Film (value)}
int MAX_ACTOR_ID = -1; // Here DataRead() puts the larges actor_id loaded from Attori.txt
const int N_THREADS = 12; // Number of threads to use for some functions
void DataRead()
{
ifstream actors("data/Attori.txt"); // leggo il file
ifstream movies("data/FilmFiltrati.txt"); // leggo il file
ifstream actors("data/Attori.txt"); // read the file
ifstream movies("data/FilmFiltrati.txt"); // read the file
string s,t; // creo delle stringhe per dopo, notazione triste? Si
const string space /* the final frontier */ = "\t"; // stringa spazio per dopo
string s,t;
const string space /* the final frontier */ = "\t";
for (int i = 1; getline(actors,s); i++)
{
if (s.empty()) // serve per saltare le righe vuote, a volte capita
if (s.empty()) // jumps empty lines, sometimes can happen
continue;
try {
Actor TmpObj; // creo un oggetto temporaneo della classe Actor
Actor TmpObj; // Temporary object for the actor class
int id = stoi(s.substr(0, s.find(space)));
TmpObj.name = s.substr(s.find(space)+1);
A[id] = TmpObj; // Notazione di Matlab/Python, ma da C++17 funziona
A[id] = TmpObj; // Matlab/Python notation, works since C++17
if (id > MAX_ACTOR_ID)
MAX_ACTOR_ID = id;
} catch (...) {
@ -73,8 +73,6 @@ void DataRead()
}
}
// Inizio a costruire il grafo
void BuildGraph()
{
ifstream relations("data/Relazioni.txt");
@ -85,9 +83,9 @@ void BuildGraph()
if (s.empty())
continue;
try {
int id_film = stoi(s.substr(0, s.find(space))); // Indice del film
int id_attore = stoi(s.substr(s.find(space)+1)); // Indice dell'attore
if (A.count(id_attore) && F.count(id_film)) { // Escludi film e attori filtrati
int id_film = stoi(s.substr(0, s.find(space))); // Index of the movie
int id_attore = stoi(s.substr(s.find(space)+1)); // Index of the actor
if (A.count(id_attore) && F.count(id_film)) { // Do not consider the filtered ones
A[id_attore].film_indices.push_back(id_film);
F[id_film].actor_indicies.push_back(id_attore);
}
@ -97,29 +95,9 @@ void BuildGraph()
}
}
// // OLD VERSION
// void PrintGraph(size_t max_n_film = 100)
// {
// const size_t n = min(max_n_film, F.size()); // Potrebbero esserci meno film di max_n_film
// size_t i = 0;
// for (const auto& [id_film, film] : F) { // Loop sulle coppie id:film della mappa
// cout << id_film << "(" << film.name << ")";
// if (!film.actor_indicies.empty()) {
// cout << ":";
// for (int id_attore : film.actor_indicies)
// cout << " " << id_attore << "(" << A[id_attore].name << ")";
// }
// cout << endl;
// i++; // Tengo il conto di quanti ne ho stampati
// if (i >= n) // e smetto quando arrivo ad n
// break;
// }
// }
void PrintGraph(size_t max_n_actors = 10)
void PrintGraph(size_t max_n_actors = 3)
{
const size_t n = min(max_n_actors, A.size()); // Potrebbero esserci meno film di max_n_actors
const size_t n = min(max_n_actors, A.size()); // There could be less film than max actors!
size_t i = 0;
for (const auto& [id_attore, attore] : A) {
cout << id_attore << " (" << attore.name << ")";
@ -134,13 +112,13 @@ void PrintGraph(size_t max_n_actors = 10)
}
cout << endl;
i++; // Tengo il conto di quanti ne ho stampati
if (i >= n) // e smetto quando arrivo ad n
i++; // Taking count of how many are getting printed
if (i >= n) // Stop when I arrive ad n
break;
}
}
// Trova un film in base al titolo, restituisce -1 se non lo trova
// Find a movie by the title. Gives -1 if there is no match
int FindFilm(string title)
{
for (const auto& [id, film] : F)
@ -149,7 +127,7 @@ int FindFilm(string title)
return -1;
}
// Trova un film in base al titolo, restituisce -1 se non lo trova
// Find an actor by the name. Gives -1 if there is no match
int FindActor(string name)
{
for (const auto& [id, actor] : A)
@ -159,6 +137,7 @@ int FindActor(string name)
}
vector<pair<int, double>> closeness(const size_t k) {
/* **************************** ALGORITHM ****************************
Input : A graph G = (V, E)
@ -196,7 +175,8 @@ vector<pair<int, double>> closeness(const size_t k) {
- updateBounds(w):
the conservative strategy updateBoundsBFSCut(w) does not improve L, and it cuts the BFS as soon as it is sure that the farness of w is smaller than the k-th biggest farness found until now, that is, Farn[Top[k]]. If the BFS is cut, the function returns +, otherwise, at the end of the BFS we have computed the farness of v, and we can return it. The running time of this procedure is O(m) in the worst case, but
it can be much better in practice. It remains to define how the procedure can be sure that the farness of v is at least x: to this purpose, during the BFS, we update a lower bound on the farness of v. The idea behind this bound is that, if we have already visited all nodes up to distance d, we can upper bound the closeness centrality of v by setting distance d + 1 to a number of vertices equal to the number of edges leaving level d, and distance d + 2 to all the remaining vertices.
*/
**************************** END OF ALGORITHM **************************** */
// L = 0 for all vertices and is never update, so we do not need to define it. We will just loop over each vertex, in the order the map prefers.
@ -206,9 +186,10 @@ vector<pair<int, double>> closeness(const size_t k) {
top_actors.reserve(k+1); // We need exactly k items, no more and no less.
vector<thread> threads;
mutex top_actors_mutex; // To prevent simultaneous accesses to top_actors
mutex top_actors_mutex; // The threads write to top_actors, so another thread reading top_actors at the same time may find it in an invalid state (if the read happens while the other thread is still writing)
threads.reserve(N_THREADS);
for (int i = 0; i < N_THREADS; i++) {
// Lancio i thread
threads.push_back(thread([&top_actors,&top_actors_mutex,&k](int start) {
vector<bool> enqueued(MAX_ACTOR_ID, false); // Vector to see which vertices with put in the queue during the BSF
// We loop over each vertex
@ -223,33 +204,34 @@ vector<pair<int, double>> closeness(const size_t k) {
int r = 0; // |R|, where R is the set of vertices reachable from our vertex
long long int sum_distances = 0; // Sum of the distances to other nodes
int prev_distance = 0; // Previous distance, to see when we get to a deeper level of the BFS
q.push(make_pair(actor_id, 0));
q.push(make_pair(actor_id, 0)); // This vertex, which is at distance 0
enqueued[actor_id] = true;
bool skip = false;
while (!q.empty()) {
auto [bfs_actor_id, distance] = q.front();
auto [bfs_actor_id, distance] = q.front(); // Prendo l'elemento in cima alla coda
q.pop();
// Try to set a lower bound on the farness
if (distance > prev_distance) {
const lock_guard<mutex> top_actors_lock(top_actors_mutex); // Acquire ownership of the mutex, wait if another thread already owns it. Release the mutex when destroyed.
top_actors_mutex.lock(); // Acquire ownership of the mutex, wait if another thread already owns it
if (top_actors.size() == k) { // We are in the first item of the next exploration level
// We assume r = A.size(), the maximum possible value
double farness_lower_bound = 1.0 / ((double)A.size() - 1) * (sum_distances + q.size() * distance);
if (top_actors[k-1].second <= farness_lower_bound) { // Stop the BFS
skip = true;
break; // top_actors_lock gets destroyed also if we do this break
top_actors_mutex.unlock(); // Release the ownership
break;
}
}
// top_actors_lock gets destroyed after this line, releasing the mutex
top_actors_mutex.unlock(); // Release the ownership
}
// We compute the farness of our vertex actor_id
r++;
sum_distances += distance;
// We loop on the adjacencies of bfs_actor_id and add them to the queue
// We loop on each actor on each film that bfs_actor_id played in, and add them to the queue
for (int bfs_film_id : A[bfs_actor_id].film_indices) {
for (int adj_actor_id : F[bfs_film_id].actor_indicies) {
if (!enqueued[adj_actor_id]) {
// The adjacent vertices have distance +1 w.r.t. the current vertex
// The adjacent vertices have distance +1 with respect to the current vertex
q.push(make_pair(adj_actor_id, distance+1));
enqueued[adj_actor_id] = true;
}
@ -261,18 +243,21 @@ vector<pair<int, double>> closeness(const size_t k) {
continue;
}
// BFS is over, we compute the farness
double farness = numeric_limits<double>::infinity();
if (r > 1)
double farness;
if (r <= 1) // Avoid computing something/0
farness = numeric_limits<double>::infinity();
else
farness = (double)(A.size()-1) / pow((double)r-1, 2) * (double)sum_distances;
// Insert the actor in top_actors, before the first element with farness >= than our actor's (i.e. sorted insert)
const lock_guard<mutex> top_actors_lock(top_actors_mutex); // Acquire ownership of the mutex, wait if another thread already owns it. Release the mutex when destroyed.
auto idx = find_if(top_actors.begin(), top_actors.end(),
top_actors_mutex.lock(); // Acquire ownership of the mutex, wait if another thread already owns it
// Insert the actor in top_actors, before the first element with farness >= than our actor's (i.e. sorted insertion)
auto index = find_if(top_actors.begin(), top_actors.end(),
[&farness](const pair<int, double>& p) { return p.second > farness; });
if (top_actors.size() < k || idx != top_actors.end()) {
top_actors.insert(idx, make_pair(actor_id, farness));
if (top_actors.size() > k)
top_actors.pop_back();
}
top_actors.insert(index, make_pair(actor_id, farness));
if (top_actors.size() > k)
top_actors.pop_back();
top_actors_mutex.unlock(); // Release the ownerhsip (we are done with top_actors)
cout << actor_id << " " << A[actor_id].name << "\n\tCC: " << 1.0/farness << endl;
// top_actors_lock gets destroyed after this line, releasing the mutex
}
@ -280,12 +265,13 @@ vector<pair<int, double>> closeness(const size_t k) {
}
for (auto& thread : threads)
// Aspetto che tutti i thread abbiano finito
thread.join();
return top_actors;
}
vector<pair<int, double>> harmonic(const size_t k) { // NON RIESCO AD INVERTIRE L'ARGOMENTO DELLA SOMMA
vector<pair<int, double>> harmonic(const size_t k) { //
vector<pair<int, double>> top_actors; // Each pair is (actor_index, harmonic centrality).
top_actors.reserve(k+1); // We need exactly k items, no more and no less.
@ -316,15 +302,16 @@ vector<pair<int, double>> harmonic(const size_t k) { // NON RIESCO AD INVERTIRE
q.pop();
// Try to set an upper bound on the centrality
if (distance > prev_distance) {
const lock_guard<mutex> top_actors_lock(top_actors_mutex); // Acquire ownership of the mutex, wait if another thread already owns it. Release the mutex when destroyed.
top_actors_mutex.lock(); // Acquire ownership of the mutex, wait if another thread already owns it
if (top_actors.size() == k) { // We are in the first item of the next exploration level
double harmonic_centrality_upper_bound = sum_reverse_distances + q.size() / (double)distance + (A.size() - r - q.size()) / (double)(distance + 1);
if (top_actors[k-1].second >= harmonic_centrality_upper_bound) { // Stop the BFS
skip = true;
break; // top_actors_lock gets destroyed also if we do this break
top_actors_mutex.unlock(); // Release the ownership
break;
}
}
// top_actors_lock gets destroyed after this line, releasing the mutex
top_actors_mutex.unlock(); // Release the ownership
}
// We compute the farness of our vertex actor_id
r++;
@ -334,7 +321,7 @@ vector<pair<int, double>> harmonic(const size_t k) { // NON RIESCO AD INVERTIRE
for (int bfs_film_id : A[bfs_actor_id].film_indices) {
for (int adj_actor_id : F[bfs_film_id].actor_indicies) {
if (!enqueued[adj_actor_id]) {
// The adjacent vertices have distance +1 w.r.t. the current vertex
// The adjacent vertices have distance +1 with respect to the current vertex
q.push(make_pair(adj_actor_id, distance+1));
enqueued[adj_actor_id] = true;
}
@ -349,17 +336,16 @@ vector<pair<int, double>> harmonic(const size_t k) { // NON RIESCO AD INVERTIRE
double harmonic_centrality = sum_reverse_distances;
if (!isfinite(harmonic_centrality))
continue;
// Insert the actor in top_actors, before the first element with farness >= than our actor's (i.e. sorted insert)
const lock_guard<mutex> top_actors_lock(top_actors_mutex); // Acquire ownership of the mutex, wait if another thread already owns it. Release the mutex when destroyed.
auto idx = find_if(top_actors.begin(), top_actors.end(),
top_actors_mutex.lock(); // Acquire ownership of the mutex, wait if another thread already owns it
// Insert the actor in top_actors, before the first element with farness >= than our actor's (i.e. sorted insertion)
auto index = find_if(top_actors.begin(), top_actors.end(),
[&harmonic_centrality](const pair<int, double>& p) { return p.second < harmonic_centrality; });
if (top_actors.size() < k || idx != top_actors.end()) {
top_actors.insert(idx, make_pair(actor_id, harmonic_centrality));
if (top_actors.size() > k)
top_actors.pop_back();
}
top_actors.insert(index, make_pair(actor_id, harmonic_centrality));
if (top_actors.size() > k)
top_actors.pop_back();
cout << actor_id << " " << A[actor_id].name << "\n\tHC: " << harmonic_centrality << endl;
// top_actors_lock gets destroyed after this line, releasing the mutex
top_actors_mutex.unlock(); // Release the ownership
}
}, i));
}
@ -375,9 +361,6 @@ int main()
{
srand(time(NULL));
// # info.txt valore massimo di un identificativo di un attore dentro Relazioni.txt, non so scriverlo in python quindi eccolo in bash
// echo "$(cut -f2 -d' ' data/Relazioni.txt | sort --numeric-sort | tail -1)" > data/info.txt
DataRead();
BuildGraph();
cout << "Numero film: " << F.size() << endl;
@ -417,7 +400,7 @@ int main()
// ------------------------------------------------------------- //
cout << "Grafo, grafo delle mie brame... chi è il più centrale del reame?\n" <<endl;
const size_t k = 10;
const size_t k = 500;
auto top_by_closeness = closeness(k);
auto top_by_harmonic = harmonic(k);
printf("\n%36s %36s\n", "CLOSENESS CENTRALITY", "HARMONIC CENTRALITY");

Loading…
Cancel
Save