2025-09-12 20:42:00 +08:00
|
|
|
|
# WebSocket/BackendClient.py
|
|
|
|
|
|
import json
|
|
|
|
|
|
import threading
|
2025-09-20 16:13:23 +08:00
|
|
|
|
import time
|
2025-09-12 20:42:00 +08:00
|
|
|
|
|
|
|
|
|
|
import websockets
|
|
|
|
|
|
|
|
|
|
|
|
import uuid
|
|
|
|
|
|
import asyncio
|
|
|
|
|
|
from typing import List, Dict, Any, Optional, Callable
|
|
|
|
|
|
from config import get_gui_ws_url
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class BackendClient:
|
|
|
|
|
|
"""后端WebSocket客户端"""
|
|
|
|
|
|
|
|
|
|
|
|
def __init__(self, url: str, token: str = None):
|
|
|
|
|
|
|
|
|
|
|
|
self.token = token
|
|
|
|
|
|
self.uuid = str(uuid.uuid4())
|
|
|
|
|
|
self.url = url
|
|
|
|
|
|
|
|
|
|
|
|
# 消息处理回调函数
|
|
|
|
|
|
self.store_list_callback: Optional[Callable] = None
|
|
|
|
|
|
self.customers_callback: Optional[Callable] = None
|
|
|
|
|
|
self.message_callback: Optional[Callable] = None
|
|
|
|
|
|
self.transfer_callback: Optional[Callable] = None
|
|
|
|
|
|
self.close_callback: Optional[Callable] = None
|
|
|
|
|
|
self.error_callback: Optional[Callable] = None
|
|
|
|
|
|
self.login_callback: Optional[Callable] = None # 新增:平台登录(下发cookies)回调
|
|
|
|
|
|
self.success_callback: Optional[Callable] = None # 新增:后端连接成功回调
|
2025-09-28 17:00:02 +08:00
|
|
|
|
self.token_error_callback: Optional[Callable] = None # 新增:token错误回调
|
2025-09-29 15:38:34 +08:00
|
|
|
|
self.version_callback: Optional[Callable] = None # 新增:版本检查回调
|
2025-09-12 20:42:00 +08:00
|
|
|
|
|
|
|
|
|
|
self.is_connected = False
|
|
|
|
|
|
|
2025-09-20 16:13:23 +08:00
|
|
|
|
# 新增:重连机制相关属性
|
|
|
|
|
|
self.reconnect_attempts = 0
|
|
|
|
|
|
self.max_reconnect_attempts = 10
|
|
|
|
|
|
self.base_reconnect_delay = 2.0
|
|
|
|
|
|
self.max_reconnect_delay = 60.0
|
|
|
|
|
|
self.reconnect_backoff = 1.5
|
|
|
|
|
|
self.is_reconnecting = False
|
|
|
|
|
|
self.should_stop = False
|
|
|
|
|
|
self.websocket = None
|
|
|
|
|
|
self.loop = None
|
|
|
|
|
|
self.thread = None
|
|
|
|
|
|
|
2025-09-12 20:42:00 +08:00
|
|
|
|
def connect(self):
|
|
|
|
|
|
"""连接到WebSocket服务器"""
|
|
|
|
|
|
if self.is_connected:
|
|
|
|
|
|
return
|
|
|
|
|
|
|
2025-09-20 16:13:23 +08:00
|
|
|
|
self.should_stop = False
|
2025-09-12 20:42:00 +08:00
|
|
|
|
self.thread = threading.Thread(target=self._run_loop, daemon=True)
|
|
|
|
|
|
self.thread.start()
|
|
|
|
|
|
|
|
|
|
|
|
def disconnect(self):
|
|
|
|
|
|
"""断开WebSocket连接"""
|
2025-09-20 16:13:23 +08:00
|
|
|
|
self.should_stop = True
|
|
|
|
|
|
|
2025-09-12 20:42:00 +08:00
|
|
|
|
if self.loop and self.loop.is_running():
|
|
|
|
|
|
asyncio.run_coroutine_threadsafe(self._close(), self.loop)
|
|
|
|
|
|
|
|
|
|
|
|
if self.thread and self.thread.is_alive():
|
|
|
|
|
|
self.thread.join(timeout=3)
|
|
|
|
|
|
|
|
|
|
|
|
self.is_connected = False
|
|
|
|
|
|
|
|
|
|
|
|
async def _close(self):
|
|
|
|
|
|
"""异步关闭连接"""
|
|
|
|
|
|
if self.websocket:
|
|
|
|
|
|
await self.websocket.close()
|
|
|
|
|
|
self.is_connected = False
|
|
|
|
|
|
|
|
|
|
|
|
def _run_loop(self):
|
|
|
|
|
|
"""在新线程中运行事件循环"""
|
|
|
|
|
|
self.loop = asyncio.new_event_loop()
|
|
|
|
|
|
asyncio.set_event_loop(self.loop)
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
self.loop.run_until_complete(self._connect_and_listen())
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
print(f"WebSocket异常: {e}")
|
|
|
|
|
|
finally:
|
|
|
|
|
|
self.loop.close()
|
|
|
|
|
|
|
|
|
|
|
|
async def _connect_and_listen(self):
|
2025-09-20 16:13:23 +08:00
|
|
|
|
"""连接并监听消息 - 带重连机制"""
|
|
|
|
|
|
while not self.should_stop:
|
|
|
|
|
|
try:
|
|
|
|
|
|
print(f"正在连接后端WebSocket: {self.url}")
|
|
|
|
|
|
|
|
|
|
|
|
# 建立连接(可配置的ping设置)
|
|
|
|
|
|
from config import WS_PING_INTERVAL, WS_PING_TIMEOUT, WS_ENABLE_PING
|
|
|
|
|
|
|
|
|
|
|
|
if WS_ENABLE_PING:
|
|
|
|
|
|
self.websocket = await websockets.connect(
|
|
|
|
|
|
self.url,
|
|
|
|
|
|
ping_interval=WS_PING_INTERVAL, # 可配置ping间隔
|
|
|
|
|
|
ping_timeout=WS_PING_TIMEOUT, # 可配置ping超时
|
|
|
|
|
|
close_timeout=10, # 10秒关闭超时
|
|
|
|
|
|
# 增加TCP keepalive配置
|
2025-09-29 15:38:34 +08:00
|
|
|
|
max_size=2 ** 20, # 1MB最大消息大小
|
|
|
|
|
|
max_queue=32, # 最大队列大小
|
2025-09-20 16:13:23 +08:00
|
|
|
|
compression=None # 禁用压缩以提高性能
|
|
|
|
|
|
)
|
|
|
|
|
|
print(f"[连接] 已启用心跳:ping_interval={WS_PING_INTERVAL}s, ping_timeout={WS_PING_TIMEOUT}s")
|
|
|
|
|
|
else:
|
|
|
|
|
|
self.websocket = await websockets.connect(
|
|
|
|
|
|
self.url,
|
2025-09-29 15:38:34 +08:00
|
|
|
|
max_size=2 ** 20,
|
2025-09-20 16:13:23 +08:00
|
|
|
|
max_queue=32,
|
|
|
|
|
|
compression=None
|
|
|
|
|
|
)
|
|
|
|
|
|
print("[连接] 已禁用心跳机制")
|
|
|
|
|
|
|
|
|
|
|
|
self.is_connected = True
|
|
|
|
|
|
self.reconnect_attempts = 0 # 重置重连计数
|
|
|
|
|
|
self.is_reconnecting = False
|
|
|
|
|
|
print("后端WebSocket连接成功")
|
2025-09-29 15:38:34 +08:00
|
|
|
|
|
2025-09-20 16:13:23 +08:00
|
|
|
|
# 等待连接稳定后再发送状态通知
|
|
|
|
|
|
await asyncio.sleep(0.5)
|
2025-09-29 15:38:34 +08:00
|
|
|
|
|
2025-09-20 16:13:23 +08:00
|
|
|
|
# 发送连接状态通知给后端
|
|
|
|
|
|
self._notify_connection_status(True)
|
2025-09-29 15:38:34 +08:00
|
|
|
|
|
2025-09-20 16:13:23 +08:00
|
|
|
|
self.on_connected()
|
|
|
|
|
|
|
|
|
|
|
|
# 消息循环
|
|
|
|
|
|
async for message in self.websocket:
|
2025-09-12 20:42:00 +08:00
|
|
|
|
try:
|
2025-09-20 16:13:23 +08:00
|
|
|
|
# 打印原始文本帧与长度
|
|
|
|
|
|
try:
|
|
|
|
|
|
raw_len = len(message.encode('utf-8')) if isinstance(message, str) else len(message)
|
|
|
|
|
|
print(f"后端发送消息体内容:{message}")
|
|
|
|
|
|
except Exception:
|
|
|
|
|
|
pass
|
|
|
|
|
|
data = json.loads(message)
|
|
|
|
|
|
self.on_message_received(data)
|
|
|
|
|
|
except json.JSONDecodeError:
|
|
|
|
|
|
print(f"JSON解析错误: {message}")
|
|
|
|
|
|
|
|
|
|
|
|
except websockets.ConnectionClosed as e:
|
|
|
|
|
|
self.is_connected = False
|
|
|
|
|
|
self._notify_connection_status(False) # 通知断开
|
2025-09-29 15:38:34 +08:00
|
|
|
|
|
2025-09-20 16:13:23 +08:00
|
|
|
|
# 详细分析断开原因
|
|
|
|
|
|
if e.code == 1006:
|
|
|
|
|
|
print(f"[重连] WebSocket异常关闭 (1006): 可能是心跳超时或网络问题")
|
|
|
|
|
|
elif e.code == 1000:
|
|
|
|
|
|
print(f"[重连] WebSocket正常关闭 (1000): 服务端主动断开")
|
|
|
|
|
|
elif e.code == 1001:
|
|
|
|
|
|
print(f"[重连] WebSocket关闭 (1001): 端点离开")
|
|
|
|
|
|
else:
|
|
|
|
|
|
print(f"[重连] WebSocket关闭 ({e.code}): {e.reason}")
|
2025-09-29 15:38:34 +08:00
|
|
|
|
|
2025-09-20 16:13:23 +08:00
|
|
|
|
self._handle_connection_closed(e)
|
|
|
|
|
|
if not await self._should_reconnect():
|
|
|
|
|
|
break
|
|
|
|
|
|
await self._wait_before_reconnect()
|
|
|
|
|
|
|
|
|
|
|
|
except websockets.InvalidURI as e:
|
|
|
|
|
|
self.is_connected = False
|
|
|
|
|
|
print(f"无效的WebSocket URI: {e}")
|
|
|
|
|
|
self.on_error(f"无效的WebSocket URI: {e}")
|
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
|
|
except (websockets.WebSocketException, OSError, ConnectionError) as e:
|
|
|
|
|
|
self.is_connected = False
|
|
|
|
|
|
self._handle_network_error(e)
|
|
|
|
|
|
if not await self._should_reconnect():
|
|
|
|
|
|
break
|
|
|
|
|
|
await self._wait_before_reconnect()
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
self.is_connected = False
|
|
|
|
|
|
self._handle_general_error(e)
|
|
|
|
|
|
if not await self._should_reconnect():
|
|
|
|
|
|
break
|
|
|
|
|
|
await self._wait_before_reconnect()
|
|
|
|
|
|
|
|
|
|
|
|
def _handle_connection_closed(self, error):
|
|
|
|
|
|
"""处理连接关闭"""
|
|
|
|
|
|
error_msg = f"WebSocket连接已关闭: {error.code} {error.reason if hasattr(error, 'reason') else ''}"
|
|
|
|
|
|
print(f"[重连] {error_msg}")
|
|
|
|
|
|
|
|
|
|
|
|
# 特殊处理ping超时等情况
|
|
|
|
|
|
if hasattr(error, 'code'):
|
|
|
|
|
|
if error.code == 1011: # Internal error (ping timeout)
|
|
|
|
|
|
print("[重连] 检测到ping超时,这是常见的网络问题")
|
|
|
|
|
|
elif error.code == 1006: # Abnormal closure
|
|
|
|
|
|
print("[重连] 检测到异常关闭,可能是网络中断")
|
|
|
|
|
|
|
|
|
|
|
|
if not self.is_reconnecting:
|
|
|
|
|
|
self.on_error(error_msg)
|
|
|
|
|
|
|
|
|
|
|
|
def _handle_network_error(self, error):
|
|
|
|
|
|
"""处理网络错误"""
|
|
|
|
|
|
error_msg = f"网络连接错误: {type(error).__name__} - {str(error)}"
|
|
|
|
|
|
print(f"[重连] {error_msg}")
|
|
|
|
|
|
if not self.is_reconnecting:
|
|
|
|
|
|
self.on_error(error_msg)
|
|
|
|
|
|
|
|
|
|
|
|
def _handle_general_error(self, error):
|
|
|
|
|
|
"""处理一般错误"""
|
|
|
|
|
|
error_msg = f"WebSocket连接异常: {type(error).__name__} - {str(error)}"
|
|
|
|
|
|
print(f"[重连] {error_msg}")
|
|
|
|
|
|
if not self.is_reconnecting:
|
|
|
|
|
|
self.on_error(error_msg)
|
|
|
|
|
|
|
|
|
|
|
|
async def _should_reconnect(self) -> bool:
|
|
|
|
|
|
"""判断是否应该重连"""
|
|
|
|
|
|
if self.should_stop:
|
|
|
|
|
|
print("[重连] 程序正在关闭,停止重连")
|
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
if self.reconnect_attempts >= self.max_reconnect_attempts:
|
|
|
|
|
|
print(f"[重连] 已达到最大重连次数({self.max_reconnect_attempts}),停止重连")
|
|
|
|
|
|
# 通知上层重连失败
|
|
|
|
|
|
if not self.is_reconnecting:
|
|
|
|
|
|
self.on_error(f"重连失败:已达到最大重连次数({self.max_reconnect_attempts})")
|
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
|
|
async def _wait_before_reconnect(self):
|
|
|
|
|
|
"""重连前等待(指数退避)"""
|
|
|
|
|
|
delay = min(
|
|
|
|
|
|
self.base_reconnect_delay * (self.reconnect_backoff ** self.reconnect_attempts),
|
|
|
|
|
|
self.max_reconnect_delay
|
|
|
|
|
|
)
|
2025-09-12 20:42:00 +08:00
|
|
|
|
|
2025-09-20 16:13:23 +08:00
|
|
|
|
self.reconnect_attempts += 1
|
|
|
|
|
|
self.is_reconnecting = True
|
|
|
|
|
|
|
|
|
|
|
|
print(f"[重连] 第{self.reconnect_attempts}次重连尝试,等待{delay:.1f}秒...")
|
|
|
|
|
|
|
|
|
|
|
|
# 分割等待时间,支持快速退出
|
|
|
|
|
|
wait_steps = max(1, int(delay))
|
|
|
|
|
|
for i in range(wait_steps):
|
|
|
|
|
|
if self.should_stop:
|
|
|
|
|
|
return
|
|
|
|
|
|
await asyncio.sleep(1)
|
|
|
|
|
|
|
|
|
|
|
|
# 处理小数部分
|
|
|
|
|
|
remaining = delay - wait_steps
|
|
|
|
|
|
if remaining > 0 and not self.should_stop:
|
|
|
|
|
|
await asyncio.sleep(remaining)
|
2025-09-12 20:42:00 +08:00
|
|
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
|
|
def from_exe_token(cls, exe_token: str):
|
|
|
|
|
|
"""使用 exe_token 构造单连接客户端(ws/gui/<token>/)"""
|
|
|
|
|
|
url = get_gui_ws_url(exe_token)
|
|
|
|
|
|
return cls(url=url, token=exe_token)
|
|
|
|
|
|
|
|
|
|
|
|
def set_callbacks(self,
|
|
|
|
|
|
store_list: Callable = None,
|
|
|
|
|
|
customers: Callable = None,
|
|
|
|
|
|
message: Callable = None,
|
|
|
|
|
|
transfer: Callable = None,
|
|
|
|
|
|
close: Callable = None,
|
|
|
|
|
|
error: Callable = None,
|
|
|
|
|
|
login: Callable = None,
|
2025-09-28 17:00:02 +08:00
|
|
|
|
success: Callable = None,
|
2025-09-29 15:38:34 +08:00
|
|
|
|
token_error: Callable = None,
|
|
|
|
|
|
version: Callable = None):
|
2025-09-12 20:42:00 +08:00
|
|
|
|
"""设置各种消息类型的回调函数"""
|
|
|
|
|
|
if store_list:
|
|
|
|
|
|
self.store_list_callback = store_list
|
|
|
|
|
|
if customers:
|
|
|
|
|
|
self.customers_callback = customers
|
|
|
|
|
|
if message:
|
|
|
|
|
|
self.message_callback = message
|
|
|
|
|
|
if transfer:
|
|
|
|
|
|
self.transfer_callback = transfer
|
|
|
|
|
|
if close:
|
|
|
|
|
|
self.close_callback = close
|
|
|
|
|
|
if error:
|
|
|
|
|
|
self.error_callback = error
|
|
|
|
|
|
if login:
|
|
|
|
|
|
self.login_callback = login
|
|
|
|
|
|
if success:
|
|
|
|
|
|
self.success_callback = success
|
2025-09-28 17:00:02 +08:00
|
|
|
|
if token_error:
|
|
|
|
|
|
self.token_error_callback = token_error
|
2025-09-29 15:38:34 +08:00
|
|
|
|
if version:
|
|
|
|
|
|
self.version_callback = version
|
2025-09-12 20:42:00 +08:00
|
|
|
|
|
|
|
|
|
|
def on_connected(self):
|
|
|
|
|
|
"""连接成功时的处理"""
|
2025-09-20 16:13:23 +08:00
|
|
|
|
if self.reconnect_attempts > 0:
|
|
|
|
|
|
print(f"[重连] 后端WebSocket重连成功!(第{self.reconnect_attempts}次尝试)")
|
|
|
|
|
|
else:
|
|
|
|
|
|
print("后端WebSocket连接成功")
|
|
|
|
|
|
|
|
|
|
|
|
# 重连成功后可选择上报状态给后端
|
|
|
|
|
|
if self.reconnect_attempts > 0:
|
|
|
|
|
|
self._report_reconnect_status()
|
|
|
|
|
|
|
2025-09-12 20:42:00 +08:00
|
|
|
|
# 不再主动请求 get_store,避免与后端不兼容导致协程未完成
|
|
|
|
|
|
|
2025-09-20 16:13:23 +08:00
|
|
|
|
def _report_reconnect_status(self):
|
|
|
|
|
|
"""重连成功后上报当前状态(可选)"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
# 获取当前已连接的平台列表
|
|
|
|
|
|
from WebSocket.backend_singleton import get_websocket_manager
|
|
|
|
|
|
manager = get_websocket_manager()
|
|
|
|
|
|
|
|
|
|
|
|
if hasattr(manager, 'platform_listeners') and manager.platform_listeners:
|
|
|
|
|
|
for platform_key, listener_info in manager.platform_listeners.items():
|
|
|
|
|
|
store_id = listener_info.get('store_id')
|
|
|
|
|
|
platform = listener_info.get('platform')
|
|
|
|
|
|
|
|
|
|
|
|
if store_id and platform:
|
|
|
|
|
|
# 上报平台仍在连接状态
|
|
|
|
|
|
try:
|
|
|
|
|
|
reconnect_message = {
|
|
|
|
|
|
"type": "connect_message",
|
|
|
|
|
|
"store_id": store_id,
|
|
|
|
|
|
"status": True,
|
|
|
|
|
|
"content": f"GUI重连成功,{platform}平台状态正常"
|
|
|
|
|
|
}
|
|
|
|
|
|
# 异步发送,不阻塞连接过程
|
|
|
|
|
|
asyncio.run_coroutine_threadsafe(
|
|
|
|
|
|
self._send_to_backend(reconnect_message),
|
|
|
|
|
|
self.loop
|
|
|
|
|
|
)
|
|
|
|
|
|
print(f"[重连] 已上报{platform}平台状态")
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
print(f"[重连] 上报{platform}平台状态失败: {e}")
|
|
|
|
|
|
else:
|
|
|
|
|
|
print("[重连] 当前无活跃平台连接,跳过状态上报")
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
print(f"[重连] 状态上报过程异常: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
def _notify_connection_status(self, connected: bool):
|
|
|
|
|
|
"""通知后端连接状态变化"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
if not self.loop:
|
|
|
|
|
|
return
|
2025-09-29 15:38:34 +08:00
|
|
|
|
|
2025-09-20 16:13:23 +08:00
|
|
|
|
# 获取当前活跃平台的store_id
|
|
|
|
|
|
active_store_id = None
|
|
|
|
|
|
try:
|
|
|
|
|
|
from Utils.JD.JdUtils import WebsocketManager as JdManager
|
2025-09-29 15:38:34 +08:00
|
|
|
|
from Utils.Dy.DyUtils import DouYinWebsocketManager as DyManager
|
2025-09-20 16:13:23 +08:00
|
|
|
|
from Utils.Pdd.PddUtils import WebsocketManager as PddManager
|
2025-09-29 15:38:34 +08:00
|
|
|
|
|
2025-09-20 16:13:23 +08:00
|
|
|
|
# 检查各平台是否有活跃连接
|
|
|
|
|
|
for mgr_class, platform_name in [(JdManager, "京东"), (DyManager, "抖音"), (PddManager, "拼多多")]:
|
|
|
|
|
|
try:
|
|
|
|
|
|
mgr = mgr_class()
|
|
|
|
|
|
if hasattr(mgr, '_store') and mgr._store:
|
|
|
|
|
|
for shop_key, entry in mgr._store.items():
|
|
|
|
|
|
if entry and entry.get('platform'):
|
|
|
|
|
|
# 从shop_key中提取store_id(格式:平台:store_id)
|
|
|
|
|
|
if ':' in shop_key:
|
|
|
|
|
|
_, store_id = shop_key.split(':', 1)
|
|
|
|
|
|
active_store_id = store_id
|
|
|
|
|
|
print(f"[状态] 检测到活跃{platform_name}平台: {store_id}")
|
|
|
|
|
|
break
|
|
|
|
|
|
if active_store_id:
|
|
|
|
|
|
break
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
print(f"[状态] 检查{platform_name}平台失败: {e}")
|
|
|
|
|
|
continue
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
print(f"[状态] 获取活跃平台信息失败: {e}")
|
2025-09-29 15:38:34 +08:00
|
|
|
|
|
2025-09-20 16:13:23 +08:00
|
|
|
|
status_message = {
|
|
|
|
|
|
"type": "connection_status",
|
|
|
|
|
|
"status": connected,
|
|
|
|
|
|
"timestamp": int(time.time()),
|
|
|
|
|
|
"client_uuid": self.uuid
|
|
|
|
|
|
}
|
2025-09-29 15:38:34 +08:00
|
|
|
|
|
2025-09-20 16:13:23 +08:00
|
|
|
|
# 如果有活跃平台,添加store_id
|
|
|
|
|
|
if active_store_id:
|
|
|
|
|
|
status_message["store_id"] = active_store_id
|
|
|
|
|
|
print(f"[状态] 添加store_id到状态消息: {active_store_id}")
|
|
|
|
|
|
else:
|
|
|
|
|
|
print(f"[状态] 未检测到活跃平台,不添加store_id")
|
2025-09-29 15:38:34 +08:00
|
|
|
|
|
2025-09-20 16:13:23 +08:00
|
|
|
|
# 异步发送状态通知
|
|
|
|
|
|
asyncio.run_coroutine_threadsafe(
|
|
|
|
|
|
self._send_to_backend(status_message),
|
|
|
|
|
|
self.loop
|
|
|
|
|
|
)
|
2025-09-29 15:38:34 +08:00
|
|
|
|
|
2025-09-20 16:13:23 +08:00
|
|
|
|
status_text = "连接" if connected else "断开"
|
|
|
|
|
|
print(f"[状态] 已通知后端GUI客户端{status_text}")
|
2025-09-29 15:38:34 +08:00
|
|
|
|
|
2025-09-20 16:13:23 +08:00
|
|
|
|
except Exception as e:
|
|
|
|
|
|
print(f"[状态] 发送状态通知失败: {e}")
|
|
|
|
|
|
import traceback
|
|
|
|
|
|
print(f"[状态] 详细错误: {traceback.format_exc()}")
|
|
|
|
|
|
|
2025-09-12 20:42:00 +08:00
|
|
|
|
def on_message_received(self, message: Dict[str, Any]):
|
|
|
|
|
|
"""处理接收到的消息 - 根据WebSocket文档v2更新"""
|
|
|
|
|
|
# 统一打印后端下发的完整消息结构体
|
|
|
|
|
|
try:
|
|
|
|
|
|
import json as _json
|
|
|
|
|
|
print("=== Backend -> GUI Message ===")
|
|
|
|
|
|
print(_json.dumps(message, ensure_ascii=False, indent=2))
|
|
|
|
|
|
print("=== End Message ===")
|
|
|
|
|
|
except Exception:
|
|
|
|
|
|
pass
|
|
|
|
|
|
msg_type = message.get('type')
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
if msg_type == 'get_store':
|
|
|
|
|
|
self._handle_store_list(message)
|
|
|
|
|
|
elif msg_type == 'get_customers':
|
|
|
|
|
|
self._handle_customers_list(message)
|
|
|
|
|
|
elif msg_type == 'success':
|
|
|
|
|
|
print("后端连接服务成功")
|
|
|
|
|
|
# 可在此触发上层UI通知
|
|
|
|
|
|
if self.success_callback:
|
|
|
|
|
|
try:
|
|
|
|
|
|
self.success_callback()
|
|
|
|
|
|
except Exception:
|
|
|
|
|
|
pass
|
|
|
|
|
|
elif msg_type == 'message':
|
|
|
|
|
|
self._handle_message(message)
|
|
|
|
|
|
elif msg_type == 'transfer':
|
|
|
|
|
|
self._handle_transfer(message)
|
|
|
|
|
|
elif msg_type == 'close':
|
|
|
|
|
|
self._handle_close(message)
|
|
|
|
|
|
elif msg_type == 'pong':
|
|
|
|
|
|
self._handle_pong(message)
|
|
|
|
|
|
elif msg_type == 'connect_success': # 兼容旧版
|
|
|
|
|
|
self._handle_connect_success(message)
|
|
|
|
|
|
elif msg_type == 'login': # 新版:后台下发平台cookies
|
|
|
|
|
|
self._handle_login(message)
|
|
|
|
|
|
elif msg_type == 'error':
|
|
|
|
|
|
self._handle_error_message(message)
|
2025-09-28 17:00:02 +08:00
|
|
|
|
elif msg_type == 'error_token':
|
|
|
|
|
|
self._handle_token_error(message)
|
2025-09-12 20:42:00 +08:00
|
|
|
|
elif msg_type == 'staff_list':
|
|
|
|
|
|
self._handle_staff_list(message)
|
2025-09-29 15:38:34 +08:00
|
|
|
|
elif msg_type == 'version_response': # 新增:版本检查响应
|
|
|
|
|
|
self._handle_version_response(message)
|
2025-09-12 20:42:00 +08:00
|
|
|
|
else:
|
|
|
|
|
|
print(f"未知消息类型: {msg_type}")
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
error_msg = f"处理消息异常: {e}"
|
|
|
|
|
|
print(error_msg)
|
|
|
|
|
|
if self.error_callback:
|
|
|
|
|
|
self.error_callback(error_msg, message)
|
|
|
|
|
|
|
|
|
|
|
|
def on_error(self, error: str):
|
|
|
|
|
|
"""错误处理"""
|
|
|
|
|
|
print(f"后端连接错误: {error}")
|
|
|
|
|
|
if self.error_callback:
|
|
|
|
|
|
self.error_callback(error, None)
|
|
|
|
|
|
|
|
|
|
|
|
# ==================== 发送消息方法 ====================
|
|
|
|
|
|
|
|
|
|
|
|
def send_message(self, message: Dict[str, Any]):
|
|
|
|
|
|
"""
|
|
|
|
|
|
发送消息到后端
|
|
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
|
message: 要发送的消息字典
|
|
|
|
|
|
"""
|
|
|
|
|
|
if not self.is_connected or not self.loop:
|
2025-09-20 16:13:23 +08:00
|
|
|
|
error_msg = "WebSocket未连接"
|
|
|
|
|
|
if self.is_reconnecting:
|
|
|
|
|
|
error_msg += "(正在重连中...)"
|
|
|
|
|
|
raise Exception(error_msg)
|
2025-09-12 20:42:00 +08:00
|
|
|
|
|
2025-09-20 16:13:23 +08:00
|
|
|
|
try:
|
|
|
|
|
|
future = asyncio.run_coroutine_threadsafe(
|
|
|
|
|
|
self._send_to_backend(message), self.loop
|
|
|
|
|
|
)
|
|
|
|
|
|
return future.result(timeout=8)
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
# 发送失败时检查连接状态
|
|
|
|
|
|
if not self.is_connected:
|
|
|
|
|
|
print(f"[重连] 消息发送失败,连接已断开: {e}")
|
|
|
|
|
|
raise e
|
2025-09-12 20:42:00 +08:00
|
|
|
|
|
|
|
|
|
|
async def _send_to_backend(self, message: Dict[str, Any]):
|
|
|
|
|
|
"""异步发送消息到后端"""
|
|
|
|
|
|
if not self.websocket:
|
|
|
|
|
|
raise Exception("WebSocket连接不存在")
|
|
|
|
|
|
|
|
|
|
|
|
import json
|
|
|
|
|
|
message_str = json.dumps(message, ensure_ascii=False)
|
|
|
|
|
|
await self.websocket.send(message_str)
|
|
|
|
|
|
print(f"发送消息到后端: {message}")
|
|
|
|
|
|
|
2025-09-20 16:13:23 +08:00
|
|
|
|
def send_ping(self, custom_uuid: str = None):
|
2025-09-12 20:42:00 +08:00
|
|
|
|
"""
|
|
|
|
|
|
发送心跳包
|
|
|
|
|
|
"""
|
2025-09-20 16:13:23 +08:00
|
|
|
|
# 生成简单的ping UUID
|
|
|
|
|
|
ping_uuid = custom_uuid or f"ping_{int(time.time())}"
|
|
|
|
|
|
|
2025-09-12 20:42:00 +08:00
|
|
|
|
ping_message = {
|
|
|
|
|
|
'type': 'ping',
|
2025-09-20 16:13:23 +08:00
|
|
|
|
'uuid': ping_uuid
|
2025-09-12 20:42:00 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
return self.send_message(ping_message)
|
|
|
|
|
|
|
|
|
|
|
|
def get_store(self):
|
|
|
|
|
|
"""获取店铺信息"""
|
|
|
|
|
|
message = {
|
|
|
|
|
|
'type': 'get_store',
|
|
|
|
|
|
'token': self.token or ''
|
|
|
|
|
|
}
|
|
|
|
|
|
return self.send_message(message)
|
|
|
|
|
|
|
|
|
|
|
|
def send_text_message(self, content: str, sender_id: str, store_id: str, pin_image: str = None):
|
|
|
|
|
|
"""发送文本消息 - 根据WebSocket文档v2更新"""
|
|
|
|
|
|
message = {
|
|
|
|
|
|
'type': 'message',
|
|
|
|
|
|
'content': content,
|
|
|
|
|
|
'msg_type': 'text',
|
|
|
|
|
|
'sender': {'id': sender_id},
|
|
|
|
|
|
'store_id': store_id
|
|
|
|
|
|
}
|
|
|
|
|
|
if pin_image:
|
|
|
|
|
|
message['pin_image'] = pin_image
|
|
|
|
|
|
return self.send_message(message)
|
|
|
|
|
|
|
|
|
|
|
|
def send_image_message(self, image_url: str, sender_id: str, store_id: str, pin_image: str = None):
|
|
|
|
|
|
"""发送图片消息 - 根据WebSocket文档v2更新"""
|
|
|
|
|
|
message = {
|
|
|
|
|
|
'type': 'message',
|
|
|
|
|
|
'content': image_url,
|
|
|
|
|
|
'msg_type': 'image',
|
|
|
|
|
|
'sender': {'id': sender_id},
|
|
|
|
|
|
'store_id': store_id
|
|
|
|
|
|
}
|
|
|
|
|
|
if pin_image:
|
|
|
|
|
|
message['pin_image'] = pin_image
|
|
|
|
|
|
return self.send_message(message)
|
|
|
|
|
|
|
|
|
|
|
|
def send_video_message(self, video_url: str, sender_id: str, store_id: str, pin_image: str = None):
|
|
|
|
|
|
"""发送视频消息 - 根据WebSocket文档v2更新"""
|
|
|
|
|
|
message = {
|
|
|
|
|
|
'type': 'message',
|
|
|
|
|
|
'content': video_url,
|
|
|
|
|
|
'msg_type': 'video',
|
|
|
|
|
|
'sender': {'id': sender_id},
|
|
|
|
|
|
'store_id': store_id
|
|
|
|
|
|
}
|
|
|
|
|
|
if pin_image:
|
|
|
|
|
|
message['pin_image'] = pin_image
|
|
|
|
|
|
return self.send_message(message)
|
|
|
|
|
|
|
|
|
|
|
|
def send_order_card(self, product_id: str, order_number: str, sender_id: str, store_id: str, pin_image: str = None):
|
|
|
|
|
|
"""发送订单卡片 - 根据WebSocket文档v2更新"""
|
|
|
|
|
|
message = {
|
|
|
|
|
|
'type': 'message',
|
|
|
|
|
|
'content': f'商品id:{product_id} 订单号:{order_number}',
|
|
|
|
|
|
'msg_type': 'order_card',
|
|
|
|
|
|
'sender': {'id': sender_id},
|
|
|
|
|
|
'store_id': store_id
|
|
|
|
|
|
}
|
|
|
|
|
|
if pin_image:
|
|
|
|
|
|
message['pin_image'] = pin_image
|
|
|
|
|
|
return self.send_message(message)
|
|
|
|
|
|
|
|
|
|
|
|
def send_product_card(self, product_url: str, sender_id: str, store_id: str, pin_image: str = None):
|
|
|
|
|
|
"""发送商品卡片 - 根据WebSocket文档v2更新"""
|
|
|
|
|
|
message = {
|
|
|
|
|
|
'type': 'message',
|
|
|
|
|
|
'content': product_url,
|
|
|
|
|
|
'msg_type': 'product_card',
|
|
|
|
|
|
'sender': {'id': sender_id},
|
|
|
|
|
|
'store_id': store_id
|
|
|
|
|
|
}
|
|
|
|
|
|
if pin_image:
|
|
|
|
|
|
message['pin_image'] = pin_image
|
|
|
|
|
|
return self.send_message(message)
|
|
|
|
|
|
|
|
|
|
|
|
def send_staff_list(self, staff_list: List[Dict], store_id: str):
|
|
|
|
|
|
"""发送客服列表 - 根据WebSocket文档v2更新"""
|
|
|
|
|
|
message = {
|
|
|
|
|
|
'type': 'staff_list',
|
|
|
|
|
|
'content': '客服列表更新',
|
|
|
|
|
|
'data': {'staff_list': staff_list},
|
|
|
|
|
|
'store_id': store_id
|
|
|
|
|
|
}
|
|
|
|
|
|
return self.send_message(message)
|
|
|
|
|
|
|
|
|
|
|
|
# 保持向后兼容的旧方法(标记为已废弃)
|
|
|
|
|
|
def send_file_message(self, content: str, uid: str, pin: str, store_id: str):
|
|
|
|
|
|
"""发送文件消息 - 已废弃,请使用send_video_message或send_image_message"""
|
|
|
|
|
|
print("警告: send_file_message已废弃,请根据文件类型使用send_video_message或send_image_message")
|
|
|
|
|
|
# 尝试自动检测文件类型
|
|
|
|
|
|
content_lower = content.lower()
|
|
|
|
|
|
if any(ext in content_lower for ext in ['.mp4', '.avi', '.mov', '.wmv', '.flv']):
|
|
|
|
|
|
return self.send_video_message(content, uid, store_id)
|
|
|
|
|
|
elif any(ext in content_lower for ext in ['.jpg', '.jpeg', '.png', '.gif', '.webp']):
|
|
|
|
|
|
return self.send_image_message(content, uid, store_id)
|
|
|
|
|
|
else:
|
|
|
|
|
|
# 默认作为文本消息发送
|
|
|
|
|
|
return self.send_text_message(content, uid, store_id)
|
|
|
|
|
|
|
|
|
|
|
|
def send_transfer(self, customer: str, pin: str, store_id: str):
|
|
|
|
|
|
"""发送转接消息"""
|
|
|
|
|
|
message = {
|
|
|
|
|
|
'type': 'transfer',
|
|
|
|
|
|
'customer': customer, # 客服名称
|
|
|
|
|
|
'pin': pin, # 顾客名称
|
|
|
|
|
|
'store_id': store_id
|
|
|
|
|
|
}
|
|
|
|
|
|
return self.send_message(message)
|
|
|
|
|
|
|
|
|
|
|
|
def close_store(self, store_id: str):
|
|
|
|
|
|
"""关闭店铺"""
|
|
|
|
|
|
message = {
|
|
|
|
|
|
'type': 'close',
|
|
|
|
|
|
'store_id': store_id
|
|
|
|
|
|
}
|
|
|
|
|
|
return self.send_message(message)
|
|
|
|
|
|
|
|
|
|
|
|
# ==================== 消息处理方法 ====================
|
|
|
|
|
|
|
|
|
|
|
|
def _handle_store_list(self, message: Dict[str, Any]):
|
|
|
|
|
|
"""处理店铺列表"""
|
|
|
|
|
|
store_list = message.get('store_list', [])
|
|
|
|
|
|
print(f"获取到{len(store_list)}个店铺:")
|
|
|
|
|
|
|
|
|
|
|
|
for store in store_list:
|
|
|
|
|
|
merchant_name = store.get('merchant_name', '')
|
|
|
|
|
|
store_id = store.get('store_id', '')
|
|
|
|
|
|
store_platform = store.get('store_platform', '')
|
|
|
|
|
|
print(f" - {merchant_name} (ID: {store_id}, 平台: {store_platform})")
|
|
|
|
|
|
|
|
|
|
|
|
if self.store_list_callback:
|
|
|
|
|
|
self.store_list_callback(store_list)
|
|
|
|
|
|
|
|
|
|
|
|
def _handle_customers_list(self, message: Dict[str, Any]):
|
|
|
|
|
|
"""处理客服列表"""
|
|
|
|
|
|
customers = message.get('customers', [])
|
|
|
|
|
|
store_id = message.get('store_id', '')
|
|
|
|
|
|
|
|
|
|
|
|
print(f"店铺{store_id}的客服列表,共{len(customers)}个客服:")
|
|
|
|
|
|
for customer in customers:
|
|
|
|
|
|
pin = customer.get('pin', '')
|
|
|
|
|
|
nickname = customer.get('nickname', '')
|
|
|
|
|
|
print(f" - {nickname} ({pin})")
|
|
|
|
|
|
|
|
|
|
|
|
if self.customers_callback:
|
|
|
|
|
|
self.customers_callback(customers, store_id)
|
|
|
|
|
|
|
|
|
|
|
|
def _handle_message(self, message: Dict[str, Any]):
|
|
|
|
|
|
"""处理消息"""
|
|
|
|
|
|
store_id = message.get('store_id', '')
|
|
|
|
|
|
data = message.get('data')
|
|
|
|
|
|
content = message.get('content', '')
|
|
|
|
|
|
print(f"[{store_id}] [{message.get('msg_type', 'unknown')}] : {content}")
|
|
|
|
|
|
|
|
|
|
|
|
# 尝试将后端AI/客服回复转发到对应平台
|
|
|
|
|
|
try:
|
|
|
|
|
|
receiver = message.get('receiver') or (data.get('receiver') if isinstance(data, dict) else None) or {}
|
|
|
|
|
|
recv_pin = receiver.get('id')
|
|
|
|
|
|
if recv_pin and store_id:
|
|
|
|
|
|
# 根据store_id动态确定平台类型
|
|
|
|
|
|
platform_type = self._get_platform_by_store_id(store_id)
|
2025-09-13 19:54:30 +08:00
|
|
|
|
|
2025-09-12 20:42:00 +08:00
|
|
|
|
if platform_type == "京东":
|
|
|
|
|
|
self._forward_to_jd(store_id, recv_pin, content)
|
|
|
|
|
|
elif platform_type == "抖音":
|
|
|
|
|
|
self._forward_to_douyin(store_id, recv_pin, content)
|
|
|
|
|
|
elif platform_type == "千牛":
|
|
|
|
|
|
self._forward_to_qianniu(store_id, recv_pin, content)
|
|
|
|
|
|
elif platform_type == "拼多多":
|
|
|
|
|
|
self._forward_to_pdd(store_id, recv_pin, content)
|
|
|
|
|
|
else:
|
|
|
|
|
|
print(f"[Forward] 未知平台类型或未找到店铺: {platform_type}, store_id={store_id}")
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
print(f"转发到平台失败: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
if self.message_callback:
|
|
|
|
|
|
self.message_callback(data if data else message, store_id)
|
|
|
|
|
|
|
|
|
|
|
|
def _get_platform_by_store_id(self, store_id: str) -> str:
|
|
|
|
|
|
"""根据店铺ID获取平台类型"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
# 从WebSocket管理器获取平台信息
|
|
|
|
|
|
from WebSocket.backend_singleton import get_websocket_manager
|
|
|
|
|
|
manager = get_websocket_manager()
|
|
|
|
|
|
if manager and hasattr(manager, 'platform_listeners'):
|
|
|
|
|
|
for key, listener_info in manager.platform_listeners.items():
|
|
|
|
|
|
if listener_info.get('store_id') == store_id:
|
|
|
|
|
|
return listener_info.get('platform', '')
|
|
|
|
|
|
return ""
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
print(f"获取平台类型失败: {e}")
|
|
|
|
|
|
return ""
|
|
|
|
|
|
|
|
|
|
|
|
def _forward_to_jd(self, store_id: str, recv_pin: str, content: str):
|
|
|
|
|
|
"""转发消息到京东平台"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
from Utils.JD.JdUtils import WebsocketManager as JDWSManager
|
|
|
|
|
|
jd_mgr = JDWSManager()
|
|
|
|
|
|
shop_key = f"京东:{store_id}"
|
|
|
|
|
|
entry = jd_mgr.get_connection(shop_key)
|
|
|
|
|
|
if not entry:
|
|
|
|
|
|
print(f"[JD Forward] 未找到连接: {shop_key}")
|
|
|
|
|
|
return
|
2025-09-13 19:54:30 +08:00
|
|
|
|
|
2025-09-12 20:42:00 +08:00
|
|
|
|
platform_info = (entry or {}).get('platform') or {}
|
|
|
|
|
|
ws = platform_info.get('ws')
|
|
|
|
|
|
aid = platform_info.get('aid')
|
|
|
|
|
|
pin_zj = platform_info.get('pin_zj')
|
|
|
|
|
|
vender_id = platform_info.get('vender_id')
|
|
|
|
|
|
loop = platform_info.get('loop')
|
2025-09-13 19:54:30 +08:00
|
|
|
|
|
|
|
|
|
|
print(
|
|
|
|
|
|
f"[JD Forward] shop_key={shop_key} has_ws={bool(ws)} aid={aid} pin_zj={pin_zj} vender_id={vender_id} has_loop={bool(loop)} recv_pin={recv_pin}")
|
|
|
|
|
|
|
2025-09-12 20:42:00 +08:00
|
|
|
|
if ws and aid and pin_zj and vender_id and loop and content:
|
|
|
|
|
|
async def _send():
|
|
|
|
|
|
import hashlib as _hashlib
|
|
|
|
|
|
import time as _time
|
|
|
|
|
|
import json as _json
|
|
|
|
|
|
msg = {
|
|
|
|
|
|
"ver": "4.3",
|
|
|
|
|
|
"type": "chat_message",
|
|
|
|
|
|
"from": {"pin": pin_zj, "app": "im.waiter", "clientType": "comet"},
|
|
|
|
|
|
"to": {"app": "im.customer", "pin": recv_pin},
|
|
|
|
|
|
"id": _hashlib.md5(str(int(_time.time() * 1000)).encode()).hexdigest(),
|
|
|
|
|
|
"lang": "zh_CN",
|
|
|
|
|
|
"aid": aid,
|
|
|
|
|
|
"timestamp": int(_time.time() * 1000),
|
|
|
|
|
|
"readFlag": 0,
|
|
|
|
|
|
"body": {
|
|
|
|
|
|
"content": content,
|
|
|
|
|
|
"translated": False,
|
|
|
|
|
|
"param": {"cusVenderId": vender_id},
|
|
|
|
|
|
"type": "text"
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
await ws.send(_json.dumps(msg))
|
2025-09-13 19:54:30 +08:00
|
|
|
|
|
2025-09-12 20:42:00 +08:00
|
|
|
|
import asyncio as _asyncio
|
|
|
|
|
|
_future = _asyncio.run_coroutine_threadsafe(_send(), loop)
|
|
|
|
|
|
try:
|
|
|
|
|
|
_future.result(timeout=2)
|
|
|
|
|
|
print(f"[JD Forward] 已转发到平台: pin={recv_pin}, content_len={len(content)}")
|
|
|
|
|
|
except Exception as fe:
|
|
|
|
|
|
print(f"[JD Forward] 转发提交失败: {fe}")
|
|
|
|
|
|
else:
|
|
|
|
|
|
print("[JD Forward] 条件不足,未转发:",
|
|
|
|
|
|
{
|
|
|
|
|
|
'has_ws': bool(ws), 'has_aid': bool(aid), 'has_pin_zj': bool(pin_zj),
|
|
|
|
|
|
'has_vender_id': bool(vender_id), 'has_loop': bool(loop), 'has_content': bool(content)
|
|
|
|
|
|
})
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
print(f"[JD Forward] 转发失败: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
def _forward_to_douyin(self, store_id: str, recv_pin: str, content: str):
|
|
|
|
|
|
"""转发消息到抖音平台"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
from Utils.Dy.DyUtils import DouYinWebsocketManager
|
|
|
|
|
|
dy_mgr = DouYinWebsocketManager()
|
|
|
|
|
|
shop_key = f"抖音:{store_id}"
|
|
|
|
|
|
entry = dy_mgr.get_connection(shop_key)
|
2025-09-13 19:54:30 +08:00
|
|
|
|
|
2025-09-12 20:42:00 +08:00
|
|
|
|
if not entry:
|
|
|
|
|
|
print(f"[DY Forward] 未找到连接: {shop_key}")
|
|
|
|
|
|
return
|
2025-09-13 19:54:30 +08:00
|
|
|
|
|
2025-09-12 20:42:00 +08:00
|
|
|
|
platform_info = entry.get('platform', {})
|
|
|
|
|
|
douyin_bot = platform_info.get('douyin_bot')
|
|
|
|
|
|
message_handler = platform_info.get('message_handler')
|
2025-09-13 19:54:30 +08:00
|
|
|
|
|
|
|
|
|
|
print(
|
|
|
|
|
|
f"[DY Forward] shop_key={shop_key} has_bot={bool(douyin_bot)} has_handler={bool(message_handler)} recv_pin={recv_pin}")
|
|
|
|
|
|
|
2025-09-12 20:42:00 +08:00
|
|
|
|
if douyin_bot and message_handler and content:
|
|
|
|
|
|
# 在消息处理器的事件循环中发送消息
|
|
|
|
|
|
def send_in_loop():
|
|
|
|
|
|
try:
|
|
|
|
|
|
# 获取消息处理器的事件循环
|
|
|
|
|
|
loop = message_handler._loop
|
|
|
|
|
|
if loop and not loop.is_closed():
|
|
|
|
|
|
# 在事件循环中执行发送
|
|
|
|
|
|
future = asyncio.run_coroutine_threadsafe(
|
|
|
|
|
|
message_handler.send_message_external(recv_pin, content),
|
|
|
|
|
|
loop
|
|
|
|
|
|
)
|
|
|
|
|
|
# 等待结果
|
|
|
|
|
|
try:
|
|
|
|
|
|
result = future.result(timeout=5)
|
|
|
|
|
|
if result:
|
|
|
|
|
|
print(f"[DY Forward] 已转发到平台: pin={recv_pin}, content_len={len(content)}")
|
|
|
|
|
|
else:
|
|
|
|
|
|
print(f"[DY Forward] 转发失败: 消息处理器返回False")
|
|
|
|
|
|
except Exception as fe:
|
|
|
|
|
|
print(f"[DY Forward] 转发执行失败: {fe}")
|
|
|
|
|
|
else:
|
|
|
|
|
|
print(f"[DY Forward] 事件循环不可用")
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
print(f"[DY Forward] 发送过程异常: {e}")
|
2025-09-13 19:54:30 +08:00
|
|
|
|
|
2025-09-12 20:42:00 +08:00
|
|
|
|
# 在新线程中执行发送操作
|
|
|
|
|
|
import threading
|
|
|
|
|
|
send_thread = threading.Thread(target=send_in_loop, daemon=True)
|
|
|
|
|
|
send_thread.start()
|
2025-09-13 19:54:30 +08:00
|
|
|
|
|
2025-09-12 20:42:00 +08:00
|
|
|
|
else:
|
|
|
|
|
|
print("[DY Forward] 条件不足,未转发:",
|
|
|
|
|
|
{
|
2025-09-13 19:54:30 +08:00
|
|
|
|
'has_bot': bool(douyin_bot),
|
|
|
|
|
|
'has_handler': bool(message_handler),
|
2025-09-12 20:42:00 +08:00
|
|
|
|
'has_content': bool(content)
|
|
|
|
|
|
})
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
print(f"[DY Forward] 转发失败: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
def _forward_to_qianniu(self, store_id: str, recv_pin: str, content: str):
|
|
|
|
|
|
"""转发消息到千牛平台"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
# TODO: 实现千牛平台的消息转发逻辑
|
2025-09-13 19:54:30 +08:00
|
|
|
|
print(
|
|
|
|
|
|
f"[QN Forward] 千牛平台消息转发功能待实现: store_id={store_id}, recv_pin={recv_pin}, content={content}")
|
2025-09-12 20:42:00 +08:00
|
|
|
|
except Exception as e:
|
|
|
|
|
|
print(f"[QN Forward] 转发失败: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
def _forward_to_pdd(self, store_id: str, recv_pin: str, content: str):
|
|
|
|
|
|
"""转发消息到拼多多平台"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
from Utils.Pdd.PddUtils import WebsocketManager as PDDWSManager
|
|
|
|
|
|
pdd_mgr = PDDWSManager()
|
|
|
|
|
|
shop_key = f"拼多多:{store_id}"
|
|
|
|
|
|
entry = pdd_mgr.get_connection(shop_key)
|
2025-09-13 19:54:30 +08:00
|
|
|
|
|
2025-09-12 20:42:00 +08:00
|
|
|
|
if not entry:
|
|
|
|
|
|
print(f"[PDD Forward] 未找到连接: {shop_key}")
|
|
|
|
|
|
return
|
2025-09-13 19:54:30 +08:00
|
|
|
|
|
2025-09-12 20:42:00 +08:00
|
|
|
|
platform_info = entry.get('platform', {})
|
|
|
|
|
|
pdd_instance = platform_info.get('pdd_instance')
|
|
|
|
|
|
loop = platform_info.get('loop')
|
2025-09-13 19:54:30 +08:00
|
|
|
|
|
|
|
|
|
|
print(
|
|
|
|
|
|
f"[PDD Forward] shop_key={shop_key} has_pdd_instance={bool(pdd_instance)} has_loop={bool(loop)} recv_pin={recv_pin}")
|
|
|
|
|
|
|
2025-09-12 20:42:00 +08:00
|
|
|
|
if pdd_instance and loop and content:
|
|
|
|
|
|
# 在拼多多实例的事件循环中发送消息
|
|
|
|
|
|
def send_in_loop():
|
|
|
|
|
|
try:
|
|
|
|
|
|
# 在事件循环中执行发送
|
|
|
|
|
|
future = asyncio.run_coroutine_threadsafe(
|
|
|
|
|
|
pdd_instance.send_message_external(recv_pin, content),
|
|
|
|
|
|
loop
|
|
|
|
|
|
)
|
|
|
|
|
|
# 等待结果
|
|
|
|
|
|
try:
|
|
|
|
|
|
result = future.result(timeout=10) # 拼多多可能需要更长时间
|
|
|
|
|
|
if result:
|
|
|
|
|
|
print(f"[PDD Forward] 已转发到平台: uid={recv_pin}, content_len={len(content)}")
|
|
|
|
|
|
else:
|
|
|
|
|
|
print(f"[PDD Forward] 转发失败: 拼多多实例返回False")
|
|
|
|
|
|
except Exception as fe:
|
|
|
|
|
|
print(f"[PDD Forward] 转发执行失败: {fe}")
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
print(f"[PDD Forward] 发送过程异常: {e}")
|
2025-09-13 19:54:30 +08:00
|
|
|
|
|
2025-09-12 20:42:00 +08:00
|
|
|
|
# 在新线程中执行发送操作
|
|
|
|
|
|
import threading
|
|
|
|
|
|
send_thread = threading.Thread(target=send_in_loop, daemon=True)
|
|
|
|
|
|
send_thread.start()
|
2025-09-13 19:54:30 +08:00
|
|
|
|
|
2025-09-12 20:42:00 +08:00
|
|
|
|
else:
|
|
|
|
|
|
print("[PDD Forward] 条件不足,未转发:",
|
|
|
|
|
|
{
|
2025-09-13 19:54:30 +08:00
|
|
|
|
'has_pdd_instance': bool(pdd_instance),
|
|
|
|
|
|
'has_loop': bool(loop),
|
2025-09-12 20:42:00 +08:00
|
|
|
|
'has_content': bool(content)
|
|
|
|
|
|
})
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
print(f"[PDD Forward] 转发失败: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
def _transfer_to_pdd(self, customer_service_id: str, user_id: str, store_id: str):
|
|
|
|
|
|
"""执行拼多多平台转接操作"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
from Utils.Pdd.PddUtils import WebsocketManager as PDDWSManager
|
|
|
|
|
|
pdd_mgr = PDDWSManager()
|
|
|
|
|
|
shop_key = f"拼多多:{store_id}"
|
|
|
|
|
|
entry = pdd_mgr.get_connection(shop_key)
|
2025-09-13 19:54:30 +08:00
|
|
|
|
|
2025-09-12 20:42:00 +08:00
|
|
|
|
if not entry:
|
|
|
|
|
|
print(f"[PDD Transfer] 未找到拼多多连接: {shop_key}")
|
|
|
|
|
|
return
|
2025-09-13 19:54:30 +08:00
|
|
|
|
|
2025-09-12 20:42:00 +08:00
|
|
|
|
platform_info = entry.get('platform', {})
|
|
|
|
|
|
pdd_instance = platform_info.get('pdd_instance')
|
|
|
|
|
|
loop = platform_info.get('loop')
|
2025-09-13 19:54:30 +08:00
|
|
|
|
|
2025-09-12 20:42:00 +08:00
|
|
|
|
print(f"[PDD Transfer] 找到拼多多连接,准备执行转接: user_id={user_id}, cs_id={customer_service_id}")
|
2025-09-13 19:54:30 +08:00
|
|
|
|
|
2025-09-12 20:42:00 +08:00
|
|
|
|
if pdd_instance and loop:
|
|
|
|
|
|
# 设置目标客服ID并执行转接
|
|
|
|
|
|
def transfer_in_loop():
|
|
|
|
|
|
try:
|
|
|
|
|
|
# 设置目标客服ID
|
|
|
|
|
|
pdd_instance.csid = customer_service_id
|
2025-09-13 19:54:30 +08:00
|
|
|
|
|
2025-09-12 20:42:00 +08:00
|
|
|
|
# 在事件循环中执行转接
|
|
|
|
|
|
future = asyncio.run_coroutine_threadsafe(
|
|
|
|
|
|
pdd_instance.handle_transfer_message({
|
|
|
|
|
|
"content": customer_service_id,
|
|
|
|
|
|
"receiver": {"id": user_id}
|
|
|
|
|
|
}),
|
|
|
|
|
|
loop
|
|
|
|
|
|
)
|
2025-09-13 19:54:30 +08:00
|
|
|
|
|
2025-09-12 20:42:00 +08:00
|
|
|
|
# 等待转接结果
|
|
|
|
|
|
try:
|
|
|
|
|
|
result = future.result(timeout=15) # 转接可能需要更长时间
|
|
|
|
|
|
if result:
|
|
|
|
|
|
print(f"[PDD Transfer] ✅ 转接成功: user_id={user_id} -> cs_id={customer_service_id}")
|
|
|
|
|
|
else:
|
|
|
|
|
|
print(f"[PDD Transfer] ❌ 转接失败: user_id={user_id}")
|
|
|
|
|
|
except Exception as fe:
|
|
|
|
|
|
print(f"[PDD Transfer] 转接执行失败: {fe}")
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
print(f"[PDD Transfer] 转接过程异常: {e}")
|
2025-09-13 19:54:30 +08:00
|
|
|
|
|
2025-09-12 20:42:00 +08:00
|
|
|
|
# 在新线程中执行转接操作
|
|
|
|
|
|
import threading
|
|
|
|
|
|
transfer_thread = threading.Thread(target=transfer_in_loop, daemon=True)
|
|
|
|
|
|
transfer_thread.start()
|
2025-09-13 19:54:30 +08:00
|
|
|
|
|
2025-09-12 20:42:00 +08:00
|
|
|
|
else:
|
|
|
|
|
|
print(f"[PDD Transfer] 条件不足: has_pdd_instance={bool(pdd_instance)}, has_loop={bool(loop)}")
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
print(f"[PDD Transfer] 拼多多转接失败: {e}")
|
|
|
|
|
|
|
2025-09-15 14:16:14 +08:00
|
|
|
|
def _transfer_to_jd(self, customer_service_id: str, user_id: str, store_id: str):
|
|
|
|
|
|
"""执行京东平台转接操作"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
from Utils.JD.JdUtils import WebsocketManager as JDWSManager
|
|
|
|
|
|
jd_mgr = JDWSManager()
|
|
|
|
|
|
shop_key = f"京东:{store_id}"
|
|
|
|
|
|
entry = jd_mgr.get_connection(shop_key)
|
|
|
|
|
|
|
|
|
|
|
|
if not entry:
|
|
|
|
|
|
print(f"[JD Transfer] 未找到京东连接: {shop_key}")
|
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
platform_info = entry.get('platform', {})
|
|
|
|
|
|
websocket = platform_info.get('ws') # 京东使用'ws'字段
|
|
|
|
|
|
aid = platform_info.get('aid', '')
|
|
|
|
|
|
pin_zj = platform_info.get('pin_zj', '')
|
|
|
|
|
|
|
|
|
|
|
|
print(f"[JD Transfer] 找到京东连接,准备执行转接: user_id={user_id}, cs_id={customer_service_id}")
|
|
|
|
|
|
print(f"[JD Transfer] 连接信息: has_ws={bool(websocket)}, aid={aid}, pin_zj={pin_zj}")
|
|
|
|
|
|
|
|
|
|
|
|
if websocket:
|
|
|
|
|
|
# 执行转接操作
|
|
|
|
|
|
def transfer_in_loop():
|
|
|
|
|
|
try:
|
|
|
|
|
|
# 导入京东工具类
|
|
|
|
|
|
from Utils.JD.JdUtils import FixJdCookie
|
2025-09-20 16:13:23 +08:00
|
|
|
|
|
2025-09-15 14:16:14 +08:00
|
|
|
|
# 创建临时实例用于转接
|
|
|
|
|
|
jd_instance = FixJdCookie()
|
2025-09-20 16:13:23 +08:00
|
|
|
|
|
2025-09-15 14:16:14 +08:00
|
|
|
|
# 执行转接操作
|
|
|
|
|
|
loop = asyncio.new_event_loop()
|
|
|
|
|
|
asyncio.set_event_loop(loop)
|
|
|
|
|
|
try:
|
|
|
|
|
|
result = loop.run_until_complete(
|
|
|
|
|
|
jd_instance.transfer_customer(
|
|
|
|
|
|
websocket, aid, user_id, pin_zj, customer_service_id
|
|
|
|
|
|
)
|
|
|
|
|
|
)
|
2025-09-20 16:13:23 +08:00
|
|
|
|
|
2025-09-15 14:16:14 +08:00
|
|
|
|
if result:
|
|
|
|
|
|
print(f"[JD Transfer] ✅ 转接成功: user_id={user_id} -> cs_id={customer_service_id}")
|
|
|
|
|
|
else:
|
|
|
|
|
|
print(f"[JD Transfer] ❌ 转接失败: user_id={user_id}")
|
|
|
|
|
|
finally:
|
|
|
|
|
|
loop.close()
|
2025-09-20 16:13:23 +08:00
|
|
|
|
|
2025-09-15 14:16:14 +08:00
|
|
|
|
except Exception as e:
|
|
|
|
|
|
print(f"[JD Transfer] 转接过程异常: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
# 在新线程中执行转接操作
|
|
|
|
|
|
import threading
|
|
|
|
|
|
transfer_thread = threading.Thread(target=transfer_in_loop, daemon=True)
|
|
|
|
|
|
transfer_thread.start()
|
|
|
|
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
|
print(f"[JD Transfer] 条件不足: has_ws={bool(websocket)}, aid='{aid}', pin_zj='{pin_zj}'")
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
print(f"[JD Transfer] 京东转接失败: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
def _transfer_to_dy(self, customer_service_id: str, user_id: str, store_id: str):
|
|
|
|
|
|
"""执行抖音平台转接操作"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
from Utils.Dy.DyUtils import DouYinWebsocketManager as DYWSManager
|
|
|
|
|
|
dy_mgr = DYWSManager()
|
|
|
|
|
|
shop_key = f"抖音:{store_id}"
|
|
|
|
|
|
entry = dy_mgr.get_connection(shop_key)
|
|
|
|
|
|
|
|
|
|
|
|
if not entry:
|
|
|
|
|
|
print(f"[DY Transfer] 未找到抖音连接: {shop_key}")
|
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
platform_info = entry.get('platform', {})
|
|
|
|
|
|
dy_instance = platform_info.get('douyin_bot') # 修正字段名称
|
|
|
|
|
|
cookie_dict = platform_info.get('cookie', {})
|
|
|
|
|
|
|
|
|
|
|
|
print(f"[DY Transfer] 找到抖音连接,准备执行转接: user_id={user_id}, cs_id={customer_service_id}")
|
|
|
|
|
|
print(f"[DY Transfer] 连接信息: has_douyin_bot={bool(dy_instance)}, has_cookie={bool(cookie_dict)}")
|
|
|
|
|
|
|
|
|
|
|
|
if dy_instance:
|
|
|
|
|
|
# 执行转接操作
|
|
|
|
|
|
def transfer_in_loop():
|
|
|
|
|
|
try:
|
|
|
|
|
|
# 抖音转接通过message_handler执行
|
|
|
|
|
|
if dy_instance and hasattr(dy_instance, 'message_handler') and dy_instance.message_handler:
|
|
|
|
|
|
# 获取实际的抖音店铺ID(从cookie中获取)
|
|
|
|
|
|
shop_id = cookie_dict.get('SHOP_ID', store_id) # 优先使用cookie中的SHOP_ID
|
|
|
|
|
|
print(f"[DY Transfer] 使用shop_id: {shop_id}")
|
2025-09-20 16:13:23 +08:00
|
|
|
|
print(
|
|
|
|
|
|
f"[DY Transfer] 转接参数: receiver_id={user_id}, shop_id={shop_id}, staff_id={customer_service_id}")
|
|
|
|
|
|
|
2025-09-15 14:16:14 +08:00
|
|
|
|
# 检查是否是自己转给自己的情况
|
|
|
|
|
|
try:
|
|
|
|
|
|
# 获取可用客服列表来验证
|
|
|
|
|
|
staff_list = dy_instance.message_handler.get_casl()
|
|
|
|
|
|
if staff_list:
|
|
|
|
|
|
print(f"[DY Transfer] 当前可用客服数量: {len(staff_list)}")
|
|
|
|
|
|
if len(staff_list) <= 1:
|
|
|
|
|
|
print(f"[DY Transfer] ⚠️ 只有一个客服在线,可能无法转接")
|
2025-09-20 16:13:23 +08:00
|
|
|
|
|
2025-09-15 14:16:14 +08:00
|
|
|
|
# 查找目标客服信息
|
|
|
|
|
|
target_staff = None
|
|
|
|
|
|
for staff in staff_list:
|
|
|
|
|
|
if str(staff.get('staffId', '')) == str(customer_service_id):
|
|
|
|
|
|
target_staff = staff
|
|
|
|
|
|
break
|
2025-09-20 16:13:23 +08:00
|
|
|
|
|
2025-09-15 14:16:14 +08:00
|
|
|
|
if target_staff:
|
2025-09-20 16:13:23 +08:00
|
|
|
|
print(
|
|
|
|
|
|
f"[DY Transfer] 找到目标客服: {target_staff.get('staffName', 'Unknown')} (ID: {customer_service_id})")
|
2025-09-15 14:16:14 +08:00
|
|
|
|
else:
|
|
|
|
|
|
print(f"[DY Transfer] ⚠️ 未找到目标客服ID: {customer_service_id}")
|
2025-09-20 16:13:23 +08:00
|
|
|
|
print(
|
|
|
|
|
|
f"[DY Transfer] 可用客服列表: {[{'id': s.get('staffId'), 'name': s.get('staffName')} for s in staff_list]}")
|
2025-09-15 14:16:14 +08:00
|
|
|
|
else:
|
|
|
|
|
|
print(f"[DY Transfer] ⚠️ 无法获取客服列表")
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
print(f"[DY Transfer] 获取客服列表时出错: {e}")
|
2025-09-20 16:13:23 +08:00
|
|
|
|
|
2025-09-15 14:16:14 +08:00
|
|
|
|
# 执行同步转接操作
|
|
|
|
|
|
result = dy_instance.message_handler.transfer_conversation(
|
|
|
|
|
|
receiver_id=user_id,
|
|
|
|
|
|
shop_id=shop_id,
|
|
|
|
|
|
staff_id=customer_service_id
|
|
|
|
|
|
)
|
2025-09-20 16:13:23 +08:00
|
|
|
|
|
2025-09-15 14:16:14 +08:00
|
|
|
|
if result:
|
|
|
|
|
|
print(f"[DY Transfer] ✅ 转接成功: user_id={user_id} -> cs_id={customer_service_id}")
|
|
|
|
|
|
else:
|
|
|
|
|
|
print(f"[DY Transfer] ❌ 转接失败: user_id={user_id}")
|
2025-09-20 16:13:23 +08:00
|
|
|
|
print(
|
|
|
|
|
|
f"[DY Transfer] 💡 可能原因:1) 只有一个客服无法转接 2) 客服ID不存在 3) 权限不足 4) 会话状态不允许转接")
|
2025-09-15 14:16:14 +08:00
|
|
|
|
else:
|
|
|
|
|
|
print(f"[DY Transfer] ⚠️ 抖音实例或message_handler不可用")
|
2025-09-20 16:13:23 +08:00
|
|
|
|
|
2025-09-15 14:16:14 +08:00
|
|
|
|
except Exception as e:
|
|
|
|
|
|
print(f"[DY Transfer] 转接过程异常: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
# 在新线程中执行转接操作
|
|
|
|
|
|
import threading
|
|
|
|
|
|
transfer_thread = threading.Thread(target=transfer_in_loop, daemon=True)
|
|
|
|
|
|
transfer_thread.start()
|
|
|
|
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
|
print(f"[DY Transfer] 条件不足: has_douyin_bot={bool(dy_instance)}")
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
print(f"[DY Transfer] 抖音转接失败: {e}")
|
|
|
|
|
|
|
2025-09-13 19:54:30 +08:00
|
|
|
|
def _transfer_to_qianniu(self, customer_service_id: str, user_id: str, store_id: str):
|
|
|
|
|
|
"""执行千牛平台转接操作"""
|
2025-09-13 15:19:39 +08:00
|
|
|
|
try:
|
2025-09-13 19:54:30 +08:00
|
|
|
|
# TODO: 实现千牛平台转接逻辑
|
|
|
|
|
|
print(f"[QN Transfer] 千牛平台转接功能待实现: user_id={user_id}, cs_id={customer_service_id}")
|
2025-09-13 15:19:39 +08:00
|
|
|
|
except Exception as e:
|
2025-09-13 19:54:30 +08:00
|
|
|
|
print(f"[QN Transfer] 千牛转接失败: {e}")
|
2025-09-13 15:19:39 +08:00
|
|
|
|
|
2025-09-12 20:42:00 +08:00
|
|
|
|
def _handle_transfer(self, message: Dict[str, Any]):
|
|
|
|
|
|
"""处理转接消息"""
|
|
|
|
|
|
# 新版转接消息格式: {"type": "transfer", "content": "客服ID", "receiver": {"id": "用户ID"}, "store_id": "店铺ID"}
|
|
|
|
|
|
customer_service_id = message.get('content', '') # 目标客服ID
|
|
|
|
|
|
receiver_info = message.get('receiver', {})
|
|
|
|
|
|
user_id = receiver_info.get('id', '') # 用户ID
|
|
|
|
|
|
store_id = message.get('store_id', '')
|
|
|
|
|
|
|
|
|
|
|
|
print(f"转接消息: 顾客{user_id}已转接给客服{customer_service_id} (店铺: {store_id})")
|
|
|
|
|
|
|
|
|
|
|
|
# 根据店铺ID确定平台类型并执行转接
|
|
|
|
|
|
try:
|
|
|
|
|
|
platform_type = self._get_platform_by_store_id(store_id)
|
2025-09-13 19:54:30 +08:00
|
|
|
|
|
2025-09-12 20:42:00 +08:00
|
|
|
|
if platform_type == "京东":
|
2025-09-15 14:16:14 +08:00
|
|
|
|
# 京东转接逻辑
|
|
|
|
|
|
self._transfer_to_jd(customer_service_id, user_id, store_id)
|
2025-09-12 20:42:00 +08:00
|
|
|
|
elif platform_type == "抖音":
|
2025-09-15 14:16:14 +08:00
|
|
|
|
# 抖音转接逻辑
|
|
|
|
|
|
self._transfer_to_dy(customer_service_id, user_id, store_id)
|
2025-09-12 20:42:00 +08:00
|
|
|
|
elif platform_type == "千牛":
|
2025-09-13 19:54:30 +08:00
|
|
|
|
# 千牛转接逻辑
|
|
|
|
|
|
self._transfer_to_qianniu(customer_service_id, user_id, store_id)
|
2025-09-12 20:42:00 +08:00
|
|
|
|
elif platform_type == "拼多多":
|
|
|
|
|
|
self._transfer_to_pdd(customer_service_id, user_id, store_id)
|
|
|
|
|
|
else:
|
|
|
|
|
|
print(f"[Transfer] 未知平台类型或未找到店铺: {platform_type}, store_id={store_id}")
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
print(f"执行转接操作失败: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
# 保持旧版回调兼容性
|
|
|
|
|
|
if self.transfer_callback:
|
|
|
|
|
|
self.transfer_callback(customer_service_id, user_id, store_id)
|
|
|
|
|
|
|
|
|
|
|
|
def _handle_close(self, message: Dict[str, Any]):
|
|
|
|
|
|
"""处理店铺关闭"""
|
|
|
|
|
|
store_id = message.get('store_id', '')
|
|
|
|
|
|
print(f"店铺{store_id}已关闭")
|
|
|
|
|
|
|
|
|
|
|
|
if self.close_callback:
|
|
|
|
|
|
self.close_callback(store_id)
|
|
|
|
|
|
|
|
|
|
|
|
def _handle_pong(self, message: Dict[str, Any]):
|
|
|
|
|
|
"""处理心跳响应"""
|
|
|
|
|
|
uuid_received = message.get('uuid', '')
|
|
|
|
|
|
print(f"收到心跳响应: {uuid_received}")
|
|
|
|
|
|
|
|
|
|
|
|
def _handle_connect_success(self, message: Dict[str, Any]):
|
|
|
|
|
|
"""处理连接成功消息(旧版兼容)"""
|
|
|
|
|
|
print("后端连接成功(connect_success)")
|
|
|
|
|
|
if self.token:
|
|
|
|
|
|
self.get_store()
|
|
|
|
|
|
|
|
|
|
|
|
def _handle_login(self, message: Dict[str, Any]):
|
2025-09-13 15:19:39 +08:00
|
|
|
|
"""处理平台登录消息(新版:type=login, cookies/login_params, store_id, platform_name)"""
|
2025-09-12 20:42:00 +08:00
|
|
|
|
cookies = message.get('cookies', '')
|
|
|
|
|
|
store_id = message.get('store_id', '')
|
|
|
|
|
|
platform_name = message.get('platform_name', '')
|
2025-09-13 15:19:39 +08:00
|
|
|
|
content = message.get('content', '')
|
|
|
|
|
|
data = message.get('data', {})
|
2025-09-13 19:54:30 +08:00
|
|
|
|
|
2025-09-29 15:38:34 +08:00
|
|
|
|
# 🔥 判断是登录参数模式还是普通Cookie模式(支持拼多多和抖音)
|
|
|
|
|
|
if (platform_name in ["拼多多", "抖音"] and
|
|
|
|
|
|
(("拼多多登录" in content and data.get('login_params')) or
|
|
|
|
|
|
("抖音登录" in content and data.get('login_flow')))):
|
|
|
|
|
|
# 登录参数模式 - 传递完整的消息JSON给处理器
|
|
|
|
|
|
print(f"收到{platform_name}登录参数: 平台={platform_name}, 店铺={store_id}, 类型={content}")
|
2025-09-13 15:19:39 +08:00
|
|
|
|
if self.login_callback:
|
2025-09-29 15:38:34 +08:00
|
|
|
|
# 传递完整的JSON消息,让处理器来解析登录参数
|
2025-09-16 09:05:11 +08:00
|
|
|
|
import json
|
|
|
|
|
|
full_message = json.dumps(message)
|
|
|
|
|
|
self.login_callback(platform_name, store_id, full_message)
|
2025-09-13 15:19:39 +08:00
|
|
|
|
else:
|
|
|
|
|
|
# 普通Cookie模式
|
|
|
|
|
|
print(f"收到登录指令: 平台={platform_name}, 店铺={store_id}, cookies_len={len(cookies) if cookies else 0}")
|
|
|
|
|
|
if self.login_callback:
|
|
|
|
|
|
self.login_callback(platform_name, store_id, cookies)
|
2025-09-12 20:42:00 +08:00
|
|
|
|
|
|
|
|
|
|
def _handle_error_message(self, message: Dict[str, Any]):
|
|
|
|
|
|
"""处理错误消息"""
|
|
|
|
|
|
error_msg = message.get('error', '未知错误')
|
2025-09-28 17:00:02 +08:00
|
|
|
|
content = message.get('content', '')
|
2025-09-29 15:38:34 +08:00
|
|
|
|
|
2025-09-28 17:00:02 +08:00
|
|
|
|
# 检查是否为token错误(无论type是error还是error_token)
|
|
|
|
|
|
if content == "无效的exe_token" or "无效的exe_token" in content:
|
|
|
|
|
|
print(f"[错误] 检测到token错误: {content}")
|
|
|
|
|
|
self._handle_token_error(message)
|
|
|
|
|
|
return
|
2025-09-29 15:38:34 +08:00
|
|
|
|
|
2025-09-12 20:42:00 +08:00
|
|
|
|
print(f"后端连接错误: {error_msg}")
|
|
|
|
|
|
if self.error_callback:
|
|
|
|
|
|
self.error_callback(error_msg, message)
|
|
|
|
|
|
|
2025-09-28 17:00:02 +08:00
|
|
|
|
def _handle_token_error(self, message: Dict[str, Any]):
|
|
|
|
|
|
"""处理token错误消息 - 无效token时停止重连并显示错误"""
|
|
|
|
|
|
error_content = message.get('content', '无效的exe_token')
|
|
|
|
|
|
print(f"[错误] Token验证失败: {error_content}")
|
2025-09-29 15:38:34 +08:00
|
|
|
|
|
2025-09-28 17:00:02 +08:00
|
|
|
|
# 停止重连机制
|
|
|
|
|
|
self.should_stop = True
|
|
|
|
|
|
self.is_reconnecting = False
|
2025-09-29 15:38:34 +08:00
|
|
|
|
|
2025-09-28 17:00:02 +08:00
|
|
|
|
# 触发token错误回调
|
|
|
|
|
|
if self.token_error_callback:
|
|
|
|
|
|
self.token_error_callback(error_content)
|
2025-09-29 15:38:34 +08:00
|
|
|
|
|
2025-09-28 17:00:02 +08:00
|
|
|
|
# 主动关闭连接
|
|
|
|
|
|
if self.websocket:
|
|
|
|
|
|
asyncio.run_coroutine_threadsafe(self.websocket.close(), self.loop)
|
|
|
|
|
|
|
2025-09-12 20:42:00 +08:00
|
|
|
|
def _handle_staff_list(self, message: Dict[str, Any]):
|
|
|
|
|
|
"""处理客服列表更新消息"""
|
|
|
|
|
|
staff_list = message.get('data', {}).get('staff_list', [])
|
|
|
|
|
|
store_id = message.get('store_id', '')
|
|
|
|
|
|
print(f"店铺{store_id}的客服列表已更新,共{len(staff_list)}个客服:")
|
|
|
|
|
|
for staff in staff_list:
|
|
|
|
|
|
pin = staff.get('pin', '')
|
|
|
|
|
|
nickname = staff.get('nickname', '')
|
|
|
|
|
|
print(f" - {nickname} ({pin})")
|
|
|
|
|
|
|
2025-09-13 19:54:30 +08:00
|
|
|
|
if self.customers_callback: # 假设客服列表更新也触发客服列表回调
|
2025-09-12 20:42:00 +08:00
|
|
|
|
self.customers_callback(staff_list, store_id)
|
|
|
|
|
|
|
2025-09-29 15:38:34 +08:00
|
|
|
|
def _handle_version_response(self, message: Dict[str, Any]):
|
|
|
|
|
|
"""处理版本检查响应"""
|
|
|
|
|
|
latest_version = message.get('latest_version')
|
|
|
|
|
|
download_url = message.get('download_url')
|
|
|
|
|
|
|
|
|
|
|
|
print(f"收到版本检查响应: 最新版本={latest_version}, 下载地址={download_url}")
|
|
|
|
|
|
|
|
|
|
|
|
if self.version_callback:
|
|
|
|
|
|
self.version_callback(message)
|
|
|
|
|
|
|
2025-09-12 20:42:00 +08:00
|
|
|
|
# ==================== 辅助方法 ====================
|
|
|
|
|
|
|
|
|
|
|
|
def set_token(self, token: str):
|
|
|
|
|
|
"""设置或更新令牌"""
|
|
|
|
|
|
self.token = token
|
|
|
|
|
|
|
|
|
|
|
|
def get_connection_info(self) -> Dict[str, Any]:
|
|
|
|
|
|
"""获取连接信息"""
|
|
|
|
|
|
return {
|
|
|
|
|
|
'url': self.url,
|
|
|
|
|
|
'token': self.token,
|
|
|
|
|
|
'uuid': self.uuid,
|
|
|
|
|
|
'is_connected': self.is_connected
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 使用示例
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
|
|
pass
|
|
|
|
|
|
# import time
|
|
|
|
|
|
#
|
|
|
|
|
|
# def on_store_list(stores):
|
|
|
|
|
|
# print(f"回调: 收到{len(stores)}个店铺")
|
|
|
|
|
|
#
|
|
|
|
|
|
# def on_message(data, store_id):
|
|
|
|
|
|
# print(f"回调: 店铺{store_id}收到消息: {data.get('content')}")
|
|
|
|
|
|
#
|
|
|
|
|
|
# def on_error(error, message):
|
|
|
|
|
|
# print(f"回调: 发生错误: {error}")
|
|
|
|
|
|
#
|
|
|
|
|
|
# def on_login(platform_name, store_id, cookies):
|
|
|
|
|
|
# print(f"回调: 登录指令 平台={platform_name}, 店铺={store_id}, cookies_len={len(cookies) if cookies else 0}")
|
|
|
|
|
|
# # 此处触发对应平台的WS连接/更新cookies逻辑,并将该平台WS与store_id绑定
|
|
|
|
|
|
#
|
|
|
|
|
|
# def on_success():
|
|
|
|
|
|
# print("回调: 后端连接成功")
|
|
|
|
|
|
#
|
|
|
|
|
|
# # 创建客户端(新版:单连接)
|
|
|
|
|
|
# client = BackendClient.from_exe_token("your_exe_token_here")
|
|
|
|
|
|
#
|
|
|
|
|
|
# # 设置回调
|
|
|
|
|
|
# client.set_callbacks(
|
|
|
|
|
|
# store_list=on_store_list,
|
|
|
|
|
|
# message=on_message,
|
|
|
|
|
|
# error=on_error,
|
|
|
|
|
|
# login=on_login,
|
|
|
|
|
|
# success=on_success
|
|
|
|
|
|
# )
|
|
|
|
|
|
#
|
|
|
|
|
|
# try:
|
|
|
|
|
|
# # 连接
|
|
|
|
|
|
# client.connect()
|
|
|
|
|
|
#
|
|
|
|
|
|
# # 等待连接
|
|
|
|
|
|
# time.sleep(2)
|
|
|
|
|
|
#
|
|
|
|
|
|
# # 发送心跳
|
|
|
|
|
|
# client.send_ping()
|
|
|
|
|
|
#
|
|
|
|
|
|
# # 保持运行
|
|
|
|
|
|
# while True:
|
|
|
|
|
|
# time.sleep(30)
|
|
|
|
|
|
# client.send_ping()
|
|
|
|
|
|
#
|
|
|
|
|
|
# except KeyboardInterrupt:
|
|
|
|
|
|
# print("用户中断")
|
|
|
|
|
|
# finally:
|
|
|
|
|
|
# client.disconnect()
|