diff --git a/WebSocket/BackendClient.py b/WebSocket/BackendClient.py index 23d5c6e..ec39c5a 100644 --- a/WebSocket/BackendClient.py +++ b/WebSocket/BackendClient.py @@ -32,6 +32,7 @@ class BackendClient: self.token_error_callback: Optional[Callable] = None # 新增:token错误回调 self.version_callback: Optional[Callable] = None # 新增:版本检查回调 self.disconnect_callback: Optional[Callable] = None # 新增:被踢下线回调 + self.log_callback: Optional[Callable] = None # 新增:日志回调 self.is_connected = False @@ -47,6 +48,18 @@ class BackendClient: self.loop = None self.thread = None + def _log(self, message: str, level: str = "INFO"): + """Unified logging method that works in both dev and packaged environments""" + # Always print to console (visible in dev mode) + print(f"[{level}] {message}") + + # Also call the log callback if available (for file logging) + if self.log_callback: + try: + self.log_callback(message, level) + except Exception: + pass + def connect(self): """连接到WebSocket服务器""" if self.is_connected: @@ -90,7 +103,7 @@ class BackendClient: """连接并监听消息 - 带重连机制""" while not self.should_stop: try: - print(f"正在连接后端WebSocket: {self.url}") + self._log(f"正在连接后端WebSocket: {self.url}") # 建立连接(可配置的ping设置) from config import WS_PING_INTERVAL, WS_PING_TIMEOUT, WS_ENABLE_PING @@ -106,7 +119,7 @@ class BackendClient: max_queue=32, # 最大队列大小 compression=None # 禁用压缩以提高性能 ) - print(f"[连接] 已启用心跳:ping_interval={WS_PING_INTERVAL}s, ping_timeout={WS_PING_TIMEOUT}s") + self._log(f"已启用心跳:ping_interval={WS_PING_INTERVAL}s, ping_timeout={WS_PING_TIMEOUT}s") else: self.websocket = await websockets.connect( self.url, @@ -114,12 +127,12 @@ class BackendClient: max_queue=32, compression=None ) - print("[连接] 已禁用心跳机制") + self._log("已禁用心跳机制", "WARNING") self.is_connected = True self.reconnect_attempts = 0 # 重置重连计数 self.is_reconnecting = False - print("后端WebSocket连接成功") + self._log("后端WebSocket连接成功", "SUCCESS") # 等待连接稳定后再发送状态通知 await asyncio.sleep(0.5) @@ -149,13 +162,13 @@ class BackendClient: # 详细分析断开原因 if e.code == 1006: - print(f"[重连] WebSocket异常关闭 (1006): 可能是心跳超时或网络问题") + self._log("WebSocket异常关闭 (1006): 可能是心跳超时或网络问题", "WARNING") elif e.code == 1000: - print(f"[重连] WebSocket正常关闭 (1000): 服务端主动断开") + self._log("WebSocket正常关闭 (1000): 服务端主动断开", "INFO") elif e.code == 1001: - print(f"[重连] WebSocket关闭 (1001): 端点离开") + self._log("WebSocket关闭 (1001): 端点离开", "INFO") else: - print(f"[重连] WebSocket关闭 ({e.code}): {e.reason}") + self._log(f"WebSocket关闭 ({e.code}): {e.reason}", "WARNING") self._handle_connection_closed(e) if not await self._should_reconnect(): @@ -185,14 +198,14 @@ class BackendClient: def _handle_connection_closed(self, error): """处理连接关闭""" error_msg = f"WebSocket连接已关闭: {error.code} {error.reason if hasattr(error, 'reason') else ''}" - print(f"[重连] {error_msg}") + self._log(error_msg, "WARNING") # 特殊处理ping超时等情况 if hasattr(error, 'code'): if error.code == 1011: # Internal error (ping timeout) - print("[重连] 检测到ping超时,这是常见的网络问题") + self._log("检测到ping超时,这是常见的网络问题", "WARNING") elif error.code == 1006: # Abnormal closure - print("[重连] 检测到异常关闭,可能是网络中断") + self._log("检测到异常关闭,可能是网络中断或心跳超时", "WARNING") if not self.is_reconnecting: self.on_error(error_msg) @@ -200,25 +213,25 @@ class BackendClient: def _handle_network_error(self, error): """处理网络错误""" error_msg = f"网络连接错误: {type(error).__name__} - {str(error)}" - print(f"[重连] {error_msg}") + self._log(error_msg, "ERROR") if not self.is_reconnecting: self.on_error(error_msg) def _handle_general_error(self, error): """处理一般错误""" error_msg = f"WebSocket连接异常: {type(error).__name__} - {str(error)}" - print(f"[重连] {error_msg}") + self._log(error_msg, "ERROR") if not self.is_reconnecting: self.on_error(error_msg) async def _should_reconnect(self) -> bool: """判断是否应该重连""" if self.should_stop: - print("[重连] 程序正在关闭,停止重连") + self._log("程序正在关闭,停止重连", "INFO") return False if self.reconnect_attempts >= self.max_reconnect_attempts: - print(f"[重连] 已达到最大重连次数({self.max_reconnect_attempts}),停止重连") + self._log(f"已达到最大重连次数({self.max_reconnect_attempts}),停止重连", "ERROR") # 通知上层重连失败 if not self.is_reconnecting: self.on_error(f"重连失败:已达到最大重连次数({self.max_reconnect_attempts})") @@ -236,7 +249,7 @@ class BackendClient: self.reconnect_attempts += 1 self.is_reconnecting = True - print(f"[重连] 第{self.reconnect_attempts}次重连尝试,等待{delay:.1f}秒...") + self._log(f"第{self.reconnect_attempts}次重连尝试,等待{delay:.1f}秒...", "INFO") # 分割等待时间,支持快速退出 wait_steps = max(1, int(delay)) @@ -267,7 +280,8 @@ class BackendClient: success: Callable = None, token_error: Callable = None, version: Callable = None, - disconnect: Callable = None): + disconnect: Callable = None, + log: Callable = None): """设置各种消息类型的回调函数""" if store_list: self.store_list_callback = store_list @@ -291,13 +305,15 @@ class BackendClient: self.version_callback = version if disconnect: self.disconnect_callback = disconnect + if log: + self.log_callback = log def on_connected(self): """连接成功时的处理""" if self.reconnect_attempts > 0: - print(f"[重连] 后端WebSocket重连成功!(第{self.reconnect_attempts}次尝试)") + self._log(f"后端WebSocket重连成功!(第{self.reconnect_attempts}次尝试)", "SUCCESS") else: - print("后端WebSocket连接成功") + self._log("后端WebSocket连接成功", "SUCCESS") # 重连成功后可选择上报状态给后端 if self.reconnect_attempts > 0: diff --git a/WebSocket/backend_singleton.py b/WebSocket/backend_singleton.py index bd96581..e1202a0 100644 --- a/WebSocket/backend_singleton.py +++ b/WebSocket/backend_singleton.py @@ -155,8 +155,13 @@ class WebSocketManager: if self.callbacks['disconnect']: self.callbacks['disconnect'](disconnect_msg) + def _on_log(message: str, level: str = "INFO"): + """Backend client log callback""" + self._log(message, level) + backend.set_callbacks(success=_on_backend_success, login=_on_backend_login, - token_error=_on_token_error, disconnect=_on_disconnect) + token_error=_on_token_error, disconnect=_on_disconnect, + log=_on_log) if not backend.is_connected: backend.connect() @@ -196,8 +201,13 @@ class WebSocketManager: if self.callbacks['disconnect']: self.callbacks['disconnect'](disconnect_msg) + def _on_log(message: str, level: str = "INFO"): + """Backend client log callback""" + self._log(message, level) + backend.set_callbacks(login=_on_backend_login, success=_on_backend_success, - token_error=_on_token_error, disconnect=_on_disconnect) + token_error=_on_token_error, disconnect=_on_disconnect, + log=_on_log) backend.connect() set_backend_client(backend) @@ -214,13 +224,92 @@ class WebSocketManager: def _handle_platform_login(self, platform_name: str, store_id: str, cookies: str): """处理平台登录请求""" try: - # 🔥 检查并清理当前店铺的旧连接 + # 🔥 检查并断开当前店铺的旧连接(策略B:先断开旧连接,再建立新连接) store_key_pattern = f":{store_id}" # 匹配 "平台名:store_id" 格式 keys_to_remove = [key for key in self.platform_listeners.keys() if key.endswith(store_key_pattern)] + if keys_to_remove: - self._log(f"🔄 检测到店铺 {store_id} 重连,清理 {len(keys_to_remove)} 个旧连接", "INFO") + self._log(f"🔄 检测到店铺 {store_id} 重复登录,断开 {len(keys_to_remove)} 个旧连接", "INFO") + for key in keys_to_remove: + listener_info = self.platform_listeners.get(key) + if listener_info: + platform_type = listener_info.get('platform', '') + + # 从各平台的 WebsocketManager 中获取连接并关闭WebSocket + try: + if platform_type == "京东": + from Utils.JD.JdUtils import WebsocketManager as JDWSManager + jd_mgr = JDWSManager() + conn_info = jd_mgr.get_connection(key) + if conn_info and conn_info.get('platform'): + ws = conn_info['platform'].get('ws') + if ws and hasattr(ws, 'close'): + try: + import asyncio + loop = conn_info['platform'].get('loop') + if loop and not loop.is_closed(): + asyncio.run_coroutine_threadsafe(ws.close(), loop) + self._log(f"✅ 已关闭京东WebSocket连接: {key}", "DEBUG") + except Exception: + pass + jd_mgr.remove_connection(key) + self._log(f"✅ 已从京东管理器移除连接: {key}", "DEBUG") + + elif platform_type == "抖音": + from Utils.Dy.DyUtils import DouYinWebsocketManager as DYWSManager + dy_mgr = DYWSManager() + conn_info = dy_mgr.get_connection(key) + if conn_info and conn_info.get('platform'): + ws = conn_info['platform'].get('ws') + if ws and hasattr(ws, 'close'): + try: + import asyncio + loop = conn_info['platform'].get('loop') + if loop and not loop.is_closed(): + asyncio.run_coroutine_threadsafe(ws.close(), loop) + self._log(f"✅ 已关闭抖音WebSocket连接: {key}", "DEBUG") + except Exception: + pass + dy_mgr.remove_connection(key) + self._log(f"✅ 已从抖音管理器移除连接: {key}", "DEBUG") + + elif platform_type == "千牛": + from Utils.QianNiu.QianNiuUtils import QianNiuWebsocketManager as QNWSManager + qn_mgr = QNWSManager() + qn_mgr.remove_connection(key) + self._log(f"✅ 已从千牛管理器移除连接: {key}", "DEBUG") + + elif platform_type == "拼多多": + from Utils.Pdd.PddUtils import WebsocketManager as PDDWSManager + pdd_mgr = PDDWSManager() + conn_info = pdd_mgr.get_connection(key) + if conn_info and conn_info.get('platform'): + # 关闭WebSocket连接 + ws = conn_info['platform'].get('ws') + if ws and hasattr(ws, 'close'): + try: + import asyncio + loop = conn_info['platform'].get('loop') + if loop and not loop.is_closed(): + asyncio.run_coroutine_threadsafe(ws.close(), loop) + self._log(f"✅ 已关闭拼多多WebSocket连接: {key}", "DEBUG") + except Exception as ws_e: + self._log(f"⚠️ 关闭WebSocket时出错: {ws_e}", "DEBUG") + pdd_mgr.remove_connection(key) + self._log(f"✅ 已从拼多多管理器移除连接: {key}", "DEBUG") + + except Exception as e: + self._log(f"⚠️ 移除{platform_type}连接时出错: {e}", "WARNING") + + # 从监听器字典中移除 self.platform_listeners.pop(key, None) + + # 给WebSocket一点时间完全关闭 + import time + time.sleep(0.5) + + self._log(f"✅ 旧连接已全部断开,准备建立新连接", "INFO") # 平台名称映射 platform_map = { diff --git a/config.py b/config.py index 3c03367..ea91114 100644 --- a/config.py +++ b/config.py @@ -8,8 +8,9 @@ import os # 用于路径与目录操作(写入用户配置目录) import json # 用于将令牌保存为 JSON 格式 # 后端服务器配置 +BACKEND_HOST = "192.168.5.103" # BACKEND_HOST = "192.168.5.233" -BACKEND_HOST = "192.168.5.106" +# BACKEND_HOST = "192.168.5.12" # BACKEND_HOST = "shuidrop.com" # BACKEND_HOST = "test.shuidrop.com" BACKEND_PORT = "8000" @@ -20,8 +21,8 @@ BACKEND_WS_URL = f"ws://{BACKEND_HOST}:{BACKEND_PORT}" # WebSocket配置 WS_CONNECT_TIMEOUT = 16.0 WS_MESSAGE_TIMEOUT = 30.0 -WS_PING_INTERVAL = 10 # 10秒ping间隔(提高检测频率) -WS_PING_TIMEOUT = 5 # 5秒ping超时(更快检测断线) +WS_PING_INTERVAL = 20 # 20秒ping间隔(平衡稳定性和响应速度) +WS_PING_TIMEOUT = 10 # 10秒ping超时(给打包环境更多时间响应) WS_ENABLE_PING = True # 是否启用WebSocket原生ping心跳 WS_ENABLE_APP_PING = False # 禁用应用层ping心跳(避免重复)