final version to send

main
Luca Lombardo 2 years ago
parent 55911adcde
commit 3b4d0e3200

File diff suppressed because one or more lines are too long

@ -9,131 +9,44 @@ import random
import time import time
from utils import * from utils import *
def parallel_omega(G: nx.Graph, k: float, nrand: int = 6, niter: int = 6, n_processes: int = None, seed: int = 42) -> float:
""" """
Computes the omega index for a given graph using parallelization. This script computes the omega index for a given graph using parallelization. To see the implementation of the omega index, see the file omega.py in the same folder and check the function parallel_omega
Parameters
----------
function to compute the omega index of a graph in parallel. This is a much faster approach then the standard omega function. It parallelizes the computation of the random graphs and lattice networks.
Parameters
----------
`G`: nx.Graph
The graph to compute the omega index
`k`: float
The percentage of nodes to sample from the graph.
`niter`: int
Approximate number of rewiring per edge to compute the equivalent random graph. Default is 6.
`nrand`: int
Number of random graphs generated to compute the maximal clustering coefficient (Cr) and average shortest path length (Lr). Default is 6
`n_processes`: int
Number of processes to use. Default is the number of cores of the machine.
`seed`: int
The seed to use to generate the random graphs. Default is 42.
Returns
-------
`omega`: float
""" """
if n_processes is None: ### PARSING ARGUMENTS ###
n_processes = multiprocessing.cpu_count()
if n_processes > nrand:
n_processes = nrand
random.seed(seed)
if not nx.is_connected(G):
G = G.subgraph(max(nx.connected_components(G), key=len))
if len(G) == 1:
return 0
if k > 0:
G = random_sample(G, k)
def worker(queue_seeds, queue_results): # worker function to be used in parallel
while True:
try:
seed = queue_seeds.get(False)
except Empty:
break
random_graph = nx.random_reference(G, niter, seed=seed)
lattice_graph = nx.lattice_reference(G, niter, seed=seed)
random_shortest_path = nx.average_shortest_path_length(random_graph)
lattice_clustering = nx.average_clustering(lattice_graph)
queue_results.put((random_shortest_path, lattice_clustering))
manager = multiprocessing.Manager() # manager to share the queue
queue_seeds = manager.Queue() # queue to give the seeds to the processes
queue_results = manager.Queue() # queue to share the results
processes = [multiprocessing.Process(target=worker, args=(queue_seeds, queue_results))
for _ in range(n_processes)] # processes to be used
for i in range(nrand): # put the tasks in the queue
queue_seeds.put(i + seed)
for process in processes: # start the processes
process.start()
for process in processes: # wait for the processes to finish
process.join()
# collect the results
shortest_paths = []
clustering_coeffs = []
while not queue_results.empty():
random_shortest_path, lattice_clustering = queue_results.get() # get the results from the queue
shortest_paths.append(random_shortest_path)
clustering_coeffs.append(lattice_clustering)
L = nx.average_shortest_path_length(G)
C = nx.average_clustering(G)
omega = (np.mean(shortest_paths) / L) - (C / np.mean(clustering_coeffs))
return omega
if __name__ == "__main__":
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument("graph", help="Name of the graph to be used.", choices=['checkins-foursquare', 'checkins-gowalla', 'checkins-brightkite', 'friends-foursquare', 'friends-gowalla', 'friends-brightkite']) parser.add_argument("graph", help="Name of the graph to be used.", choices=['checkins-foursquare', 'checkins-gowalla', 'checkins-brightkite', 'friends-foursquare', 'friends-gowalla', 'friends-brightkite'])
parser.add_argument("--k", help="Percentage of nodes to be sampled. Needs to be a float between 0 and 1. Default is 0.", default=0, type=float) parser.add_argument("--k", help="Percentage of nodes to be sampled. Needs to be a float between 0 and 1. Default is 0.", default=0, type=float)
parser.add_argument("--nrand", help="Number of random graphs. Needs to be an integer. Default is 12", default=12, type=int) parser.add_argument("--nrand", help="Number of random graphs. Needs to be an integer. Default is 12", default=12, type=int)
parser.add_argument("--niter", help="Approximate number of rewiring per edge to compute the equivalent random graph. Default is 12", default=12, type=int) parser.add_argument("--niter", help="Approximate number of rewiring per edge to compute the equivalent random graph. Default is 12", default=12, type=int)
parser.add_argument("--processes", help="Number of processes to be used. Needs to be an integer. Default is the number of cores.", default=multiprocessing.cpu_count(), type=int) parser.add_argument("--processes", help="Number of processes to be used. Needs to be an integer. Default is the number of cores.", default=multiprocessing.cpu_count(), type=int)
parser.add_argument("--seed", help="Seed for the random number generator. Needs to be an integer. Default is 42", default=42, type=int) parser.add_argument("--seed", help="Seed for the random number generator. Needs to be an integer. Default is 42", default=42, type=int)
parser.add_help = True parser.add_help = True
args = parser.parse_args() args = parser.parse_args()
# check if the number of processes is valid
if args.processes > multiprocessing.cpu_count(): if args.processes > multiprocessing.cpu_count():
print("Number of processes is higher than available. Setting it to default value: all available") print("Number of processes is higher than available. Setting it to default value: all available")
args.processes = multiprocessing.cpu_count() args.processes = multiprocessing.cpu_count()
elif args.processes < 1: elif args.processes < 1:
raise ValueError("Number of processes needs to be at least 1") raise ValueError("Number of processes needs to be at least 1")
# the name of the graph is the first part of the input string
name = args.graph.split('-')[1] name = args.graph.split('-')[1]
if 'checkins' in args.graph: if 'checkins' in args.graph:
G = create_graph_from_checkins(name) G = create_graph_from_checkins(name) #function from utils.py, check it out there
elif 'friends' in args.graph: elif 'friends' in args.graph:
G = create_friendships_graph(name) G = create_friendships_graph(name) #function from utils.py, check it out there
G.name = str(args.graph) + " Checkins Graph" G.name = str(args.graph) + " Checkins Graph"
print("\nComputing omega for graph {} with {} nodes and {} edges".format(args.graph, len(G), G.number_of_edges())) print("\nThe full graph {} has {} nodes and {} edges".format(args.graph, len(G), G.number_of_edges()))
print("Number of processes used: ", args.processes) print("Number of processes used: ", args.processes)
start = time.time() start = time.time()
# function from utils.py, check it out there (it's the parallel version of the omega index)
omega = parallel_omega(G, k = args.k, nrand=args.nrand, niter=args.niter, n_processes=args.processes, seed=42) omega = parallel_omega(G, k = args.k, nrand=args.nrand, niter=args.niter, n_processes=args.processes, seed=42)
end = time.time() end = time.time()

