Source code for tnetwork.readwrite.SN_graph_io

import networkx as nx
from tnetwork import DynGraphSN
from tnetwork.dyn_graph.encodings import code_length_LS,code_length_SN_E,code_length_SN_M
import os
import tnetwork as tn
import pandas as pd
import json

import bidict
__all__ = ["read_snapshots", "write_snapshots", "write_snapshots_single_file","write_as_SN_E"]

def _detectAutomaticallyFormat(networkFile):
    format = networkFile.split(".")[1]
    return format


def _write_network_file(graph, out_name, out_format=None, data=False,weight=False):
    """
    Write the graph representation on file using a user specified format

    :param graph: networkx graph
    :param out_name: pattern for the output filename
    :param out_format: output format. Accepted values: edges(edgelist)|ncol|gefx|gml|pajek|graphML
    """

    if out_format==None:
        out_format="edges"
    os.makedirs(os.path.dirname(out_name), exist_ok=True)
    #print("writing graph of format " + out_format + " at " + out_name)
    if out_format == 'edges':
        nx.write_edgelist(graph, "%s.edges" % (out_name), data=data)
    elif out_format == 'gefx':
        nx.write_gexf(graph, "%s.gefx" % (out_name))
    elif out_format == 'gml':
        nx.write_gml(graph, "%s.gml" % (out_name))
    elif out_format == 'pajek':
        nx.write_pajek(graph, "%s.pajek" % (out_name))
    elif out_format == 'ncol':
            nx.write_edgelist(graph, "%s.ncol" % (out_name), delimiter='\t',data=weight)
    elif out_format == 'graphML' :
        g = nx.write_graphml(graph, "%s.graphml" % (out_name))
    else:
        raise Exception("UNKNOWN FORMAT " + out_format)


def _read_network_file(in_name, in_format="", directed=False):
    """
    Read the graph representation on file using a user specified format

    :param in_name: pattern for the output filename
    :param in_format: output format. Accepted values: edgelist|ncol|gefx|gml|pajek|graphML

    """

    if in_format == 'edges':
        if directed:
            g = nx.read_edgelist(in_name, create_using=nx.DiGraph())
        else:
            g = nx.read_edgelist(in_name, data=False)
    elif in_format == 'gefx':
        g = nx.read_gexf(in_name)
    elif in_format == 'gml':
        g = nx.read_gml(in_name)
    elif in_format == 'graphML' or in_format == 'graphml':
        g = nx.read_graphml(in_name)
        nodesInfo = g.nodes(data=True)
        if len(nx.get_node_attributes(g,"label"))>0:
            node2Label = {nodeid: data["label"].replace(" ","_") for (nodeid, data) in nodesInfo}
            g = nx.relabel_nodes(g, node2Label, copy=False)
    elif in_format == 'pajek':
        g = nx.read_pajek(in_name)
    elif in_format == 'ncol':
        g = nx.read_edgelist(in_name)
    else:
        raise Exception("UNKNOWN FORMAT " + in_format)
    return g


