refact: extract func login_till_succ, login
This commit is contained in:
+82
-49
@@ -1,35 +1,21 @@
|
||||
|
||||
from enum import IntEnum, auto
|
||||
from urllib import request
|
||||
import json
|
||||
import random
|
||||
import subprocess
|
||||
import syslog
|
||||
import warnings
|
||||
|
||||
def print(*a, sep=' '): syslog.syslog(sep.join(map(str, a)))
|
||||
|
||||
import argparse
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("-i", "--interface", help="网卡接口名称 interface")
|
||||
parser.add_argument("-f", "--file", help="用户密码文件路径", default="user_pwd.txt")
|
||||
args = parser.parse_args()
|
||||
interface = args.interface
|
||||
file = args.file
|
||||
|
||||
interface_def = interface is None
|
||||
|
||||
def request_get_text(url, headers={}):
|
||||
req = request.Request(url, headers=headers)
|
||||
with request.urlopen(req) as response:
|
||||
return response.read().decode('utf-8')
|
||||
|
||||
def append_interface(command, pre, interface):
|
||||
if not interface_def:
|
||||
command += [pre, interface]
|
||||
|
||||
def check_connectivity(interface):
|
||||
interface_def = interface is None
|
||||
if not interface_def:
|
||||
command = ['ping']
|
||||
append_interface(command, '-I', interface)
|
||||
if not interface_def:
|
||||
command += ["-I", interface]
|
||||
host = "223.5.5.5"
|
||||
command += ['-c', '1', host] # -n 1 on windows, -c 1 on linux
|
||||
try:
|
||||
@@ -50,28 +36,15 @@ def check_connectivity(interface):
|
||||
except:
|
||||
return False
|
||||
|
||||
class LoginStatus(IntEnum):
|
||||
unknown = -3
|
||||
not_unlimit = -2
|
||||
bad_pwd = -1
|
||||
succ = auto()
|
||||
used_online = auto()
|
||||
|
||||
device = 0
|
||||
if interface_def:
|
||||
warnings.warn("no interface given by -i or --interface, use default route")
|
||||
|
||||
# 检测网络联通性
|
||||
if check_connectivity(interface):
|
||||
exit()
|
||||
|
||||
with open(file, 'r') as f:
|
||||
user_pwd = [i.rstrip('\n\r').split() for i in f.readlines()]
|
||||
|
||||
user_pwd_error_or_not_unlimit_combo_idx = set()
|
||||
# 检查是否付费
|
||||
not_succ = True
|
||||
while not_succ:
|
||||
# 随机选择,防止前面有密码错误的用户卡死
|
||||
user_index = random.randrange(len(user_pwd))
|
||||
if user_index in user_pwd_error_or_not_unlimit_combo_idx:
|
||||
continue
|
||||
|
||||
(user, pwd) = user_pwd[user_index]
|
||||
def login(user, pwd, interface, log, device):
|
||||
interface_def = interface is None
|
||||
url = f'http://192.168.101.201:801/eportal/portal/page/loadUserInfo?callback=dr1004&lang=zh-CN&program_index=ctshNw1713845951&page_index=V5fmKw1713845966&user_account={user}&wlan_user_ip=0.0.0.0&wlan_user_mac=000000000000&jsVersion=4.1&v=599&lang=zh'
|
||||
t = request_get_text(url, headers={
|
||||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36 Edg/105.0.1343.33',
|
||||
@@ -105,23 +78,72 @@ while not_succ:
|
||||
"""
|
||||
msg = ""
|
||||
if not interface_def:
|
||||
print(f"使用接口 {interface} 进行请求")
|
||||
log(f"使用接口 {interface} 进行请求")
|
||||
msg += f"接口 {interface}: "
|
||||
if '密码错误' in result['msg']:
|
||||
print(f"{user} {pwd} 密码错误")
|
||||
user_pwd_error_or_not_unlimit_combo_idx.add(user_index)
|
||||
log(f"{user} {pwd} 密码错误")
|
||||
return LoginStatus.bad_pwd
|
||||
elif '已经在线' in result['msg']:
|
||||
msg += "正常在线!"
|
||||
not_succ = False
|
||||
return LoginStatus.used_online
|
||||
elif '认证成功' in result['msg']:
|
||||
msg += f"使用账号{user}登录成功!"
|
||||
not_succ = False
|
||||
print(msg)
|
||||
return LoginStatus.succ
|
||||
log(msg)
|
||||
return LoginStatus.unknown
|
||||
else:
|
||||
user_pwd_error_or_not_unlimit_combo_idx.add(user_index)
|
||||
# 删除密码错误或不满足要求的用户
|
||||
return LoginStatus.not_unlimit
|
||||
|
||||
with open(file, 'w') as f:
|
||||
def login_till_succ(user_pwd_gen, bad_user_callback=lambda u: None, interface=None, log=print):
|
||||
'''
|
||||
user_pwd_gen will be called multiply times until login succeeds.
|
||||
'''
|
||||
device = 0 # Final[int]
|
||||
# 检查是否付费
|
||||
not_succ = True
|
||||
while not_succ:
|
||||
(user, pwd) = user_pwd_gen()
|
||||
ret = login(user, pwd, interface, log, device)
|
||||
not_succ = ret < 0
|
||||
|
||||
if ret in {LoginStatus.bad_pwd, LoginStatus.not_unlimit}:
|
||||
bad_user_callback(user)
|
||||
|
||||
|
||||
def main(args, warn, log):
|
||||
interface = args.interface
|
||||
file = args.file
|
||||
|
||||
interface_def = interface is None
|
||||
if interface_def:
|
||||
warn("no interface given by -i or --interface, use default route")
|
||||
|
||||
# 检测网络联通性
|
||||
if check_connectivity(interface):
|
||||
exit()
|
||||
|
||||
with open(file, 'r') as f:
|
||||
user_pwd = [i.rstrip('\n\r').split() for i in f.readlines()]
|
||||
|
||||
# login
|
||||
user_pwd_error_or_not_unlimit_combo_idx = set()
|
||||
cur_lku_user_idx = 0
|
||||
def user_pwd_getter():
|
||||
# 随机选择,防止前面有密码错误的用户卡死
|
||||
user_index = random.randrange(len(user_pwd))
|
||||
if user_index in user_pwd_error_or_not_unlimit_combo_idx:
|
||||
return user_pwd_getter()
|
||||
(user, pwd) = user_pwd[user_index]
|
||||
return (user, pwd)
|
||||
|
||||
def bad_user_callback(_):
|
||||
user_pwd_error_or_not_unlimit_combo_idx.add(cur_lku_user_idx)
|
||||
|
||||
login_till_succ(user_pwd_getter, bad_user_callback, interface=interface, log=log)
|
||||
|
||||
|
||||
# 删除密码错误或不满足要求的用户
|
||||
with open(file, 'w') as f:
|
||||
for i in range(len(user_pwd)):
|
||||
if i in user_pwd_error_or_not_unlimit_combo_idx:
|
||||
user_pwd_error_or_not_unlimit_combo_idx.remove(i)
|
||||
@@ -131,3 +153,14 @@ with open(file, 'w') as f:
|
||||
f.write(' ')
|
||||
f.write(t[1])
|
||||
f.write('\n')
|
||||
|
||||
if __name__ == "__main__":
|
||||
import argparse
|
||||
import warnings
|
||||
import syslog
|
||||
def log(*a, sep=' '): syslog.syslog(sep.join(map(str, a)))
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("-i", "--interface", help="网卡接口名称 interface")
|
||||
parser.add_argument("-f", "--file", help="用户密码文件路径", default="user_pwd.txt")
|
||||
args = parser.parse_args()
|
||||
main(args, warnings.warn, log)
|
||||
|
||||
Reference in New Issue
Block a user