@ -1,38 +1,31 @@
#! /usr/bin/python3 #! /usr/bin/python3
import networkx as nx
from utils import *
import warnings
import time import time
import random
import argparse import argparse
warnings.filterwarnings("ignore") import networkx as nx
from utils import *
def random_sample(graph, k):
nodes = list(graph.nodes())
n = int(k*len(nodes))
nodes_sample = random.sample(nodes, n)
G = graph.subgraph(nodes_sample) """
Standard function to compute the omega index for a given graph. To see the implementation of the omega index, refer to the networkx documentation
if not nx.is_connected(G): https://networkx.org/documentation/stable/reference/algorithms/generated/networkx.algorithms.smallworld.omega.html#networkx.algorithms.smallworld.omega
print("Graph is not connected. Taking the largest connected component")
connected = max(nx.connected_components(G), key=len)
G_connected = graph.subgraph(connected)
print(nx.is_connected(G_connected)) This file has been created to be used with the server. It takes as input the name of the graph and the percentage of nodes to be sampled. It then computes the omega index for the sampled graph and returns the result. Run
print("Number of nodes in the sampled graph: ", G.number_of_nodes()) ```
print("Number of edges in the sampled graph: ", G.number_of_edges()) ./omega_sampled_server.py -h
```
return G_connected to see the list of available graphs and the other parameters that can be passed as input.
"""
if __name__ == "__main__":
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument("graph", help="Name of the graph to be used. Options are 'checkins-foursquare', 'checkins-gowalla', 'checkins-brightkite', 'friends-foursquare', 'friends-gowalla', 'friends-brightkite'")
parser.add_argument("k", help="Percentage of nodes to be sampled. Needs to be a float between 0 and 1") parser.add_argument("graph", help="Name of the graph to be used.", choices=['checkins-foursquare', 'checkins-gowalla', 'checkins-brightkite', 'friends-foursquare', 'friends-gowalla', 'friends-brightkite'])
parser.add_argument("--k", help="Percentage of nodes to be sampled. Needs to be a float between 0 and 1", default=0)
parser.add_argument("--niter", help="Number of rewiring per edge. Needs to be an integer. Default is 5", default=5) parser.add_argument("--niter", help="Number of rewiring per edge. Needs to be an integer. Default is 5", default=5)
parser.add_argument("--nrand", help="Number of random graphs. Needs to be an integer. Default is 5", default=5) parser.add_argument("--nrand", help="Number of random graphs. Needs to be an integer. Default is 5", default=5)
parser.add_help = True parser.add_help = True
args = parser.parse_args() args = parser.parse_args()
@ -45,12 +38,12 @@ if __name__ == "__main__":
G.name = str(args.graph) + " Checkins Graph" G.name = str(args.graph) + " Checkins Graph"
# sample the graph # sample the graph
G_sample = random_sample(G, float(args.k)) G_sample = random_sample(G, float(args.k)) # function from utils.py, check it out there
# compute omega # compute omega
start = time.time() start = time.time()
print("\nComputing omega for graph: ", G.name) print("\nComputing omega for graph: ", G.name)
omega = nx.omega(G_sample, niter = int(args.niter), nrand = int(args.nrand)) omega = nx.omega(G_sample, niter = int(args.niter), nrand = int(args.nrand))
end = time.time() end = time.time()
print("Omega coefficient for graph {}: {}".format(G.name, omega)) print("\nOmega coefficient for graph {}: {}".format(G.name, omega))
print("Time taken: ", round(end-start,2)) print("Time taken: ", round(end-start,2), " seconds")

