https://github.com/dmlc/dgl
Raw File
Tip revision: d4a1be3266857c963e4a8e76746baeba3be8f369 authored by Quan Gan on 07 April 2021, 04:41:15 UTC
update meta
Tip revision: d4a1be3
convert_partition.py
import os
import json
import time
import argparse
import numpy as np
import dgl
import torch as th
import pyarrow
from pyarrow import csv

parser = argparse.ArgumentParser(description='Construct graph partitions')
parser.add_argument('--input-dir', required=True, type=str,
                    help='The directory path that contains the partition results.')
parser.add_argument('--graph-name', required=True, type=str,
                    help='The graph name')
parser.add_argument('--schema', required=True, type=str,
                    help='The schema of the graph')
parser.add_argument('--num-parts', required=True, type=int,
                    help='The number of partitions')
parser.add_argument('--num-node-weights', required=True, type=int,
                    help='The number of node weights used by METIS.')
parser.add_argument('--workspace', type=str, default='/tmp',
                    help='The directory to store the intermediate results')
parser.add_argument('--node-attr-dtype', type=str, default=None,
                    help='The data type of the node attributes')
parser.add_argument('--edge-attr-dtype', type=str, default=None,
                    help='The data type of the edge attributes')
parser.add_argument('--output', required=True, type=str,
                    help='The output directory of the partitioned results')
args = parser.parse_args()

input_dir = args.input_dir
graph_name = args.graph_name
num_parts = args.num_parts
num_node_weights = args.num_node_weights
node_attr_dtype = args.node_attr_dtype
edge_attr_dtype = args.edge_attr_dtype
workspace_dir = args.workspace
output_dir = args.output

with open(args.schema) as json_file:
    schema = json.load(json_file)
nid_ranges = schema['nid']
eid_ranges = schema['eid']
nid_ranges = {key: np.array(nid_ranges[key]).reshape(1, 2) for key in nid_ranges}
eid_ranges = {key: np.array(eid_ranges[key]).reshape(1, 2) for key in eid_ranges}
id_map = dgl.distributed.id_map.IdMap(nid_ranges)

ntypes = [(key, nid_ranges[key][0,0]) for key in nid_ranges]
ntypes.sort(key=lambda e: e[1])
ntype_offset_np = np.array([e[1] for e in ntypes])
ntypes = [e[0] for e in ntypes]
ntypes_map = {e:i for i, e in enumerate(ntypes)}
etypes = [(key, eid_ranges[key][0,0]) for key in eid_ranges]
etypes.sort(key=lambda e: e[1])
etype_offset_np = np.array([e[1] for e in etypes])
etypes = [e[0] for e in etypes]
etypes_map = {e:i for i, e in enumerate(etypes)}

def read_feats(file_name):
    attrs = csv.read_csv(file_name, read_options=pyarrow.csv.ReadOptions(autogenerate_column_names=True),
                         parse_options=pyarrow.csv.ParseOptions(delimiter=' '))
    num_cols = len(attrs.columns)
    return np.stack([attrs.columns[i].to_numpy() for i in range(num_cols)], 1)

