From feb2dc7c6960976d251c07f721e58f7c8c23d882 Mon Sep 17 00:00:00 2001 From: Luca Lombardo Date: Thu, 9 Feb 2023 00:49:41 +0100 Subject: [PATCH] empty file, it's needed by the utilis.py module --- src/__init__.py | 1 + src/omega_parallel_server.py | 147 +++++++++++++-------- src/utils.py | 239 ++++++++++++++++++++++++++++------- 3 files changed, 291 insertions(+), 96 deletions(-) create mode 100644 src/__init__.py diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..d7d0642 --- /dev/null +++ b/src/__init__.py @@ -0,0 +1 @@ +# this file is voluntarily empty, it is here to make this directory a package. It's needed for the utils module to work. diff --git a/src/omega_parallel_server.py b/src/omega_parallel_server.py index 7414920..6d722f6 100755 --- a/src/omega_parallel_server.py +++ b/src/omega_parallel_server.py @@ -1,15 +1,45 @@ #! /usr/bin/python3 import multiprocessing -import random import networkx as nx import numpy as np -import math -from utils import * import argparse +import random import time +from utils import * + +def parallel_omega(G: nx.Graph, k: float, nrand: int, niter: int, n_processes: int, seed: int) -> float: + """ + Computes the omega index for a given graph using parallelization. + + Parameters + ---------- + function to compute the omega index of a graph in parallel. This is a much faster approach then the standard omega function. It parallelizes the computation of the random graphs and lattice networks. + + Parameters + ---------- + `G`: nx.Graph + The graph to compute the omega index + + `k`: float + The percentage of nodes to sample from the graph. -def parallel_omega(G, nrand=12, n_processes = multiprocessing.cpu_count(), seed=42): + `niter`: int + Approximate number of rewiring per edge to compute the equivalent random graph. Default is 12. + + `nrand`: int + Number of random graphs generated to compute the maximal clustering coefficient (Cr) and average shortest path length (Lr). Default is 12 + + `n_processes`: int + Number of processes to use. Default is the number of cores of the machine. + + `seed`: int + The seed to use to generate the random graphs. Default is 42. + + Returns + ------- + `omega`: float + """ random.seed(seed) if not nx.is_connected(G): @@ -18,16 +48,16 @@ def parallel_omega(G, nrand=12, n_processes = multiprocessing.cpu_count(), seed= if len(G) == 1: return 0 - niter_lattice_reference = nrand - niter_random_reference = nrand * 2 + if k is not None: + G = random_sample(G, k) def worker(queue): # worker function to be used in parallel while True: task = queue.get() if task is None: break - random_graph = nx.random_reference(G, niter_random_reference, seed=seed) - lattice_graph = nx.lattice_reference(G, niter_lattice_reference, seed=seed) + random_graph = nx.random_reference(G, niter, seed=seed) + lattice_graph = nx.lattice_reference(G, niter, seed=seed) random_shortest_path = nx.average_shortest_path_length(random_graph) lattice_clustering = nx.average_clustering(lattice_graph) queue.put((random_shortest_path, lattice_clustering)) @@ -65,24 +95,53 @@ def parallel_omega(G, nrand=12, n_processes = multiprocessing.cpu_count(), seed= graphs = ['checkins-foursquare', 'checkins-gowalla', 'checkins-brightkite', 'friends-foursquare', 'friends-gowalla', 'friends-brightkite'] if __name__ == "__main__": - results = {} + parser = argparse.ArgumentParser() - # loop in reverse order - for graph in graphs[::-1]: - print("\nComputing omega for graph: ", graph) - print("Number of processes used: ", multiprocessing.cpu_count()) + parser.add_argument("graph", help="Name of the graph to be used. Options are 'checkins-foursquare', 'checkins-gowalla', 'checkins-brightkite', 'friends-foursquare', 'friends-gowalla', 'friends-brightkite'") + + parser.add_argument("--k", help="Percentage of nodes to be sampled. Needs to be a float between 0 and 1. Default is 0.1", default=None, type=float) - if 'checkins' in graph: - G = create_graph_from_checkins(graph.split('-')[1]) - elif 'friends' in graph: - G = create_friendships_graph(graph.split('-')[1]) + parser.add_argument("--nrand", help="Number of random graphs. Needs to be an integer. Default is 5", default=12, type=int) + + parser.add_argument("--niter", help="Approximate number of rewiring per edge to compute the equivalent random graph. Default is 12", default=12, type=int) + + parser.add_argument("--processes", help="Number of processes to be used. Needs to be an integer. Default is all available", default=multiprocessing.cpu_count(), type=int) + + parser.add_argument("--seed", help="Seed for the random number generator. Needs to be an integer. Default is 42", default=42, type=int) + + parser.add_help = True + args = parser.parse_args() + + graphs = ['checkins-foursquare', 'checkins-gowalla', 'checkins-brightkite', 'friends-foursquare', 'friends-gowalla', 'friends-brightkite'] + + if args.graph not in graphs: + raise ValueError("Graph name is not valid. Options are 'checkins-foursquare', 'checkins-gowalla', 'checkins-brightkite', 'friends-foursquare', 'friends-gowalla', 'friends-brightkite'") + + if args.processes > multiprocessing.cpu_count(): + print("Number of processes is higher than available. Setting it to default value: all available") + args.processes = multiprocessing.cpu_count() + elif args.processes < 1: + raise ValueError("Number of processes needs to be at least 1") + + name = args.graph.split('-')[1] + if 'checkins' in args.graph: + G = create_graph_from_checkins(name) + elif 'friends' in args.graph: + G = create_friendships_graph(name) + G.name = str(args.graph) + " Checkins Graph" + + results = {} + for graph in graphs: + print("\nComputing omega for graph {} with {} nodes and {} edges".format(graph, len(G), G.number_of_edges())) + print("Number of processes used: ", multiprocessing.cpu_count()) start = time.time() - omega = parallel_omega(G) + omega = parallel_omega(G, k = float(args.k), nrand=int(args.nrand), niter=int(args.niter), n_processes=int(args.processes), seed=42) end = time.time() print("Omega: ", omega) - print("Number of random graphs: ", 12) + print("Number of random graphs: ", args.nrand) + print("Number of processes used: ", args.processes) print("Time: ", end - start) results[graph] = omega @@ -91,49 +150,31 @@ if __name__ == "__main__": for key in results.keys(): f.write("%s\t%s\n" % (key, results[key])) -# if __name__ == "__main__": -# parser = argparse.ArgumentParser() -# parser.add_argument("graph", help="Name of the graph to be used. Options are 'checkins-foursquare', 'checkins-gowalla', 'checkins-brightkite', 'friends-foursquare', 'friends-gowalla', 'friends-brightkite'") -# parser.add_argument("--nrand", help="Number of random graphs. Needs to be an integer. Default is 5", default=12, type=int) -# parser.add_argument("--processes", help="Number of processes to be used. Needs to be an integer. Default is all available", default=multiprocessing.cpu_count(), type=int) -# parser.add_argument("--seed", help="Seed for the random number generator. Needs to be an integer. Default is 42", default=42, type=int) -# parser.add_help = True -# args = parser.parse_args() - -# graphs = ['checkins-foursquare', 'checkins-gowalla', 'checkins-brightkite', 'friends-foursquare', 'friends-gowalla', 'friends-brightkite'] - -# if args.graph not in graphs: -# raise ValueError("Graph name is not valid. Options are 'checkins-foursquare', 'checkins-gowalla', 'checkins-brightkite', 'friends-foursquare', 'friends-gowalla', 'friends-brightkite'") - -# if args.processes > multiprocessing.cpu_count(): -# print("Number of processes is higher than available. Setting it to default value: all available") -# args.processes = multiprocessing.cpu_count() -# elif args.processes < 1: -# raise ValueError("Number of processes needs to be at least 1") - -# name = args.graph.split('-')[1] -# if 'checkins' in args.graph: -# G = create_graph_from_checkins(name) -# elif 'friends' in args.graph: -# G = create_friendships_graph(name) -# G.name = str(args.graph) + " Checkins Graph" +## Variant if you want to run it on a server for all the graphs +# if __name__ == "__main__": # results = {} -# for graph in graphs: -# print("\nComputing omega for graph: ", G.name) + +# # loop in reverse order +# for graph in graphs[::-1]: +# print("\nComputing omega for graph: ", graph) # print("Number of processes used: ", multiprocessing.cpu_count()) +# if 'checkins' in graph: +# G = create_graph_from_checkins(graph.split('-')[1]) +# elif 'friends' in graph: +# G = create_friendships_graph(graph.split('-')[1]) + # start = time.time() -# omega = parallel_omega(G, nrand=int(args.nrand), n_processes=int(args.processes), seed=42) +# omega = parallel_omega(G) # end = time.time() # print("Omega: ", omega) -# print("Number of random graphs: ", args.nrand) -# print("Number of processes used: ", args.processes) +# print("Number of random graphs: ", 12) # print("Time: ", end - start) # results[graph] = omega -# with open('results.tsv', 'w') as f: -# for key in results.keys(): -# f.write("%s\t%s\n" % (key, results[key])) +# with open('results.tsv', 'w') as f: +# for key in results.keys(): +# f.write("%s\t%s\n" % (key, results[key])) diff --git a/src/utils.py b/src/utils.py index 31b89cf..52018a4 100755 --- a/src/utils.py +++ b/src/utils.py @@ -1,29 +1,26 @@ """ -NOTEs: - -- This file is note meant to be run, it's just a collection of functions that are used in the other files. It's just a way to keep the code clean and organized. - -- Why do I use os.path.join and not the "/"? Because it's more portable, it works on every OS, while "/" works only on Linux and Mac. In windows you would have to change all the "/" with "\". With os.path.join you don't have to worry about it and, as always, f*** Microsoft. +This file is note meant to be run, it's just a collection of functions that are used in the other files. It's just a way to keep the code clean and organized. """ -from multiprocessing import Pool -import itertools import os -import random import wget +import gdown +import shutil +import random import zipfile +import itertools +import numpy as np import pandas as pd import tqdm as tqdm import networkx as nx -from typing import Literal -from itertools import combinations +import multiprocessing import plotly.graph_objects as go -from collections import Counter -import numpy as np -import gdown from networkx.utils import py_random_state -import shutil +from itertools import combinations from pyvis.network import Network +from multiprocessing import Pool +from collections import Counter +from typing import Literal # ------------------------------------------------------------------------# @@ -62,7 +59,6 @@ def download_datasets(): print("Created {} folder".format(folder)) ## DOWNLOADING ## - for folder in dict.keys(): for url in dict[folder]: if folder == "foursquare": @@ -79,7 +75,6 @@ def download_datasets(): print("{} already downloaded".format(url)) ## UNZIPPING ## - for folder in dict.keys(): for file in os.listdir(os.path.join("data", folder)): if file.endswith(".gz"): @@ -91,7 +86,6 @@ def download_datasets(): os.remove(os.path.join("data", folder, file)) ## FOURSQUARE CLEANING ## - for file in os.listdir(os.path.join("data", "foursquare", "dataset_WWW2019")): if file.endswith(".txt"): os.rename(os.path.join("data", "foursquare", "dataset_WWW2019", file), os.path.join("data", "foursquare", file)) @@ -142,21 +136,17 @@ def create_graph_from_checkins(dataset: Literal['brightkite', 'gowalla', 'foursq file = os.path.join("data", dataset, dataset + "_checkins.txt") - print("\nCreating the graph for the dataset {}...".format(dataset)) - df = pd.read_csv(file, sep="\t", header=None, names=["user_id", "venue_id"], engine='pyarrow') G = nx.Graph() venues_users = df.groupby("venue_id")["user_id"].apply(set) - for users in tqdm.tqdm(venues_users): for user1, user2 in combinations(users, 2): G.add_edge(user1, user2) # path to the file where we want to save the graph edges_path = os.path.join("data", dataset , dataset + "_checkins_edges.tsv") - print("Done! The graph has {} edges".format(G.number_of_edges()), " and {} nodes".format(G.number_of_nodes())) # delete from memory the dataframe @@ -195,23 +185,23 @@ def create_friendships_graph(dataset: Literal['brightkite', 'gowalla', 'foursqua if dataset not in ["brightkite", "gowalla", "foursquare"]: raise ValueError("The dataset must be brightkite, gowalla or foursquare") - file = os.path.join("data", dataset, dataset + "_friends_edges.txt") + # read the file with the edges of the friendship graph and get the unique users df_friends_all = pd.read_csv(file, sep="\t", header=None, names=["node1", "node2"], engine='pyarrow') unique_friends = set(df_friends_all["node1"].unique()).union(set(df_friends_all["node2"].unique())) + # read the file with the edges of the check-ins graph and get the unique users df_checkins = pd.read_csv(os.path.join("data", dataset, dataset + "_checkins_edges.tsv"), sep="\t", header=None, names=["node1", "node2"]) unique_checkins = set(df_checkins["node1"].unique()).union(set(df_checkins["node2"].unique())) + # get the intersection of the two sets and filter the friendship graph unique_users = unique_friends.intersection(unique_checkins) - df = df_friends_all[df_friends_all["node1"].isin(unique_users) & df_friends_all["node2"].isin(unique_users)] - df.to_csv(os.path.join("data", dataset, dataset + "_friends_edges_filtered.tsv"), sep="\t", header=False, index=False) G = nx.from_pandas_edgelist(df, "node1", "node2", create_using=nx.Graph()) - del df_friends_all, df_checkins, df + del df_friends_all, df_checkins, df # delete from memory the dataframes return G @@ -289,7 +279,6 @@ def chunks(l, n): `n` : int Number of chunks - """ l_c = iter(l) @@ -302,6 +291,7 @@ def chunks(l, n): # ------------------------------------------------------------------------# def betweenness_centrality_parallel(G, processes=None, k =None) -> dict: + """ Compute the betweenness centrality for nodes in a graph using multiprocessing. @@ -327,7 +317,6 @@ def betweenness_centrality_parallel(G, processes=None, k =None) -> dict: Notes ----- Do not use more then 6 process for big graphs, otherwise the memory will be full. Do it only if you have more at least 32 GB of RAM. For small graphs, you can use more processes. - """ # if process is None or 1, run the standard algorithm with one process @@ -388,7 +377,7 @@ def average_shortest_path(G: nx.Graph, k=None) -> float: `G` : networkx graph The graph to compute the average shortest path length of. `k` : float - percentage of nodes to remove from the graph. If k is None, the average shortest path length of each connected component is computed using all the nodes of the connected component. + percentage of nodes to remove from the graph. If `k` is None, the average shortest path length of each connected component is computed using all the nodes of the connected component. Returns ------- @@ -398,7 +387,7 @@ def average_shortest_path(G: nx.Graph, k=None) -> float: Raises ------ ValueError - If k is not between 0 and 1 + If `k` is not between 0 and 1 """ if k is not None and (k < 0 or k > 1): @@ -434,10 +423,10 @@ def average_clustering_coefficient(G: nx.Graph, k=None) -> float: Parameters ---------- - G : networkx graph + `G` : networkx graph The graph to compute the average clustering coefficient of. - k : int - percentage of nodes to remove from the graph. If k is None, the average clustering coefficient of each connected component is computed using all the nodes of the connected component. + `k` : int + percentage of nodes to remove from the graph. If `k` is None, the average clustering coefficient of each connected component is computed using all the nodes of the connected component. Returns ------- @@ -447,7 +436,7 @@ def average_clustering_coefficient(G: nx.Graph, k=None) -> float: Raises ------ ValueError - If k is not between 0 and 1 + If `k` is not between 0 and 1 """ if k is not None and (k < 0 or k > 1): @@ -470,7 +459,7 @@ def generalized_average_clustering_coefficient(G: nx.Graph) -> float: Parameters ---------- - G : networkx graph + `G` : networkx graph The graph to compute the generalized average clustering coefficient of. Returns @@ -485,24 +474,25 @@ def generalized_average_clustering_coefficient(G: nx.Graph) -> float: C += (3*(k-1))/(2*(2*k - 1)) return C/G.number_of_nodes() + # ------------------------------------------------------------------------# def create_random_graphs(G: nx.Graph, model = None, save = True) -> nx.Graph: - """Create a random graphs of the same model of the original graph G. + """Create a random graphs with about the same number of nodes and edges of the original graph. Parameters ---------- - G : nx.Graph + `G` : nx.Graph The original graph. - model : str + `model` : str The model to use to generate the random graphs. It can be one of the following: "erdos", "watts_strogatz" - save: bool + `save`: bool If True, the random graph is saved in the folder data/random/model Returns ------- - G_random : nx.Graph + `G_random` : nx.Graph """ @@ -544,6 +534,7 @@ def create_random_graphs(G: nx.Graph, model = None, save = True) -> nx.Graph: return G_random +# ------------------------------------------------------------------------# def visualize_graphs(G: nx.Graph, k: float, connected = True): @@ -552,18 +543,18 @@ def visualize_graphs(G: nx.Graph, k: float, connected = True): Parameters ---------- - G: nx.Graph + `G`: nx.Graph The graph to visualize - k: float + `k`: float The percentage of nodes to remove from the graph. Default is None, in which case it will be chosen such that there are about 1000 nodes in the sampled graph. I strongly suggest to use the default value, other wise the visualization will be very slow. - connected: bool + `connected`: bool If True, we will consider only the largest connected component of the graph Returns ------- - html file + `html file` The html file containing the visualization of the graph Notes: @@ -630,3 +621,165 @@ def visualize_graphs(G: nx.Graph, k: float, connected = True): net.show("html_graphs/{}.html".format(name)) print("The graph has been saved in the folder html_graphs with the name {}.html" .format(name)) + +# ------------------------------------------------------------------------# + +def random_sample(graph: nx.Graph, k: float) -> nx.Graph: + + """ + Function to take a random sample of a graph + + Parameters + ---------- + `graph`: nx.Graph + The graph to sample + + `k`: float + The percentage of nodes to sample from the graph. + + Returns + ------- + `G`: nx.Graph + + """ + + nodes = list(graph.nodes()) + n = int(k*len(nodes)) + nodes_sample = random.sample(nodes, n) + + G = graph.subgraph(nodes_sample) + + if not nx.is_connected(G): + print("Graph is not connected. Taking the largest connected component") + connected = max(nx.connected_components(G), key=len) + G_connected = graph.subgraph(connected) + + print(nx.is_connected(G_connected)) + + print("Number of nodes in the sampled graph: ", G.number_of_nodes()) + print("Number of edges in the sampled graph: ", G.number_of_edges()) + + return G_connected + +# ------------------------------------------------------------------------# + +def omega_sampled(G: nx.Graph, k: float, niter: int, nrand: int) -> float: + + """ + Function to compute the omega index of a graph + + Parameters + ---------- + `G`: nx.Graph + The graph to compute the omega index + + `k`: float + The percentage of nodes to sample from the graph. + + `niter`: int + Approximate number of rewiring per edge to compute the equivalent random graph. + + `nrand`: int + Number of random graphs generated to compute the maximal clustering coefficient (Cr) and average shortest path length (Lr). + + Returns + ------- + `omega`: float + The omega index of the graph + + """ + + # sample the graph + G_sampled = random_sample(G, k) + + # compute the omega index + omega = nx.omega(G_sampled, nrand, niter) + + return omega + +# ------------------------------------------------------------------------# + +def omega_parallel(G: nx.Graph, k: float, niter: int, nrand: int, n_processes = multiprocessing.cpu_count(), seed = 42) -> float: + + """ + function to compute the omega index of a graph in parallel. This is a much faster approach then the standard omega function. It parallelizes the computation of the random graphs and lattice networks. + + Parameters + ---------- + `G`: nx.Graph + The graph to compute the omega index + + `k`: float + The percentage of nodes to sample from the graph. + + `niter`: int + Approximate number of rewiring per edge to compute the equivalent random graph. + + `nrand`: int + Number of random graphs generated to compute the maximal clustering coefficient (Cr) and average shortest path length (Lr). + + `n_processes`: int + Number of processes to use. Default is the number of cores of the machine. + + `seed`: int + The seed to use to generate the random graphs. Default is 42. + + Returns + ------- + `omega`: float + + Notes + ----- + This is just a notebook version of the program omega_parallel_server.py that you can find in the src/ folder of the repository. This is supposed to be used just fo testing on small graphs. + + """ + + random.seed(seed) + if not nx.is_connected(G): + G = G.subgraph(max(nx.connected_components(G), key=len)) + + if len(G) == 1: + return 0 + + # sample the graph + G = random_sample(G, k) + + def worker(queue): # worker function to be used in parallel + while True: + task = queue.get() + if task is None: + break + random_graph = nx.random_reference(G, niter, seed=seed) + lattice_graph = nx.lattice_reference(G, niter, seed=seed) + random_shortest_path = nx.average_shortest_path_length(random_graph) + lattice_clustering = nx.average_clustering(lattice_graph) + queue.put((random_shortest_path, lattice_clustering)) + + manager = multiprocessing.Manager() # manager to share the queue + queue = manager.Queue() # queue to share the results + processes = [multiprocessing.Process(target=worker, args=(queue,)) for _ in range(n_processes)] # processes to be used + for process in processes: # start the processes + process.start() + + for _ in range(nrand): # put the tasks in the queue + queue.put(1) + + for _ in range(n_processes): # put the stop signals in the queue + queue.put(None) + + for process in processes: # wait for the processes to finish + process.join() + + # collect the results + shortest_paths = [] + clustering_coeffs = [] + while not queue.empty(): + random_shortest_path, lattice_clustering = queue.get() # get the results from the queue + shortest_paths.append(random_shortest_path) + clustering_coeffs.append(lattice_clustering) + + L = nx.average_shortest_path_length(G) + C = nx.average_clustering(G) + + omega = (np.mean(shortest_paths) / L) - (C / np.mean(clustering_coeffs)) + return omega