import time import hashlib import requests import config import csv import os output = config.outputpath + "/" + str(config.examId) if not os.path.exists(output): os.makedirs(output) #savepath = output + "/" + "mediaimport.csv" savestudentpath=output + "/" + "examstudent.csv" mediaimportlist=[] examstudentlist=[] def main(): queryExamMarkingInfo() # with open(savepath, 'w', encoding='utf-8', newline='') as f: # writer = csv.writer(f) # writer.writerow(['准考证号', '卷型']) # for row in mediaimportlist: # print(row) # writer.writerow(row) with open(savestudentpath, 'w', encoding='utf-8', newline='') as f: writer = csv.writer(f) writer.writerow(["课程代码","课程名称","准考证号","学号","姓名"]) for row in examstudentlist: print(row) writer.writerow(row) def queryExamMarkingInfo(): """ 获取阅卷信息 """ url=config.oedomain+"api/core/oe/admin/examRecordForMarking/findExamRecordForMarkingInfo" data = { "examId": config.examId, "courseId": "", "batchNum": "" } headers=createRpcheader("4","OE") result = requests.post( url, headers=headers, json=data) if result.status_code==200: examRecordForMarkingBeanList=result.json()['examRecordForMarkingBeanList'] for examRecordForMarkingBean in examRecordForMarkingBeanList: queryExamValidRecordData(examRecordForMarkingBean['courseId'],1) else: print("查询阅卷信息失败") def queryExamValidRecordData(courseId,pageNo): """ 获取全部考生的阅卷信息 """ url=config.oedomain+"api/core/oe/admin/examRecordForMarking/queryValidExamRecordInfoPage" data = { "examId": config.examId, "courseId": courseId, "start": pageNo, "size":200 } headers = createRpcheader("4","OE") result = requests.post( url, headers=headers, json=data) if result.status_code==200: examRecordForMarkingBeanList=result.json()['examRecordForMarkingBeanList'] if len(examRecordForMarkingBeanList) > 0: for examRecordForMarkingBean in examRecordForMarkingBeanList: # 取试卷题目 course = getCourse(courseId) examstudent=queryExamStudent(examRecordForMarkingBean["examStudentId"]) print(examstudent) examstudentlist.append([examstudent["courseCode"],examstudent["courseName"], examRecordForMarkingBean["examRecordDataId"],examstudent["identityNumber"],examstudent["studentName"]]) #mediaimportlist.append([examRecordForMarkingBean["examRecordDataId"],examRecordForMarkingBean["paperType"]]) #存储到本地 time.sleep(1) next = result.json()["next"] queryExamValidRecordData(courseId,next) else: print("没有剩余课程") def queryExamStudent(examstudentid): url=config.examworkdomain+"api/core/examwork/examStudent/getExamStudent" headers = createRpcheader("2", "E") data={ "rootOrgId":config.rootOrgId, "examStudentId":examstudentid, } result = requests.post( url, headers=headers, json=data) if result.status_code==200: examstudent=result.json()["examStudentBean"] return examstudent else: print(("query examstudent %s failed")%(str(examstudentid))) def getCourse(courseId): url=config.basedomain+"api/core/basic/course/getCoursesByIdList" headers =createRpcheader("1","B") data={ "courseIdList":[courseId] } result = requests.post( url, headers=headers,json=data) if result.status_code==200: return result.json()["courseList"][0] def createRpcheader(appid,appcode): secretkey = "123456" t = time.time() secretstr = ''.join([appid, appcode, str(int(round(t * 1000))), secretkey]) accssToken = hashlib.sha256(secretstr.encode("utf-8")).hexdigest() headers = { "App-Id": appid, "App-Code": appcode, "Trace-Id": "1222222", "Access-Token": accssToken, "timestamp": str(int(round(t * 1000))) } return headers if __name__=="__main__": main()