num_edges = 0
num_nodes = 0
node_map_val = {ntype:[] for ntype in ntypes}
edge_map_val = {etype:[] for etype in etypes}
for part_id in range(num_parts):
    node_file = 'p{:03}-{}_nodes.txt'.format(part_id, graph_name)
    # The format of each line in the node file:
    # <node_id> <node_type> <weight1> ... <orig_type_node_id> <attributes>
    # The node file contains nodes that belong to a partition. It doesn't include HALO nodes.
    orig_type_nid_col = 3 + num_node_weights
    first_attr_col = 4 + num_node_weights

    # Get the first two columns which is the node ID and node type.
    tmp_output = workspace_dir + '/' + node_file + '.tmp'
    os.system('awk \'{print $1, $2, $' + str(orig_type_nid_col) + '}\''
              + ' {} > {}'.format(input_dir + '/' + node_file, tmp_output))
    nodes = csv.read_csv(tmp_output, read_options=pyarrow.csv.ReadOptions(autogenerate_column_names=True),
                         parse_options=pyarrow.csv.ParseOptions(delimiter=' '))
    nids, ntype_ids, orig_type_nid = nodes.columns[0].to_numpy(), nodes.columns[1].to_numpy(), \
            nodes.columns[2].to_numpy()
    orig_homo_nid = ntype_offset_np[ntype_ids] + orig_type_nid
    assert np.all(nids[1:] - nids[:-1] == 1)
    nid_range = (nids[0], nids[-1])
    num_nodes += len(nodes)

    if node_attr_dtype is not None:
        # Get node attributes
        # Here we just assume all nodes have the same attributes.
        # In practice, this is not the same, in which we need more complex solution to
        # encode and decode node attributes.
        os.system('cut -d\' \' -f {}- {} > {}'.format(first_attr_col,
                                                      input_dir + '/' + node_file,
                                                      tmp_output))
        node_attrs = read_feats(tmp_output)
        node_feats = {}
        # nodes in a partition has been sorted based on node types.
        for ntype_name in nid_ranges:
            ntype_id = ntypes_map[ntype_name]
            type_nids = nids[ntype_ids == ntype_id]
            assert np.all(type_nids == np.arange(type_nids[0], type_nids[-1] + 1))
            node_feats[ntype_name + '/feat'] = th.as_tensor(node_attrs[ntype_ids == ntype_id])
        dgl.data.utils.save_tensors(os.path.join(part_dir, "node_feat.dgl"), node_feats)

    # Determine the node ID ranges of different node types.
    for ntype_name in nid_ranges:
        ntype_id = ntypes_map[ntype_name]
        type_nids = nids[ntype_ids == ntype_id]
        node_map_val[ntype_name].append([int(type_nids[0]), int(type_nids[-1]) + 1])

    edge_file = 'p{:03}-{}_edges.txt'.format(part_id, graph_name)
    # The format of each line in the edge file:
    # <src_id> <dst_id> <orig_src_id> <orig_dst_id> <orig_edge_id> <edge_type> <attributes>

    tmp_output = workspace_dir + '/' + edge_file + '.tmp'
    os.system('awk \'{print $1, $2, $3, $4, $5, $6}\'' + ' {} > {}'.format(input_dir + '/' + edge_file,
                                                                           tmp_output))
    edges = csv.read_csv(tmp_output, read_options=pyarrow.csv.ReadOptions(autogenerate_column_names=True),
                         parse_options=pyarrow.csv.ParseOptions(delimiter=' '))
    num_cols = len(edges.columns)
    src_id, dst_id, orig_src_id, orig_dst_id, orig_edge_id, etype_ids = [edges.columns[i].to_numpy() for i in range(num_cols)]
    # It's not guaranteed that the edges are sorted based on edge type.
    # Let's sort edges and all attributes on the edges.
    sort_idx = np.argsort(etype_ids)
    src_id, dst_id, orig_src_id, orig_dst_id, orig_edge_id, etype_ids = src_id[sort_idx], dst_id[sort_idx], \
            orig_src_id[sort_idx], orig_dst_id[sort_idx], orig_edge_id[sort_idx], etype_ids[sort_idx]
    assert np.all(np.diff(etype_ids) >= 0)

    if edge_attr_dtype is not None:
        # Get edge attributes
        # Here we just assume all edges have the same attributes.
        # In practice, this is not the same, in which we need more complex solution to
        # encode and decode edge attributes.
        os.system('cut -d\' \' -f 7- {} > {}'.format(input_dir + '/' + edge_file, tmp_output))
        edge_attrs = th.as_tensor(read_feats(tmp_output))[sort_idx]
        edge_feats = {}
        for etype_name in eid_ranges:
            etype_id = etypes_map[etype_name]
            edge_feats[etype_name + '/feat'] = th.as_tensor(edge_attrs[etype_ids == etype_id])
        dgl.data.utils.save_tensors(os.path.join(part_dir, "edge_feat.dgl"), edge_feats)

    # Determine the edge ID range of different edge types.
    edge_id_start = num_edges
    for etype_name in eid_ranges:
        etype_id = etypes_map[etype_name]
        edge_map_val[etype_name].append([int(edge_id_start),
                                         int(edge_id_start + np.sum(etype_ids == etype_id))])
        edge_id_start += np.sum(etype_ids == etype_id)

    # Here we want to compute the unique IDs in the edge list.
    # It is possible that a node that belongs to the partition but it doesn't appear
    # in the edge list. That is, the node is assigned to this partition, but its neighbor
    # belongs to another partition so that the edge is assigned to another partition.
    # This happens in a directed graph.
    # To avoid this kind of nodes being removed from the graph, we add node IDs that
    # belong to this partition.
    ids = np.concatenate([src_id, dst_id, np.arange(nid_range[0], nid_range[1] + 1)])
    uniq_ids, idx, inverse_idx = np.unique(ids, return_index=True, return_inverse=True)
    assert len(uniq_ids) == len(idx)
    # We get the edge list with their node IDs mapped to a contiguous ID range.
    local_src_id, local_dst_id = np.split(inverse_idx[:len(src_id) * 2], 2)
    compact_g = dgl.graph((local_src_id, local_dst_id))
    compact_g.edata['orig_id'] = th.as_tensor(orig_edge_id)
    compact_g.edata[dgl.ETYPE] = th.as_tensor(etype_ids)
    compact_g.edata['inner_edge'] = th.ones(compact_g.number_of_edges(), dtype=th.bool)
    compact_g.edata[dgl.EID] = th.arange(num_edges, num_edges + compact_g.number_of_edges())
    num_edges += compact_g.number_of_edges()

    # The original IDs are homogeneous IDs.
    # Similarly, we need to add the original homogeneous node IDs
    orig_ids = np.concatenate([orig_src_id, orig_dst_id, orig_homo_nid])
    orig_homo_ids = orig_ids[idx]
    ntype, per_type_ids = id_map(orig_homo_ids)
    compact_g.ndata['orig_id'] = th.as_tensor(per_type_ids)
    compact_g.ndata[dgl.NTYPE] = th.as_tensor(ntype)
    compact_g.ndata[dgl.NID] = th.as_tensor(uniq_ids)
    compact_g.ndata['inner_node'] = th.as_tensor(np.logical_and(uniq_ids >= nid_range[0], uniq_ids <= nid_range[1]))
    local_nids = compact_g.ndata[dgl.NID][compact_g.ndata['inner_node'].bool()]
    assert np.all((local_nids == th.arange(local_nids[0], local_nids[-1] + 1)).numpy())
    print('|V|={}'.format(compact_g.number_of_nodes()))
    print('|E|={}'.format(compact_g.number_of_edges()))

    # We need to reshuffle nodes in a partition so that all local nodes are labelled starting from 0.
    reshuffle_nodes = th.arange(compact_g.number_of_nodes())
    reshuffle_nodes = th.cat([reshuffle_nodes[compact_g.ndata['inner_node'].bool()],
                              reshuffle_nodes[compact_g.ndata['inner_node'] == 0]])
    compact_g1 = dgl.node_subgraph(compact_g, reshuffle_nodes)
    compact_g1.ndata['orig_id'] = compact_g.ndata['orig_id'][reshuffle_nodes]
    compact_g1.ndata[dgl.NTYPE] = compact_g.ndata[dgl.NTYPE][reshuffle_nodes]
    compact_g1.ndata[dgl.NID] = compact_g.ndata[dgl.NID][reshuffle_nodes]
    compact_g1.ndata['inner_node'] = compact_g.ndata['inner_node'][reshuffle_nodes]
    compact_g1.edata['orig_id'] = compact_g.edata['orig_id'][compact_g1.edata[dgl.EID]]
    compact_g1.edata[dgl.ETYPE] = compact_g.edata[dgl.ETYPE][compact_g1.edata[dgl.EID]]
    compact_g1.edata['inner_edge'] = compact_g.edata['inner_edge'][compact_g1.edata[dgl.EID]]
    compact_g1.edata[dgl.EID] = compact_g.edata[dgl.EID][compact_g1.edata[dgl.EID]]

    part_dir = output_dir + '/part' + str(part_id)
    os.makedirs(part_dir, exist_ok=True)
    dgl.save_graphs(part_dir + '/graph.dgl', [compact_g1])

part_metadata = {'graph_name': graph_name,
                 'num_nodes': num_nodes,
                 'num_edges': num_edges,
                 'part_method': 'metis',
                 'num_parts': num_parts,
                 'halo_hops': 1,
                 'node_map': node_map_val,
                 'edge_map': edge_map_val,
                 'ntypes': ntypes_map,
                 'etypes': etypes_map}

for part_id in range(num_parts):
    part_dir = 'part' + str(part_id)
    node_feat_file = os.path.join(part_dir, "node_feat.dgl")
    edge_feat_file = os.path.join(part_dir, "edge_feat.dgl")
    part_graph_file = os.path.join(part_dir, "graph.dgl")
    part_metadata['part-{}'.format(part_id)] = {'node_feats': node_feat_file,
                                                'edge_feats': edge_feat_file,
                                                'part_graph': part_graph_file}
with open('{}/{}.json'.format(output_dir, graph_name), 'w') as outfile:
    json.dump(part_metadata, outfile, sort_keys=True, indent=4)
back to top