# backend_singleton.py # 共享后端单连接客户端实例和WebSocket连接管理 from typing import Optional, Callable import threading import asyncio from threading import Thread # 创建新的后端客户端 from WebSocket.BackendClient import BackendClient from Utils.JD.JdUtils import JDListenerForGUI as JDListenerForGUI_WS from Utils.Dy.DyUtils import DouYinListenerForGUI as DYListenerForGUI_WS from Utils.Pdd.PddUtils import PddListenerForGUI as PDDListenerForGUI_WS from Utils.QianNiu.QianNiuUtils import QianNiuListenerForGUI as QNListenerForGUI_WS _backend_client = None def set_backend_client(client: BackendClient) -> None: global _backend_client _backend_client = client def get_backend_client() -> Optional[BackendClient]: return _backend_client class WebSocketManager: """WebSocket连接管理器,统一处理后端连接和平台监听""" def __init__(self): # 后端客户端 self.backend_client = None self.is_connected = False # 版本检查器 self.version_checker = None self.gui_update_callback = None self.platform_listeners = {} # 存储各平台的监听器 self.connected_platforms = [] # 存储已连接的平台列表 # <- 新增 self.callbacks = { 'log': None, 'success': None, 'error': None, 'platform_connected': None, 'token_error': None, } def set_callbacks(self, log: Callable = None, success: Callable = None, error: Callable = None, platform_connected: Callable = None, token_error: Callable = None): """设置回调函数""" if log: self.callbacks['log'] = log if success: self.callbacks['success'] = success if error: self.callbacks['error'] = error if platform_connected: # ← 新增 self.callbacks['platform_connected'] = platform_connected if token_error: self.callbacks['token_error'] = token_error def _log(self, message: str, level: str = "INFO"): """内部日志方法""" if self.callbacks['log']: self.callbacks['log'](message, level) else: print(f"[{level}] {message}") def _notify_platform_connected(self, platform_name: str): """通知GUI平台连接成功""" try: if platform_name not in self.connected_platforms: self.connected_platforms.append(platform_name) if self.callbacks['platform_connected']: self.callbacks['platform_connected'](platform_name, self.connected_platforms.copy()) self._log(f"已通知GUI平台连接: {platform_name}", "INFO") except Exception as e: self._log(f"通知平台连接失败: {e}", "ERROR") def connect_backend(self, token: str) -> bool: """连接后端WebSocket""" try: # 1 保存token到配置 try: from config import set_saved_token set_saved_token(token) except Exception: pass # 2 获取或创建后端客户端 backend = get_backend_client() if backend: # 检查现有客户端是否因token错误而停止 if backend.should_stop: self._log("检测到客户端因token错误已停止,创建新的客户端", "INFO") # 断开旧客户端 backend.disconnect() # 清除旧客户端引用 set_backend_client(None) backend = None else: # 3 如果有客户端更新token并重连 backend.set_token(token) # 设置回调函数 def _on_backend_success(): try: self._log("连接服务成功", "SUCCESS") if self.callbacks['success']: self.callbacks['success']() # 启动版本检查器 self._start_version_checker() except Exception as e: self._log(f"成功回调执行失败: {e}", "ERROR") def _on_backend_login(platform_name: str, store_id: str, cookies: str): self._log( f"收到后端登录指令: 平台={platform_name}, 店铺={store_id}, cookies_len={len(cookies) if cookies else 0}", "INFO") self._handle_platform_login(platform_name, store_id, cookies) def _on_token_error(error_content: str): self._log(f"Token验证失败: {error_content}", "ERROR") if self.callbacks['token_error']: self.callbacks['token_error'](error_content) backend.set_callbacks(success=_on_backend_success, login=_on_backend_login, token_error=_on_token_error) if not backend.is_connected: backend.connect() self.backend_client = backend self._log("令牌已提交,已连接后端。等待后端下发平台cookies后自动连接平台...", "SUCCESS") return True # 如果没有现有客户端或客户端被重置,创建新的 if not backend: backend = BackendClient.from_exe_token(token) def _on_backend_login(platform_name: str, store_id: str, cookies: str): self._log( f"收到后端登录指令: 平台={platform_name}, 店铺={store_id}, cookies_len={len(cookies) if cookies else 0}", "INFO") self._handle_platform_login(platform_name, store_id, cookies) def _on_backend_success(): try: self._log("连接服务成功", "SUCCESS") if self.callbacks['success']: self.callbacks['success']() # 启动版本检查器 self._start_version_checker() except Exception as e: self._log(f"成功回调执行失败: {e}", "ERROR") def _on_token_error(error_content: str): self._log(f"Token验证失败: {error_content}", "ERROR") if self.callbacks['token_error']: self.callbacks['token_error'](error_content) backend.set_callbacks(login=_on_backend_login, success=_on_backend_success, token_error=_on_token_error) backend.connect() set_backend_client(backend) self.backend_client = backend self._log("已创建后端客户端并连接。等待后端下发平台cookies...", "SUCCESS") return True except Exception as e: self._log(f"连接后端失败: {e}", "ERROR") if self.callbacks['error']: self.callbacks['error'](str(e)) return False def _handle_platform_login(self, platform_name: str, store_id: str, cookies: str): """处理平台登录请求""" try: # 🔥 检查并清理当前店铺的旧连接 store_key_pattern = f":{store_id}" # 匹配 "平台名:store_id" 格式 keys_to_remove = [key for key in self.platform_listeners.keys() if key.endswith(store_key_pattern)] if keys_to_remove: self._log(f"🔄 检测到店铺 {store_id} 重连,清理 {len(keys_to_remove)} 个旧连接", "INFO") for key in keys_to_remove: self.platform_listeners.pop(key, None) # 平台名称映射 platform_map = { "淘宝": "千牛", "QIANNIU": "千牛", "京东": "京东", "JD": "京东", "抖音": "抖音", "DY": "抖音", "DOUYIN": "抖音", "拼多多": "拼多多", "PDD": "拼多多", "PINDUODUO": "拼多多" } # 标准化平台名称 normalized_platform = platform_map.get(platform_name.upper(), platform_name) self._log(f"处理平台登录: {platform_name} -> {normalized_platform}, 店铺={store_id}", "INFO") # 🔧 关键修改:确保每个平台都能独立处理自己的登录请求 if normalized_platform == "千牛": if cookies == "login_success": self._log("⚠️ 千牛平台收到空cookies,但允许启动监听器", "WARNING") cookies = "" # 清空cookies,千牛不需要真实cookies self._start_qianniu_listener(store_id, cookies) elif normalized_platform == "京东": self._start_jd_listener(store_id, cookies) elif normalized_platform == "抖音": self._start_douyin_listener(store_id, cookies) elif normalized_platform == "拼多多": self._start_pdd_listener(store_id, cookies) else: self._log(f"❌ 不支持的平台: {platform_name}", "ERROR") except Exception as e: self._log(f"处理平台登录失败: {e}", "ERROR") import traceback self._log(f"错误详情: {traceback.format_exc()}", "DEBUG") def _start_version_checker(self): """启动版本检查器""" try: from version_checker import VersionChecker self.version_checker = VersionChecker( backend_client=self.backend_client, update_callback=self._on_update_available ) self.backend_client.version_callback = self.version_checker.handle_version_response self.version_checker.start() self._log("✅ 版本检查器已启动,将每10分钟检查一次更新", "SUCCESS") except Exception as e: self._log(f"❌ 启动版本检查器失败: {e}", "ERROR") def _on_update_available(self, latest_version, download_url): """发现新版本时的处理(在子线程中调用)""" self._log(f"🔔 发现新版本 {latest_version}", "INFO") # 通知主GUI显示更新提醒(通过 Qt 信号机制,线程安全) if hasattr(self, 'gui_update_callback') and self.gui_update_callback: try: # 直接调用回调(回调内部使用信号机制调度到主线程) self.gui_update_callback(latest_version, download_url) self._log(f"✅ 已调用更新回调", "DEBUG") except Exception as e: self._log(f"❌ 调用更新回调失败: {e}", "ERROR") import traceback self._log(f"详细错误: {traceback.format_exc()}", "ERROR") def _start_jd_listener(self, store_id: str, cookies: str): """启动京东平台监听""" try: def _runner(): try: listener = JDListenerForGUI_WS() asyncio.run(listener.start_with_cookies(store_id, cookies)) except Exception as e: self._log(f"京东监听器运行异常: {e}", "ERROR") # 在新线程中启动监听器 thread = Thread(target=_runner, daemon=True) thread.start() # 保存监听器引用 self.platform_listeners[f"京东:{store_id}"] = { 'thread': thread, 'platform': '京东', 'store_id': store_id } # 上报连接状态给后端 if self.backend_client: try: self.backend_client.send_message({ "type": "connect_message", "store_id": store_id, "status": True }) except Exception as e: self._log(f"上报连接状态失败: {e}", "WARNING") self._log("已启动京东平台监听", "SUCCESS") self._notify_platform_connected("京东") # ← 新增 except Exception as e: self._log(f"启动京东平台监听失败: {e}", "ERROR") # 确保失败时也上报状态 if self.backend_client: try: self.backend_client.send_message({ "type": "connect_message", "store_id": store_id, "status": False }) except Exception as send_e: self._log(f"失败状态下报连接状态也失败: {send_e}", "ERROR") def _start_douyin_listener(self, store_id: str, cookies: str): """启动抖音平台监听""" try: def _runner(): try: import json self._log("🚀 开始创建抖音监听器实例", "DEBUG") listener = DYListenerForGUI_WS() self._log("✅ 抖音监听器实例创建成功", "DEBUG") # 🔥 检查是否为登录参数模式(与拼多多保持一致) if cookies and ('"login_flow"' in cookies or '"phone_number"' in cookies): # 使用登录参数模式 self._log("📋 使用登录参数启动抖音监听器", "INFO") self._log("🔄 开始执行 start_with_login_params", "DEBUG") result = asyncio.run(listener.start_with_login_params(store_id=store_id, login_params=cookies)) self._log(f"📊 start_with_login_params 执行结果: {result}", "DEBUG") # 🔥 详细的结果分析(与拼多多完全一致) if result == "need_verification_code": self._log("✅ [DY] 登录流程正常,已发送验证码需求通知给后端", "SUCCESS") elif result == "verification_code_error": self._log("⚠️ [DY] 验证码错误,已发送错误通知给后端", "WARNING") elif result: self._log("✅ [DY] 登录成功,平台连接已建立", "SUCCESS") self._notify_platform_connected("抖音") else: self._log("❌ [DY] 登录失败", "ERROR") else: # 传统cookie模式(保留兼容性) self._log("🍪 使用Cookie启动抖音监听器", "INFO") self._log("🔄 开始执行 start_with_cookies", "DEBUG") try: # 🔥 修复:尝试JSON解析,失败时用ast.literal_eval解析Python字典字符串 if isinstance(cookies, str): try: cookie_dict = json.loads(cookies) except json.JSONDecodeError: # 后端发送的是Python字典字符串格式,使用ast.literal_eval import ast cookie_dict = ast.literal_eval(cookies) self._log("✅ 使用ast.literal_eval成功解析cookies", "DEBUG") else: cookie_dict = cookies except (json.JSONDecodeError, ValueError, SyntaxError) as e: self._log(f"❌ Cookie解析失败: {e}", "ERROR") return False result = asyncio.run(listener.start_with_cookies(store_id=store_id, cookie_dict=cookie_dict)) self._log(f"📊 start_with_cookies 执行结果: {result}", "DEBUG") # Cookie启动成功时也要通知GUI if result: self._log("✅ [DY] Cookie启动成功,平台连接已建立", "SUCCESS") self._notify_platform_connected("抖音") # 🔥 移除:不再在backend_singleton中发送connect_message # 抖音的连接状态报告应该在DyUtils中的DyLogin类中发送,与拼多多保持一致 # 所有特殊状态通知都已经在DyLogin中发送过了,这里不需要重复发送 return result except Exception as e: self._log(f"抖音监听器运行异常: {e}", "ERROR") return False # 在新线程中启动监听器 thread = Thread(target=_runner, daemon=True) thread.start() # 保存监听器引用 self.platform_listeners[f"抖音:{store_id}"] = { 'thread': thread, 'platform': '抖音', 'store_id': store_id, } # 更新监听器状态 if f"抖音:{store_id}" in self.platform_listeners: self.platform_listeners[f"抖音:{store_id}"]['status'] = 'success' self._log("已启动抖音平台监听", "SUCCESS") except Exception as e: self._log(f"启动抖音平台监听失败: {e}", "ERROR") # 🔥 移除:确保失败时也不在这里上报状态 # 失败状态应该在DyLogin中处理,与拼多多保持一致 def _start_qianniu_listener(self, store_id: str, cookies: str): """启动千牛平台监听(单连接多店铺架构)""" try: # 获取用户token(从后端客户端获取) exe_token = None if self.backend_client: exe_token = getattr(self.backend_client, 'token', None) if not exe_token: self._log("❌ 缺少exe_token,无法启动千牛监听器", "ERROR") return def _runner(): try: listener = QNListenerForGUI_WS() # 千牛平台使用新架构:传递store_id和exe_token asyncio.run(listener.start_listening_with_store(store_id, exe_token)) except Exception as e: self._log(f"千牛监听器运行异常: {e}", "ERROR") # 在新线程中启动监听器 thread = Thread(target=_runner, daemon=True) thread.start() # 保存监听器引用 self.platform_listeners[f"千牛:{store_id}"] = { 'thread': thread, 'platform': '千牛', 'store_id': store_id, 'exe_token': exe_token } # 上报连接状态给后端 if self.backend_client: try: self.backend_client.send_message({ "type": "connect_message", "store_id": store_id, "status": True }) except Exception as e: self._log(f"上报连接状态失败: {e}", "WARNING") self._log("已启动千牛平台监听(单连接多店铺架构)", "SUCCESS") self._notify_platform_connected("千牛") # ← 新增 except Exception as e: self._log(f"启动千牛平台监听失败: {e}", "ERROR") # 确保失败时也上报状态 if self.backend_client: try: self.backend_client.send_message({ "type": "connect_message", "store_id": store_id, "status": False }) except Exception as send_e: self._log(f"失败状态下报千牛平台连接状态也失败: {send_e}", "ERROR") def _start_pdd_listener(self, store_id: str, data: str): """启动拼多多平台监听""" try: def _runner(): try: self._log("🚀 开始创建拼多多监听器实例", "DEBUG") listener = PDDListenerForGUI_WS(log_callback=self._log) self._log("✅ 拼多多监听器实例创建成功", "DEBUG") # 判断是登录参数还是Cookie if self._is_pdd_login_params(data): # 使用登录参数启动 self._log("📋 使用登录参数启动拼多多监听器", "INFO") self._log("🔄 开始执行 start_with_login_params", "DEBUG") result = asyncio.run(listener.start_with_login_params(store_id=store_id, login_params=data)) self._log(f"📊 start_with_login_params 执行结果: {result}", "DEBUG") # 详细的结果分析 if result == "need_verification_code": self._log("✅ [PDD] 登录流程正常,已发送验证码需求通知给后端", "SUCCESS") elif result == "verification_code_error": self._log("⚠️ [PDD] 验证码错误,已发送错误通知给后端", "WARNING") elif result: self._log("✅ [PDD] 登录成功,平台连接已建立", "SUCCESS") self._notify_platform_connected("拼多多") else: self._log("❌ [PDD] 登录失败", "ERROR") else: # 使用Cookie启动(兼容旧方式) self._log("🍪 使用Cookie启动拼多多监听器", "INFO") self._log("🔄 开始执行 start_with_cookies", "DEBUG") result = asyncio.run(listener.start_with_cookies(store_id=store_id, cookies=data)) self._log(f"📊 start_with_cookies 执行结果: {result}", "DEBUG") # Cookie启动成功时也要通知GUI if result: self._log("✅ [PDD] Cookie启动成功,平台连接已建立", "SUCCESS") self._notify_platform_connected("拼多多") # 根据实际登录结果上报状态给后端 if self.backend_client and result not in ["need_verification_code", "verification_code_error", "login_failure"]: # 如果是特殊状态,说明通知已经在PddLogin中发送了,不需要重复发送 try: message = { "type": "connect_message", "store_id": store_id, "status": bool(result) } self.backend_client.send_message(message) status_text = "成功" if result else "失败" self._log(f"上报拼多多平台连接状态{status_text}: {message}", "SUCCESS" if result else "ERROR") except Exception as send_e: self._log(f"上报拼多多平台连接状态失败: {send_e}", "ERROR") elif result == "need_verification_code": self._log("需要验证码,验证码通知已由PddLogin发送,等待后端重新下发登录参数", "INFO") elif result == "verification_code_error": self._log("验证码错误,错误通知已由PddLogin发送,等待后端处理", "INFO") elif result == "login_failure": self._log("登录失败,失败通知已由PddLogin发送,等待后端处理", "INFO") return result except Exception as e: self._log(f"拼多多监听器运行异常: {e}", "ERROR") import traceback self._log(f"异常详情: {traceback.format_exc()}", "DEBUG") # 异常情况下上报失败状态 if self.backend_client: try: # 截取异常信息前100个字符,避免消息过长 error_msg = str(e)[:100] if len(str(e)) > 100 else str(e) message = { "type": "connect_message", "store_id": store_id, "status": False, "content": f"登录异常: {error_msg}" } self.backend_client.send_message(message) self._log(f"异常情况下上报拼多多平台连接失败状态: {message}", "ERROR") except Exception as send_e: self._log(f"异常情况下上报状态也失败: {send_e}", "ERROR") return False # 在新线程中启动监听器 thread = Thread(target=_runner, daemon=True) thread.start() # 保存监听器引用 self.platform_listeners[f"拼多多:{store_id}"] = { 'thread': thread, 'platform': '拼多多', 'store_id': store_id, } self._log("拼多多平台监听线程已启动,等待登录结果...", "INFO") except Exception as e: self._log(f"启动拼多多平台监听失败: {e}", "ERROR") # 确保失败时也上报状态 if self.backend_client: try: self.backend_client.send_message({ "type": "connect_message", "store_id": store_id, "status": False }) self._log(f"启动失败情况下上报拼多多平台连接状态失败", "ERROR") except Exception as send_e: self._log(f"启动失败情况下上报状态也失败: {send_e}", "ERROR") def _is_pdd_login_params(self, data: str) -> bool: """判断是否为拼多多登录参数""" try: self._log(f"🔍 [DEBUG] 检查是否为登录参数,数据长度: {len(data)}", "DEBUG") self._log(f"🔍 [DEBUG] 数据前100字符: {data[:100]}", "DEBUG") import json parsed_data = json.loads(data) self._log(f"🔍 [DEBUG] JSON解析成功,键: {list(parsed_data.keys())}", "DEBUG") login_params = parsed_data.get("data", {}).get("login_params", {}) self._log(f"🔍 [DEBUG] login_params存在: {bool(login_params)}", "DEBUG") if not login_params: self._log("🔍 [DEBUG] login_params为空,返回False", "DEBUG") return False # 检查必需的登录参数字段 required_fields = ["username", "password", "anti_content", "risk_sign", "timestamp"] has_all_fields = all(field in login_params for field in required_fields) self._log(f"🔍 [DEBUG] 包含所有必需字段: {has_all_fields}", "DEBUG") self._log(f"🔍 [DEBUG] 现有字段: {list(login_params.keys())}", "DEBUG") return has_all_fields except Exception as e: self._log(f"🔍 [DEBUG] 解析失败: {e}", "DEBUG") return False def send_message(self, message: dict): """发送消息到后端""" if self.backend_client and self.backend_client.is_connected: return self.backend_client.send_message(message) else: raise Exception("后端未连接") def disconnect_all(self): """断开所有连接""" try: # 断开后端连接 if self.backend_client: self.backend_client.disconnect() self.backend_client = None # 清理平台监听器 self.platform_listeners.clear() self._log("所有连接已断开", "INFO") except Exception as e: self._log(f"断开连接时发生错误: {e}", "ERROR") # 全局WebSocket管理器实例 _websocket_manager = None def get_websocket_manager() -> WebSocketManager: """获取全局WebSocket管理器实例""" global _websocket_manager if _websocket_manager is None: _websocket_manager = WebSocketManager() return _websocket_manager