Files
shuidrop_gui/WebSocket/backend_singleton.py

508 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# backend_singleton.py
# 共享后端单连接客户端实例和WebSocket连接管理
from typing import Optional, Callable
import threading
import asyncio
from threading import Thread
# 创建新的后端客户端
from WebSocket.BackendClient import BackendClient
from Utils.JD.JdUtils import JDListenerForGUI as JDListenerForGUI_WS
from Utils.Dy.DyUtils import DouYinListenerForGUI as DYListenerForGUI_WS
from Utils.Pdd.PddUtils import PddListenerForGUI as PDDListenerForGUI_WS
from Utils.QianNiu.QianNiuUtils import QianNiuListenerForGUI as QNListenerForGUI_WS
_backend_client = None
def set_backend_client(client: BackendClient) -> None:
global _backend_client
_backend_client = client
def get_backend_client() -> Optional[BackendClient]:
return _backend_client
class WebSocketManager:
"""WebSocket连接管理器统一处理后端连接和平台监听"""
def __init__(self):
self.backend_client = None
self.platform_listeners = {} # 存储各平台的监听器
self.callbacks = {
'log': None,
'success': None,
'error': None
}
def set_callbacks(self, log: Callable = None, success: Callable = None, error: Callable = None):
"""设置回调函数"""
if log:
self.callbacks['log'] = log
if success:
self.callbacks['success'] = success
if error:
self.callbacks['error'] = error
def _log(self, message: str, level: str = "INFO"):
"""内部日志方法"""
if self.callbacks['log']:
self.callbacks['log'](message, level)
else:
print(f"[{level}] {message}")
def connect_backend(self, token: str) -> bool:
"""连接后端WebSocket"""
try:
# 1 保存token到配置
try:
from config import set_saved_token
set_saved_token(token)
except Exception:
pass
# 2 获取或创建后端客户端
backend = get_backend_client()
if backend:
# 3 如果有客户端更新token并重连
backend.set_token(token)
# 设置回调函数
def _on_backend_success():
try:
self._log("连接服务成功", "SUCCESS")
if self.callbacks['success']:
self.callbacks['success']()
except Exception as e:
self._log(f"成功回调执行失败: {e}", "ERROR")
def _on_backend_login(platform_name: str, store_id: str, cookies: str):
self._log(
f"收到后端登录指令: 平台={platform_name}, 店铺={store_id}, cookies_len={len(cookies) if cookies else 0}",
"INFO")
self._handle_platform_login(platform_name, store_id, cookies)
backend.set_callbacks(success=_on_backend_success, login=_on_backend_login)
if not backend.is_connected:
backend.connect()
self.backend_client = backend
self._log("令牌已提交已连接后端。等待后端下发平台cookies后自动连接平台...", "SUCCESS")
return True
else:
backend = BackendClient.from_exe_token(token)
def _on_backend_login(platform_name: str, store_id: str, cookies: str):
self._log(
f"收到后端登录指令: 平台={platform_name}, 店铺={store_id}, cookies_len={len(cookies) if cookies else 0}",
"INFO")
self._handle_platform_login(platform_name, store_id, cookies)
def _on_backend_success():
try:
self._log("连接服务成功", "SUCCESS")
if self.callbacks['success']:
self.callbacks['success']()
except Exception as e:
self._log(f"成功回调执行失败: {e}", "ERROR")
backend.set_callbacks(login=_on_backend_login, success=_on_backend_success)
backend.connect()
set_backend_client(backend)
self.backend_client = backend
self._log("已创建后端客户端并连接。等待后端下发平台cookies...", "SUCCESS")
return True
except Exception as e:
self._log(f"连接后端失败: {e}", "ERROR")
if self.callbacks['error']:
self.callbacks['error'](str(e))
return False
def _handle_platform_login(self, platform_name: str, store_id: str, cookies: str):
"""处理平台登录请求"""
try:
# 平台名称映射
platform_map = {
"淘宝": "千牛",
"QIANNIU": "千牛",
"京东": "京东",
"JD": "京东",
"抖音": "抖音",
"DY": "抖音",
"DOUYIN": "抖音",
"拼多多": "拼多多",
"PDD": "拼多多",
"PINDUODUO": "拼多多"
}
# 标准化平台名称
normalized_platform = platform_map.get(platform_name.upper(), platform_name)
self._log(f"处理平台登录: {platform_name} -> {normalized_platform}, 店铺={store_id}", "INFO")
# 🔧 关键修改:确保每个平台都能独立处理自己的登录请求
if normalized_platform == "千牛":
if cookies == "login_success":
self._log("⚠️ 千牛平台收到空cookies但允许启动监听器", "WARNING")
cookies = "" # 清空cookies千牛不需要真实cookies
self._start_qianniu_listener(store_id, cookies)
elif normalized_platform == "京东":
self._start_jd_listener(store_id, cookies)
elif normalized_platform == "抖音":
self._start_douyin_listener(store_id, cookies)
elif normalized_platform == "拼多多":
self._start_pdd_listener(store_id, cookies)
else:
self._log(f"❌ 不支持的平台: {platform_name}", "ERROR")
except Exception as e:
self._log(f"处理平台登录失败: {e}", "ERROR")
import traceback
self._log(f"错误详情: {traceback.format_exc()}", "DEBUG")
def _start_jd_listener(self, store_id: str, cookies: str):
"""启动京东平台监听"""
try:
def _runner():
try:
listener = JDListenerForGUI_WS()
asyncio.run(listener.start_with_cookies(store_id, cookies))
except Exception as e:
self._log(f"京东监听器运行异常: {e}", "ERROR")
# 在新线程中启动监听器
thread = Thread(target=_runner, daemon=True)
thread.start()
# 保存监听器引用
self.platform_listeners[f"京东:{store_id}"] = {
'thread': thread,
'platform': '京东',
'store_id': store_id
}
# 上报连接状态给后端
if self.backend_client:
try:
self.backend_client.send_message({
"type": "connect_message",
"store_id": store_id,
"status": True
})
except Exception as e:
self._log(f"上报连接状态失败: {e}", "WARNING")
self._log("已启动京东平台监听", "SUCCESS")
except Exception as e:
self._log(f"启动京东平台监听失败: {e}", "ERROR")
# 确保失败时也上报状态
if self.backend_client:
try:
self.backend_client.send_message({
"type": "connect_message",
"store_id": store_id,
"status": False
})
except Exception as send_e:
self._log(f"失败状态下报连接状态也失败: {send_e}", "ERROR")
def _start_douyin_listener(self, store_id: str, cookies: str):
"""启动抖音平台监听"""
try:
def _runner():
try:
import json
listener = DYListenerForGUI_WS()
# 将JSON字符串格式的cookies解析为字典
try:
cookie_dict = json.loads(cookies) if isinstance(cookies, str) else cookies
except json.JSONDecodeError as e:
self._log(f"❌ Cookie JSON解析失败: {e}", "ERROR")
return False
result = asyncio.run(listener.start_with_cookies(store_id=store_id, cookie_dict=cookie_dict))
return result
except Exception as e:
self._log(f"抖音监听器运行异常: {e}", "ERROR")
return False
# 在新线程中启动监听器
thread = Thread(target=_runner, daemon=True)
thread.start()
# 保存监听器引用
self.platform_listeners[f"抖音:{store_id}"] = {
'thread': thread,
'platform': '抖音',
'store_id': store_id,
}
# 更新监听器状态
if f"抖音:{store_id}" in self.platform_listeners:
self.platform_listeners[f"抖音:{store_id}"]['status'] = 'success'
# 上报连接状态给后端
if self.backend_client:
try:
self.backend_client.send_message({
"type": "connect_message",
"store_id": store_id,
"status": True
})
self._log("已上报抖音平台连接状态: 成功", "INFO")
except Exception as e:
self._log(f"上报抖音平台连接状态失败: {e}", "WARNING")
self._log("已启动抖音平台监听", "SUCCESS")
except Exception as e:
self._log(f"启动抖音平台监听失败: {e}", "ERROR")
# 确保失败时也上报状态
if self.backend_client:
try:
self.backend_client.send_message({
"type": "connect_message",
"store_id": store_id,
"status": False
})
except Exception as send_e:
self._log(f"失败状态下报抖音平台连接状态也失败: {send_e}", "ERROR")
def _start_qianniu_listener(self, store_id: str, cookies: str):
"""启动千牛平台监听(单连接多店铺架构)"""
try:
# 获取用户token从后端客户端获取
exe_token = None
if self.backend_client:
exe_token = getattr(self.backend_client, 'token', None)
if not exe_token:
self._log("❌ 缺少exe_token无法启动千牛监听器", "ERROR")
return
def _runner():
try:
listener = QNListenerForGUI_WS()
# 千牛平台使用新架构传递store_id和exe_token
asyncio.run(listener.start_listening_with_store(store_id, exe_token))
except Exception as e:
self._log(f"千牛监听器运行异常: {e}", "ERROR")
# 在新线程中启动监听器
thread = Thread(target=_runner, daemon=True)
thread.start()
# 保存监听器引用
self.platform_listeners[f"千牛:{store_id}"] = {
'thread': thread,
'platform': '千牛',
'store_id': store_id,
'exe_token': exe_token
}
# 上报连接状态给后端
if self.backend_client:
try:
self.backend_client.send_message({
"type": "connect_message",
"store_id": store_id,
"status": True
})
except Exception as e:
self._log(f"上报连接状态失败: {e}", "WARNING")
self._log("已启动千牛平台监听(单连接多店铺架构)", "SUCCESS")
except Exception as e:
self._log(f"启动千牛平台监听失败: {e}", "ERROR")
# 确保失败时也上报状态
if self.backend_client:
try:
self.backend_client.send_message({
"type": "connect_message",
"store_id": store_id,
"status": False
})
except Exception as send_e:
self._log(f"失败状态下报千牛平台连接状态也失败: {send_e}", "ERROR")
def _start_pdd_listener(self, store_id: str, data: str):
"""启动拼多多平台监听"""
try:
def _runner():
try:
self._log("🚀 开始创建拼多多监听器实例", "DEBUG")
listener = PDDListenerForGUI_WS(log_callback=self._log)
self._log("✅ 拼多多监听器实例创建成功", "DEBUG")
# 判断是登录参数还是Cookie
if self._is_pdd_login_params(data):
# 使用登录参数启动
self._log("📋 使用登录参数启动拼多多监听器", "INFO")
self._log("🔄 开始执行 start_with_login_params", "DEBUG")
result = asyncio.run(listener.start_with_login_params(store_id=store_id, login_params=data))
self._log(f"📊 start_with_login_params 执行结果: {result}", "DEBUG")
# 详细的结果分析
if result == "need_verification_code":
self._log("✅ [PDD] 登录流程正常,已发送验证码需求通知给后端", "SUCCESS")
elif result == "verification_code_error":
self._log("⚠️ [PDD] 验证码错误,已发送错误通知给后端", "WARNING")
elif result:
self._log("✅ [PDD] 登录成功,平台连接已建立", "SUCCESS")
else:
self._log("❌ [PDD] 登录失败", "ERROR")
else:
# 使用Cookie启动兼容旧方式
self._log("🍪 使用Cookie启动拼多多监听器", "INFO")
self._log("🔄 开始执行 start_with_cookies", "DEBUG")
result = asyncio.run(listener.start_with_cookies(store_id=store_id, cookies=data))
self._log(f"📊 start_with_cookies 执行结果: {result}", "DEBUG")
# 根据实际登录结果上报状态给后端
if self.backend_client and result not in ["need_verification_code", "verification_code_error", "login_failure"]:
# 如果是特殊状态说明通知已经在PddLogin中发送了不需要重复发送
try:
message = {
"type": "connect_message",
"store_id": store_id,
"status": bool(result)
}
self.backend_client.send_message(message)
status_text = "成功" if result else "失败"
self._log(f"上报拼多多平台连接状态{status_text}: {message}", "SUCCESS" if result else "ERROR")
except Exception as send_e:
self._log(f"上报拼多多平台连接状态失败: {send_e}", "ERROR")
elif result == "need_verification_code":
self._log("需要验证码验证码通知已由PddLogin发送等待后端重新下发登录参数", "INFO")
elif result == "verification_code_error":
self._log("验证码错误错误通知已由PddLogin发送等待后端处理", "INFO")
elif result == "login_failure":
self._log("登录失败失败通知已由PddLogin发送等待后端处理", "INFO")
return result
except Exception as e:
self._log(f"拼多多监听器运行异常: {e}", "ERROR")
import traceback
self._log(f"异常详情: {traceback.format_exc()}", "DEBUG")
# 异常情况下上报失败状态
if self.backend_client:
try:
# 截取异常信息前100个字符避免消息过长
error_msg = str(e)[:100] if len(str(e)) > 100 else str(e)
message = {
"type": "connect_message",
"store_id": store_id,
"status": False,
"content": f"登录异常: {error_msg}"
}
self.backend_client.send_message(message)
self._log(f"异常情况下上报拼多多平台连接失败状态: {message}", "ERROR")
except Exception as send_e:
self._log(f"异常情况下上报状态也失败: {send_e}", "ERROR")
return False
# 在新线程中启动监听器
thread = Thread(target=_runner, daemon=True)
thread.start()
# 保存监听器引用
self.platform_listeners[f"拼多多:{store_id}"] = {
'thread': thread,
'platform': '拼多多',
'store_id': store_id,
}
self._log("拼多多平台监听线程已启动,等待登录结果...", "INFO")
except Exception as e:
self._log(f"启动拼多多平台监听失败: {e}", "ERROR")
# 确保失败时也上报状态
if self.backend_client:
try:
self.backend_client.send_message({
"type": "connect_message",
"store_id": store_id,
"status": False
})
self._log(f"启动失败情况下上报拼多多平台连接状态失败", "ERROR")
except Exception as send_e:
self._log(f"启动失败情况下上报状态也失败: {send_e}", "ERROR")
def _is_pdd_login_params(self, data: str) -> bool:
"""判断是否为拼多多登录参数"""
try:
self._log(f"🔍 [DEBUG] 检查是否为登录参数,数据长度: {len(data)}", "DEBUG")
self._log(f"🔍 [DEBUG] 数据前100字符: {data[:100]}", "DEBUG")
import json
parsed_data = json.loads(data)
self._log(f"🔍 [DEBUG] JSON解析成功键: {list(parsed_data.keys())}", "DEBUG")
login_params = parsed_data.get("data", {}).get("login_params", {})
self._log(f"🔍 [DEBUG] login_params存在: {bool(login_params)}", "DEBUG")
if not login_params:
self._log("🔍 [DEBUG] login_params为空返回False", "DEBUG")
return False
# 检查必需的登录参数字段
required_fields = ["username", "password", "anti_content", "risk_sign", "timestamp"]
has_all_fields = all(field in login_params for field in required_fields)
self._log(f"🔍 [DEBUG] 包含所有必需字段: {has_all_fields}", "DEBUG")
self._log(f"🔍 [DEBUG] 现有字段: {list(login_params.keys())}", "DEBUG")
return has_all_fields
except Exception as e:
self._log(f"🔍 [DEBUG] 解析失败: {e}", "DEBUG")
return False
def send_message(self, message: dict):
"""发送消息到后端"""
if self.backend_client and self.backend_client.is_connected:
return self.backend_client.send_message(message)
else:
raise Exception("后端未连接")
def disconnect_all(self):
"""断开所有连接"""
try:
# 断开后端连接
if self.backend_client:
self.backend_client.disconnect()
self.backend_client = None
# 清理平台监听器
self.platform_listeners.clear()
self._log("所有连接已断开", "INFO")
except Exception as e:
self._log(f"断开连接时发生错误: {e}", "ERROR")
# 全局WebSocket管理器实例
_websocket_manager = None
def get_websocket_manager() -> WebSocketManager:
"""获取全局WebSocket管理器实例"""
global _websocket_manager
if _websocket_manager is None:
_websocket_manager = WebSocketManager()
return _websocket_manager