diff --git a/get_xha_user_pwd.py b/get_xha_user_pwd.py index 66888b0..6f724a9 100644 --- a/get_xha_user_pwd.py +++ b/get_xha_user_pwd.py @@ -1,35 +1,21 @@ + +from enum import IntEnum, auto from urllib import request import json import random import subprocess -import syslog -import warnings - -def print(*a, sep=' '): syslog.syslog(sep.join(map(str, a))) - -import argparse -parser = argparse.ArgumentParser() -parser.add_argument("-i", "--interface", help="网卡接口名称 interface") -parser.add_argument("-f", "--file", help="用户密码文件路径", default="user_pwd.txt") -args = parser.parse_args() -interface = args.interface -file = args.file - -interface_def = interface is None def request_get_text(url, headers={}): req = request.Request(url, headers=headers) with request.urlopen(req) as response: return response.read().decode('utf-8') -def append_interface(command, pre, interface): - if not interface_def: - command += [pre, interface] - def check_connectivity(interface): + interface_def = interface is None if not interface_def: command = ['ping'] - append_interface(command, '-I', interface) + if not interface_def: + command += ["-I", interface] host = "223.5.5.5" command += ['-c', '1', host] # -n 1 on windows, -c 1 on linux try: @@ -50,28 +36,15 @@ def check_connectivity(interface): except: return False +class LoginStatus(IntEnum): + unknown = -3 + not_unlimit = -2 + bad_pwd = -1 + succ = auto() + used_online = auto() -device = 0 -if interface_def: - warnings.warn("no interface given by -i or --interface, use default route") - -# 检测网络联通性 -if check_connectivity(interface): - exit() - -with open(file, 'r') as f: - user_pwd = [i.rstrip('\n\r').split() for i in f.readlines()] - -user_pwd_error_or_not_unlimit_combo_idx = set() -# 检查是否付费 -not_succ = True -while not_succ: - # 随机选择,防止前面有密码错误的用户卡死 - user_index = random.randrange(len(user_pwd)) - if user_index in user_pwd_error_or_not_unlimit_combo_idx: - continue - - (user, pwd) = user_pwd[user_index] +def login(user, pwd, interface, log, device): + interface_def = interface is None url = f'http://192.168.101.201:801/eportal/portal/page/loadUserInfo?callback=dr1004&lang=zh-CN&program_index=ctshNw1713845951&page_index=V5fmKw1713845966&user_account={user}&wlan_user_ip=0.0.0.0&wlan_user_mac=000000000000&jsVersion=4.1&v=599&lang=zh' t = request_get_text(url, headers={ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36 Edg/105.0.1343.33', @@ -105,29 +78,89 @@ while not_succ: """ msg = "" if not interface_def: - print(f"使用接口 {interface} 进行请求") + log(f"使用接口 {interface} 进行请求") msg += f"接口 {interface}: " if '密码错误' in result['msg']: - print(f"{user} {pwd} 密码错误") - user_pwd_error_or_not_unlimit_combo_idx.add(user_index) + log(f"{user} {pwd} 密码错误") + return LoginStatus.bad_pwd elif '已经在线' in result['msg']: msg += "正常在线!" - not_succ = False + return LoginStatus.used_online elif '认证成功' in result['msg']: msg += f"使用账号{user}登录成功!" - not_succ = False - print(msg) + return LoginStatus.succ + log(msg) + return LoginStatus.unknown else: - user_pwd_error_or_not_unlimit_combo_idx.add(user_index) -# 删除密码错误或不满足要求的用户 + return LoginStatus.not_unlimit -with open(file, 'w') as f: - for i in range(len(user_pwd)): - if i in user_pwd_error_or_not_unlimit_combo_idx: - user_pwd_error_or_not_unlimit_combo_idx.remove(i) - else: - t = user_pwd[i] - f.write(t[0]) - f.write(' ') - f.write(t[1]) - f.write('\n') +def login_till_succ(user_pwd_gen, bad_user_callback=lambda u: None, interface=None, log=print): + ''' + user_pwd_gen will be called multiply times until login succeeds. + ''' + device = 0 # Final[int] + # 检查是否付费 + not_succ = True + while not_succ: + (user, pwd) = user_pwd_gen() + ret = login(user, pwd, interface, log, device) + not_succ = ret < 0 + + if ret in {LoginStatus.bad_pwd, LoginStatus.not_unlimit}: + bad_user_callback(user) + + +def main(args, warn, log): + interface = args.interface + file = args.file + + interface_def = interface is None + if interface_def: + warn("no interface given by -i or --interface, use default route") + + # 检测网络联通性 + if check_connectivity(interface): + exit() + + with open(file, 'r') as f: + user_pwd = [i.rstrip('\n\r').split() for i in f.readlines()] + + # login + user_pwd_error_or_not_unlimit_combo_idx = set() + cur_lku_user_idx = 0 + def user_pwd_getter(): + # 随机选择,防止前面有密码错误的用户卡死 + user_index = random.randrange(len(user_pwd)) + if user_index in user_pwd_error_or_not_unlimit_combo_idx: + return user_pwd_getter() + (user, pwd) = user_pwd[user_index] + return (user, pwd) + + def bad_user_callback(_): + user_pwd_error_or_not_unlimit_combo_idx.add(cur_lku_user_idx) + + login_till_succ(user_pwd_getter, bad_user_callback, interface=interface, log=log) + + + # 删除密码错误或不满足要求的用户 + with open(file, 'w') as f: + for i in range(len(user_pwd)): + if i in user_pwd_error_or_not_unlimit_combo_idx: + user_pwd_error_or_not_unlimit_combo_idx.remove(i) + else: + t = user_pwd[i] + f.write(t[0]) + f.write(' ') + f.write(t[1]) + f.write('\n') + +if __name__ == "__main__": + import argparse + import warnings + import syslog + def log(*a, sep=' '): syslog.syslog(sep.join(map(str, a))) + parser = argparse.ArgumentParser() + parser.add_argument("-i", "--interface", help="网卡接口名称 interface") + parser.add_argument("-f", "--file", help="用户密码文件路径", default="user_pwd.txt") + args = parser.parse_args() + main(args, warnings.warn, log)