1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
|
import base64
import json
import time
import uuid
from urllib.parse import urlencode, urljoin
import requests
import ddddocr
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad
from loguru import logger
def ocr_by_bytes(content: bytes) -> str:
"""识别验证码字节序列"""
ocr = ddddocr.DdddOcr(show_ad=False)
return ocr.classification(content)
def ocr_by_url(url: str) -> str:
"""识别验证码链接"""
image_bytes = requests.get(url, timeout=5).content
return ocr_by_bytes(image_bytes)
class Hisense:
def __init__(self) -> None:
self.username = None
self.password = None
self.uuid = str(uuid.uuid4())
self.host = "http://x.x.x.x:x/"
self.captcha_api = urljoin(self.host, "/gatewayserver/authcenter/code/create")
self.aes_key_api = urljoin(self.host, "/gatewayserver/authcenter/key/get")
self.login_api = urljoin(self.host, "/gatewayserver/authcenter/token/login")
self.session = requests.Session()
self.timeout = 30
self.session.headers.update(
{
"Accept": "application/json, text/plain, */*",
"Accept-Encoding": "gzip, deflate",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,zh-TW;q=0.7",
"appName": "undefined",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Cookie": "",
"funcCode": "home",
"Pragma": "no-cache",
"timestamp": str(int(time.time() * 1000)),
"token": "null",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36",
}
)
def aes_encrypt(self, plain_text: str, key: bytes) -> str:
"""AES加密,加密密码使用"""
if isinstance(key, str):
key = key.encode()
iv = key
cipher = AES.new(key, AES.MODE_CBC, iv)
padded_text = pad(plain_text.encode(), AES.block_size)
cipher_text = cipher.encrypt(padded_text)
return base64.b64encode(cipher_text).decode()
def get_aes_key(self) -> None:
"""从服务器获取 aes key"""
try:
aes_info = self.session.get(self.aes_key_api, timeout=self.timeout).json()
if aes_info.get("code") == 0:
self.token_key = aes_info["meta"]["tokenKey"]
self.user_key = aes_info["meta"]["userKey"]
logger.info(f"token_key:{self.token_key}, user_key:{self.user_key}")
else:
logger.error(f"aes响应码不为0: {aes_info}")
except Exception as e:
raise Exception("获取aes-key失败") from e
def get_captcha(self) -> None:
"""获取并识别验证码"""
params = {
"uuid": self.uuid,
}
query_string = urlencode(params)
captcha_url = f"{self.captcha_api}?{query_string}"
try:
captcha_code = ocr_by_url(captcha_url)
self.captcha_code = captcha_code
logger.info(f"已识别验证码: {captcha_code}")
except Exception as e:
raise Exception("验证码识别失败") from e
def login(self) -> bool:
"""自动登录,需要先设置 `self.username` 和 `self.password`"""
if not (self.username and self.password):
raise Exception("用户名或密码为空,请先使用 `Hisense.username=xxx` 和 `Hisense.password=xxx` 进行设置")
self.get_aes_key()
self.get_captcha()
encrypted_password = self.aes_encrypt(self.password, self.token_key)
payload = {
"code": self.captcha_code,
"key": self.uuid,
"loginInfo": self.username,
"pwd": encrypted_password,
"tenantCode": "",
"userKey": self.user_key,
}
try:
login_info = self.session.post(self.login_api, json=payload, timeout=self.timeout).json()
if login_info.get("code") == 0:
login_data = login_info["meta"]["data"]
login_token = login_info["meta"]["token"]
user_info = json.loads(login_data)
org_name = user_info["orgName"]
user_code = user_info["userCode"]
username = user_info["userName"]
self.session.headers.update({"appid": user_code})
self.session.headers.update({"token": login_token})
logger.info(f"登录成功,[{org_name}][{username}][{user_code}]")
return True
except Exception as e:
raise Exception("登录失败") from e
return False
if __name__ == "__main__":
hisense = Hisense()
hisense.username = "1234"
hisense.password = "1234"
hisense.login()
|