[patch] 优化心跳检测逻辑 优化与后端交互连接处理逻辑 新增自动登录与手动下发冲突问题解决方法块
This commit is contained in:
@@ -32,6 +32,7 @@ class BackendClient:
|
||||
self.token_error_callback: Optional[Callable] = None # 新增:token错误回调
|
||||
self.version_callback: Optional[Callable] = None # 新增:版本检查回调
|
||||
self.disconnect_callback: Optional[Callable] = None # 新增:被踢下线回调
|
||||
self.log_callback: Optional[Callable] = None # 新增:日志回调
|
||||
|
||||
self.is_connected = False
|
||||
|
||||
@@ -47,6 +48,18 @@ class BackendClient:
|
||||
self.loop = None
|
||||
self.thread = None
|
||||
|
||||
def _log(self, message: str, level: str = "INFO"):
|
||||
"""Unified logging method that works in both dev and packaged environments"""
|
||||
# Always print to console (visible in dev mode)
|
||||
print(f"[{level}] {message}")
|
||||
|
||||
# Also call the log callback if available (for file logging)
|
||||
if self.log_callback:
|
||||
try:
|
||||
self.log_callback(message, level)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def connect(self):
|
||||
"""连接到WebSocket服务器"""
|
||||
if self.is_connected:
|
||||
@@ -90,7 +103,7 @@ class BackendClient:
|
||||
"""连接并监听消息 - 带重连机制"""
|
||||
while not self.should_stop:
|
||||
try:
|
||||
print(f"正在连接后端WebSocket: {self.url}")
|
||||
self._log(f"正在连接后端WebSocket: {self.url}")
|
||||
|
||||
# 建立连接(可配置的ping设置)
|
||||
from config import WS_PING_INTERVAL, WS_PING_TIMEOUT, WS_ENABLE_PING
|
||||
@@ -106,7 +119,7 @@ class BackendClient:
|
||||
max_queue=32, # 最大队列大小
|
||||
compression=None # 禁用压缩以提高性能
|
||||
)
|
||||
print(f"[连接] 已启用心跳:ping_interval={WS_PING_INTERVAL}s, ping_timeout={WS_PING_TIMEOUT}s")
|
||||
self._log(f"已启用心跳:ping_interval={WS_PING_INTERVAL}s, ping_timeout={WS_PING_TIMEOUT}s")
|
||||
else:
|
||||
self.websocket = await websockets.connect(
|
||||
self.url,
|
||||
@@ -114,12 +127,12 @@ class BackendClient:
|
||||
max_queue=32,
|
||||
compression=None
|
||||
)
|
||||
print("[连接] 已禁用心跳机制")
|
||||
self._log("已禁用心跳机制", "WARNING")
|
||||
|
||||
self.is_connected = True
|
||||
self.reconnect_attempts = 0 # 重置重连计数
|
||||
self.is_reconnecting = False
|
||||
print("后端WebSocket连接成功")
|
||||
self._log("后端WebSocket连接成功", "SUCCESS")
|
||||
|
||||
# 等待连接稳定后再发送状态通知
|
||||
await asyncio.sleep(0.5)
|
||||
@@ -149,13 +162,13 @@ class BackendClient:
|
||||
|
||||
# 详细分析断开原因
|
||||
if e.code == 1006:
|
||||
print(f"[重连] WebSocket异常关闭 (1006): 可能是心跳超时或网络问题")
|
||||
self._log("WebSocket异常关闭 (1006): 可能是心跳超时或网络问题", "WARNING")
|
||||
elif e.code == 1000:
|
||||
print(f"[重连] WebSocket正常关闭 (1000): 服务端主动断开")
|
||||
self._log("WebSocket正常关闭 (1000): 服务端主动断开", "INFO")
|
||||
elif e.code == 1001:
|
||||
print(f"[重连] WebSocket关闭 (1001): 端点离开")
|
||||
self._log("WebSocket关闭 (1001): 端点离开", "INFO")
|
||||
else:
|
||||
print(f"[重连] WebSocket关闭 ({e.code}): {e.reason}")
|
||||
self._log(f"WebSocket关闭 ({e.code}): {e.reason}", "WARNING")
|
||||
|
||||
self._handle_connection_closed(e)
|
||||
if not await self._should_reconnect():
|
||||
@@ -185,14 +198,14 @@ class BackendClient:
|
||||
def _handle_connection_closed(self, error):
|
||||
"""处理连接关闭"""
|
||||
error_msg = f"WebSocket连接已关闭: {error.code} {error.reason if hasattr(error, 'reason') else ''}"
|
||||
print(f"[重连] {error_msg}")
|
||||
self._log(error_msg, "WARNING")
|
||||
|
||||
# 特殊处理ping超时等情况
|
||||
if hasattr(error, 'code'):
|
||||
if error.code == 1011: # Internal error (ping timeout)
|
||||
print("[重连] 检测到ping超时,这是常见的网络问题")
|
||||
self._log("检测到ping超时,这是常见的网络问题", "WARNING")
|
||||
elif error.code == 1006: # Abnormal closure
|
||||
print("[重连] 检测到异常关闭,可能是网络中断")
|
||||
self._log("检测到异常关闭,可能是网络中断或心跳超时", "WARNING")
|
||||
|
||||
if not self.is_reconnecting:
|
||||
self.on_error(error_msg)
|
||||
@@ -200,25 +213,25 @@ class BackendClient:
|
||||
def _handle_network_error(self, error):
|
||||
"""处理网络错误"""
|
||||
error_msg = f"网络连接错误: {type(error).__name__} - {str(error)}"
|
||||
print(f"[重连] {error_msg}")
|
||||
self._log(error_msg, "ERROR")
|
||||
if not self.is_reconnecting:
|
||||
self.on_error(error_msg)
|
||||
|
||||
def _handle_general_error(self, error):
|
||||
"""处理一般错误"""
|
||||
error_msg = f"WebSocket连接异常: {type(error).__name__} - {str(error)}"
|
||||
print(f"[重连] {error_msg}")
|
||||
self._log(error_msg, "ERROR")
|
||||
if not self.is_reconnecting:
|
||||
self.on_error(error_msg)
|
||||
|
||||
async def _should_reconnect(self) -> bool:
|
||||
"""判断是否应该重连"""
|
||||
if self.should_stop:
|
||||
print("[重连] 程序正在关闭,停止重连")
|
||||
self._log("程序正在关闭,停止重连", "INFO")
|
||||
return False
|
||||
|
||||
if self.reconnect_attempts >= self.max_reconnect_attempts:
|
||||
print(f"[重连] 已达到最大重连次数({self.max_reconnect_attempts}),停止重连")
|
||||
self._log(f"已达到最大重连次数({self.max_reconnect_attempts}),停止重连", "ERROR")
|
||||
# 通知上层重连失败
|
||||
if not self.is_reconnecting:
|
||||
self.on_error(f"重连失败:已达到最大重连次数({self.max_reconnect_attempts})")
|
||||
@@ -236,7 +249,7 @@ class BackendClient:
|
||||
self.reconnect_attempts += 1
|
||||
self.is_reconnecting = True
|
||||
|
||||
print(f"[重连] 第{self.reconnect_attempts}次重连尝试,等待{delay:.1f}秒...")
|
||||
self._log(f"第{self.reconnect_attempts}次重连尝试,等待{delay:.1f}秒...", "INFO")
|
||||
|
||||
# 分割等待时间,支持快速退出
|
||||
wait_steps = max(1, int(delay))
|
||||
@@ -267,7 +280,8 @@ class BackendClient:
|
||||
success: Callable = None,
|
||||
token_error: Callable = None,
|
||||
version: Callable = None,
|
||||
disconnect: Callable = None):
|
||||
disconnect: Callable = None,
|
||||
log: Callable = None):
|
||||
"""设置各种消息类型的回调函数"""
|
||||
if store_list:
|
||||
self.store_list_callback = store_list
|
||||
@@ -291,13 +305,15 @@ class BackendClient:
|
||||
self.version_callback = version
|
||||
if disconnect:
|
||||
self.disconnect_callback = disconnect
|
||||
if log:
|
||||
self.log_callback = log
|
||||
|
||||
def on_connected(self):
|
||||
"""连接成功时的处理"""
|
||||
if self.reconnect_attempts > 0:
|
||||
print(f"[重连] 后端WebSocket重连成功!(第{self.reconnect_attempts}次尝试)")
|
||||
self._log(f"后端WebSocket重连成功!(第{self.reconnect_attempts}次尝试)", "SUCCESS")
|
||||
else:
|
||||
print("后端WebSocket连接成功")
|
||||
self._log("后端WebSocket连接成功", "SUCCESS")
|
||||
|
||||
# 重连成功后可选择上报状态给后端
|
||||
if self.reconnect_attempts > 0:
|
||||
|
||||
@@ -155,8 +155,13 @@ class WebSocketManager:
|
||||
if self.callbacks['disconnect']:
|
||||
self.callbacks['disconnect'](disconnect_msg)
|
||||
|
||||
def _on_log(message: str, level: str = "INFO"):
|
||||
"""Backend client log callback"""
|
||||
self._log(message, level)
|
||||
|
||||
backend.set_callbacks(success=_on_backend_success, login=_on_backend_login,
|
||||
token_error=_on_token_error, disconnect=_on_disconnect)
|
||||
token_error=_on_token_error, disconnect=_on_disconnect,
|
||||
log=_on_log)
|
||||
|
||||
if not backend.is_connected:
|
||||
backend.connect()
|
||||
@@ -196,8 +201,13 @@ class WebSocketManager:
|
||||
if self.callbacks['disconnect']:
|
||||
self.callbacks['disconnect'](disconnect_msg)
|
||||
|
||||
def _on_log(message: str, level: str = "INFO"):
|
||||
"""Backend client log callback"""
|
||||
self._log(message, level)
|
||||
|
||||
backend.set_callbacks(login=_on_backend_login, success=_on_backend_success,
|
||||
token_error=_on_token_error, disconnect=_on_disconnect)
|
||||
token_error=_on_token_error, disconnect=_on_disconnect,
|
||||
log=_on_log)
|
||||
backend.connect()
|
||||
|
||||
set_backend_client(backend)
|
||||
@@ -214,13 +224,92 @@ class WebSocketManager:
|
||||
def _handle_platform_login(self, platform_name: str, store_id: str, cookies: str):
|
||||
"""处理平台登录请求"""
|
||||
try:
|
||||
# 🔥 检查并清理当前店铺的旧连接
|
||||
# 🔥 检查并断开当前店铺的旧连接(策略B:先断开旧连接,再建立新连接)
|
||||
store_key_pattern = f":{store_id}" # 匹配 "平台名:store_id" 格式
|
||||
keys_to_remove = [key for key in self.platform_listeners.keys() if key.endswith(store_key_pattern)]
|
||||
|
||||
if keys_to_remove:
|
||||
self._log(f"🔄 检测到店铺 {store_id} 重连,清理 {len(keys_to_remove)} 个旧连接", "INFO")
|
||||
self._log(f"🔄 检测到店铺 {store_id} 重复登录,断开 {len(keys_to_remove)} 个旧连接", "INFO")
|
||||
|
||||
for key in keys_to_remove:
|
||||
listener_info = self.platform_listeners.get(key)
|
||||
if listener_info:
|
||||
platform_type = listener_info.get('platform', '')
|
||||
|
||||
# 从各平台的 WebsocketManager 中获取连接并关闭WebSocket
|
||||
try:
|
||||
if platform_type == "京东":
|
||||
from Utils.JD.JdUtils import WebsocketManager as JDWSManager
|
||||
jd_mgr = JDWSManager()
|
||||
conn_info = jd_mgr.get_connection(key)
|
||||
if conn_info and conn_info.get('platform'):
|
||||
ws = conn_info['platform'].get('ws')
|
||||
if ws and hasattr(ws, 'close'):
|
||||
try:
|
||||
import asyncio
|
||||
loop = conn_info['platform'].get('loop')
|
||||
if loop and not loop.is_closed():
|
||||
asyncio.run_coroutine_threadsafe(ws.close(), loop)
|
||||
self._log(f"✅ 已关闭京东WebSocket连接: {key}", "DEBUG")
|
||||
except Exception:
|
||||
pass
|
||||
jd_mgr.remove_connection(key)
|
||||
self._log(f"✅ 已从京东管理器移除连接: {key}", "DEBUG")
|
||||
|
||||
elif platform_type == "抖音":
|
||||
from Utils.Dy.DyUtils import DouYinWebsocketManager as DYWSManager
|
||||
dy_mgr = DYWSManager()
|
||||
conn_info = dy_mgr.get_connection(key)
|
||||
if conn_info and conn_info.get('platform'):
|
||||
ws = conn_info['platform'].get('ws')
|
||||
if ws and hasattr(ws, 'close'):
|
||||
try:
|
||||
import asyncio
|
||||
loop = conn_info['platform'].get('loop')
|
||||
if loop and not loop.is_closed():
|
||||
asyncio.run_coroutine_threadsafe(ws.close(), loop)
|
||||
self._log(f"✅ 已关闭抖音WebSocket连接: {key}", "DEBUG")
|
||||
except Exception:
|
||||
pass
|
||||
dy_mgr.remove_connection(key)
|
||||
self._log(f"✅ 已从抖音管理器移除连接: {key}", "DEBUG")
|
||||
|
||||
elif platform_type == "千牛":
|
||||
from Utils.QianNiu.QianNiuUtils import QianNiuWebsocketManager as QNWSManager
|
||||
qn_mgr = QNWSManager()
|
||||
qn_mgr.remove_connection(key)
|
||||
self._log(f"✅ 已从千牛管理器移除连接: {key}", "DEBUG")
|
||||
|
||||
elif platform_type == "拼多多":
|
||||
from Utils.Pdd.PddUtils import WebsocketManager as PDDWSManager
|
||||
pdd_mgr = PDDWSManager()
|
||||
conn_info = pdd_mgr.get_connection(key)
|
||||
if conn_info and conn_info.get('platform'):
|
||||
# 关闭WebSocket连接
|
||||
ws = conn_info['platform'].get('ws')
|
||||
if ws and hasattr(ws, 'close'):
|
||||
try:
|
||||
import asyncio
|
||||
loop = conn_info['platform'].get('loop')
|
||||
if loop and not loop.is_closed():
|
||||
asyncio.run_coroutine_threadsafe(ws.close(), loop)
|
||||
self._log(f"✅ 已关闭拼多多WebSocket连接: {key}", "DEBUG")
|
||||
except Exception as ws_e:
|
||||
self._log(f"⚠️ 关闭WebSocket时出错: {ws_e}", "DEBUG")
|
||||
pdd_mgr.remove_connection(key)
|
||||
self._log(f"✅ 已从拼多多管理器移除连接: {key}", "DEBUG")
|
||||
|
||||
except Exception as e:
|
||||
self._log(f"⚠️ 移除{platform_type}连接时出错: {e}", "WARNING")
|
||||
|
||||
# 从监听器字典中移除
|
||||
self.platform_listeners.pop(key, None)
|
||||
|
||||
# 给WebSocket一点时间完全关闭
|
||||
import time
|
||||
time.sleep(0.5)
|
||||
|
||||
self._log(f"✅ 旧连接已全部断开,准备建立新连接", "INFO")
|
||||
|
||||
# 平台名称映射
|
||||
platform_map = {
|
||||
|
||||
@@ -8,8 +8,9 @@ import os # 用于路径与目录操作(写入用户配置目录)
|
||||
import json # 用于将令牌保存为 JSON 格式
|
||||
|
||||
# 后端服务器配置
|
||||
BACKEND_HOST = "192.168.5.103"
|
||||
# BACKEND_HOST = "192.168.5.233"
|
||||
BACKEND_HOST = "192.168.5.106"
|
||||
# BACKEND_HOST = "192.168.5.12"
|
||||
# BACKEND_HOST = "shuidrop.com"
|
||||
# BACKEND_HOST = "test.shuidrop.com"
|
||||
BACKEND_PORT = "8000"
|
||||
@@ -20,8 +21,8 @@ BACKEND_WS_URL = f"ws://{BACKEND_HOST}:{BACKEND_PORT}"
|
||||
# WebSocket配置
|
||||
WS_CONNECT_TIMEOUT = 16.0
|
||||
WS_MESSAGE_TIMEOUT = 30.0
|
||||
WS_PING_INTERVAL = 10 # 10秒ping间隔(提高检测频率)
|
||||
WS_PING_TIMEOUT = 5 # 5秒ping超时(更快检测断线)
|
||||
WS_PING_INTERVAL = 20 # 20秒ping间隔(平衡稳定性和响应速度)
|
||||
WS_PING_TIMEOUT = 10 # 10秒ping超时(给打包环境更多时间响应)
|
||||
WS_ENABLE_PING = True # 是否启用WebSocket原生ping心跳
|
||||
WS_ENABLE_APP_PING = False # 禁用应用层ping心跳(避免重复)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user