""" 功能:导出json试卷 作者:pengchen 时间:2020/3/10 """ import time import hashlib import requests import config import logger import json import os from bs4 import BeautifulSoup from bs4 import element import re def main(): queryExamMarkingInfo() # questionlist=querySubjectInfo("8887593","A","B149") # with open("8887593.json", "w", encoding="utf-8") as file: # json_str = json.dumps(questionlist, indent=4, ensure_ascii=False) # file.write(json_str) def queryExamMarkingInfo(): """ 获取阅卷信息 """ url = config.oedomain + "api/core/oe/admin/examRecordForMarking/findExamRecordForMarkingInfo" data = { "examId": config.examId, "courseId": "", "batchNum": "" } headers = createRpcheader("4", "OE") result = requests.post( url, headers=headers, json=data) if result.status_code == 200: examRecordForMarkingBeanList = result.json()['examRecordForMarkingBeanList'] for examRecordForMarkingBean in examRecordForMarkingBeanList: queryExamValidRecordData(examRecordForMarkingBean['courseId'], 1) else: print("查询阅卷信息失败") def queryExamValidRecordData(courseId, pageNo): """ 获取全部考生的阅卷信息 """ url = config.oedomain + "api/core/oe/admin/examRecordForMarking/queryValidExamRecordInfoPage" data = { "examId": config.examId, "courseId": courseId, "start": pageNo, "size": 200 } headers = createRpcheader("4", "OE") result = requests.post( url, headers=headers, json=data) if result.status_code == 200: examRecordForMarkingBeanList = result.json()['examRecordForMarkingBeanList'] if len(examRecordForMarkingBeanList) > 0: for examRecordForMarkingBean in examRecordForMarkingBeanList: # examRecordDataId = str(examRecordForMarkingBean["examRecordDataId"]); # if examRecordDataId != "39881554" and examRecordDataId != "39879641": # # logger.info("不用处理,跳过!examRecordDataId = %s" % examRecordDataId) # continue # 取试卷题目 course = getCourse(courseId) output = config.outputpath + "/" + str(config.examId) + "/" + str(course["code"]) + "-" + \ examRecordForMarkingBean["paperType"] if not os.path.exists(output): os.makedirs(output) savepath = output + "/" + str(examRecordForMarkingBean["examRecordDataId"]) + ".json" if os.path.exists(savepath): logger.info("已处理,跳过!examRecordDataId = %s" % str(examRecordForMarkingBean["examRecordDataId"])) continue questionbeanlist = querySubjectInfo(examRecordForMarkingBean["examRecordDataId"], examRecordForMarkingBean["paperType"], course["code"]) # 存储到本地 time.sleep(1) with open(savepath, "w", encoding="utf-8") as file: json_str = json.dumps(questionbeanlist, indent=4, ensure_ascii=False) file.write(json_str) next = result.json()["next"] queryExamValidRecordData(courseId, next) else: print("没有剩余课程") def querySubjectInfo(examRecordDataId, paperType, courseCode): """ paperId: paperType: courseCode: """ print("开始处理%s" % (examRecordDataId)) questionBeanList = [] url = config.oedomain + "api/core/oe/admin/examRecordQuestions/querySubjectiveAnswerList" headers = createRpcheader("4", "OE") data = { "examRecordDataId": examRecordDataId } result = requests.post( url, headers=headers, json=data) if result.status_code == 200: querySubjectiveAnswerBeanList = result.json()["querySubjectiveAnswerBeanList"] question_unit_number = 1 lastquestionId = None for querySubjectiveAnswerBean in querySubjectiveAnswerBeanList: questionId = querySubjectiveAnswerBean["questionId"] main_number = querySubjectiveAnswerBean["mainNumber"] sub_number = querySubjectiveAnswerBean["order"] studentanswer = querySubjectiveAnswerBean["studentAnswer"] print("studentanswer:", studentanswer) questiondata = getQuestion(questionId, courseCode, paperType) mainbodyblocklist = None if lastquestionId == questionId: question_unit_number = question_unit_number + 1 # 下一道主观题 else: lastquestionId = questionId question_unit_number = 1 if questiondata["defaultQuestion"]["masterVersion"]["body"]: # 套题处理 mainquestionbody = questiondata["defaultQuestion"]["masterVersion"]["body"] mainbodyblocklist = splitQuestionBody(mainquestionbody) questionUnitList = questiondata["defaultQuestion"]["masterVersion"]["questionUnitList"] current_subjective_number = 1 for questionUnit in questionUnitList: sub_ques_body = questionUnit["body"] if questionUnit["questionType"] == "FILL_UP" or questionUnit["questionType"] == "ESSAY": if current_subjective_number == question_unit_number: questionBean = { "mainNumber": main_number, "subNumber": sub_number, "studentAnswer": { "sections": [] }, "answer": { "sections": [] }, "body": { "sections": [] } } quesbodyblocklist = splitQuestionBody(sub_ques_body) if mainbodyblocklist: questionBean["body"]["sections"] = mainbodyblocklist + quesbodyblocklist else: questionBean["body"]["sections"] = quesbodyblocklist questionBean["studentAnswer"]["sections"] = splitQuesitonStudentAnswer(studentanswer, questionUnit[ "answerType"], questionUnit[ "questionType"]) questionBean["answer"]["sections"] = splitQuesitonAnswer(questionUnit["rightAnswer"][0]) questionBeanList.append(questionBean) current_subjective_number = current_subjective_number + 1 return questionBeanList def splittagElement(blocklist, childrenelement): if type(childrenelement) == element.NavigableString: block = { "type": "text", "value": childrenelement, "playTime": None, "param": None } blocklist.append(block) elif type(childrenelement) == element.Tag: for child in childrenelement.children: if child.name == "a": playTime = None if "playtime" in child.attrs: playTime = child.attrs["playtime"] block = { "type": "audio", "value": child.attrs["url"], "playTime": playTime, "param": None } blocklist.append(block) elif child.name == "img": width = None height = None if "width" in child.attrs and "height" in child.attrs: width = child.attrs["width"] height = child.attrs["height"] else: style = child.attrs["style"] width = re.search("width:([^;]*)", style).group(1) height = re.search("height:([^;]*)", style).group(1) block = { "type": "image", "value": child.attrs["src"], "playTime": None, "param": { "width": width, "height": height } } blocklist.append(block) else: splittagElement(blocklist, child) def splitQuestionBody(questionBody): """ 切分quesitonBody """ sectionlist = [] bodyhtml = BeautifulSoup(questionBody, features="html.parser") plist = bodyhtml.find_all("p") for p in plist: # 遍历p标签 section = { "blocks": [] } splittagElement(section["blocks"], p) sectionlist.append(section) return sectionlist def splitQuesitonAnswer(quesitonanswer): """ 切分问题标答 """ sectionlist = [] # print(quesitonanswer) if quesitonanswer: bodyhtml = BeautifulSoup(quesitonanswer, features="html.parser") plist = bodyhtml.find_all("p") for p in plist: # 遍历p标签 section = { "blocks": [] } splittagElement(section["blocks"], p) sectionlist.append(section) else: sectionlist = None return sectionlist def splitQuesitonStudentAnswer(studentanswer, answerType, questionType): """ 切分学生答案 """ print(answerType, questionType) if studentanswer: sectionlist = [] if questionType == "ESSAY": if answerType == "SINGLE_AUDIO": answer = { "blocks": [{ "type": "audio", "value": studentanswer, "playTime": None, "param": None }]} sectionlist.append(answer) else: # 取图片 bodyhtml = BeautifulSoup(studentanswer, features="html.parser") blocks = { "blocks": [] } for body in bodyhtml: if type(body) == element.NavigableString: textanswer = { "type": "text", "value": body, "playTime": None, "param": None } blocks["blocks"].append(textanswer) else: alist = body.find_all("a") if len(alist) > 0: for atag in alist: imageanswer = { "type": "image", "value": atag.attrs["href"], "playTime": None, "param": { "width": 200, "height": 200 } } blocks["blocks"].append(imageanswer) else: textanswer = { "type": "text", "value": body.text, "playTime": None, "param": None } blocks["blocks"].append(textanswer) sectionlist.append(blocks) else: answer = { "blocks": [{ "type": "text", "value": studentanswer, "playTime": None, "param": None }] } sectionlist.append(answer) print(sectionlist) else: sectionlist = None return sectionlist def getQuestion(questionId, courseCode, paperType): url = config.questiondomain + "api/core/questions/extract_config/getQuestion" headers = createRpcheader("3", "Q") data = { "examId": config.examId, "courseCode": courseCode, "groupCode": paperType, "questionId": questionId } result = requests.post( url, headers=headers, json=data) if result.status_code == 200: return result.json() def getCourse(courseId): url = config.basedomain + "api/core/basic/course/getCoursesByIdList" headers = createRpcheader("1", "B") data = { "courseIdList": [courseId] } result = requests.post( url, headers=headers, json=data) if result.status_code == 200: return result.json()["courseList"][0] def createRpcheader(appid, appcode): secretkey = "123456" t = time.time() secretstr = ''.join([appid, appcode, str(int(round(t * 1000))), secretkey]) accssToken = hashlib.sha256(secretstr.encode("utf-8")).hexdigest() headers = { "App-Id": appid, "App-Code": appcode, "Trace-Id": "1222222", "Access-Token": accssToken, "timestamp": str(int(round(t * 1000))) } return headers if __name__ == "__main__": main()