[docs]def read_snapshots(inputDir:str, format=None,frequency=1,prefix="") -> DynGraphSN: """ Read as one file per snapshot Read a dynamic graph as a directory containing one file per snapshot. If the format is not provided, it is infered automatically from file extensions :param inputDir: directory where the files are located :param format: a string among edges(edgelist)|ncol|gefx|gml|pajek|graphML, by default, the extension of the files :return: a DynGraphSN object """ anSnGraph = tn.DynGraphSN(frequency=frequency) files = os.listdir(inputDir) visibleFiles = [f for f in files if f[0] != "."] if format==None: format=_detectAutomaticallyFormat(visibleFiles[0]) for f in visibleFiles: g = _read_network_file(inputDir + "/" + f, format) # type:nx.Graph anSnGraph.add_snapshot(int(os.path.splitext(f)[0][len(prefix):]), g) return anSnGraph
[docs]def write_snapshots(dynGraph:DynGraphSN, outputDir:str, format:str=None): """ Write one file per snapshot Write a dynamic graph as a directory containing one file for each snapshot. The format of files can be chosen. :param dynGraph: a dynamic graph :param outputDir: address of the directory to write :param format: default edgelist, choose among edges(edgelist)|ncol|gefx|gml|pajek|graphML """ if format==None: format="edges" allGraphs = dynGraph.snapshots() for g in allGraphs: _write_network_file(allGraphs[g],os.path.join(outputDir,str(g)),out_format=format)
def write_snapshots_single_file(dynGraph: DynGraphSN, outputFile: str,both_directions=False): """ Write a single file with all edges from all steps Format: time n1 n2 1 :param dynGraph: a dynamic graph :param outputFile: address of the file to write """ f = open(outputFile,"w") allGraphs = dynGraph.snapshots() for t,g in allGraphs.items(): for e in g.edges(): weights=" "+str(1) f.write(str(t)+" "+str(e[0])+" "+str(e[1])+weights+"\n") if both_directions: f.write(str(t) + " " + str(e[1]) + " " + str(e[0]) + weights + "\n") f.close() def _readStaticSNByCom(inputFile, commentsChar="#", nodeSeparator=" ", nodeInBrackets=False, mainSeparator="\t", comIDposition=0, nodeListPosition=1): """ nodeSeparator: characters that separate the list of nodes nodeInBrackets : if true, list of nodes in the community is [x y z] instead of just x y z mainSeparator : character used to separate comID from nodesIDS """ #read community file from a static network # if asSN: # theDynCom = dn.dynamicCommunitiesSN() # if asTN: # theDynCom = dn.dynamicCommunitiesTN() coms = bidict.bidict() f = open(inputFile) for l in f: # for each line currentCom = set() if not l[0] == commentsChar: # if it is not a comment line l = l.rstrip().split("\t") comID = l[comIDposition] nodesIDs = l[nodeListPosition] if len(nodesIDs)>=1: # if nodeInBrackets: if "[" in nodesIDs: nodesIDs = nodesIDs[1:-1] if ", " in nodesIDs: nodeSeparator = ", " for n in nodesIDs.split(nodeSeparator): currentCom.add(n) # if asSN: # theDynCom.add_affiliation(n,startTime,comID) # if asTN: # theDynCom.add_affiliation(n,comID,startTime) #belongings without end coms[frozenset(currentCom)]=comID return coms # def read_graph_link_stream(inputFile:str) -> DynGraphSN: # """ # Format used by SOCIOPATTERN # # This format is a variation of snapshot_affiliations, in which all snapshot_affiliations are in a single file, adapted for occasional observations # at a high framerate (each SN is not really meaningful). # # Format: # :: # # DATE1 N1 N2 # DATE1 N2 N3 # DATE2 N1 N2 # DATE3 N1 N2 # DATE3 N2 N4 # DATE3 N5 N2 # # :param inputFile: address of the file to read # :return: DynGraphSN # """ # # theDynGraph = DynGraphSN() # # f = open(inputFile) # # # # for l in f: # # l = l.split("\t") # # date = int(l[0]) # # n1 = l[1] # # n2 = l[2] # # theDynGraph.add_interaction(n1,n2,date) # # return theDynGraph # return read_link_stream(inputFile,time_first_column=True) def write_as_SN_E( graph:tn.DynGraphSN, filename): """ :param filename: :return: """ nodes = list(graph.cumulated_graph().nodes()) dict_nodes = {n: i for i, n in enumerate(nodes)} times = list(graph.change_times()) dict_times = {t: i for i, t in enumerate(times)} interactions = [] for t,g in graph.snapshots().items(): renamed = [[ dict_nodes[e[0]],dict_nodes[e[1]]] for e in g.edges()] interactions.append(renamed) json.dump({"nodes": nodes, "times": times, "interactions": interactions}, open(filename, 'w'))