cherry_be/lib/public/crawler.py
2021-09-04 10:50:46 +08:00

403 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
from json.encoder import JSONEncoder
import requests
from urllib.parse import quote
import base64
from bs4 import BeautifulSoup
import random
import sys
from werkzeug.utils import redirect
from utils import btoa, signCode
from ocr import getCaptcha
class Crawler(object):
def __init__(self):
self.__session = requests.Session()
self.__response = None
self.__pwd = None
self.__phone = None
self.cid = None
self.sid = None
self.uid = None
self.real_name = None
self.baseUrl = None
# 获取用户基本信息
def getUserInfo(self):
return {
'cid': self.cid,
'pwd': signCode(self.__pwd),
'sid': self.sid,
'uid': self.uid,
'real_name': self.real_name,
}
# 链接教务 -----------------------------------------------------------------------------
def connection(self):
try:
# 获取统一身份系统的网页
r = self.__session.get(
url='https://mysso.cust.edu.cn/cas/login?service=https://jwgl.cust.edu.cn/welcome')
soup = BeautifulSoup(r.text, 'html.parser')
execution = soup.find_all(name='input')[3]['value']
r = self.__session.get('https://mysso.cust.edu.cn/cas/captcha')
formdata = {
'username': self.cid,
'password': self.__pwd,
'execution': execution,
'captcha': getCaptcha(r.content),
'_eventId': 'submit',
'geolocation': ''
}
r = self.__session.post(
url='https://mysso.cust.edu.cn/cas/login?service=https://jwgl.cust.edu.cn/welcome', data = formdata)
soup = BeautifulSoup(r.text, 'html.parser')
loginSuccess = (soup.find(name='title').text != "统一身份认证系统")
self.__response = r
if not loginSuccess:
return '账号或者密码错误', 511
return '登录成功', 200
except Exception as e:
print(e)
return '教务挂了', 515
def submitVerificationCode(self, data):
try:
r = self.__response
soup = BeautifulSoup(r.text, 'html.parser')
execution = soup.find_all(name='input')[2]['value']
formdata = {
'yzm': data['vc'],
'msgType': '',
'execution': execution,
'_eventId': 'submit'
}
r = self.__session.post(
url = "https://mysso.cust.edu.cn/cas/login?service=https://jwgl.cust.edu.cn/welcome", data = formdata, allow_redirects = True)
self.__response = r
return 'gotcha!', 200
except Exception as e:
print(e)
return '教务挂了', 515
# 获取成绩 -----------------------------------------------------------------------------
def getGrade(self):
r = self.__response
urlBase = r.url.split('.')[0]
ticket = r.url.split('?')[-1]
param = base64.b64encode(quote(json.dumps({
"Ticket":ticket.split('=')[-1],
"Url":"https://jwgl.cust.edu.cn/welcome"
})).encode('utf-8'))
self.baseUrl = urlBase
r = self.__session.post(
url = urlBase + '.cust.edu.cn/api/LoginApi/LGSSOLocalLogin',
data = {
"param": param,
"__log": {},
"__permission": {}
})
headers = {'Content-Type': 'application/json'}
r = self.__session.post(
url=urlBase + '.cust.edu.cn/api/ClientStudent/QueryService/GradeQueryApi/GetDataByStudent',
data=json.dumps({
"param": "JTdCJTIyU2hvd0dyYWRlVHlwZSUyMiUzQTAlN0Q=",
"__permission": {
"MenuID": "4443798E-EB6E-4D88-BFBD-BB0A76FF6BD5",
"Operate": "select",
"Operation": 0
},
"__log": {
"MenuID": "4443798E-EB6E-4D88-BFBD-BB0A76FF6BD5",
"Logtype": 6,
"Context": "查询"
}
}),
headers=headers
)
data = json.loads(r.content.decode('utf-8'))
if data['state'] != 0:
return '教务挂了', 515
# 分解数据并重命名
total = data['data']['GradeStatistics']
split = data['data']['GradeList']
# 成绩总览
total_grade = {
'total_GPA': total['PJJD'],
'total_credit': total['SDXF'],
'total_kill': total['TGMS'],
'total_dead': total['WTGMS']
}
# 提取第一和最后一学期
first_term = split[0]['KSXNXQ']
last_term = split[len(split)-1]['KSXNXQ']
# 转换成int元组
first_term = (int(first_term[0:4]), int(first_term[4:5]))
last_term = (int(last_term[0:4]), int(last_term[4:5]))
# 生成中间学期
total_term = []
for i in range(last_term[0], first_term[0] + 1):
for j in range(1, 3):
total_term.append(str(i) + str(j))
if i == first_term[0] and j == first_term[1]:
break
total_term.reverse()
grade_list = []
# 当前学期索引/上学期总通过/上学期总挂科/上学期总学分/上学期总学分绩点/上一课程通过次数
this_term, last_term_kill, last_term_dead, last_term_credit, last_term_c_x_g, last_lesson_kill = 0, 0, 0, 0, 0, 0
# 上学期课程列表
last_term_grade_list = []
# 上一课程名称
last_lesson_name = ''
# flag,将是否通过延后一个循环
flag = True
# 总必修学分
total_bixiu_credit = 0
# 总必修学分绩点
total_bixiu_c_x_g = 0
# 遍历课程
for item in split:
if not item['YXCJ']:
continue
# 如果和上一个课程重名
if item['LessonInfo']['KCMC'] == last_lesson_name:
# 判断是否通过
if item['YXCJ'] >= 60:
# 如果通过贡献1通过
last_lesson_kill += 1
# 贡献总学分绩点
last_term_c_x_g += item['XF'] * (item['YXCJ'] - 50) / 10
if item['KCXZ'] == '必修':
total_bixiu_c_x_g += item['XF'] * \
(item['YXCJ'] - 50) / 10
total_bixiu_credit += item['XF']
else: # 如果不重名
if flag: # 将else中的判断延后一个循环
flag = False
else:
# 如果上一课程通过
if last_lesson_kill > 0:
# 贡献上学期通过数
last_term_kill += 1
else:
# 贡献上学期挂科数
last_term_dead += 1
last_lesson_kill = 0
# 更新上一课程名称
last_lesson_name = item['LessonInfo']['KCMC']
# 如果不是当前学期
if item['KSXNXQ'] != total_term[this_term]:
# 成绩列表添加上学期数据
grade_list.append({
'term_time': total_term[this_term],
'term_GPA': last_term_c_x_g / last_term_credit,
'term_kill': last_term_kill,
'term_dead': last_term_dead,
'term_credit': last_term_credit,
'term_grade': last_term_grade_list
})
# 当前学期索引+1
while item['KSXNXQ'] != total_term[this_term]:
this_term += 1
# 初始化所有值
last_term_kill, last_term_dead, last_term_credit = 0, 0, item['XF']
last_term_grade_list = []
# 如果通过
if item['YXCJ'] >= 60:
# 贡献总学分绩点
last_term_c_x_g = item['XF'] * (item['YXCJ'] - 50) / 10
# 贡献通过次数
last_lesson_kill += 1
if item['KCXZ'] == '必修':
total_bixiu_c_x_g += item['XF'] * \
(item['YXCJ'] - 50) / 10
total_bixiu_credit += item['XF']
else:
last_term_c_x_g = 0
else: # 如果是当前学期
# 贡献总学分
last_term_credit += item['XF']
# 如果通过
if item['YXCJ'] >= 60:
# 贡献通过数
last_lesson_kill += 1
# 贡献学分绩点
last_term_c_x_g += item['XF'] * \
(item['YXCJ'] - 50) / 10
if item['KCXZ'] == '必修':
total_bixiu_c_x_g += item['XF'] * \
(item['YXCJ'] - 50) / 10
total_bixiu_credit += item['XF']
# 加入学期成绩列表
last_term_grade_list.append({
'title': item['LessonInfo']['KCMC'],
'credit': item['XF'],
'grade': item['YXCJ'],
'kill': 'yes' if (item['YXCJ'] >= 60) else 'no',
'class': item['KSXZ']
})
# 补充最后一次遍历的数据
if last_lesson_kill > 0:
last_term_kill += 1
else:
last_term_dead += 1
grade_list.append({
'term_time': total_term[this_term],
'term_GPA': last_term_c_x_g / last_term_credit,
'term_kill': last_term_kill,
'term_dead': last_term_dead,
'term_credit': last_term_credit,
'term_grade': last_term_grade_list
})
total_grade['total_bixiu_GPA'] = total_bixiu_c_x_g / \
total_bixiu_credit
return {
'total_grade': total_grade,
'grade_list': grade_list
}, 200
# 获取当前周数
def getCurWeek(self):
headers = {'Content-Type': 'application/json'}
r = self.__session.post(
url='https://jwgls1.cust.edu.cn/api/ClientStudent/Home/StudentHomeApi/GetHomeCurWeekTime?sf_request_type=ajax',
data=json.dumps({"param": "JTdCJTdE", "__permission": {"MenuID": "F71C97D5-D3E2-4FDA-9209-D7FA8626390E",
"Operation": 0}, "__log": {"MenuID": "F71C97D5-D3E2-4FDA-9209-D7FA8626390E", "Logtype": 6, "Context": "查询"}}),
headers=headers
)
return json.loads(
r.content.decode('utf-8'))['data']['CurWeek'], 200
# 处理课表信息
def manageSchedule(self, data):
time = ['AM__TimePieces', 'PM__TimePieces', 'EV__TimePieces']
data = data['data']['AdjustDays']
lessons = []
for i in range(7):
for j in range(3):
for k in range(2):
if(data[i][time[j]][k]['Dtos']):
for l in data[i][time[j]][k]['Dtos']:
temp_lesson = {
'sid': self.sid,
'real_name': self.real_name,
'is_personal': False,
'day': i,
'period': j*2+k,
'is_groups_course': False,
}
weeks_split = [0] * 23
mod = ''
for m in l['Content']:
key = m['Key']
if m['Key'] == 'Teacher':
key = 'teacher'
elif m['Key'] == 'Lesson':
key = 'course'
elif m['Key'] == 'Room':
key = 'room'
elif m['Key'] == 'Time':
key = 'weeks'
if temp_lesson.get(key):
temp_lesson[key] += ','+m['Name']
else:
temp_lesson[key] = m['Name']
temp_weeks = temp_lesson['weeks']
temp_lesson['weeks'] = temp_weeks[0:int(
temp_weeks.find('') + 1)]
if '单周' in temp_weeks:
mod = 'single'
elif '双周' in temp_weeks:
mod = 'double'
else:
mod = 'all'
zhou_pos = temp_weeks.find('')
temp_weeks = temp_weeks[0:zhou_pos]
temp_weeks = temp_weeks.split(',')
index = 0
for n in temp_weeks:
temp_weeks[index] = n.split('-')
index += 1
index = 0
for n in temp_weeks:
if len(n) > 1:
for o in range(int(n[0]), int(n[1]) + 1):
if (o % 2 == 0 and mod == 'double') or (o % 2 == 1 and mod == 'single') or (mod == 'all'):
weeks_split[o] = 1
else:
weeks_split[o] = 0
else:
weeks_split[int(n[0])] = 1
index += 1
temp_lesson['weeks_split'] = weeks_split
lessons.append(temp_lesson)
return lessons, 200
# 获取个人课表
def getOwnSchedule(self):
headers = {'Content-Type': 'application/json'}
r = self.__session.post(
url=self.baseUrl + '.cust.edu.cn/api/ClientStudent/Home/StudentHomeApi/QueryStudentScheduleData',
data=json.dumps({
"param": "JTdCJTdE",
"__permission": {
"MenuID": "00000000-0000-0000-0000-000000000000",
"Operate": "select",
"Operation": "0"
},
"__log": {
"MenuID": "00000000-0000-0000-0000-000000000000",
"Logtype": 6,
"Context": "查询"
}
}),
headers=headers
)
data = json.loads(r.content.decode('utf-8'))
if data['state'] != 0:
return ('教务挂了', 515)
return self.manageSchedule(data)
# 获取他人课表
def getOtherschedule(self):
headers = {'Content-Type': 'application/json'}
params = {"KBLX":"2","CXLX":"0","XNXQ":"20202","CXID":self.uid,"CXZC":"0","JXBLX":""}
params = str(btoa(json.dumps(params)))[2:-1]
r = self.__session.post(
url=self.baseUrl + '.cust.edu.cn/api/ClientStudent/QueryService/OccupyQueryApi/QueryScheduleData?sf_request_type=ajax',
data=json.dumps({"param": params, "__permission": {"MenuID": "F71C97D5-D3E2-4FDA-9209-D7FA8626390E",
"Operation": 0}, "__log": {"MenuID": "F71C97D5-D3E2-4FDA-9209-D7FA8626390E", "Logtype": 6, "Context": "查询"}}),
headers=headers
)
data = json.loads(r.content.decode('utf-8'))
if data['state'] != 0:
return ('教务挂了', 515)
return self.manageSchedule(data)
# 获取cookie
def getCookie(self):
return self.__session.cookies.items(), 200
# 设置cookie
def setCookie(self, cookies):
requests.utils.add_dict_to_cookiejar(
self.__session.cookies, dict(cookies))
return 'OK', 200
# 默认初始化
def defaultInit(self, data):
self.cid = data['cid']
self.__pwd = data['pwd']
self.__phone = data['phone']
self.__session = requests.Session()
# 使用我的cookie初始化用于快速刷新课表
def cookieInit(self, cookies, uid, cid, sid, real_name):
self.cid = cid
self.sid = sid
self.uid = uid
self.real_name = real_name
self.__session = requests.Session()
self.setCookie(cookies)
return self.getOtherschedule()