# backend_singleton.py # 共享后端单连接客户端实例和WebSocket连接管理 from typing import Optional, Callable import threading import asyncio from threading import Thread # 创建新的后端客户端 from WebSocket.BackendClient import BackendClient from Utils.JD.JdUtils import JDListenerForGUI as JDListenerForGUI_WS from Utils.Dy.DyUtils import DouYinListenerForGUI as DYListenerForGUI_WS from Utils.Pdd.PddUtils import PddListenerForGUI as PDDListenerForGUI_WS _backend_client = None def set_backend_client(client: BackendClient) -> None: global _backend_client _backend_client = client def get_backend_client() -> Optional[BackendClient]: return _backend_client class WebSocketManager: """WebSocket连接管理器,统一处理后端连接和平台监听""" def __init__(self): self.backend_client = None self.platform_listeners = {} # 存储各平台的监听器 self.callbacks = { 'log': None, 'success': None, 'error': None } def set_callbacks(self, log: Callable = None, success: Callable = None, error: Callable = None): """设置回调函数""" if log: self.callbacks['log'] = log if success: self.callbacks['success'] = success if error: self.callbacks['error'] = error def _log(self, message: str, level: str = "INFO"): """内部日志方法""" if self.callbacks['log']: self.callbacks['log'](message, level) else: print(f"[{level}] {message}") def connect_backend(self, token: str) -> bool: """连接后端WebSocket""" try: # 1 保存token到配置 try: from config import set_saved_token set_saved_token(token) except Exception: pass # 2 获取或创建后端客户端 backend = get_backend_client() if backend: # 3 如果有客户端更新token并重连 backend.set_token(token) # 设置回调函数 def _on_backend_success(): try: self._log("连接服务成功", "SUCCESS") if self.callbacks['success']: self.callbacks['success']() except Exception as e: self._log(f"成功回调执行失败: {e}", "ERROR") def _on_backend_login(platform_name: str, store_id: str, cookies: str): self._log( f"收到后端登录指令: 平台={platform_name}, 店铺={store_id}, cookies_len={len(cookies) if cookies else 0}", "INFO") self._handle_platform_login(platform_name, store_id, cookies) backend.set_callbacks(success=_on_backend_success, login=_on_backend_login) if not backend.is_connected: backend.connect() self.backend_client = backend self._log("令牌已提交,已连接后端。等待后端下发平台cookies后自动连接平台...", "SUCCESS") return True else: backend = BackendClient.from_exe_token(token) def _on_backend_login(platform_name: str, store_id: str, cookies: str): self._log( f"收到后端登录指令: 平台={platform_name}, 店铺={store_id}, cookies_len={len(cookies) if cookies else 0}", "INFO") self._handle_platform_login(platform_name, store_id, cookies) def _on_backend_success(): try: self._log("连接服务成功", "SUCCESS") if self.callbacks['success']: self.callbacks['success']() except Exception as e: self._log(f"成功回调执行失败: {e}", "ERROR") backend.set_callbacks(login=_on_backend_login, success=_on_backend_success) backend.connect() set_backend_client(backend) self.backend_client = backend self._log("已创建后端客户端并连接。等待后端下发平台cookies...", "SUCCESS") return True except Exception as e: self._log(f"连接后端失败: {e}", "ERROR") if self.callbacks['error']: self.callbacks['error'](str(e)) return False def _handle_platform_login(self, platform_name: str, store_id: str, cookies: str): """处理平台登录逻辑""" try: if platform_name == "京东" and store_id and cookies: self._start_jd_listener(store_id, cookies) elif platform_name == "抖音" and store_id and cookies: self._start_douyin_listener(store_id, cookies) elif platform_name == "千牛" and store_id and cookies: self._start_qianniu_listener(store_id, cookies) elif platform_name == "拼多多" and store_id and cookies: self._start_pdd_listener(store_id, cookies) else: self._log(f"不支持的平台或参数不全: {platform_name}", "WARNING") except Exception as e: self._log(f"启动平台监听失败: {e}", "ERROR") def _start_jd_listener(self, store_id: str, cookies: str): """启动京东平台监听""" try: def _runner(): try: listener = JDListenerForGUI_WS() asyncio.run(listener.start_with_cookies(store_id, cookies)) except Exception as e: self._log(f"京东监听器运行异常: {e}", "ERROR") # 在新线程中启动监听器 thread = Thread(target=_runner, daemon=True) thread.start() # 保存监听器引用 self.platform_listeners[f"京东:{store_id}"] = { 'thread': thread, 'platform': '京东', 'store_id': store_id } # 上报连接状态给后端 if self.backend_client: try: self.backend_client.send_message({ "type": "connect_message", "store_id": store_id, "status": True }) except Exception as e: self._log(f"上报连接状态失败: {e}", "WARNING") self._log("已启动京东平台监听", "SUCCESS") except Exception as e: self._log(f"启动京东平台监听失败: {e}", "ERROR") # 确保失败时也上报状态 if self.backend_client: try: self.backend_client.send_message({ "type": "connect_message", "store_id": store_id, "status": False }) except Exception as send_e: self._log(f"失败状态下报连接状态也失败: {send_e}", "ERROR") def _start_douyin_listener(self, store_id: str, cookies: str): """启动抖音平台监听""" try: def _runner(): try: import json listener = DYListenerForGUI_WS() # 将JSON字符串格式的cookies解析为字典 try: cookie_dict = json.loads(cookies) if isinstance(cookies, str) else cookies except json.JSONDecodeError as e: self._log(f"❌ Cookie JSON解析失败: {e}", "ERROR") return False result = asyncio.run(listener.start_with_cookies(store_id=store_id, cookie_dict=cookie_dict)) return result except Exception as e: self._log(f"抖音监听器运行异常: {e}", "ERROR") return False # 在新线程中启动监听器 thread = Thread(target=_runner, daemon=True) thread.start() # 保存监听器引用 self.platform_listeners[f"抖音:{store_id}"] = { 'thread': thread, 'platform': '抖音', 'store_id': store_id, } # 更新监听器状态 if f"抖音:{store_id}" in self.platform_listeners: self.platform_listeners[f"抖音:{store_id}"]['status'] = 'success' # 上报连接状态给后端 if self.backend_client: try: self.backend_client.send_message({ "type": "connect_message", "store_id": store_id, "status": True }) self._log("已上报抖音平台连接状态: 成功", "INFO") except Exception as e: self._log(f"上报抖音平台连接状态失败: {e}", "WARNING") self._log("已启动抖音平台监听", "SUCCESS") except Exception as e: self._log(f"启动抖音平台监听失败: {e}", "ERROR") # 确保失败时也上报状态 if self.backend_client: try: self.backend_client.send_message({ "type": "connect_message", "store_id": store_id, "status": False }) except Exception as send_e: self._log(f"失败状态下报抖音平台连接状态也失败: {send_e}", "ERROR") def _start_qianniu_listener(self, store_id: str, cookies: str): """启动千牛平台监听""" try: # 这里可以添加千牛监听逻辑 self._log("千牛平台监听功能待实现", "INFO") except Exception as e: self._log(f"启动千牛平台监听失败: {e}", "ERROR") def _start_pdd_listener(self, store_id: str, data: str): """启动拼多多平台监听""" try: def _runner(): try: listener = PDDListenerForGUI_WS(log_callback=self._log) # 判断是登录参数还是Cookie if self._is_pdd_login_params(data): # 使用登录参数启动 result = asyncio.run(listener.start_with_login_params(store_id=store_id, login_params=data)) else: # 使用Cookie启动(兼容旧方式) result = asyncio.run(listener.start_with_cookies(store_id=store_id, cookies=data)) return result except Exception as e: self._log(f"拼多多监听器运行异常: {e}", "ERROR") return False # 在新线程中启动监听器 thread = Thread(target=_runner, daemon=True) thread.start() # 保存监听器引用 self.platform_listeners[f"拼多多:{store_id}"] = { 'thread': thread, 'platform': '拼多多', 'store_id': store_id, } # 更新监听器状态 if f"拼多多:{store_id}" in self.platform_listeners: self.platform_listeners[f"拼多多:{store_id}"]['status'] = 'success' # 上报连接状态给后端 if self.backend_client: try: self.backend_client.send_message({ "type": "connect_message", "store_id": store_id, "status": True }) self._log("已上报拼多多平台连接状态: 成功", "INFO") except Exception as e: self._log(f"上报拼多多平台连接状态失败: {e}", "WARNING") self._log("已启动拼多多平台监听", "SUCCESS") except Exception as e: self._log(f"启动拼多多平台监听失败: {e}", "ERROR") # 确保失败时也上报状态 if self.backend_client: try: self.backend_client.send_message({ "type": "connect_message", "store_id": store_id, "status": False }) except Exception as send_e: self._log(f"失败状态下报拼多多平台连接状态也失败: {send_e}", "ERROR") def _is_pdd_login_params(self, data: str) -> bool: """判断是否为拼多多登录参数""" try: import json parsed_data = json.loads(data) return (parsed_data.get("data", {}).get("login_params") is not None) except: return False def send_message(self, message: dict): """发送消息到后端""" if self.backend_client and self.backend_client.is_connected: return self.backend_client.send_message(message) else: raise Exception("后端未连接") def disconnect_all(self): """断开所有连接""" try: # 断开后端连接 if self.backend_client: self.backend_client.disconnect() self.backend_client = None # 清理平台监听器 self.platform_listeners.clear() self._log("所有连接已断开", "INFO") except Exception as e: self._log(f"断开连接时发生错误: {e}", "ERROR") # 全局WebSocket管理器实例 _websocket_manager = None def get_websocket_manager() -> WebSocketManager: """获取全局WebSocket管理器实例""" global _websocket_manager if _websocket_manager is None: _websocket_manager = WebSocketManager() return _websocket_manager