refact: merge common logic

This commit is contained in:
lit
2025-06-24 22:17:28 +08:00
parent 2dd9ddc520
commit d84ff711ad
+46 -32
View File
@@ -28,7 +28,7 @@ class Loginer:
log = print, log = print,
): ):
self.interface = interface self.interface = interface
self.device = device self.device_str = str(device)
self.log = log self.log = log
self.interface_def = interface is None self.interface_def = interface is None
def check_connectivity(self): def check_connectivity(self):
@@ -54,42 +54,56 @@ class Loginer:
return True return True
except: except:
return False return False
referrer = "http://192.168.101.201/"
header = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36 Edg/105.0.1343.33",
"Referer": referrer
}
@staticmethod
def url(last, dr, v):
return "/eportal/portal/page/" + last + "?callback=dr" + dr + "&v=" + v + \
"&lang=zh-CN&program_index=ctshNw1713845951&page_index=V5fmKw1713845966&wlan_user_ip=0.0.0.0&wlan_user_mac=000000000000&jsVersion=4.1"
ip0 = "&wlan_user_ip=0.0.0.0"
urlloadUserInfo = referrer + url("loadUserInfo", "1004", "599") + ip0
urlloadOnlineRecord = referrer + url("loadOnlineRecord", "1006", "2399") + "&wlan_user_ip=10.169.0.241&start_time=2010-01-01&end_time=2100-01-01&start_rn=1&end_rn=5"
urllogin = referrer + url("loadOnlineRecord", "1006", "2399") + ip0 + "&login_method=1&wlan_user_ipv6=&wlan_user_mac=000000000000&wlan_ac_ip=&wlan_ac_name=&terminal_type=1"
@staticmethod
def json_from_drnnnn(t):
"drNNNN(`result`); <- e.g. t.replace('dr1006(', '').replace(')', '').replace(';', '')"
return json.loads(t[7:-2])
def login(self, user, pwd): def login(self, user, pwd):
interface = self.interface interface = self.interface
interface_def = self.interface_def interface_def = self.interface_def
url = f'http://192.168.101.201:801/eportal/portal/page/loadUserInfo?callback=dr1004&lang=zh-CN&program_index=ctshNw1713845951&page_index=V5fmKw1713845966&user_account={user}&wlan_user_ip=0.0.0.0&wlan_user_mac=000000000000&jsVersion=4.1&v=599&lang=zh' u = "&user_account=" + user
url = self.urlloadUserInfo + u
try: try:
t = request_get_text(url, headers={ t = request_get_text(url, headers=self.header)
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36 Edg/105.0.1343.33',
'Referer': 'http://192.168.101.201/'
})
except URLError as e: except URLError as e:
if e.reason.errno == ENETUNREACH: if e.reason.errno == ENETUNREACH:
return LoginStatus.no_wifi return LoginStatus.no_wifi
raise raise
text = t.replace('dr1004(', '').replace(')', '').replace(';', '') j = self.json_from_drnnnn(t)
j = json.loads(text)
# 检查是否付费 # 检查是否付费
if j['user_info']['user_state'] == "正常" and j['user_info']['available_flow'] in ("0MB", "无限制"): if j['user_info']['user_state'] == "正常" and j['user_info']['available_flow'] in ("0MB", "无限制"):
url = f"http://192.168.101.201:801/eportal/portal/page/loadOnlineRecord?callback=dr1006&lang=zh-CN&program_index=ctshNw1713845951&page_index=V5fmKw1713845966&user_account={user}&wlan_user_ip=10.169.0.241&wlan_user_mac=000000000000&start_time=2010-01-01&end_time=2100-01-01&start_rn=1&end_rn=5&jsVersion=4.1&v=2399&lang=zh" url = self.urlloadOnlineRecord + u
# 获取在线设备 # 获取在线设备
t = request_get_text(url, headers={ t = request_get_text(url, headers=self.header)
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36 Edg/105.0.1343.33', j1 = self.json_from_drnnnn(t)
'Referer': 'http://192.168.101.201/' result = LoginStatus.unknown
}) if j1['count'] == self.device_str:
text = t.replace('dr1006(', '').replace(')', '').replace(';', '')
j1 = json.loads(text)
if int(j1['count']) == self.device:
# 判断密码是否正确 # 判断密码是否正确
url = f"https://xha.ouc.edu.cn:802/eportal/portal/login?callback=dr1003&login_method=1&user_account={user}&user_password={pwd}&wlan_user_ip=0.0.0.0&wlan_user_ipv6=&wlan_user_mac=000000000000&wlan_ac_ip=&wlan_ac_name=&jsVersion=4.1&terminal_type=1&lang=zh-cn&v=2425&lang=zh" url = self.urllogin + u + "&user_password=" + pwd
if interface_def: if interface_def:
res = request_get_text(url) res = request_get_text(url)
else: else:
command = ["curl", url, "--interface", interface] command = ["curl", url, "--interface", interface]
res = subprocess.check_output(command, text=True) res = subprocess.check_output(command, text=True)
result = json.loads(res[7:-2]) j_res = self.json_from_drnnnn(res)
""" """
dr1003({"result":0,"msg":"账号不存在","ret_code":1}); dr1003({"result":0,"msg":"账号不存在","ret_code":1});
dr1003({"result":0,"msg":"密码错误","ret_code":1}); dr1003({"result":0,"msg":"密码错误","ret_code":1});
@@ -100,26 +114,26 @@ class Loginer:
if not interface_def: if not interface_def:
self.log(f"使用接口 {interface} 进行请求") self.log(f"使用接口 {interface} 进行请求")
msg += f"接口 {interface} " msg += f"接口 {interface} "
if '密码错误' in result['msg']:
self.log(f"{user} {pwd} 密码错误") j_msg = j_res['msg']
self.log(msg) if '密码错误' in j_msg:
return LoginStatus.bad_pwd msg += f"{user} {pwd} 密码错误"
elif '已经在线' in result['msg']: result = LoginStatus.bad_pwd
elif '已经在线' in j_msg:
msg += "正常在线!" msg += "正常在线!"
self.log(msg) result = LoginStatus.used_online
return LoginStatus.used_online elif '认证成功' in j_msg:
elif '认证成功' in result['msg']:
msg += f"使用账号{user}登录成功!" msg += f"使用账号{user}登录成功!"
result = LoginStatus.succ
self.log(msg) self.log(msg)
return LoginStatus.succ return result
return LoginStatus.unknown
else: else:
return LoginStatus.not_unlimit return LoginStatus.not_unlimit
def login_till_succ(self, user_pwd_gen, bad_user_callback=lambda u: None): def login_till_succ(self, user_pwd_gen, bad_user_callback=lambda u: None):
''' """
user_pwd_gen will be called multiply times until login succeeds. user_pwd_gen will be called multiply times until login succeeds.
''' """
not_succ = True not_succ = True
while not_succ: while not_succ:
(user, pwd) = user_pwd_gen() (user, pwd) = user_pwd_gen()
@@ -139,10 +153,10 @@ class Loginer:
# 检测网络联通性 # 检测网络联通性
if self.check_connectivity(): if self.check_connectivity():
exit() return
with open(file, 'r') as f: with open(file, 'r') as f:
user_pwd = [i.rstrip('\n\r').split() for i in f.readlines()] user_pwd = [i.rstrip("\r\n").split(' ') for i in f.readlines()]
# login # login
user_pwd_error_or_not_unlimit_combo_idx = set() user_pwd_error_or_not_unlimit_combo_idx = set()