123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371 |
- """
- 功能:导出json试卷
- 作者:pengchen
- 时间:2020/3/10
- """
- import time
- import hashlib
- import requests
- import config
- import logger
- import json
- import os
- from bs4 import BeautifulSoup
- from bs4 import element
- import re
- def main():
- queryExamMarkingInfo()
- # questionlist=querySubjectInfo("8887593","A","B149")
- # with open("8887593.json", "w", encoding="utf-8") as file:
- # json_str = json.dumps(questionlist, indent=4, ensure_ascii=False)
- # file.write(json_str)
- def queryExamMarkingInfo():
- """
- 获取阅卷信息
- """
- url = config.oedomain + "api/core/oe/admin/examRecordForMarking/findExamRecordForMarkingInfo"
- data = {
- "examId": config.examId,
- "courseId": "",
- "batchNum": ""
- }
- headers = createRpcheader("4", "OE")
- result = requests.post(
- url, headers=headers,
- json=data)
- if result.status_code == 200:
- examRecordForMarkingBeanList = result.json()['examRecordForMarkingBeanList']
- for examRecordForMarkingBean in examRecordForMarkingBeanList:
- queryExamValidRecordData(examRecordForMarkingBean['courseId'], 1)
- else:
- print("查询阅卷信息失败")
- def queryExamValidRecordData(courseId, pageNo):
- """
- 获取全部考生的阅卷信息
- """
- url = config.oedomain + "api/core/oe/admin/examRecordForMarking/queryValidExamRecordInfoPage"
- data = {
- "examId": config.examId,
- "courseId": courseId,
- "start": pageNo,
- "size": 200
- }
- headers = createRpcheader("4", "OE")
- result = requests.post(
- url, headers=headers,
- json=data)
- if result.status_code == 200:
- examRecordForMarkingBeanList = result.json()['examRecordForMarkingBeanList']
- if len(examRecordForMarkingBeanList) > 0:
- for examRecordForMarkingBean in examRecordForMarkingBeanList:
- # examRecordDataId = str(examRecordForMarkingBean["examRecordDataId"]);
- # if examRecordDataId != "39881554" and examRecordDataId != "39879641":
- # # logger.info("不用处理,跳过!examRecordDataId = %s" % examRecordDataId)
- # continue
- # 取试卷题目
- course = getCourse(courseId)
- output = config.outputpath + "/" + str(config.examId) + "/" + str(course["code"]) + "-" + \
- examRecordForMarkingBean["paperType"]
- if not os.path.exists(output):
- os.makedirs(output)
- savepath = output + "/" + str(examRecordForMarkingBean["examRecordDataId"]) + ".json"
- if os.path.exists(savepath):
- logger.info("已处理,跳过!examRecordDataId = %s" % str(examRecordForMarkingBean["examRecordDataId"]))
- continue
- questionbeanlist = querySubjectInfo(examRecordForMarkingBean["examRecordDataId"],
- examRecordForMarkingBean["paperType"], course["code"])
- # 存储到本地
- time.sleep(1)
- with open(savepath, "w", encoding="utf-8") as file:
- json_str = json.dumps(questionbeanlist, indent=4, ensure_ascii=False)
- file.write(json_str)
- next = result.json()["next"]
- queryExamValidRecordData(courseId, next)
- else:
- print("没有剩余课程")
- def querySubjectInfo(examRecordDataId, paperType, courseCode):
- """
- paperId:
- paperType:
- courseCode:
- """
- print("开始处理%s" % (examRecordDataId))
- questionBeanList = []
- url = config.oedomain + "api/core/oe/admin/examRecordQuestions/querySubjectiveAnswerList"
- headers = createRpcheader("4", "OE")
- data = {
- "examRecordDataId": examRecordDataId
- }
- result = requests.post(
- url, headers=headers,
- json=data)
- if result.status_code == 200:
- querySubjectiveAnswerBeanList = result.json()["querySubjectiveAnswerBeanList"]
- question_unit_number = 1
- lastquestionId = None
- for querySubjectiveAnswerBean in querySubjectiveAnswerBeanList:
- questionId = querySubjectiveAnswerBean["questionId"]
- main_number = querySubjectiveAnswerBean["mainNumber"]
- sub_number = querySubjectiveAnswerBean["order"]
- studentanswer = querySubjectiveAnswerBean["studentAnswer"]
- print("studentanswer:", studentanswer)
- questiondata = getQuestion(questionId, courseCode, paperType)
- mainbodyblocklist = None
- if lastquestionId == questionId:
- question_unit_number = question_unit_number + 1 # 下一道主观题
- else:
- lastquestionId = questionId
- question_unit_number = 1
- if questiondata["defaultQuestion"]["masterVersion"]["body"]:
- # 套题处理
- mainquestionbody = questiondata["defaultQuestion"]["masterVersion"]["body"]
- mainbodyblocklist = splitQuestionBody(mainquestionbody)
- questionUnitList = questiondata["defaultQuestion"]["masterVersion"]["questionUnitList"]
- current_subjective_number = 1
- for questionUnit in questionUnitList:
- sub_ques_body = questionUnit["body"]
- if questionUnit["questionType"] == "FILL_UP" or questionUnit["questionType"] == "ESSAY":
- if current_subjective_number == question_unit_number:
- questionBean = {
- "mainNumber": main_number,
- "subNumber": sub_number,
- "studentAnswer": {
- "sections": []
- },
- "answer": {
- "sections": []
- },
- "body": {
- "sections": []
- }
- }
- quesbodyblocklist = splitQuestionBody(sub_ques_body)
- if mainbodyblocklist:
- questionBean["body"]["sections"] = mainbodyblocklist + quesbodyblocklist
- else:
- questionBean["body"]["sections"] = quesbodyblocklist
- questionBean["studentAnswer"]["sections"] = splitQuesitonStudentAnswer(studentanswer,
- questionUnit[
- "answerType"],
- questionUnit[
- "questionType"])
- questionBean["answer"]["sections"] = splitQuesitonAnswer(questionUnit["rightAnswer"][0])
- questionBeanList.append(questionBean)
- current_subjective_number = current_subjective_number + 1
- return questionBeanList
- def splittagElement(blocklist, childrenelement):
- if type(childrenelement) == element.NavigableString:
- block = {
- "type": "text",
- "value": childrenelement,
- "playTime": None,
- "param": None
- }
- blocklist.append(block)
- elif type(childrenelement) == element.Tag:
- for child in childrenelement.children:
- if child.name == "a":
- playTime = None
- if "playtime" in child.attrs:
- playTime = child.attrs["playtime"]
- block = {
- "type": "audio",
- "value": child.attrs["url"],
- "playTime": playTime,
- "param": None
- }
- blocklist.append(block)
- elif child.name == "img":
- width = None
- height = None
- if "width" in child.attrs and "height" in child.attrs:
- width = child.attrs["width"]
- height = child.attrs["height"]
- else:
- style = child.attrs["style"]
- width = re.search("width:([^;]*)", style).group(1)
- height = re.search("height:([^;]*)", style).group(1)
- block = {
- "type": "image",
- "value": child.attrs["src"],
- "playTime": None,
- "param": {
- "width": width,
- "height": height
- }
- }
- blocklist.append(block)
- else:
- splittagElement(blocklist, child)
- def splitQuestionBody(questionBody):
- """
- 切分quesitonBody
- """
- sectionlist = []
- bodyhtml = BeautifulSoup(questionBody, features="html.parser")
- plist = bodyhtml.find_all("p")
- for p in plist:
- # 遍历p标签
- section = {
- "blocks": []
- }
- splittagElement(section["blocks"], p)
- sectionlist.append(section)
- return sectionlist
- def splitQuesitonAnswer(quesitonanswer):
- """
- 切分问题标答
- """
- sectionlist = []
- # print(quesitonanswer)
- if quesitonanswer:
- bodyhtml = BeautifulSoup(quesitonanswer, features="html.parser")
- plist = bodyhtml.find_all("p")
- for p in plist:
- # 遍历p标签
- section = {
- "blocks": []
- }
- splittagElement(section["blocks"], p)
- sectionlist.append(section)
- else:
- sectionlist = None
- return sectionlist
- def splitQuesitonStudentAnswer(studentanswer, answerType, questionType):
- """
- 切分学生答案
- """
- print(answerType, questionType)
- if studentanswer:
- sectionlist = []
- if questionType == "ESSAY":
- if answerType == "SINGLE_AUDIO":
- answer = {
- "blocks": [{
- "type": "audio",
- "value": studentanswer,
- "playTime": None,
- "param": None
- }]}
- sectionlist.append(answer)
- else:
- # 取图片
- bodyhtml = BeautifulSoup(studentanswer, features="html.parser")
- blocks = {
- "blocks": []
- }
- for body in bodyhtml:
- if type(body) == element.NavigableString:
- textanswer = {
- "type": "text",
- "value": body,
- "playTime": None,
- "param": None
- }
- blocks["blocks"].append(textanswer)
- else:
- alist = body.find_all("a")
- if len(alist) > 0:
- for atag in alist:
- imageanswer = {
- "type": "image",
- "value": atag.attrs["href"],
- "playTime": None,
- "param": {
- "width": 200,
- "height": 200
- }
- }
- blocks["blocks"].append(imageanswer)
- else:
- textanswer = {
- "type": "text",
- "value": body.text,
- "playTime": None,
- "param": None
- }
- blocks["blocks"].append(textanswer)
- sectionlist.append(blocks)
- else:
- answer = {
- "blocks": [{
- "type": "text",
- "value": studentanswer,
- "playTime": None,
- "param": None
- }]
- }
- sectionlist.append(answer)
- print(sectionlist)
- else:
- sectionlist = None
- return sectionlist
- def getQuestion(questionId, courseCode, paperType):
- url = config.questiondomain + "api/core/questions/extract_config/getQuestion"
- headers = createRpcheader("3", "Q")
- data = {
- "examId": config.examId,
- "courseCode": courseCode,
- "groupCode": paperType,
- "questionId": questionId
- }
- result = requests.post(
- url, headers=headers,
- json=data)
- if result.status_code == 200:
- return result.json()
- def getCourse(courseId):
- url = config.basedomain + "api/core/basic/course/getCoursesByIdList"
- headers = createRpcheader("1", "B")
- data = {
- "courseIdList": [courseId]
- }
- result = requests.post(
- url, headers=headers, json=data)
- if result.status_code == 200:
- return result.json()["courseList"][0]
- def createRpcheader(appid, appcode):
- secretkey = "123456"
- t = time.time()
- secretstr = ''.join([appid, appcode, str(int(round(t * 1000))), secretkey])
- accssToken = hashlib.sha256(secretstr.encode("utf-8")).hexdigest()
- headers = {
- "App-Id": appid,
- "App-Code": appcode,
- "Trace-Id": "1222222",
- "Access-Token": accssToken,
- "timestamp": str(int(round(t * 1000)))
- }
- return headers
- if __name__ == "__main__":
- main()
|