导入Ontology论文中709MB的test.json数据至neo4j数据库
郝伟 2021/02/20
对Ontology的论文中的源数据进行导入。
输入数据:test.json
文件大小:709MB
内容摘要:关于漏洞知识图谱的42.5万个节点和195万条关系。
目标网站:http://121.196.157.14:7474/browser/
以下代码将test.jon转成了 vertices.cql 和 edges.cql 两个文件,分别存储节点和边的CQL插入语句。
# -*- coding: utf-8 -*- """ 创建时间:2021/02/20 06:38:08 原创作者: 郝伟老师 功能简介: 根据输入的test.json,生成节点和边的 CQL插入语法 """ import json def load_json(jsonfile): ''' 从文件中加载Json数据,返回dict对象 ''' data = [] with open(jsonfile, 'r', encoding='utf-8') as load_f: data = json.load(load_f) return data def extract_vertices(vertices, vertices_path='vertices.txt'): ''' 根据输入的字典 vertices,提取每个节点并生成相应的CQL插入语法输入至文件 vertices_path ''' with open(vertices_path, 'w', encoding='utf-8') as vfile: for vertex in vertices: properties=[] for key in vertex.keys(): value = vertex[key] if key == 'vertexType': continue elif isinstance(value, str): properties.append('{0}:"{1}"'.format(key, value.replace("\"", "\\\""))) else: properties.append('{0}:{1}'.format(key, value)) cql= 'CREATE (:{0}{{{1}}})'.format(vertex['vertexType'], ', '.join(properties)) vfile.write(cql + '\n') def extract_edges(edges, edges_path='edges.txt'): ''' 根据输入的字典 vertices,提取每条边并生成相应的CQL插入语法输入至文件 edges_path ''' with open(edges_path, 'w', encoding='utf-8') as efile: for edge in edges: properties=[] for key in edge.keys(): value=edge[key] if key in ['inVType', '_inV', '_label', 'outVType', '_outV']: continue elif isinstance(value, str): properties.append('{0}:"{1}"'.format(key, value.replace("\"", "\\\""))) else: properties.append('{0}:{1}'.format(key, value)) cql = 'MATCH (n1:{0}),(n2:{1}) WHERE n1._id={2} AND n2._id={3} CREATE (n1)-[:{4}{{{5}}}]->(n2)'.format(edge['inVType'],edge['outVType'],edge['_inV'],edge['_outV'],edge['_label'], ', '.join(properties)) efile.write(cql + '\n') jdata = load_json('test.json') extract_vertices(jdata['vertices']) extract_edges(jdata['edges'])
1.0的代码有问题,未能执行成功,原因是未对特殊字符串进行处理,比如 \ 和 "。
# -*- coding: utf-8 -*- """ 创建时间:2021/02/20 06:38:08 更新时间:2021/02/20 08:30:32 原创作者: 郝伟老师 功能简介: 根据输入的test.json,生成节点和边的 CQL插入语法 用时约2小时,写代码1小时40分钟,20分钟整理文档 """ import json from neo4j import GraphDatabase def cql_executor(tx, cql): try: tx.run(cql) except: print(cql) print('Connecting 121.196.157.14:7687 ...') driver=GraphDatabase.driver('bolt://121.196.157.14:7687', auth=('neo4j', 'hd7iu2_X@v4u')) print('Connected') print('Loading test.json...') with open('test.json', 'r', encoding='utf-8') as f: data = json.load(f) f = open('log.txt', 'w', encoding='utf-8') # 记录日志 with driver.session() as session: print("Begin verticies insertion...") for vertex in data['vertices']: properties=[] for key in vertex.keys(): value = vertex[key] if key == 'vertexType': continue elif isinstance(value, str): properties.append('{0}:"{1}"'.format(key, value)) else: properties.append('{0}:{1}'.format(key, value)) cql= 'CREATE (:{0}{{{1}}})'.format(vertex['vertexType'], ', '.join(properties)) f.write(cql + '\n') session.write_transaction(cql_executor, cql) print("Begin edges insertion...") for edge in data['edges']: properties=[] for key in edge.keys(): value=edge[key] if key in ['inVType', '_inV', '_label', 'outVType', '_outV']: continue elif isinstance(value, str): properties.append('{0}:"{1}"'.format(key, value)) else: properties.append('{0}:{1}'.format(key, value)) cql = 'MATCH (n1:{0}),(n2:{1}) WHERE n1._id={2} AND n2._id={3} CREATE (n1)-[:{4}{{{5}}}]->(n2)'.format(edge['inVType'],edge['outVType'],edge['_inV'],edge['_outV'],edge['_label'], ', '.join(properties)) f.write(cql + '\n') session.write_transaction(cql_executor, cql) f.close() print('\ndone')
中间因为字符串中包括 \ 和 "而导致语法错误,就会自动断开。所以最终版加入了相关处理,同时在cql_executor中加入了异常处理。经过调整后正常了,用了22个小时的时间完成了数据的导入(未优化数据批量导入的方法)。
# -*- coding: utf-8 -*- """ 创建时间:2021/02/20 06:38:08 更新时间:2021/02/20 08:30:32 2021/02/20 09:14:43 原创作者: 郝伟老师 功能简介: 根据输入的test.json,生成节点和边的 CQL插入语法 用时约2.5小时,写代码1小时40分钟,20分钟整理文档,30分钟调试 加入了对 \ " 的处理,同时添加了try-expect 容错。 """ import json from neo4j import GraphDatabase def cql_executor(tx, cql): try: tx.run(cql) except Exception as ex: print(ex, type(ex), cql) print('Connecting 121.196.157.14:7687 ...') driver=GraphDatabase.driver('bolt://121.196.157.14:7687', auth=('neo4j', 'hd7iu2_X@v4u')) print('Connected.\nLoading test.json...') with open('test.json', 'r', encoding='utf-8') as f: data = json.load(f) with open('log.txt', 'w', encoding='utf-8') as f: # 记录日志 with driver.session() as session: print("Begin verticies insertion...") for vertex in data['vertices']: properties=[] for key in vertex.keys(): value = vertex[key] if key == 'vertexType': continue elif isinstance(value, str): properties.append('{0}:"{1}"'.format(key, value.replace("\\", "\\\\").replace("\"", "\\\""))) else: properties.append('{0}:{1}'.format(key, value)) cql= 'CREATE (:{0}{{{1}}})'.format(vertex['vertexType'], ', '.join(properties)) f.write(cql + '\n') session.write_transaction(cql_executor, cql) print("Create Indexes...") for entity in ['Address','DNSName','IP','address','addressRange','flow','host','ip','malware','port','software','vulnerability','user','attack','attacker']: session.write_transaction(cql_executor, 'Create Index On:{0}(_id)'.format(entity)) print("Begin edges insertion...") for edge in data['edges']: properties=[] for key in edge.keys(): value=edge[key] if key in ['inVType', '_inV', '_label', 'outVType', '_outV']: continue elif isinstance(value, str): properties.append('{0}:"{1}"'.format(key, value.replace("\\", "\\\\").replace("\"", "\\\""))) else: properties.append('{0}:{1}'.format(key, value)) cql = 'MATCH (n1:{0}),(n2:{1}) WHERE n1._id={2} AND n2._id={3} CREATE (n1)-[:{4}{{{5}}}]->(n2)'.format(edge['inVType'],edge['outVType'],edge['_inV'],edge['_outV'],edge['_label'], ', '.join(properties)) f.write(cql + '\n') session.write_transaction(cql_executor, cql) print('\ndone')