从零实现基于医疗知识图谱的问答系统(四)-构建回答

文章目录
  1. 1. 寻找答案-answer_search.py
    1. 1.1. Neo4j配置
    2. 1.2. 根据问题类型回调模板
    3. 1.3. 执行CQL查询并返回相关结果
  2. 2. 构建回答系统-chatbot_graph.py

一开始我预计分三个部分来写完这份KG的阅读记录的,然鹅没想到单单是问题的解析就有点点麻烦QAQ
所以多写了一篇来叙述如何构建基于知识图谱的问答系统,也是系列的最终篇(这莫名的激动是怎么肥四!

寻找答案-answer_search.py

在第三篇博客中讲了如何对问题进行分类并且构建了问题在Neo4j中的查询语句,现在要开始说回答咯~
下面的内容在chatbot文件中要进行调用,chatbot及上述代码中的数据调用在第二部分详细介绍。

Neo4j配置

因为是基于知识图谱的问答系统,所以要先把Neo4j连起来,于是在初始化中进行设置。

1
2
3
4
5
6
7
8
9
# 配置Neo4j
def __init__(self):
self.g=Graph(
host='127.0.0.1',
http_port='7474',
user='neo4j',
password='123456'
)
self.num_limit=20

根据问题类型回调模板

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# 根据对应的question_type,调用相应的回复模板
def answer_prettify(self,question_type,answers):
final_answer=[]
if not answers:
return ''

if question_type=='disease_symptom':
desc=[i['n.name'] for i in answers]
subject=answers[0]['m.name']
final_answer = '{0}的症状包括:{1}'.format(subject, ';'.join(list(set(desc))[:self.num_limit]))

elif question_type == 'symptom_disease':
desc = [i['m.name'] for i in answers]
subject = answers[0]['n.name']
final_answer = '症状{0}可能染上的疾病有:{1}'.format(subject, ';'.join(list(set(desc))[:self.num_limit]))

elif question_type == 'disease_cause':
desc = [i['m.cause'] for i in answers]
subject = answers[0]['m.name']
final_answer = '{0}可能的成因有:{1}'.format(subject, ';'.join(list(set(desc))[:self.num_limit]))

elif question_type == 'disease_prevent':
desc = [i['m.prevent'] for i in answers]
subject = answers[0]['m.name']
final_answer = '{0}的预防措施包括:{1}'.format(subject, ';'.join(list(set(desc))[:self.num_limit]))

elif question_type == 'disease_lasttime':
desc = [i['m.cure_lasttime'] for i in answers]
subject = answers[0]['m.name']
final_answer = '{0}治疗可能持续的周期为:{1}'.format(subject, ';'.join(list(set(desc))[:self.num_limit]))

elif question_type == 'disease_cureway':
desc = [';'.join(i['m.cure_way']) for i in answers]
subject = answers[0]['m.name']
final_answer = '{0}可以尝试如下治疗:{1}'.format(subject, ';'.join(list(set(desc))[:self.num_limit]))

elif question_type == 'disease_cureprob':
desc = [i['m.cured_prob'] for i in answers]
subject = answers[0]['m.name']
final_answer = '{0}治愈的概率为(仅供参考):{1}'.format(subject, ';'.join(list(set(desc))[:self.num_limit]))

elif question_type == 'disease_easyget':
desc = [i['m.easy_get'] for i in answers]
subject = answers[0]['m.name']

final_answer = '{0}的易感人群包括:{1}'.format(subject, ';'.join(list(set(desc))[:self.num_limit]))

elif question_type == 'disease_desc':
desc = [i['m.desc'] for i in answers]
subject = answers[0]['m.name']
final_answer = '{0},熟悉一下:{1}'.format(subject, ';'.join(list(set(desc))[:self.num_limit]))

elif question_type == 'disease_acompany':
desc1 = [i['n.name'] for i in answers]
desc2 = [i['m.name'] for i in answers]
subject = answers[0]['m.name']
desc = [i for i in desc1 + desc2 if i != subject]
final_answer = '{0}的症状包括:{1}'.format(subject, ';'.join(list(set(desc))[:self.num_limit]))

elif question_type == 'disease_not_food':
desc = [i['n.name'] for i in answers]
subject = answers[0]['m.name']
final_answer = '{0}忌食的食物包括有:{1}'.format(subject, ';'.join(list(set(desc))[:self.num_limit]))

elif question_type == 'disease_do_food':
do_desc = [i['n.name'] for i in answers if i['r.name'] == '宜吃']
recommand_desc = [i['n.name'] for i in answers if i['r.name'] == '推荐食谱']
subject = answers[0]['m.name']
final_answer = '{0}宜食的食物包括有:{1}\n推荐食谱包括有:{2}'.format(subject, ';'.join(list(set(do_desc))[:self.num_limit]),
';'.join(list(set(recommand_desc))[:self.num_limit]))

elif question_type == 'food_not_disease':
desc = [i['m.name'] for i in answers]
subject = answers[0]['n.name']
final_answer = '患有{0}的人最好不要吃{1}'.format(';'.join(list(set(desc))[:self.num_limit]), subject)

elif question_type == 'food_do_disease':
desc = [i['m.name'] for i in answers]
subject = answers[0]['n.name']
final_answer = '患有{0}的人建议多试试{1}'.format(';'.join(list(set(desc))[:self.num_limit]), subject)

elif question_type == 'disease_drug':
desc = [i['n.name'] for i in answers]
subject = answers[0]['m.name']
final_answer = '{0}通常的使用的药品包括:{1}'.format(subject, ';'.join(list(set(desc))[:self.num_limit]))

elif question_type == 'drug_disease':
desc = [i['m.name'] for i in answers]
subject = answers[0]['n.name']
final_answer = '{0}主治的疾病有{1},可以试试'.format(subject, ';'.join(list(set(desc))[:self.num_limit]))

elif question_type == 'disease_check':
desc = [i['n.name'] for i in answers]
subject = answers[0]['m.name']
final_answer = '{0}通常可以通过以下方式检查出来:{1}'.format(subject, ';'.join(list(set(desc))[:self.num_limit]))

elif question_type == 'check_disease':
desc = [i['m.name'] for i in answers]
subject = answers[0]['n.name']
final_answer = '通常可以通过{0}检查出来的疾病有{1}'.format(subject, ';'.join(list(set(desc))[:self.num_limit]))

return final_answer

执行CQL查询并返回相关结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 执行CQL查询并返回相应结果
def search_main(self,sqls):
final_answers=[]
for sql_ in sqls:
question_type=sql_['question_type']
queries=sql_['sql']
answers=[]
for query in queries:
ress=self.g.run(query).data()
answers+=ress
final_answer=self.answer_prettify(question_type,answers)
if final_answer:
final_answers.append(final_answer)
return final_answers

构建回答系统-chatbot_graph.py

这个python文件是整个项目里最短小的文件了,然鹅,如果前面的文件写不好,后面也是要粗事情的。
我对源代码进行了回答部分的一丢丢修改,其他内容基本不变。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#!/usr/bin/env python
#coding:utf-8

from question_classifer import *
from question_parser import *
from answer_search import *

'''问答构建'''
class ChatBotGraph:
def __init__(self):
self.classifier = QuestionClassifier()
self.parser = QuestionParser()
self.searcher = AnswerSearcher()
def chat_main(self, sent):
answer = '您好,我是小勇医药智能助理,希望可以帮到您。'
res_classify = self.classifier.classify(sent)
#print(res_classify)
if not res_classify:
answer='不好意思噢这个问题我也不晓得'
return answer
res_sql = self.parser.parser_main(res_classify)
#print(res_sql)
final_answers = self.searcher.search_main(res_sql)
print(final_answers)
if not final_answers:
answer = '不好意思噢这个问题我也不晓得'
return answer
else:
return '\n'.join(final_answers)

if __name__ == '__main__':
handler = ChatBotGraph()
while 1:
question = input('用户:')
answer = handler.chat_main(question)
print('小勇:', answer)

测试一下效果:

OK,基于医疗知识图谱的问答系统已经复现完成,再次感谢刘老师提供的开源代码!
有问题也可以联系我,邮箱见侧栏-w-