Todo: 集成多平台 解决PDD打包后js文件位置检索问题

Todo: 集成多平台 解决打包日志规划问题
Todo: 集成多平台 解决后端连接心跳与重连管理问题
This commit is contained in:
2025-09-20 16:13:23 +08:00
parent 7cfc0c22b7
commit 8ad02b4416
6 changed files with 447 additions and 64 deletions

View File

@@ -1,6 +1,7 @@
# WebSocket/BackendClient.py
import json
import threading
import time
import websockets
@@ -31,16 +32,31 @@ class BackendClient:
self.is_connected = False
# 新增:重连机制相关属性
self.reconnect_attempts = 0
self.max_reconnect_attempts = 10
self.base_reconnect_delay = 2.0
self.max_reconnect_delay = 60.0
self.reconnect_backoff = 1.5
self.is_reconnecting = False
self.should_stop = False
self.websocket = None
self.loop = None
self.thread = None
def connect(self):
"""连接到WebSocket服务器"""
if self.is_connected:
return
self.should_stop = False
self.thread = threading.Thread(target=self._run_loop, daemon=True)
self.thread.start()
def disconnect(self):
"""断开WebSocket连接"""
self.should_stop = True
if self.loop and self.loop.is_running():
asyncio.run_coroutine_threadsafe(self._close(), self.loop)
@@ -68,28 +84,168 @@ class BackendClient:
self.loop.close()
async def _connect_and_listen(self):
"""连接并监听消息"""
try:
self.websocket = await websockets.connect(self.url)
self.is_connected = True
self.on_connected()
"""连接并监听消息 - 带重连机制"""
while not self.should_stop:
try:
print(f"正在连接后端WebSocket: {self.url}")
async for message in self.websocket:
try:
# 打印原始文本帧与长度
# 建立连接可配置的ping设置
from config import WS_PING_INTERVAL, WS_PING_TIMEOUT, WS_ENABLE_PING
if WS_ENABLE_PING:
self.websocket = await websockets.connect(
self.url,
ping_interval=WS_PING_INTERVAL, # 可配置ping间隔
ping_timeout=WS_PING_TIMEOUT, # 可配置ping超时
close_timeout=10, # 10秒关闭超时
# 增加TCP keepalive配置
max_size=2**20, # 1MB最大消息大小
max_queue=32, # 最大队列大小
compression=None # 禁用压缩以提高性能
)
print(f"[连接] 已启用心跳ping_interval={WS_PING_INTERVAL}s, ping_timeout={WS_PING_TIMEOUT}s")
else:
self.websocket = await websockets.connect(
self.url,
max_size=2**20,
max_queue=32,
compression=None
)
print("[连接] 已禁用心跳机制")
self.is_connected = True
self.reconnect_attempts = 0 # 重置重连计数
self.is_reconnecting = False
print("后端WebSocket连接成功")
# 等待连接稳定后再发送状态通知
await asyncio.sleep(0.5)
# 发送连接状态通知给后端
self._notify_connection_status(True)
self.on_connected()
# 消息循环
async for message in self.websocket:
try:
raw_len = len(message.encode('utf-8')) if isinstance(message, str) else len(message)
print(f"后端发送消息体内容:{message}")
except Exception:
pass
data = json.loads(message)
self.on_message_received(data)
except json.JSONDecodeError:
print(f"JSON解析错误: {message}")
# 打印原始文本帧与长度
try:
raw_len = len(message.encode('utf-8')) if isinstance(message, str) else len(message)
print(f"后端发送消息体内容:{message}")
except Exception:
pass
data = json.loads(message)
self.on_message_received(data)
except json.JSONDecodeError:
print(f"JSON解析错误: {message}")
except Exception as e:
self.is_connected = False
self.on_error(str(e))
except websockets.ConnectionClosed as e:
self.is_connected = False
self._notify_connection_status(False) # 通知断开
# 详细分析断开原因
if e.code == 1006:
print(f"[重连] WebSocket异常关闭 (1006): 可能是心跳超时或网络问题")
elif e.code == 1000:
print(f"[重连] WebSocket正常关闭 (1000): 服务端主动断开")
elif e.code == 1001:
print(f"[重连] WebSocket关闭 (1001): 端点离开")
else:
print(f"[重连] WebSocket关闭 ({e.code}): {e.reason}")
self._handle_connection_closed(e)
if not await self._should_reconnect():
break
await self._wait_before_reconnect()
except websockets.InvalidURI as e:
self.is_connected = False
print(f"无效的WebSocket URI: {e}")
self.on_error(f"无效的WebSocket URI: {e}")
break
except (websockets.WebSocketException, OSError, ConnectionError) as e:
self.is_connected = False
self._handle_network_error(e)
if not await self._should_reconnect():
break
await self._wait_before_reconnect()
except Exception as e:
self.is_connected = False
self._handle_general_error(e)
if not await self._should_reconnect():
break
await self._wait_before_reconnect()
def _handle_connection_closed(self, error):
"""处理连接关闭"""
error_msg = f"WebSocket连接已关闭: {error.code} {error.reason if hasattr(error, 'reason') else ''}"
print(f"[重连] {error_msg}")
# 特殊处理ping超时等情况
if hasattr(error, 'code'):
if error.code == 1011: # Internal error (ping timeout)
print("[重连] 检测到ping超时这是常见的网络问题")
elif error.code == 1006: # Abnormal closure
print("[重连] 检测到异常关闭,可能是网络中断")
if not self.is_reconnecting:
self.on_error(error_msg)
def _handle_network_error(self, error):
"""处理网络错误"""
error_msg = f"网络连接错误: {type(error).__name__} - {str(error)}"
print(f"[重连] {error_msg}")
if not self.is_reconnecting:
self.on_error(error_msg)
def _handle_general_error(self, error):
"""处理一般错误"""
error_msg = f"WebSocket连接异常: {type(error).__name__} - {str(error)}"
print(f"[重连] {error_msg}")
if not self.is_reconnecting:
self.on_error(error_msg)
async def _should_reconnect(self) -> bool:
"""判断是否应该重连"""
if self.should_stop:
print("[重连] 程序正在关闭,停止重连")
return False
if self.reconnect_attempts >= self.max_reconnect_attempts:
print(f"[重连] 已达到最大重连次数({self.max_reconnect_attempts}),停止重连")
# 通知上层重连失败
if not self.is_reconnecting:
self.on_error(f"重连失败:已达到最大重连次数({self.max_reconnect_attempts})")
return False
return True
async def _wait_before_reconnect(self):
"""重连前等待(指数退避)"""
delay = min(
self.base_reconnect_delay * (self.reconnect_backoff ** self.reconnect_attempts),
self.max_reconnect_delay
)
self.reconnect_attempts += 1
self.is_reconnecting = True
print(f"[重连] 第{self.reconnect_attempts}次重连尝试,等待{delay:.1f}秒...")
# 分割等待时间,支持快速退出
wait_steps = max(1, int(delay))
for i in range(wait_steps):
if self.should_stop:
return
await asyncio.sleep(1)
# 处理小数部分
remaining = delay - wait_steps
if remaining > 0 and not self.should_stop:
await asyncio.sleep(remaining)
@classmethod
def from_exe_token(cls, exe_token: str):
@@ -126,9 +282,114 @@ class BackendClient:
def on_connected(self):
"""连接成功时的处理"""
print("后端WebSocket连接成功")
if self.reconnect_attempts > 0:
print(f"[重连] 后端WebSocket重连成功(第{self.reconnect_attempts}次尝试)")
else:
print("后端WebSocket连接成功")
# 重连成功后可选择上报状态给后端
if self.reconnect_attempts > 0:
self._report_reconnect_status()
# 不再主动请求 get_store避免与后端不兼容导致协程未完成
def _report_reconnect_status(self):
"""重连成功后上报当前状态(可选)"""
try:
# 获取当前已连接的平台列表
from WebSocket.backend_singleton import get_websocket_manager
manager = get_websocket_manager()
if hasattr(manager, 'platform_listeners') and manager.platform_listeners:
for platform_key, listener_info in manager.platform_listeners.items():
store_id = listener_info.get('store_id')
platform = listener_info.get('platform')
if store_id and platform:
# 上报平台仍在连接状态
try:
reconnect_message = {
"type": "connect_message",
"store_id": store_id,
"status": True,
"content": f"GUI重连成功{platform}平台状态正常"
}
# 异步发送,不阻塞连接过程
asyncio.run_coroutine_threadsafe(
self._send_to_backend(reconnect_message),
self.loop
)
print(f"[重连] 已上报{platform}平台状态")
except Exception as e:
print(f"[重连] 上报{platform}平台状态失败: {e}")
else:
print("[重连] 当前无活跃平台连接,跳过状态上报")
except Exception as e:
print(f"[重连] 状态上报过程异常: {e}")
def _notify_connection_status(self, connected: bool):
"""通知后端连接状态变化"""
try:
if not self.loop:
return
# 获取当前活跃平台的store_id
active_store_id = None
try:
from Utils.JD.JdUtils import WebsocketManager as JdManager
from Utils.Dy.DyUtils import WebsocketManager as DyManager
from Utils.Pdd.PddUtils import WebsocketManager as PddManager
# 检查各平台是否有活跃连接
for mgr_class, platform_name in [(JdManager, "京东"), (DyManager, "抖音"), (PddManager, "拼多多")]:
try:
mgr = mgr_class()
if hasattr(mgr, '_store') and mgr._store:
for shop_key, entry in mgr._store.items():
if entry and entry.get('platform'):
# 从shop_key中提取store_id格式平台:store_id
if ':' in shop_key:
_, store_id = shop_key.split(':', 1)
active_store_id = store_id
print(f"[状态] 检测到活跃{platform_name}平台: {store_id}")
break
if active_store_id:
break
except Exception as e:
print(f"[状态] 检查{platform_name}平台失败: {e}")
continue
except Exception as e:
print(f"[状态] 获取活跃平台信息失败: {e}")
status_message = {
"type": "connection_status",
"status": connected,
"timestamp": int(time.time()),
"client_uuid": self.uuid
}
# 如果有活跃平台添加store_id
if active_store_id:
status_message["store_id"] = active_store_id
print(f"[状态] 添加store_id到状态消息: {active_store_id}")
else:
print(f"[状态] 未检测到活跃平台不添加store_id")
# 异步发送状态通知
asyncio.run_coroutine_threadsafe(
self._send_to_backend(status_message),
self.loop
)
status_text = "连接" if connected else "断开"
print(f"[状态] 已通知后端GUI客户端{status_text}")
except Exception as e:
print(f"[状态] 发送状态通知失败: {e}")
import traceback
print(f"[状态] 详细错误: {traceback.format_exc()}")
def on_message_received(self, message: Dict[str, Any]):
"""处理接收到的消息 - 根据WebSocket文档v2更新"""
# 统一打印后端下发的完整消息结构体
@@ -195,12 +456,21 @@ class BackendClient:
message: 要发送的消息字典
"""
if not self.is_connected or not self.loop:
raise Exception("WebSocket未连接")
error_msg = "WebSocket未连接"
if self.is_reconnecting:
error_msg += "(正在重连中..."
raise Exception(error_msg)
future = asyncio.run_coroutine_threadsafe(
self._send_to_backend(message), self.loop
)
return future.result(timeout=8)
try:
future = asyncio.run_coroutine_threadsafe(
self._send_to_backend(message), self.loop
)
return future.result(timeout=8)
except Exception as e:
# 发送失败时检查连接状态
if not self.is_connected:
print(f"[重连] 消息发送失败,连接已断开: {e}")
raise e
async def _send_to_backend(self, message: Dict[str, Any]):
"""异步发送消息到后端"""
@@ -212,20 +482,18 @@ class BackendClient:
await self.websocket.send(message_str)
print(f"发送消息到后端: {message}")
def send_ping(self, custom_uuid: str = None, custom_token: str = None):
def send_ping(self, custom_uuid: str = None):
"""
发送心跳包
如果接收到关闭的消息后心跳包要带上token
"""
# 生成简单的ping UUID
ping_uuid = custom_uuid or f"ping_{int(time.time())}"
ping_message = {
'type': 'ping',
'uuid': custom_uuid or self.uuid
'uuid': ping_uuid
}
token = custom_token or self.token
if token:
ping_message['token'] = token
return self.send_message(ping_message)
def get_store(self):
@@ -686,10 +954,10 @@ class BackendClient:
try:
# 导入京东工具类
from Utils.JD.JdUtils import FixJdCookie
# 创建临时实例用于转接
jd_instance = FixJdCookie()
# 执行转接操作
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
@@ -699,14 +967,14 @@ class BackendClient:
websocket, aid, user_id, pin_zj, customer_service_id
)
)
if result:
print(f"[JD Transfer] ✅ 转接成功: user_id={user_id} -> cs_id={customer_service_id}")
else:
print(f"[JD Transfer] ❌ 转接失败: user_id={user_id}")
finally:
loop.close()
except Exception as e:
print(f"[JD Transfer] 转接过程异常: {e}")
@@ -748,8 +1016,9 @@ class BackendClient:
# 获取实际的抖音店铺ID从cookie中获取
shop_id = cookie_dict.get('SHOP_ID', store_id) # 优先使用cookie中的SHOP_ID
print(f"[DY Transfer] 使用shop_id: {shop_id}")
print(f"[DY Transfer] 转接参数: receiver_id={user_id}, shop_id={shop_id}, staff_id={customer_service_id}")
print(
f"[DY Transfer] 转接参数: receiver_id={user_id}, shop_id={shop_id}, staff_id={customer_service_id}")
# 检查是否是自己转给自己的情况
try:
# 获取可用客服列表来验证
@@ -758,39 +1027,42 @@ class BackendClient:
print(f"[DY Transfer] 当前可用客服数量: {len(staff_list)}")
if len(staff_list) <= 1:
print(f"[DY Transfer] ⚠️ 只有一个客服在线,可能无法转接")
# 查找目标客服信息
target_staff = None
for staff in staff_list:
if str(staff.get('staffId', '')) == str(customer_service_id):
target_staff = staff
break
if target_staff:
print(f"[DY Transfer] 找到目标客服: {target_staff.get('staffName', 'Unknown')} (ID: {customer_service_id})")
print(
f"[DY Transfer] 找到目标客服: {target_staff.get('staffName', 'Unknown')} (ID: {customer_service_id})")
else:
print(f"[DY Transfer] ⚠️ 未找到目标客服ID: {customer_service_id}")
print(f"[DY Transfer] 可用客服列表: {[{'id': s.get('staffId'), 'name': s.get('staffName')} for s in staff_list]}")
print(
f"[DY Transfer] 可用客服列表: {[{'id': s.get('staffId'), 'name': s.get('staffName')} for s in staff_list]}")
else:
print(f"[DY Transfer] ⚠️ 无法获取客服列表")
except Exception as e:
print(f"[DY Transfer] 获取客服列表时出错: {e}")
# 执行同步转接操作
result = dy_instance.message_handler.transfer_conversation(
receiver_id=user_id,
shop_id=shop_id,
staff_id=customer_service_id
)
if result:
print(f"[DY Transfer] ✅ 转接成功: user_id={user_id} -> cs_id={customer_service_id}")
else:
print(f"[DY Transfer] ❌ 转接失败: user_id={user_id}")
print(f"[DY Transfer] 💡 可能原因1) 只有一个客服无法转接 2) 客服ID不存在 3) 权限不足 4) 会话状态不允许转接")
print(
f"[DY Transfer] 💡 可能原因1) 只有一个客服无法转接 2) 客服ID不存在 3) 权限不足 4) 会话状态不允许转接")
else:
print(f"[DY Transfer] ⚠️ 抖音实例或message_handler不可用")
except Exception as e:
print(f"[DY Transfer] 转接过程异常: {e}")