导入Ontology论文中709MB的test.json数据至neo4j数据库 郝伟 2021/02/20 [TOC]
1. 简介
对Ontology的论文中的源数据进行导入。
输入数据:test.json
文件大小:709MB
内容摘要:关于漏洞知识图谱的42.5万个节点和195万条关系。
目标网站:http://121.196.157.14:7474/browser/
2. V1:转成两个CQL内容文件
以下代码将test.jon转成了 vertices.cql 和 edges.cql 两个文件,分别存储节点和边的CQL插入语句。
# -*- coding: utf-8 -*-
"""
创建时间:2021/02/20 06:38:08
原创作者: 郝伟老师
功能简介: 根据输入的test.json,生成节点和边的 CQL插入语法
"""
import json
def load_json(jsonfile):
''' 从文件中加载Json数据,返回dict对象 '''
data = []
with open(jsonfile, 'r', encoding='utf-8') as load_f:
data = json.load(load_f)
return data
def extract_vertices(vertices, vertices_path='vertices.txt'):
''' 根据输入的字典 vertices,提取每个节点并生成相应的CQL插入语法输入至文件 vertices_path '''
with open(vertices_path, 'w', encoding='utf-8') as vfile:
for vertex in vertices:
properties=[]
for key in vertex.keys():
value = vertex[key]
if key == 'vertexType':
continue
elif isinstance(value, str):
properties.append('{0}:"{1}"'.format(key, value.replace("\"", "\\\"")))
else:
properties.append('{0}:{1}'.format(key, value))
cql= 'CREATE (:{0}{{{1}}})'.format(vertex['vertexType'], ', '.join(properties))
vfile.write(cql + '\n')
def extract_edges(edges, edges_path='edges.txt'):
''' 根据输入的字典 vertices,提取每条边并生成相应的CQL插入语法输入至文件 edges_path '''
with open(edges_path, 'w', encoding='utf-8') as efile:
for edge in edges:
properties=[]
for key in edge.keys():
value=edge[key]
if key in ['inVType', '_inV', '_label', 'outVType', '_outV']:
continue
elif isinstance(value, str):
properties.append('{0}:"{1}"'.format(key, value.replace("\"", "\\\"")))
else:
properties.append('{0}:{1}'.format(key, value))
cql = 'MATCH (n1:{0}),(n2:{1}) WHERE n1._id={2} AND n2._id={3} CREATE (n1)-[:{4}{{{5}}}]->(n2)'.format(edge['inVType'],edge['outVType'],edge['_inV'],edge['_outV'],edge['_label'], ', '.join(properties))
efile.write(cql + '\n')
jdata = load_json('test.json')
extract_vertices(jdata['vertices'])
extract_edges(jdata['edges'])
3. V2:加入数据库导入
1.0的代码有问题,未能执行成功,原因是未对特殊字符串进行处理,比如 \ 和 "。
# -*- coding: utf-8 -*-
"""
创建时间:2021/02/20 06:38:08
更新时间:2021/02/20 08:30:32
原创作者: 郝伟老师
功能简介: 根据输入的test.json,生成节点和边的 CQL插入语法
用时约2小时,写代码1小时40分钟,20分钟整理文档
"""
import json
from neo4j import GraphDatabase
def cql_executor(tx, cql):
try:
tx.run(cql)
except:
print(cql)
print('Connecting 121.196.157.14:7687 ...')
driver=GraphDatabase.driver('bolt://121.196.157.14:7687', auth=('neo4j', 'hd7iu2_X@v4u'))
print('Connected')
print('Loading test.json...')
with open('test.json', 'r', encoding='utf-8') as f:
data = json.load(f)
f = open('log.txt', 'w', encoding='utf-8') # 记录日志
with driver.session() as session:
print("Begin verticies insertion...")
for vertex in data['vertices']:
properties=[]
for key in vertex.keys():
value = vertex[key]
if key == 'vertexType':
continue
elif isinstance(value, str):
properties.append('{0}:"{1}"'.format(key, value))
else:
properties.append('{0}:{1}'.format(key, value))
cql= 'CREATE (:{0}{{{1}}})'.format(vertex['vertexType'], ', '.join(properties))
f.write(cql + '\n')
session.write_transaction(cql_executor, cql)
print("Begin edges insertion...")
for edge in data['edges']:
properties=[]
for key in edge.keys():
value=edge[key]
if key in ['inVType', '_inV', '_label', 'outVType', '_outV']:
continue
elif isinstance(value, str):
properties.append('{0}:"{1}"'.format(key, value))
else:
properties.append('{0}:{1}'.format(key, value))
cql = 'MATCH (n1:{0}),(n2:{1}) WHERE n1._id={2} AND n2._id={3} CREATE (n1)-[:{4}{{{5}}}]->(n2)'.format(edge['inVType'],edge['outVType'],edge['_inV'],edge['_outV'],edge['_label'], ', '.join(properties))
f.write(cql + '\n')
session.write_transaction(cql_executor, cql)
f.close()
print('\ndone')
4. V3:加入容错并执行成功
中间因为字符串中包括 \ 和 "而导致语法错误,就会自动断开。所以最终版加入了相关处理,同时在cql_executor中加入了异常处理。经过调整后正常了,用了22个小时的时间完成了数据的导入(未优化数据批量导入的方法)。
# -*- coding: utf-8 -*-
"""
创建时间:2021/02/20 06:38:08
更新时间:2021/02/20 08:30:32
2021/02/20 09:14:43
原创作者: 郝伟老师
功能简介: 根据输入的test.json,生成节点和边的 CQL插入语法
用时约2.5小时,写代码1小时40分钟,20分钟整理文档,30分钟调试
加入了对 \ " 的处理,同时添加了try-expect 容错。
"""
import json
from neo4j import GraphDatabase
def cql_executor(tx, cql):
try:
tx.run(cql)
except Exception as ex:
print(ex, type(ex), cql)
print('Connecting 121.196.157.14:7687 ...')
driver=GraphDatabase.driver('bolt://121.196.157.14:7687', auth=('neo4j', 'hd7iu2_X@v4u'))
print('Connected.\nLoading test.json...')
with open('test.json', 'r', encoding='utf-8') as f:
data = json.load(f)
with open('log.txt', 'w', encoding='utf-8') as f: # 记录日志
with driver.session() as session:
print("Begin verticies insertion...")
for vertex in data['vertices']:
properties=[]
for key in vertex.keys():
value = vertex[key]
if key == 'vertexType':
continue
elif isinstance(value, str):
properties.append('{0}:"{1}"'.format(key, value.replace("\\", "\\\\").replace("\"", "\\\"")))
else:
properties.append('{0}:{1}'.format(key, value))
cql= 'CREATE (:{0}{{{1}}})'.format(vertex['vertexType'], ', '.join(properties))
f.write(cql + '\n')
session.write_transaction(cql_executor, cql)
print("Create Indexes...")
for entity in ['Address','DNSName','IP','address','addressRange','flow','host','ip','malware','port','software','vulnerability','user','attack','attacker']:
session.write_transaction(cql_executor, 'Create Index On:{0}(_id)'.format(entity))
print("Begin edges insertion...")
for edge in data['edges']:
properties=[]
for key in edge.keys():
value=edge[key]
if key in ['inVType', '_inV', '_label', 'outVType', '_outV']:
continue
elif isinstance(value, str):
properties.append('{0}:"{1}"'.format(key, value.replace("\\", "\\\\").replace("\"", "\\\"")))
else:
properties.append('{0}:{1}'.format(key, value))
cql = 'MATCH (n1:{0}),(n2:{1}) WHERE n1._id={2} AND n2._id={3} CREATE (n1)-[:{4}{{{5}}}]->(n2)'.format(edge['inVType'],edge['outVType'],edge['_inV'],edge['_outV'],edge['_label'], ', '.join(properties))
f.write(cql + '\n')
session.write_transaction(cql_executor, cql)
print('\ndone')