@ -23,7 +23,6 @@ from subprocess import run
from typing import Literal from typing import Literal
from queue import Empty from queue import Empty
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
DATA_DIR = os.path.join(SCRIPT_DIR, "data") DATA_DIR = os.path.join(SCRIPT_DIR, "data")
@ -47,7 +46,6 @@ def download_datasets():
The datasets are downloaded in the "data" folder. If the folder doesn't exist, it will be created. If the dataset is already downloaded, it will be skipped. The files are renamed to make them more readable. The datasets are downloaded in the "data" folder. If the folder doesn't exist, it will be created. If the dataset is already downloaded, it will be skipped. The files are renamed to make them more readable.
""" """
dict = { dict = {
"brightkite": ["https://snap.stanford.edu/data/loc-brightkite_edges.txt.gz", "https://snap.stanford.edu/data/loc-brightkite_totalCheckins.txt.gz"], "brightkite": ["https://snap.stanford.edu/data/loc-brightkite_edges.txt.gz", "https://snap.stanford.edu/data/loc-brightkite_totalCheckins.txt.gz"],
"gowalla": ["https://snap.stanford.edu/data/loc-gowalla_edges.txt.gz", "https://snap.stanford.edu/data/loc-gowalla_totalCheckins.txt.gz"], "gowalla": ["https://snap.stanford.edu/data/loc-gowalla_edges.txt.gz", "https://snap.stanford.edu/data/loc-gowalla_totalCheckins.txt.gz"],
@ -141,7 +139,7 @@ def create_graph_from_checkins(dataset: Literal['brightkite', 'gowalla', 'foursq
Raises Raises
------ ------
ValueError `ValueError`
If the dataset is not valid. If the dataset is not valid.
""" """
@ -149,7 +147,6 @@ def create_graph_from_checkins(dataset: Literal['brightkite', 'gowalla', 'foursq
if dataset not in ['brightkite', 'gowalla', 'foursquare']: if dataset not in ['brightkite', 'gowalla', 'foursquare']:
raise ValueError("Dataset not valid. Please choose between brightkite, gowalla, foursquare") raise ValueError("Dataset not valid. Please choose between brightkite, gowalla, foursquare")
file = os.path.join(DATA_DIR, dataset, dataset + "_checkins.txt") file = os.path.join(DATA_DIR, dataset, dataset + "_checkins.txt")
print("\nCreating the graph for the dataset {}...".format(dataset)) print("\nCreating the graph for the dataset {}...".format(dataset))
df = pd.read_csv(file, sep="\t", header=None, names=["user_id", "venue_id"], engine='pyarrow') df = pd.read_csv(file, sep="\t", header=None, names=["user_id", "venue_id"], engine='pyarrow')
@ -173,20 +170,21 @@ def create_graph_from_checkins(dataset: Literal['brightkite', 'gowalla', 'foursq
return G return G
# ------------------------------------------------------------------------# # ------------------------------------------------------------------------#
def create_friendships_graph(dataset: Literal['brightkite', 'gowalla', 'foursquareEU', 'foursquareIT']) -> nx.Graph: def create_friendships_graph(dataset: Literal['brightkite', 'gowalla', 'foursquareEU', 'foursquareIT'], create_file = True) -> nx.Graph:
""" """
Create the graph of friendships for the dataset brightkite, gowalla or foursquare. Create the graph of friendships for the dataset brightkite, gowalla or foursquare. The graph is saved in a file.
The graph is saved in a file.
Parameters Parameters
---------- ----------
`dataset` : str `dataset` : str
The dataset for which we want to create the graph of friendships. The dataset for which we want to create the graph of friendships.
`create_file` : bool, optional
If True, the graph is saved in a file, by default True
Returns Returns
------- -------
`G` : networkx.Graph `G` : networkx.Graph
@ -213,10 +211,14 @@ def create_friendships_graph(dataset: Literal['brightkite', 'gowalla', 'foursqua
# get the intersection of the two sets and filter the friendship graph # get the intersection of the two sets and filter the friendship graph
unique_users = unique_friends.intersection(unique_checkins) unique_users = unique_friends.intersection(unique_checkins)
df = df_friends_all[df_friends_all["node1"].isin(unique_users) & df_friends_all["node2"].isin(unique_users)] df = df_friends_all[df_friends_all["node1"].isin(unique_users) & df_friends_all["node2"].isin(unique_users)]
# save the graph in a file
if create_file:
df.to_csv(os.path.join(DATA_DIR, dataset, dataset + "_friends_edges_filtered.tsv"), sep="\t", header=False, index=False) df.to_csv(os.path.join(DATA_DIR, dataset, dataset + "_friends_edges_filtered.tsv"), sep="\t", header=False, index=False)
G = nx.from_pandas_edgelist(df, "node1", "node2", create_using=nx.Graph()) G = nx.from_pandas_edgelist(df, "node1", "node2", create_using=nx.Graph())
del df_friends_all, df_checkins, df # delete from memory the dataframes del df_friends_all, df_checkins, df # delete from memory the dataframes
print("Created the graph for the dataset {} with {} edges".format(dataset, G.number_of_edges()), "and {} nodes".format(G.number_of_nodes()))
return G return G
@ -327,8 +329,16 @@ def betweenness_centrality_parallel(G, processes=None, k =None) -> dict:
Returns Returns
------- -------
dict `dict`
Dictionary of nodes with betweenness centrality as the value.
Raises
------
`ValueError`
If the number of processes is greater than the number of cores in the system
`ValueError`
If k is not None and k is not between 0 and 1
Notes Notes
----- -----
Do not use more then 6 process for big graphs, otherwise the memory will be full. Do it only if you have more at least 32 GB of RAM. For small graphs, you can use more processes. Do not use more then 6 process for big graphs, otherwise the memory will be full. Do it only if you have more at least 32 GB of RAM. For small graphs, you can use more processes.
@ -357,10 +367,11 @@ def betweenness_centrality_parallel(G, processes=None, k =None) -> dict:
if k is None: if k is None:
G_copy = G.copy() G_copy = G.copy()
p = Pool(processes=processes) p = Pool(processes=processes) # create a pool of processes
node_divisor = len(p._pool) * 4 node_divisor = len(p._pool) * 4 # number of nodes to be processed by each process
node_chunks = list(chunks(G_copy.nodes(), G_copy.order() // node_divisor)) node_chunks = list(chunks(G_copy.nodes(), G_copy.order() // node_divisor)) # divide the nodes in chunks
num_chunks = len(node_chunks) num_chunks = len(node_chunks)
# run the algorithm on each chunk
bt_sc = p.starmap( bt_sc = p.starmap(
nx.betweenness_centrality_subset, nx.betweenness_centrality_subset,
zip( zip(
@ -396,12 +407,12 @@ def average_shortest_path(G: nx.Graph, k=None) -> float:
Returns Returns
------- -------
float `float`
The average shortest path length of the graph. The average shortest path length of the graph.
Raises Raises
------ ------
ValueError `ValueError`
If `k` is not between 0 and 1 If `k` is not between 0 and 1
""" """
@ -412,12 +423,11 @@ def average_shortest_path(G: nx.Graph, k=None) -> float:
connected_components = list(nx.connected_components(G)) connected_components = list(nx.connected_components(G))
else: else:
G_copy = G.copy() G_copy = G.copy()
# remove the k% of nodes from G
G_copy.remove_nodes_from(random.sample(G_copy.nodes(), int((k)*G_copy.number_of_nodes()))) G_copy.remove_nodes_from(random.sample(G_copy.nodes(), int((k)*G_copy.number_of_nodes())))
print("\tNumber of nodes after removing {}% of nodes: {}" .format((k)*100, G_copy.number_of_nodes())) print("\tNumber of nodes after removing {}% of nodes: {}" .format((k)*100, G_copy.number_of_nodes()))
print("\tNumber of edges after removing {}% of nodes: {}" .format((k)*100, G_copy.number_of_edges())) print("\tNumber of edges after removing {}% of nodes: {}" .format((k)*100, G_copy.number_of_edges()))
tmp = 0 tmp = 0 # temporary variable to store the sum of the average shortest path length of each connected component
connected_components = list(nx.connected_components(G_copy)) connected_components = list(nx.connected_components(G_copy))
# remove all the connected components with less than 10 nodes # remove all the connected components with less than 10 nodes
connected_components = [c for c in connected_components if len(c) > 10] connected_components = [c for c in connected_components if len(c) > 10]
@ -445,12 +455,12 @@ def average_clustering_coefficient(G: nx.Graph, k=None) -> float:
Returns Returns
------- -------
float `float`
The average clustering coefficient of the graph. The average clustering coefficient of the graph.
Raises Raises
------ ------
ValueError `ValueError`
If `k` is not between 0 and 1 If `k` is not between 0 and 1
""" """
@ -461,9 +471,7 @@ def average_clustering_coefficient(G: nx.Graph, k=None) -> float:
return nx.average_clustering(G) return nx.average_clustering(G)
else: else:
G_copy = G.copy() G_copy = random_sample(G, k)
G_copy.remove_nodes_from(random.sample(list(G_copy.nodes()), int((k)*G_copy.number_of_nodes())))
print("\tNumber of nodes after removing {}% of nodes: {}" .format((k)*100, G_copy.number_of_nodes()))
return nx.average_clustering(G_copy) return nx.average_clustering(G_copy)
@ -479,7 +487,7 @@ def generalized_average_clustering_coefficient(G: nx.Graph) -> float:
Returns Returns
------- -------
float `float`
The generalized average clustering coefficient of the graph. The generalized average clustering coefficient of the graph.
""" """
@ -509,6 +517,10 @@ def create_random_graphs(G: nx.Graph, model = None, save = True) -> nx.Graph:
------- -------
`G_random` : nx.Graph `G_random` : nx.Graph
Notes
-----
This is just a time-saving and approximate way to create random graphs. If you want more accurate random graphs, you should use the function `random_reference` from the `networkx` library.
""" """
if model is None: if model is None:
@ -595,7 +607,7 @@ def visualize_graphs(G: nx.Graph, k: float, connected = True):
# create a networkx graph # create a networkx graph
net = net = Network(directed=False, bgcolor='#1e1f29', font_color='white') net = Network(directed=False, bgcolor='#1e1f29', font_color='white')
# for some reasons, if I put % values, the graph is not displayed correctly. So I use pixels, sorry non FHD users # for some reasons, if I put % values, the graph is not displayed correctly. So I use pixels, sorry non FHD users
net.width = '1920px' net.width = '1920px'
@ -652,14 +664,27 @@ def random_sample(graph: nx.Graph, k: float) -> nx.Graph:
`k`: float `k`: float
The percentage of nodes to remove from the graph The percentage of nodes to remove from the graph
Raises
------
`ValueError`
If k is not between 0 and 1
Returns Returns
------- -------
`G`: nx.Graph `G`: nx.Graph
The sampled graph
""" """
# edge cases
if not 0 <= k <= 1: if not 0 <= k <= 1:
raise ValueError("Percentage of nodes needs to be between 0 and 1") raise ValueError("Percentage of nodes needs to be between 0 and 1")
elif k == 0:
print("k is 0. Returning the original graph")
return graph
elif k == 1:
print("k is 1. Returning an empty graph")
return nx.Graph()
nodes = list(graph.nodes()) nodes = list(graph.nodes())
nodes_sample = np.random.choice(nodes, size=int((1-k)*len(nodes)), replace=False) nodes_sample = np.random.choice(nodes, size=int((1-k)*len(nodes)), replace=False)
@ -667,7 +692,7 @@ def random_sample(graph: nx.Graph, k: float) -> nx.Graph:
G = graph.subgraph(nodes_sample) G = graph.subgraph(nodes_sample)
if not nx.is_connected(G): if not nx.is_connected(G):
print("Graph is not connected. Taking the largest connected component") print("\nGraph is not connected. Taking the largest connected component")
connected = max(nx.connected_components(G), key=len) connected = max(nx.connected_components(G), key=len)
G_connected = graph.subgraph(connected) G_connected = graph.subgraph(connected)
@ -681,7 +706,7 @@ def random_sample(graph: nx.Graph, k: float) -> nx.Graph:
def omega_sampled(G: nx.Graph, k: float, niter: int, nrand: int) -> float: def omega_sampled(G: nx.Graph, k: float, niter: int, nrand: int) -> float:
""" """
Function to compute the omega index of a graph Function to compute the omega index on a sampled graph
Parameters Parameters
---------- ----------
@ -701,7 +726,6 @@ def omega_sampled(G: nx.Graph, k: float, niter: int, nrand: int) -> float:
------- -------
`omega`: float `omega`: float
The omega index of the graph The omega index of the graph
""" """
# sample the graph # sample the graph
@ -739,23 +763,30 @@ def parallel_omega(G: nx.Graph, k: float, nrand: int = 6, niter: int = 6, n_proc
`seed`: int `seed`: int
The seed to use to generate the random graphs. Default is 42. The seed to use to generate the random graphs. Default is 42.
Raises
------
`ValueError`
If n_processes is less than 1
Returns Returns
------- -------
`omega`: float `omega`: float
Notes Notes
----- -----
This is just a notebook version of the program omega_parallel_server.py that you can find in the repository. This is supposed to be used just fo testing on small graphs. This is an experimental function that has not been fully tested.
""" """
if n_processes is None: if n_processes is None:
n_processes = multiprocessing.cpu_count() n_processes = multiprocessing.cpu_count()
if n_processes > nrand: if n_processes > nrand:
n_processes = nrand n_processes = nrand
if n_processes < 1:
raise ValueError("Number of processes needs to be at least 1")
random.seed(seed) random.seed(seed)
if not nx.is_connected(G): if not nx.is_connected(G):
# take the largest connected component
G = G.subgraph(max(nx.connected_components(G), key=len)) G = G.subgraph(max(nx.connected_components(G), key=len))
if len(G) == 1: if len(G) == 1:
@ -764,6 +795,7 @@ def parallel_omega(G: nx.Graph, k: float, nrand: int = 6, niter: int = 6, n_proc
# sample the graph # sample the graph
G = random_sample(G, k) G = random_sample(G, k)
# we are using two queues to share the seeds and the results between the processes
def worker(queue_seeds, queue_results): # worker function to be used in parallel def worker(queue_seeds, queue_results): # worker function to be used in parallel
while True: while True:
try: try:

Loading…
Cancel
Save