导入Ontology论文中709MB的test.json数据至neo4j数据库 郝伟 2021/02/20 [TOC]

1. 简介

对Ontology的论文中的源数据进行导入。 输入数据:test.json 文件大小:709MB 内容摘要:关于漏洞知识图谱的42.5万个节点和195万条关系。 目标网站:http://121.196.157.14:7474/browser/

2. V1:转成两个CQL内容文件

以下代码将test.jon转成了 vertices.cqledges.cql 两个文件,分别存储节点和边的CQL插入语句。

# -*- coding: utf-8 -*-
"""
创建时间:2021/02/20 06:38:08
原创作者: 郝伟老师
功能简介: 根据输入的test.json,生成节点和边的 CQL插入语法
"""
import json

def load_json(jsonfile):
    ''' 从文件中加载Json数据,返回dict对象 '''
    data = []
    with open(jsonfile, 'r', encoding='utf-8') as load_f: 
         data =  json.load(load_f)
    return data


def extract_vertices(vertices, vertices_path='vertices.txt'):
    ''' 根据输入的字典 vertices,提取每个节点并生成相应的CQL插入语法输入至文件 vertices_path '''
    with open(vertices_path, 'w', encoding='utf-8') as vfile:
        for vertex in vertices:
            properties=[]
            for key in vertex.keys():
                value = vertex[key]
                if key == 'vertexType':
                    continue
                elif isinstance(value, str):
                    properties.append('{0}:"{1}"'.format(key, value.replace("\"", "\\\"")))
                else:
                    properties.append('{0}:{1}'.format(key, value))
            cql= 'CREATE (:{0}{{{1}}})'.format(vertex['vertexType'], ', '.join(properties))
            vfile.write(cql + '\n')

def extract_edges(edges, edges_path='edges.txt'):
    ''' 根据输入的字典 vertices,提取每条边并生成相应的CQL插入语法输入至文件 edges_path '''
    with open(edges_path, 'w', encoding='utf-8') as efile:
        for edge in edges:
            properties=[]
            for key in edge.keys():
                value=edge[key]
                if key in ['inVType', '_inV', '_label', 'outVType', '_outV']:
                    continue
                elif isinstance(value, str):
                    properties.append('{0}:"{1}"'.format(key, value.replace("\"", "\\\"")))
                else:
                    properties.append('{0}:{1}'.format(key, value))
            cql = 'MATCH (n1:{0}),(n2:{1}) WHERE n1._id={2} AND n2._id={3} CREATE (n1)-[:{4}{{{5}}}]->(n2)'.format(edge['inVType'],edge['outVType'],edge['_inV'],edge['_outV'],edge['_label'], ', '.join(properties))
            efile.write(cql + '\n')

jdata = load_json('test.json')
extract_vertices(jdata['vertices'])
extract_edges(jdata['edges'])

3. V2:加入数据库导入

1.0的代码有问题,未能执行成功,原因是未对特殊字符串进行处理,比如 \"

# -*- coding: utf-8 -*-
"""
创建时间:2021/02/20 06:38:08
更新时间:2021/02/20 08:30:32 
原创作者: 郝伟老师
功能简介: 根据输入的test.json,生成节点和边的 CQL插入语法
         用时约2小时,写代码1小时40分钟,20分钟整理文档
"""
import json
from neo4j import GraphDatabase

def cql_executor(tx, cql):
    try:
        tx.run(cql)
    except:
        print(cql)

print('Connecting 121.196.157.14:7687 ...')
driver=GraphDatabase.driver('bolt://121.196.157.14:7687', auth=('neo4j', 'hd7iu2_X@v4u'))            
print('Connected')

print('Loading test.json...')
with open('test.json', 'r', encoding='utf-8') as f: 
    data =  json.load(f)

