refact: class Loginer

This commit is contained in:
lit
2025-06-24 18:53:48 +08:00
parent b53576a273
commit 6a82d46a6b
+39 -32
View File
@@ -1,4 +1,5 @@
from enum import IntEnum, auto
from urllib import request
import json
@@ -10,11 +11,28 @@ def request_get_text(url, headers={}):
with request.urlopen(req) as response:
return response.read().decode('utf-8')
def check_connectivity(interface):
interface_def = interface is None
if not interface_def:
class LoginStatus(IntEnum):
unknown = -3
not_unlimit = -2
bad_pwd = -1
succ = auto()
used_online = auto()
class Loginer:
def __init__(self,
interface = None,
device = 0,
log = print,
):
self.interface = interface
self.device = device
self.log = log
self.interface_def = interface is None
def check_connectivity(self):
interface = self.interface
if not self.interface_def:
command = ['ping']
if not interface_def:
if not self.interface_def:
command += ["-I", interface]
host = "223.5.5.5"
command += ['-c', '1', host] # -n 1 on windows, -c 1 on linux
@@ -22,12 +40,12 @@ def check_connectivity(interface):
result = subprocess.check_call(command, timeout=2)
return result == 0
except subprocess.TimeoutExpired:
print(f"接口 {interface}Ping to {host} timed out.")
self.log(f"接口 {interface}Ping to {host} timed out.")
return False
except subprocess.CalledProcessError:
return False
except Exception as e:
print(f"接口 {interface}Error during ping: {e}")
self.log(f"接口 {interface}Error during ping: {e}")
return False
else:
try:
@@ -36,15 +54,9 @@ def check_connectivity(interface):
except:
return False
class LoginStatus(IntEnum):
unknown = -3
not_unlimit = -2
bad_pwd = -1
succ = auto()
used_online = auto()
def login(user, pwd, interface, log, device):
interface_def = interface is None
def login(self, user, pwd):
interface = self.interface
interface_def = self.interface_def
url = f'http://192.168.101.201:801/eportal/portal/page/loadUserInfo?callback=dr1004&lang=zh-CN&program_index=ctshNw1713845951&page_index=V5fmKw1713845966&user_account={user}&wlan_user_ip=0.0.0.0&wlan_user_mac=000000000000&jsVersion=4.1&v=599&lang=zh'
t = request_get_text(url, headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36 Edg/105.0.1343.33',
@@ -52,6 +64,7 @@ def login(user, pwd, interface, log, device):
})
text = t.replace('dr1004(', '').replace(')', '').replace(';', '')
j = json.loads(text)
# 检查是否付费
if j['user_info']['user_state'] == "正常" and j['user_info']['available_flow'] in ("0MB", "无限制"):
url = f"http://192.168.101.201:801/eportal/portal/page/loadOnlineRecord?callback=dr1006&lang=zh-CN&program_index=ctshNw1713845951&page_index=V5fmKw1713845966&user_account={user}&wlan_user_ip=10.169.0.241&wlan_user_mac=000000000000&start_time=2010-01-01&end_time=2100-01-01&start_rn=1&end_rn=5&jsVersion=4.1&v=2399&lang=zh"
# 获取在线设备
@@ -61,7 +74,7 @@ def login(user, pwd, interface, log, device):
})
text = t.replace('dr1006(', '').replace(')', '').replace(';', '')
j1 = json.loads(text)
if int(j1['count']) == device:
if int(j1['count']) == self.device:
# 判断密码是否正确
url = f"https://xha.ouc.edu.cn:802/eportal/portal/login?callback=dr1003&login_method=1&user_account={user}&user_password={pwd}&wlan_user_ip=0.0.0.0&wlan_user_ipv6=&wlan_user_mac=000000000000&wlan_ac_ip=&wlan_ac_name=&jsVersion=4.1&terminal_type=1&lang=zh-cn&v=2425&lang=zh"
if interface_def:
@@ -78,10 +91,10 @@ def login(user, pwd, interface, log, device):
"""
msg = ""
if not interface_def:
log(f"使用接口 {interface} 进行请求")
self.log(f"使用接口 {interface} 进行请求")
msg += f"接口 {interface} "
if '密码错误' in result['msg']:
log(f"{user} {pwd} 密码错误")
self.log(f"{user} {pwd} 密码错误")
return LoginStatus.bad_pwd
elif '已经在线' in result['msg']:
msg += "正常在线!"
@@ -89,37 +102,32 @@ def login(user, pwd, interface, log, device):
elif '认证成功' in result['msg']:
msg += f"使用账号{user}登录成功!"
return LoginStatus.succ
log(msg)
self.log(msg)
return LoginStatus.unknown
else:
return LoginStatus.not_unlimit
def login_till_succ(user_pwd_gen, bad_user_callback=lambda u: None, interface=None, log=print):
def login_till_succ(self, user_pwd_gen, bad_user_callback=lambda u: None):
'''
user_pwd_gen will be called multiply times until login succeeds.
'''
device = 0 # Final[int]
# 检查是否付费
not_succ = True
while not_succ:
(user, pwd) = user_pwd_gen()
ret = login(user, pwd, interface, log, device)
ret = self.login(user, pwd)
not_succ = ret < 0
if ret in {LoginStatus.bad_pwd, LoginStatus.not_unlimit}:
bad_user_callback(user)
def main(args, warn, log):
interface = args.interface
file = args.file
interface_def = interface is None
def main(self, file, warn):
interface_def = self.interface_def
if interface_def:
warn("no interface given by -i or --interface, use default route")
# 检测网络联通性
if check_connectivity(interface):
if self.check_connectivity():
exit()
with open(file, 'r') as f:
@@ -139,8 +147,7 @@ def main(args, warn, log):
def bad_user_callback(_):
user_pwd_error_or_not_unlimit_combo_idx.add(cur_lku_user_idx)
login_till_succ(user_pwd_getter, bad_user_callback, interface=interface, log=log)
self.login_till_succ(user_pwd_getter, bad_user_callback)
# 删除密码错误或不满足要求的用户
with open(file, 'w') as f:
@@ -163,4 +170,4 @@ if __name__ == "__main__":
parser.add_argument("-i", "--interface", help="网卡接口名称 interface")
parser.add_argument("-f", "--file", help="用户密码文件路径", default="user_pwd.txt")
args = parser.parse_args()
main(args, warnings.warn, log)
Loginer(args.interface, log=log).main(args.file, warnings.warn)