Source code for tnetwork.dyn_graph.dyn_graph_ls


import networkx as nx
import math
from tnetwork.dyn_graph.dyn_graph import DynGraph
from tnetwork.utils.intervals import Intervals
import tnetwork as tn
import sortedcontainers
from tnetwork.dyn_graph.encodings import code_length_LS
from collections.abc import Iterable


[docs]class DynGraphLS(DynGraph): """ A class to represent dynamic graphs as link streams. It is represented using a networkx Graph, using an attribute ("t") for each node and each edge representing its time of presence. The representation is done using a list of integer. """
[docs] def __init__(self,edges=None, nodes=None , frequency=1, start=None,end=None): """ Instanciate a dynamic graph A start and end dates can be used to give a "duration" to the graph independently from its nodes and edges (for instance, to study activity during a whole year, the graph might start on January 1st at 00:00 while the first recorded activity occurs in the afternoon or on another day) :param start: set a start time, by default will be the first added time :param end: set an end time, by default will be the last added time :param frequency: minimal time difference between two observations. Default: 1 :param edges: data to initialize the dynamic graph, dictionary {(n1,n2):[int]}. Keys are edges, time is ordered list of int :param nodes: data to initialize the dynamic graph, dictionary {n:time}. Keys are nodes, time is Intervals object (see interval graph) """ self._start=start self._end=end self.frequency(frequency) if start==None: self._start=math.inf if end==None: self._end=-math.inf self._graph = nx.Graph() if nodes!=None: self._graph.add_nodes_from(nodes.keys()) nx.set_node_attributes(self._graph,nodes,"t") if start == None or end == None: times = set([x.start() for x in nodes.values()] + [x.end() for x in nodes.values()]) if start == None: self._start = min(times) start = min(times) if end == None: self._end = max(times) end = max(times) if edges!=None: self._graph.add_weighted_edges_from([(k[0],k[1],sortedcontainers.SortedSet(v)) for k,v in edges.items()],"t") #nx.set_edge_attributes(self._graph,edges,"t") if start==None or end==None: if start == None or end == None: times = set([min(x) for x in edges.values()] + [max(x) for x in edges.values()]) if start == None: self._start = min(times) if end == None: self._end = max(times)
[docs] def start(self): return self._start
[docs] def end(self): return self._end
[docs] def node_presence(self, nodes=None): """ Presence period of nodes Several usages: * If nodes==None (default), return a dict for each node, its existing times * If nodes is a single node, return the interval of presence of this node * If nodes is a set of nodes, return interval of presence of those nodes as a dictionary :param nodes: :return: dictionary, for each node, its presence Intervals, or single Interval for single node """ toReturn = {} ns = nx.get_node_attributes(self._graph,"t") if nodes==None: if len(ns)>0: return ns else: return {n:None for n in self._graph.nodes()} if isinstance(nodes,str): return ns[nodes] return {k:v for k,v in ns.items() if k in nodes}
[docs] def add_interaction(self, u,v, time): """ Add an interaction between nodes u and v at time time :param u: first node :param b: second node :param time: integer or list of integers :return: """ if isinstance(time,int): time=[time] if not self._graph.has_edge(u, v): self._graph.add_edge(u, v, t=sortedcontainers.SortedSet(time)) else: self._graph[u][v]["t"].update(time) # self._graph.add_edge(u,v,t=time) start = min(time) end=max(time) self._start = min(self._start, start) self._end = max(self._end, end)
[docs] def cumulated_graph(self,times=None,weighted=True): """ Compute the cumulated graph. Return a networkx graph corresponding to the cumulated graph of the given period (whole graph by default) :param times: a pair (start,end) :return: a networkx (weighted) graph """ if times==None: times=(self._start, self._end) times_interval = Intervals(times) to_return = nx.Graph() for n,t in nx.get_node_attributes(self._graph,"t").items(): intersect = t.intersection(times_interval) if weighted: to_return.add_node(n,weight=intersect.duration()) else: to_return.add_node(n) for (u,v),t in nx.get_edge_attributes(self._graph,"t").items(): intersect = list(t.irange(times[0], times[1], inclusive=(True, False))) if len(intersect)>0: if weighted: to_return.add_edge(u,v,weight=len(intersect)) else: to_return.add_edge(u,v) return to_return
[docs] def graph_at_time(self,t:int) -> nx.Graph: """ Graph as it is at time t Return a networkx graph corresponding to the graphs as it is at time t, i.e., edges and nodes present at that time :param t: timestep :return: a networkx Graph """ to_return = nx.Graph() for n,intv in nx.get_node_attributes(self._graph,"t").items(): if intv.contains_t(t): to_return.add_node(n) for e, times in nx.get_edge_attributes(self._graph, "t").items(): if t in times: to_return.add_edge(e[0],e[1]) return to_return
[docs] def add_interactions_from(self, nodePairs, times): """ Add interactions between the provided node pairs for the provided times. Add each provided nodePair at each provided time :param nodePairs: list of pairs of nodes, or a single pair of nodes as a tuple or set :param times: list of times as integer or a single integer """ print(nodePairs) list_element_example = list(nodePairs)[0] if not isinstance(list_element_example, Iterable) or isinstance(list_element_example, str): nodePairs = [nodePairs] if not isinstance(times, Iterable): times = [times] for i,nodePair in enumerate(nodePairs): npp = list(nodePair) self.add_interaction(npp[0], npp[1], times)
[docs] def add_node_presence(self, n, time): """ Add presence for a node for a period :param n: node :param time: a period, couple (start, stop) or an interval """ if not isinstance(time,Intervals): time= Intervals(time) if not self._graph.has_node(n): self._graph.add_node(n, t=time) else: self._graph.nodes[n]["t"]+=time self._start = min(self._start, time.start()) self._end = max(self._end, time.end())
[docs] def add_nodes_presence_from(self, nodes,times): """ Add interactions between provided pairs for the provided periods :param nodes: list of nodes, or a single node :param times: list of times or a single time (integer) """ if not isinstance(nodes,Iterable): nodes = [nodes] if not isinstance(times,Iterable): #Single Interval object times = [times] else: #single interaval as a pair or multiple things list_element_example = list(times)[0] if not isinstance(list_element_example, Iterable): #if an element of a list is an int, it's a single pair times = [times] for i,node in enumerate(nodes): self.add_node_presence(node, times)
[docs] def remove_node_presence(self,node,time): """ Remove node and its interactions over the period :param node: node to remove :param time: a period, couple (start, stop) or an interval """ if not isinstance(time,Intervals): time= Intervals(time) if self._graph.has_node(node): self._graph.nodes()[node]["t"]= self._graph.nodes()[node]["t"]-time if self._graph.nodes()[node]["t"].duration()==0: self._graph.remove_node(node) if self._start in time or self._end in time or time.end()==self._end: new_max = -math.inf new_min = math.inf for k,v in self.node_presence().items(): new_max = max(new_max,v.end()) new_min = min(new_min,v.start()) self._start = new_min self._end = new_max
[docs] def remove_interaction(self,u,v,time): """ Remove an interaction between nodes u and v at time time :param u: first node :param v: second node :param time: integer :return: """ if self._graph.has_edge(u,v): self._graph[u][v]["t"].remove(time)
[docs] def remove_interactions_from(self, nodePairs, times): """ Remove interactions between provided pairs for the provided periods :param nodePairs: a node pair, or a list of node pairs :param times: a list of integer (applied to all pairs) or a list of lsit of integer (one per nodePairs) """ if not isinstance(nodePairs[0],tuple): #means it is a single pair, not list of pairs nodePairs=[nodePairs]*len(times) if isinstance(times[0], int): #means it is a single list, not list of list times = [times]*len(nodePairs) for i, nodePair in enumerate(nodePairs): self.remove_interaction(nodePair[0], nodePair[1], times[i])
def nodes_interactions(self): to_return={} for e,pres in self.edge_presence().items(): elist = list(e) to_return.setdefault(elist[0],[]) to_return[elist[0]]+=list(pres) to_return.setdefault(elist[1], []) to_return[elist[1]] += list(pres) for n,pres in to_return.items(): to_return[n]=sortedcontainers.SortedSet(pres) return to_return
[docs] def edge_presence(self, edges=None): """ Return the periods of interactions for each pair of nodes with at least an interaction :param edges: the list of edges to get interactions for, all by default :return: dictionary, keys : pair of nodes, values : list of integer """ list_element_example = list(edges)[0] if not isinstance(list_element_example, Iterable) or isinstance(list_element_example, str): edges = [edges] to_return = {frozenset(e):pres for e,pres in nx.get_edge_attributes(self._graph,"t").items()} if edges!=None: edges = [frozenset(e) for e in edges] to_return = {e:pres for e,pres in to_return.items() if e in edges} if len(edges)==1: return list(to_return.values())[0] return to_return
#return self.interactions_intervals(edges)
[docs] def change_times(self) ->[int]: """ List of all times with a node/edge change Return the list of all times at which a change node change/link :return: list of int """ to_return = set() for e,interv in self.edge_presence().items(): to_return.update(interv) for n,interv in self.node_presence().items(): if interv!=None: for period in interv.periods(): to_return.update(period) return sorted(list(to_return))
[docs] def slice(self,start,end): """ Keep only the selected period :param start: time of the beginning of the slice (inclusive) :param end: time of the end of the slice (exclusive) """ to_return = tn.DynGraphLS() slice_time = Intervals((start,end)) for n,presence in self.node_presence().items(): duration = slice_time.intersection(presence) if duration.duration()>0: to_return.add_node_presence(n,duration) for e,presence in self.interactions_intervals().items(): to_return.add_interaction(e[0],e[1],presence.islice(start,end)) to_return.add_interaction(e[0],e[1],slice_time.intersection(presence)) return to_return
[docs] def to_DynGraphSN(self,slices=None,weighted=True): """ Convert to a snapshot representation. :param slices: can be one of - None, snapshot_affiliations are created according to the frequency of the dynamic network (default one), - an integer, snapshot_affiliations are created using a sliding window - a list of periods, represented as pairs (start, end), each period yielding a snapshot :return: a dynamic graph represented as snapshot_affiliations, the weight of nodes/edges correspond to their presence time during the snapshot """ dgSN = tn.DynGraphSN() if slices==None: slices=self.frequency() if isinstance(slices,int): duration = slices dgSN.frequency(slices) slices = [] start = self._start end = start+duration slices.append((start, end)) while(end<=self._end): start=end end = start+duration slices.append((start,end)) all_times = sortedcontainers.SortedList(self.change_times()) for ts in slices: #print(ts) if len(list(all_times.irange(ts[0],ts[1],inclusive=(True,False))))>0: dgSN.add_snapshot(t=ts[0], graphSN=self.cumulated_graph(ts, weighted=weighted)) #dgSN.add_snapshot(t=ts[0],graphSN=nx.Graph()) #sorted_times = [x[0] for x in slices] #print(self.node_presence()) #to_add = {} # for n,interv in self.node_presence().items(): # intersection = interv._discretize(sorted_times) # for t, duration in intersection.items(): # to_add.setdefault(t, []).append((n, {"weight": duration})) # for t in to_add: # dgSN.snapshots(t).add_nodes_from(to_add[t]) # # to_add = {} # for e,interv in self.interactions_intervals().items(): # intersection = interv._discretize(sorted_times) # for t,duration in intersection.items(): # to_add.setdefault(t,[]).append((e[0],e[1],{"weight":duration})) # for t in to_add: # dgSN.snapshots(t).add_edges_from(to_add[t]) return dgSN
[docs] def code_length(self): return code_length_LS(self)
# def code_length(self): # # # code_time = np.log2(len(self.change_times())+1) # # # # #g_cumulated = self.cumulated_graph() # node_encoding = np.log2(len(self._graph.nodes())) # edge_encoding = node_encoding * 2 # nb_time = len(self.change_times()) # # nb_unique_edges = len(self._graph.edges()) # nb_periods = 0 # for e,ts in self.interactions_intervals().items(): # nb_periods+=len(ts.periods()) # time_encoding = np.log2(nb_time) # #time_encoding = np.log2(nb_periods*2) # # #(N1,N2)_(T1,T2)_(T3,T4)_STOP_(N2,N3) # # total_code = edge_encoding*nb_unique_edges + 2*nb_periods*time_encoding + nb_unique_edges*time_encoding # print("ig: ",edge_encoding,time_encoding,nb_unique_edges,nb_periods) # # return total_code
[docs] def write_interactions(self,filename): """ :param filename: :return: """ tn.write_as_LS(self, filename)
[docs] def aggregate_sliding_window(self, bin_size=None, shift=None, t_start=None, t_end=None,weighted=True): """ Return a new dynamic graph without modifying the original one, aggregated using sliding windows of the desired size. If Shift is not provided or equal to bin_size, windows are non overlapping. If no parameter is provided, creates a single graph aggregating the whole period. Yielded graphs are weighted (weight: number of apparition of edges during the period) :param bin_size: desired size of bins, in the internal time unit (not necessarily equals to the number of snapshot_affiliations) :param shift: time distance (shift) between the start of two successive bins, in the internal time unit (not necessarily number of sn) :param t_start: time step to start the binning (default: first) :param t_end: time step (not included) to stop the binning (default: last) :return: a DynGraph_SN object """ if t_start==None: t_start = self.start() if t_end==None: t_end=self.end() if bin_size == None: bin_size=t_end-t_start if shift==None: shift=bin_size bins = [] for t in range(t_start, t_end, shift): bins.append((t, t + bin_size)) return self.to_DynGraphSN(bins,weighted=weighted)