f = open('log.txt', 'w', encoding='utf-8')  # 记录日志
with driver.session() as session:
    print("Begin verticies insertion...")
    for vertex in data['vertices']:
        properties=[]
        for key in vertex.keys():
            value = vertex[key]
            if key == 'vertexType':
                continue
            elif isinstance(value, str):
                properties.append('{0}:"{1}"'.format(key, value))
            else:
                properties.append('{0}:{1}'.format(key, value))
        cql= 'CREATE (:{0}{{{1}}})'.format(vertex['vertexType'], ', '.join(properties))
        f.write(cql + '\n')
        session.write_transaction(cql_executor, cql)

    print("Begin edges insertion...")
    for edge in data['edges']:
        properties=[]
        for key in edge.keys():
            value=edge[key]
            if key in ['inVType', '_inV', '_label', 'outVType', '_outV']:
                continue
            elif isinstance(value, str):
                properties.append('{0}:"{1}"'.format(key, value))
            else:
                properties.append('{0}:{1}'.format(key, value))
        cql = 'MATCH (n1:{0}),(n2:{1}) WHERE n1._id={2} AND n2._id={3} CREATE (n1)-[:{4}{{{5}}}]->(n2)'.format(edge['inVType'],edge['outVType'],edge['_inV'],edge['_outV'],edge['_label'], ', '.join(properties))
        f.write(cql + '\n')
        session.write_transaction(cql_executor, cql)
f.close()
print('\ndone')

4. V3:加入容错并执行成功

中间因为字符串中包括 \"而导致语法错误,就会自动断开。所以最终版加入了相关处理,同时在cql_executor中加入了异常处理。经过调整后正常了,用了22个小时的时间完成了数据的导入(未优化数据批量导入的方法)。

# -*- coding: utf-8 -*-
"""
创建时间:2021/02/20 06:38:08
更新时间:2021/02/20 08:30:32
        2021/02/20 09:14:43
原创作者: 郝伟老师
功能简介: 根据输入的test.json,生成节点和边的 CQL插入语法
         用时约2.5小时,写代码1小时40分钟,20分钟整理文档,30分钟调试
         加入了对 \ " 的处理,同时添加了try-expect 容错。
"""
import json
from neo4j import GraphDatabase

def cql_executor(tx, cql):
    try:
        tx.run(cql)
    except Exception as ex:
        print(ex, type(ex), cql)

print('Connecting 121.196.157.14:7687 ...')
driver=GraphDatabase.driver('bolt://121.196.157.14:7687', auth=('neo4j', 'hd7iu2_X@v4u'))  

print('Connected.\nLoading test.json...')
with open('test.json', 'r', encoding='utf-8') as f: 
    data =  json.load(f)

with open('log.txt', 'w', encoding='utf-8') as f: # 记录日志
    with driver.session() as session:
        print("Begin verticies insertion...")
        for vertex in data['vertices']:
            properties=[]
            for key in vertex.keys():
                value = vertex[key]
                if key == 'vertexType':
                    continue
                elif isinstance(value, str):
                    properties.append('{0}:"{1}"'.format(key, value.replace("\\", "\\\\").replace("\"", "\\\"")))
                else:
                    properties.append('{0}:{1}'.format(key, value))
            cql= 'CREATE (:{0}{{{1}}})'.format(vertex['vertexType'], ', '.join(properties))
            f.write(cql + '\n')
            session.write_transaction(cql_executor, cql)
        print("Create Indexes...")
        for entity in ['Address','DNSName','IP','address','addressRange','flow','host','ip','malware','port','software','vulnerability','user','attack','attacker']:
            session.write_transaction(cql_executor, 'Create Index On:{0}(_id)'.format(entity))
        print("Begin edges insertion...")
        for edge in data['edges']:
            properties=[]
            for key in edge.keys():
                value=edge[key]
                if key in ['inVType', '_inV', '_label', 'outVType', '_outV']:
                    continue
                elif isinstance(value, str):
                    properties.append('{0}:"{1}"'.format(key, value.replace("\\", "\\\\").replace("\"", "\\\"")))
                else:
                    properties.append('{0}:{1}'.format(key, value))
            cql = 'MATCH (n1:{0}),(n2:{1}) WHERE n1._id={2} AND n2._id={3} CREATE (n1)-[:{4}{{{5}}}]->(n2)'.format(edge['inVType'],edge['outVType'],edge['_inV'],edge['_outV'],edge['_label'], ', '.join(properties))
            f.write(cql + '\n')
            session.write_transaction(cql_executor, cql)
print('\ndone')

results matching ""

    No results matching ""