Files
shuidrop_gui/WebSocket/backend_singleton.py

657 lines
30 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# backend_singleton.py
# 共享后端单连接客户端实例和WebSocket连接管理
from typing import Optional, Callable
import threading
import asyncio
from threading import Thread
from PyQt5.QtCore import QObject, pyqtSignal
# 创建新的后端客户端
from WebSocket.BackendClient import BackendClient
from Utils.JD.JdUtils import JDListenerForGUI as JDListenerForGUI_WS
from Utils.Dy.DyUtils import DouYinListenerForGUI as DYListenerForGUI_WS
from Utils.Pdd.PddUtils import PddListenerForGUI as PDDListenerForGUI_WS
from Utils.QianNiu.QianNiuUtils import QianNiuListenerForGUI as QNListenerForGUI_WS
class PlatformConnectionSignals(QObject):
"""平台连接信号(线程安全)"""
platform_connected = pyqtSignal(str, str) # (platform_name, store_id)
_backend_client = None
def set_backend_client(client: BackendClient) -> None:
global _backend_client
_backend_client = client
def get_backend_client() -> Optional[BackendClient]:
return _backend_client
class WebSocketManager:
"""WebSocket连接管理器统一处理后端连接和平台监听"""
def __init__(self):
# 后端客户端
self.backend_client = None
self.is_connected = False
# 版本检查器
self.version_checker = None
self.gui_update_callback = None
self.platform_listeners = {} # 存储各平台的监听器
self.connected_platforms = [] # 存储已连接的平台列表 # <- 新增
# 平台连接信号(线程安全)
self.platform_signals = PlatformConnectionSignals()
self.platform_signals.platform_connected.connect(self._on_platform_signal_received)
self.callbacks = {
'log': None,
'success': None,
'error': None,
'platform_connected': None,
'token_error': None,
}
def set_callbacks(self, log: Callable = None, success: Callable = None, error: Callable = None,
platform_connected: Callable = None, token_error: Callable = None):
"""设置回调函数"""
if log:
self.callbacks['log'] = log
if success:
self.callbacks['success'] = success
if error:
self.callbacks['error'] = error
if platform_connected: # ← 新增
self.callbacks['platform_connected'] = platform_connected
if token_error:
self.callbacks['token_error'] = token_error
def _log(self, message: str, level: str = "INFO"):
"""内部日志方法"""
if self.callbacks['log']:
self.callbacks['log'](message, level)
else:
print(f"[{level}] {message}")
def _on_platform_signal_received(self, platform_name: str, store_id: str):
"""接收平台连接信号(在主线程中执行)"""
try:
self._log(f"📡 收到平台连接信号: {platform_name} (店铺:{store_id})", "INFO")
self._notify_platform_connected(platform_name)
except Exception as e:
self._log(f"处理平台连接信号失败: {e}", "ERROR")
def _notify_platform_connected(self, platform_name: str):
"""通知GUI平台连接成功仅在主线程中调用"""
try:
if platform_name not in self.connected_platforms:
self.connected_platforms.append(platform_name)
if self.callbacks['platform_connected']:
self.callbacks['platform_connected'](platform_name, self.connected_platforms.copy())
self._log(f"已通知GUI平台连接: {platform_name}", "INFO")
except Exception as e:
self._log(f"通知平台连接失败: {e}", "ERROR")
def connect_backend(self, token: str) -> bool:
"""连接后端WebSocket"""
try:
# 1 保存token到配置
try:
from config import set_saved_token
set_saved_token(token)
except Exception:
pass
# 2 获取或创建后端客户端
backend = get_backend_client()
if backend:
# 检查现有客户端是否因token错误而停止
if backend.should_stop:
self._log("检测到客户端因token错误已停止创建新的客户端", "INFO")
# 断开旧客户端
backend.disconnect()
# 清除旧客户端引用
set_backend_client(None)
backend = None
else:
# 3 如果有客户端更新token并重连
backend.set_token(token)
# 设置回调函数
def _on_backend_success():
try:
self._log("连接服务成功", "SUCCESS")
if self.callbacks['success']:
self.callbacks['success']()
# 启动版本检查器
self._start_version_checker()
except Exception as e:
self._log(f"成功回调执行失败: {e}", "ERROR")
def _on_backend_login(platform_name: str, store_id: str, cookies: str):
self._log(
f"收到后端登录指令: 平台={platform_name}, 店铺={store_id}, cookies_len={len(cookies) if cookies else 0}",
"INFO")
self._handle_platform_login(platform_name, store_id, cookies)
def _on_token_error(error_content: str):
self._log(f"Token验证失败: {error_content}", "ERROR")
if self.callbacks['token_error']:
self.callbacks['token_error'](error_content)
backend.set_callbacks(success=_on_backend_success, login=_on_backend_login,
token_error=_on_token_error)
if not backend.is_connected:
backend.connect()
self.backend_client = backend
self._log("令牌已提交已连接后端。等待后端下发平台cookies后自动连接平台...", "SUCCESS")
return True
# 如果没有现有客户端或客户端被重置,创建新的
if not backend:
backend = BackendClient.from_exe_token(token)
def _on_backend_login(platform_name: str, store_id: str, cookies: str):
self._log(
f"收到后端登录指令: 平台={platform_name}, 店铺={store_id}, cookies_len={len(cookies) if cookies else 0}",
"INFO")
self._handle_platform_login(platform_name, store_id, cookies)
def _on_backend_success():
try:
self._log("连接服务成功", "SUCCESS")
if self.callbacks['success']:
self.callbacks['success']()
# 启动版本检查器
self._start_version_checker()
except Exception as e:
self._log(f"成功回调执行失败: {e}", "ERROR")
def _on_token_error(error_content: str):
self._log(f"Token验证失败: {error_content}", "ERROR")
if self.callbacks['token_error']:
self.callbacks['token_error'](error_content)
backend.set_callbacks(login=_on_backend_login, success=_on_backend_success, token_error=_on_token_error)
backend.connect()
set_backend_client(backend)
self.backend_client = backend
self._log("已创建后端客户端并连接。等待后端下发平台cookies...", "SUCCESS")
return True
except Exception as e:
self._log(f"连接后端失败: {e}", "ERROR")
if self.callbacks['error']:
self.callbacks['error'](str(e))
return False
def _handle_platform_login(self, platform_name: str, store_id: str, cookies: str):
"""处理平台登录请求"""
try:
# 🔥 检查并清理当前店铺的旧连接
store_key_pattern = f":{store_id}" # 匹配 "平台名:store_id" 格式
keys_to_remove = [key for key in self.platform_listeners.keys() if key.endswith(store_key_pattern)]
if keys_to_remove:
self._log(f"🔄 检测到店铺 {store_id} 重连,清理 {len(keys_to_remove)} 个旧连接", "INFO")
for key in keys_to_remove:
self.platform_listeners.pop(key, None)
# 平台名称映射
platform_map = {
"淘宝": "千牛",
"QIANNIU": "千牛",
"京东": "京东",
"JD": "京东",
"抖音": "抖音",
"DY": "抖音",
"DOUYIN": "抖音",
"拼多多": "拼多多",
"PDD": "拼多多",
"PINDUODUO": "拼多多"
}
# 标准化平台名称
normalized_platform = platform_map.get(platform_name.upper(), platform_name)
self._log(f"处理平台登录: {platform_name} -> {normalized_platform}, 店铺={store_id}", "INFO")
# 🔧 关键修改:确保每个平台都能独立处理自己的登录请求
if normalized_platform == "千牛":
if cookies == "login_success":
self._log("⚠️ 千牛平台收到空cookies但允许启动监听器", "WARNING")
cookies = "" # 清空cookies千牛不需要真实cookies
self._start_qianniu_listener(store_id, cookies)
elif normalized_platform == "京东":
self._start_jd_listener(store_id, cookies)
elif normalized_platform == "抖音":
self._start_douyin_listener(store_id, cookies)
elif normalized_platform == "拼多多":
self._start_pdd_listener(store_id, cookies)
else:
self._log(f"❌ 不支持的平台: {platform_name}", "ERROR")
except Exception as e:
self._log(f"处理平台登录失败: {e}", "ERROR")
import traceback
self._log(f"错误详情: {traceback.format_exc()}", "DEBUG")
def _start_version_checker(self):
"""启动版本检查器"""
try:
from version_checker import VersionChecker
self.version_checker = VersionChecker(
backend_client=self.backend_client,
update_callback=self._on_update_available
)
self.backend_client.version_callback = self.version_checker.handle_version_response
self.version_checker.start()
self._log("✅ 版本检查器已启动将每10分钟检查一次更新", "SUCCESS")
except Exception as e:
self._log(f"❌ 启动版本检查器失败: {e}", "ERROR")
def _on_update_available(self, latest_version, download_url):
"""发现新版本时的处理(在子线程中调用)"""
self._log(f"🔔 发现新版本 {latest_version}", "INFO")
# 通知主GUI显示更新提醒通过 Qt 信号机制,线程安全)
if hasattr(self, 'gui_update_callback') and self.gui_update_callback:
try:
# 直接调用回调(回调内部使用信号机制调度到主线程)
self.gui_update_callback(latest_version, download_url)
self._log(f"✅ 已调用更新回调", "DEBUG")
except Exception as e:
self._log(f"❌ 调用更新回调失败: {e}", "ERROR")
import traceback
self._log(f"详细错误: {traceback.format_exc()}", "ERROR")
def _start_jd_listener(self, store_id: str, cookies: str):
"""启动京东平台监听"""
try:
def _runner():
try:
listener = JDListenerForGUI_WS()
asyncio.run(listener.start_with_cookies(store_id, cookies))
except Exception as e:
self._log(f"京东监听器运行异常: {e}", "ERROR")
# 在新线程中启动监听器
thread = Thread(target=_runner, daemon=True)
thread.start()
# 保存监听器引用
self.platform_listeners[f"京东:{store_id}"] = {
'thread': thread,
'platform': '京东',
'store_id': store_id
}
# 上报连接状态给后端
if self.backend_client:
try:
self.backend_client.send_message({
"type": "connect_message",
"store_id": store_id,
"status": True
})
except Exception as e:
self._log(f"上报连接状态失败: {e}", "WARNING")
self._log("已启动京东平台监听", "SUCCESS")
self._notify_platform_connected("京东") # ← 新增
except Exception as e:
self._log(f"启动京东平台监听失败: {e}", "ERROR")
# 确保失败时也上报状态
if self.backend_client:
try:
self.backend_client.send_message({
"type": "connect_message",
"store_id": store_id,
"status": False
})
except Exception as send_e:
self._log(f"失败状态下报连接状态也失败: {send_e}", "ERROR")
def _start_douyin_listener(self, store_id: str, cookies: str):
"""启动抖音平台监听"""
try:
def _runner():
try:
import json
self._log("🚀 开始创建抖音监听器实例", "DEBUG")
listener = DYListenerForGUI_WS()
self._log("✅ 抖音监听器实例创建成功", "DEBUG")
# 🔥 检查是否为登录参数模式(与拼多多保持一致)
if cookies and ('"login_flow"' in cookies or '"phone_number"' in cookies):
# 使用登录参数模式
self._log("📋 使用登录参数启动抖音监听器", "INFO")
self._log("🔄 开始执行 start_with_login_params", "DEBUG")
result = asyncio.run(listener.start_with_login_params(store_id=store_id, login_params=cookies))
self._log(f"📊 start_with_login_params 执行结果: {result}", "DEBUG")
# 详细的结果分析仅日志记录GUI 已在主线程中通知)
if result == "need_verification_code":
self._log("✅ [DY] 登录流程正常,已发送验证码需求通知给后端", "SUCCESS")
elif result == "verification_code_error":
self._log("⚠️ [DY] 验证码错误,已发送错误通知给后端", "WARNING")
elif result:
self._log("✅ [DY] 登录成功,平台连接已建立", "SUCCESS")
else:
self._log("❌ [DY] 登录失败", "ERROR")
else:
# 传统cookie模式保留兼容性
self._log("🍪 使用Cookie启动抖音监听器", "INFO")
self._log("🔄 开始执行 start_with_cookies", "DEBUG")
try:
# 🔥 修复尝试JSON解析失败时用ast.literal_eval解析Python字典字符串
if isinstance(cookies, str):
try:
cookie_dict = json.loads(cookies)
except json.JSONDecodeError:
# 后端发送的是Python字典字符串格式使用ast.literal_eval
import ast
cookie_dict = ast.literal_eval(cookies)
self._log("✅ 使用ast.literal_eval成功解析cookies", "DEBUG")
else:
cookie_dict = cookies
except (json.JSONDecodeError, ValueError, SyntaxError) as e:
self._log(f"❌ Cookie解析失败: {e}", "ERROR")
return False
result = asyncio.run(listener.start_with_cookies(store_id=store_id, cookie_dict=cookie_dict))
self._log(f"📊 start_with_cookies 执行结果: {result}", "DEBUG")
# Cookie启动成功时记录日志GUI 已在主线程中通知)
if result:
self._log("✅ [DY] Cookie启动成功平台连接已建立", "SUCCESS")
# 🔥 移除不再在backend_singleton中发送connect_message
# 抖音的连接状态报告应该在DyUtils中的DyLogin类中发送与拼多多保持一致
# 所有特殊状态通知都已经在DyLogin中发送过了这里不需要重复发送
return result
except Exception as e:
self._log(f"抖音监听器运行异常: {e}", "ERROR")
return False
# 在新线程中启动监听器
thread = Thread(target=_runner, daemon=True)
thread.start()
# 保存监听器引用
self.platform_listeners[f"抖音:{store_id}"] = {
'thread': thread,
'platform': '抖音',
'store_id': store_id,
}
# 更新监听器状态
if f"抖音:{store_id}" in self.platform_listeners:
self.platform_listeners[f"抖音:{store_id}"]['status'] = 'success'
# ✅ 临时方案:启动后立即通知(因为 DY 监听器也会阻塞)
# DY 内部会处理验证码流程,失败时会向后端上报相应状态
self._log("已启动抖音平台监听", "SUCCESS")
self._notify_platform_connected("抖音") # ← 立即通知
except Exception as e:
self._log(f"启动抖音平台监听失败: {e}", "ERROR")
# 🔥 移除:确保失败时也不在这里上报状态
# 失败状态应该在DyLogin中处理与拼多多保持一致
def _start_qianniu_listener(self, store_id: str, cookies: str):
"""启动千牛平台监听(单连接多店铺架构)"""
try:
# 获取用户token从后端客户端获取
exe_token = None
if self.backend_client:
exe_token = getattr(self.backend_client, 'token', None)
if not exe_token:
self._log("❌ 缺少exe_token无法启动千牛监听器", "ERROR")
return
def _runner():
try:
listener = QNListenerForGUI_WS()
# 千牛平台使用新架构传递store_id和exe_token
asyncio.run(listener.start_listening_with_store(store_id, exe_token))
except Exception as e:
self._log(f"千牛监听器运行异常: {e}", "ERROR")
# 在新线程中启动监听器
thread = Thread(target=_runner, daemon=True)
thread.start()
# 保存监听器引用
self.platform_listeners[f"千牛:{store_id}"] = {
'thread': thread,
'platform': '千牛',
'store_id': store_id,
'exe_token': exe_token
}
# 上报连接状态给后端
if self.backend_client:
try:
self.backend_client.send_message({
"type": "connect_message",
"store_id": store_id,
"status": True
})
except Exception as e:
self._log(f"上报连接状态失败: {e}", "WARNING")
self._log("已启动千牛平台监听(单连接多店铺架构)", "SUCCESS")
self._notify_platform_connected("千牛") # ← 新增
except Exception as e:
self._log(f"启动千牛平台监听失败: {e}", "ERROR")
# 确保失败时也上报状态
if self.backend_client:
try:
self.backend_client.send_message({
"type": "connect_message",
"store_id": store_id,
"status": False
})
except Exception as send_e:
self._log(f"失败状态下报千牛平台连接状态也失败: {send_e}", "ERROR")
def _start_pdd_listener(self, store_id: str, data: str):
"""启动拼多多平台监听"""
try:
def _runner():
try:
self._log("🚀 开始创建拼多多监听器实例", "DEBUG")
listener = PDDListenerForGUI_WS(log_callback=self._log)
self._log("✅ 拼多多监听器实例创建成功", "DEBUG")
# 判断是登录参数还是Cookie
if self._is_pdd_login_params(data):
# 使用登录参数启动
self._log("📋 使用登录参数启动拼多多监听器", "INFO")
self._log("🔄 开始执行 start_with_login_params", "DEBUG")
result = asyncio.run(listener.start_with_login_params(store_id=store_id, login_params=data))
self._log(f"📊 start_with_login_params 执行结果: {result}", "DEBUG")
# 详细的结果分析仅日志记录GUI 已在主线程中通知)
if result == "need_verification_code":
self._log("✅ [PDD] 登录流程正常,已发送验证码需求通知给后端", "SUCCESS")
elif result == "verification_code_error":
self._log("⚠️ [PDD] 验证码错误,已发送错误通知给后端", "WARNING")
elif result:
self._log("✅ [PDD] 登录成功,平台连接已建立", "SUCCESS")
else:
self._log("❌ [PDD] 登录失败", "ERROR")
else:
# 使用Cookie启动兼容旧方式
self._log("🍪 使用Cookie启动拼多多监听器", "INFO")
self._log("🔄 开始执行 start_with_cookies", "DEBUG")
result = asyncio.run(listener.start_with_cookies(store_id=store_id, cookies=data))
self._log(f"📊 start_with_cookies 执行结果: {result}", "DEBUG")
# Cookie启动成功时记录日志GUI 已在主线程中通知)
if result:
self._log("✅ [PDD] Cookie启动成功平台连接已建立", "SUCCESS")
# 根据实际登录结果上报状态给后端
if self.backend_client and result not in ["need_verification_code", "verification_code_error",
"login_failure"]:
# 如果是特殊状态说明通知已经在PddLogin中发送了不需要重复发送
try:
message = {
"type": "connect_message",
"store_id": store_id,
"status": bool(result)
}
self.backend_client.send_message(message)
status_text = "成功" if result else "失败"
self._log(f"上报拼多多平台连接状态{status_text}: {message}",
"SUCCESS" if result else "ERROR")
except Exception as send_e:
self._log(f"上报拼多多平台连接状态失败: {send_e}", "ERROR")
elif result == "need_verification_code":
self._log("需要验证码验证码通知已由PddLogin发送等待后端重新下发登录参数", "INFO")
elif result == "verification_code_error":
self._log("验证码错误错误通知已由PddLogin发送等待后端处理", "INFO")
elif result == "login_failure":
self._log("登录失败失败通知已由PddLogin发送等待后端处理", "INFO")
return result
except Exception as e:
self._log(f"拼多多监听器运行异常: {e}", "ERROR")
import traceback
self._log(f"异常详情: {traceback.format_exc()}", "DEBUG")
# 异常情况下上报失败状态
if self.backend_client:
try:
# 截取异常信息前100个字符避免消息过长
error_msg = str(e)[:100] if len(str(e)) > 100 else str(e)
message = {
"type": "connect_message",
"store_id": store_id,
"status": False,
"content": f"登录异常: {error_msg}"
}
self.backend_client.send_message(message)
self._log(f"异常情况下上报拼多多平台连接失败状态: {message}", "ERROR")
except Exception as send_e:
self._log(f"异常情况下上报状态也失败: {send_e}", "ERROR")
return False
# 在新线程中启动监听器
thread = Thread(target=_runner, daemon=True)
thread.start()
# 保存监听器引用
self.platform_listeners[f"拼多多:{store_id}"] = {
'thread': thread,
'platform': '拼多多',
'store_id': store_id,
}
# ✅ 临时方案:启动后立即通知(因为 PDD 监听器会阻塞,无法通过返回值判断)
# PDD 内部会处理验证码流程,失败时会向后端上报 status=False
self._log("已启动拼多多平台监听", "SUCCESS")
self._notify_platform_connected("拼多多") # ← 立即通知
except Exception as e:
self._log(f"启动拼多多平台监听失败: {e}", "ERROR")
# 确保失败时也上报状态
if self.backend_client:
try:
self.backend_client.send_message({
"type": "connect_message",
"store_id": store_id,
"status": False
})
self._log(f"启动失败情况下上报拼多多平台连接状态失败", "ERROR")
except Exception as send_e:
self._log(f"启动失败情况下上报状态也失败: {send_e}", "ERROR")
def _is_pdd_login_params(self, data: str) -> bool:
"""判断是否为拼多多登录参数"""
try:
self._log(f"🔍 [DEBUG] 检查是否为登录参数,数据长度: {len(data)}", "DEBUG")
self._log(f"🔍 [DEBUG] 数据前100字符: {data[:100]}", "DEBUG")
import json
parsed_data = json.loads(data)
self._log(f"🔍 [DEBUG] JSON解析成功键: {list(parsed_data.keys())}", "DEBUG")
login_params = parsed_data.get("data", {}).get("login_params", {})
self._log(f"🔍 [DEBUG] login_params存在: {bool(login_params)}", "DEBUG")
if not login_params:
self._log("🔍 [DEBUG] login_params为空返回False", "DEBUG")
return False
# 检查必需的登录参数字段
required_fields = ["username", "password", "anti_content", "risk_sign", "timestamp"]
has_all_fields = all(field in login_params for field in required_fields)
self._log(f"🔍 [DEBUG] 包含所有必需字段: {has_all_fields}", "DEBUG")
self._log(f"🔍 [DEBUG] 现有字段: {list(login_params.keys())}", "DEBUG")
return has_all_fields
except Exception as e:
self._log(f"🔍 [DEBUG] 解析失败: {e}", "DEBUG")
return False
def send_message(self, message: dict):
"""发送消息到后端"""
if self.backend_client and self.backend_client.is_connected:
return self.backend_client.send_message(message)
else:
raise Exception("后端未连接")
def disconnect_all(self):
"""断开所有连接"""
try:
# 断开后端连接
if self.backend_client:
self.backend_client.disconnect()
self.backend_client = None
# 清理平台监听器
self.platform_listeners.clear()
self._log("所有连接已断开", "INFO")
except Exception as e:
self._log(f"断开连接时发生错误: {e}", "ERROR")
# 全局WebSocket管理器实例
_websocket_manager = None
def get_websocket_manager() -> WebSocketManager:
"""获取全局WebSocket管理器实例"""
global _websocket_manager
if _websocket_manager is None:
_websocket_manager = WebSocketManager()
return _websocket_manager
# 测试git