Files
shuidrop_gui/WebSocket/BackendClientPing.py

1160 lines
48 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# WebSocket/BackendClient.py
import json
import threading
import time
import websockets
import uuid
import asyncio
from typing import List, Dict, Any, Optional, Callable
from config import get_gui_ws_url
class BackendClient:
"""后端WebSocket客户端"""
def __init__(self, url: str, token: str = None):
self.token = token
self.uuid = str(uuid.uuid4())
self.url = url
# 消息处理回调函数
self.store_list_callback: Optional[Callable] = None
self.customers_callback: Optional[Callable] = None
self.message_callback: Optional[Callable] = None
self.transfer_callback: Optional[Callable] = None
self.close_callback: Optional[Callable] = None
self.error_callback: Optional[Callable] = None
self.login_callback: Optional[Callable] = None # 新增平台登录下发cookies回调
self.success_callback: Optional[Callable] = None # 新增:后端连接成功回调
self.is_connected = False
# 新增:重连机制相关属性
self.reconnect_attempts = 0
self.max_reconnect_attempts = 10
self.base_reconnect_delay = 2.0
self.max_reconnect_delay = 60.0
self.reconnect_backoff = 1.5
self.is_reconnecting = False
self.should_stop = False
self.websocket = None
self.loop = None
self.thread = None
def connect(self):
"""连接到WebSocket服务器"""
if self.is_connected:
return
self.should_stop = False
self.thread = threading.Thread(target=self._run_loop, daemon=True)
self.thread.start()
def disconnect(self):
"""断开WebSocket连接"""
self.should_stop = True
if self.loop and self.loop.is_running():
asyncio.run_coroutine_threadsafe(self._close(), self.loop)
if self.thread and self.thread.is_alive():
self.thread.join(timeout=3)
self.is_connected = False
async def _close(self):
"""异步关闭连接"""
if self.websocket:
await self.websocket.close()
self.is_connected = False
def _run_loop(self):
"""在新线程中运行事件循环"""
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
try:
self.loop.run_until_complete(self._connect_and_listen())
except Exception as e:
print(f"WebSocket异常: {e}")
finally:
self.loop.close()
async def _connect_and_listen(self):
"""连接并监听消息 - 带重连机制"""
while not self.should_stop:
try:
print(f"正在连接后端WebSocket: {self.url}")
# 建立连接可配置的ping设置
from config import WS_PING_INTERVAL, WS_PING_TIMEOUT, WS_ENABLE_PING
if WS_ENABLE_PING:
self.websocket = await websockets.connect(
self.url,
ping_interval=WS_PING_INTERVAL, # 可配置ping间隔
ping_timeout=WS_PING_TIMEOUT, # 可配置ping超时
close_timeout=10 # 10秒关闭超时
)
print(f"[连接] 已启用心跳ping_interval={WS_PING_INTERVAL}s, ping_timeout={WS_PING_TIMEOUT}s")
else:
self.websocket = await websockets.connect(self.url)
print("[连接] 已禁用心跳机制")
self.is_connected = True
self.reconnect_attempts = 0 # 重置重连计数
self.is_reconnecting = False
print("后端WebSocket连接成功")
self.on_connected()
# 消息循环
async for message in self.websocket:
try:
# 打印原始文本帧与长度
try:
raw_len = len(message.encode('utf-8')) if isinstance(message, str) else len(message)
print(f"后端发送消息体内容:{message}")
except Exception:
pass
data = json.loads(message)
self.on_message_received(data)
except json.JSONDecodeError:
print(f"JSON解析错误: {message}")
except websockets.ConnectionClosed as e:
self.is_connected = False
self._handle_connection_closed(e)
if not await self._should_reconnect():
break
await self._wait_before_reconnect()
except websockets.InvalidURI as e:
self.is_connected = False
print(f"无效的WebSocket URI: {e}")
self.on_error(f"无效的WebSocket URI: {e}")
break
except (websockets.WebSocketException, OSError, ConnectionError) as e:
self.is_connected = False
self._handle_network_error(e)
if not await self._should_reconnect():
break
await self._wait_before_reconnect()
except Exception as e:
self.is_connected = False
self._handle_general_error(e)
if not await self._should_reconnect():
break
await self._wait_before_reconnect()
def _handle_connection_closed(self, error):
"""处理连接关闭"""
error_msg = f"WebSocket连接已关闭: {error.code} {error.reason if hasattr(error, 'reason') else ''}"
print(f"[重连] {error_msg}")
# 特殊处理ping超时等情况
if hasattr(error, 'code'):
if error.code == 1011: # Internal error (ping timeout)
print("[重连] 检测到ping超时这是常见的网络问题")
elif error.code == 1006: # Abnormal closure
print("[重连] 检测到异常关闭,可能是网络中断")
if not self.is_reconnecting:
self.on_error(error_msg)
def _handle_network_error(self, error):
"""处理网络错误"""
error_msg = f"网络连接错误: {type(error).__name__} - {str(error)}"
print(f"[重连] {error_msg}")
if not self.is_reconnecting:
self.on_error(error_msg)
def _handle_general_error(self, error):
"""处理一般错误"""
error_msg = f"WebSocket连接异常: {type(error).__name__} - {str(error)}"
print(f"[重连] {error_msg}")
if not self.is_reconnecting:
self.on_error(error_msg)
async def _should_reconnect(self) -> bool:
"""判断是否应该重连"""
if self.should_stop:
print("[重连] 程序正在关闭,停止重连")
return False
if self.reconnect_attempts >= self.max_reconnect_attempts:
print(f"[重连] 已达到最大重连次数({self.max_reconnect_attempts}),停止重连")
# 通知上层重连失败
if not self.is_reconnecting:
self.on_error(f"重连失败:已达到最大重连次数({self.max_reconnect_attempts})")
return False
return True
async def _wait_before_reconnect(self):
"""重连前等待(指数退避)"""
delay = min(
self.base_reconnect_delay * (self.reconnect_backoff ** self.reconnect_attempts),
self.max_reconnect_delay
)
self.reconnect_attempts += 1
self.is_reconnecting = True
print(f"[重连] 第{self.reconnect_attempts}次重连尝试,等待{delay:.1f}秒...")
# 分割等待时间,支持快速退出
wait_steps = max(1, int(delay))
for i in range(wait_steps):
if self.should_stop:
return
await asyncio.sleep(1)
# 处理小数部分
remaining = delay - wait_steps
if remaining > 0 and not self.should_stop:
await asyncio.sleep(remaining)
@classmethod
def from_exe_token(cls, exe_token: str):
"""使用 exe_token 构造单连接客户端ws/gui/<token>/"""
url = get_gui_ws_url(exe_token)
return cls(url=url, token=exe_token)
def set_callbacks(self,
store_list: Callable = None,
customers: Callable = None,
message: Callable = None,
transfer: Callable = None,
close: Callable = None,
error: Callable = None,
login: Callable = None,
success: Callable = None):
"""设置各种消息类型的回调函数"""
if store_list:
self.store_list_callback = store_list
if customers:
self.customers_callback = customers
if message:
self.message_callback = message
if transfer:
self.transfer_callback = transfer
if close:
self.close_callback = close
if error:
self.error_callback = error
if login:
self.login_callback = login
if success:
self.success_callback = success
def on_connected(self):
"""连接成功时的处理"""
if self.reconnect_attempts > 0:
print(f"[重连] 后端WebSocket重连成功(第{self.reconnect_attempts}次尝试)")
else:
print("后端WebSocket连接成功")
# 重连成功后可选择上报状态给后端
if self.reconnect_attempts > 0:
self._report_reconnect_status()
# 不再主动请求 get_store避免与后端不兼容导致协程未完成
def _report_reconnect_status(self):
"""重连成功后上报当前状态(可选)"""
try:
# 获取当前已连接的平台列表
from WebSocket.backend_singleton import get_websocket_manager
manager = get_websocket_manager()
if hasattr(manager, 'platform_listeners') and manager.platform_listeners:
for platform_key, listener_info in manager.platform_listeners.items():
store_id = listener_info.get('store_id')
platform = listener_info.get('platform')
if store_id and platform:
# 上报平台仍在连接状态
try:
reconnect_message = {
"type": "connect_message",
"store_id": store_id,
"status": True,
"content": f"GUI重连成功{platform}平台状态正常"
}
# 异步发送,不阻塞连接过程
asyncio.run_coroutine_threadsafe(
self._send_to_backend(reconnect_message),
self.loop
)
print(f"[重连] 已上报{platform}平台状态")
except Exception as e:
print(f"[重连] 上报{platform}平台状态失败: {e}")
else:
print("[重连] 当前无活跃平台连接,跳过状态上报")
except Exception as e:
print(f"[重连] 状态上报过程异常: {e}")
def on_message_received(self, message: Dict[str, Any]):
"""处理接收到的消息 - 根据WebSocket文档v2更新"""
# 统一打印后端下发的完整消息结构体
try:
import json as _json
print("=== Backend -> GUI Message ===")
print(_json.dumps(message, ensure_ascii=False, indent=2))
print("=== End Message ===")
except Exception:
pass
msg_type = message.get('type')
try:
if msg_type == 'get_store':
self._handle_store_list(message)
elif msg_type == 'get_customers':
self._handle_customers_list(message)
elif msg_type == 'success':
print("后端连接服务成功")
# 可在此触发上层UI通知
if self.success_callback:
try:
self.success_callback()
except Exception:
pass
elif msg_type == 'message':
self._handle_message(message)
elif msg_type == 'transfer':
self._handle_transfer(message)
elif msg_type == 'close':
self._handle_close(message)
elif msg_type == 'pong':
self._handle_pong(message)
elif msg_type == 'connect_success': # 兼容旧版
self._handle_connect_success(message)
elif msg_type == 'login': # 新版后台下发平台cookies
self._handle_login(message)
elif msg_type == 'error':
self._handle_error_message(message)
elif msg_type == 'staff_list':
self._handle_staff_list(message)
else:
print(f"未知消息类型: {msg_type}")
except Exception as e:
error_msg = f"处理消息异常: {e}"
print(error_msg)
if self.error_callback:
self.error_callback(error_msg, message)
def on_error(self, error: str):
"""错误处理"""
print(f"后端连接错误: {error}")
if self.error_callback:
self.error_callback(error, None)
# ==================== 发送消息方法 ====================
def send_message(self, message: Dict[str, Any]):
"""
发送消息到后端
Args:
message: 要发送的消息字典
"""
if not self.is_connected or not self.loop:
error_msg = "WebSocket未连接"
if self.is_reconnecting:
error_msg += "(正在重连中..."
raise Exception(error_msg)
try:
future = asyncio.run_coroutine_threadsafe(
self._send_to_backend(message), self.loop
)
return future.result(timeout=8)
except Exception as e:
# 发送失败时检查连接状态
if not self.is_connected:
print(f"[重连] 消息发送失败,连接已断开: {e}")
raise e
async def _send_to_backend(self, message: Dict[str, Any]):
"""异步发送消息到后端"""
if not self.websocket:
raise Exception("WebSocket连接不存在")
import json
message_str = json.dumps(message, ensure_ascii=False)
await self.websocket.send(message_str)
print(f"发送消息到后端: {message}")
def send_ping(self, custom_uuid: str = None):
"""
发送心跳包
"""
# 生成简单的ping UUID
ping_uuid = custom_uuid or f"ping_{int(time.time())}"
ping_message = {
'type': 'ping',
'uuid': ping_uuid
}
return self.send_message(ping_message)
def get_store(self):
"""获取店铺信息"""
message = {
'type': 'get_store',
'token': self.token or ''
}
return self.send_message(message)
def send_text_message(self, content: str, sender_id: str, store_id: str, pin_image: str = None):
"""发送文本消息 - 根据WebSocket文档v2更新"""
message = {
'type': 'message',
'content': content,
'msg_type': 'text',
'sender': {'id': sender_id},
'store_id': store_id
}
if pin_image:
message['pin_image'] = pin_image
return self.send_message(message)
def send_image_message(self, image_url: str, sender_id: str, store_id: str, pin_image: str = None):
"""发送图片消息 - 根据WebSocket文档v2更新"""
message = {
'type': 'message',
'content': image_url,
'msg_type': 'image',
'sender': {'id': sender_id},
'store_id': store_id
}
if pin_image:
message['pin_image'] = pin_image
return self.send_message(message)
def send_video_message(self, video_url: str, sender_id: str, store_id: str, pin_image: str = None):
"""发送视频消息 - 根据WebSocket文档v2更新"""
message = {
'type': 'message',
'content': video_url,
'msg_type': 'video',
'sender': {'id': sender_id},
'store_id': store_id
}
if pin_image:
message['pin_image'] = pin_image
return self.send_message(message)
def send_order_card(self, product_id: str, order_number: str, sender_id: str, store_id: str, pin_image: str = None):
"""发送订单卡片 - 根据WebSocket文档v2更新"""
message = {
'type': 'message',
'content': f'商品id{product_id} 订单号:{order_number}',
'msg_type': 'order_card',
'sender': {'id': sender_id},
'store_id': store_id
}
if pin_image:
message['pin_image'] = pin_image
return self.send_message(message)
def send_product_card(self, product_url: str, sender_id: str, store_id: str, pin_image: str = None):
"""发送商品卡片 - 根据WebSocket文档v2更新"""
message = {
'type': 'message',
'content': product_url,
'msg_type': 'product_card',
'sender': {'id': sender_id},
'store_id': store_id
}
if pin_image:
message['pin_image'] = pin_image
return self.send_message(message)
def send_staff_list(self, staff_list: List[Dict], store_id: str):
"""发送客服列表 - 根据WebSocket文档v2更新"""
message = {
'type': 'staff_list',
'content': '客服列表更新',
'data': {'staff_list': staff_list},
'store_id': store_id
}
return self.send_message(message)
# 保持向后兼容的旧方法(标记为已废弃)
def send_file_message(self, content: str, uid: str, pin: str, store_id: str):
"""发送文件消息 - 已废弃请使用send_video_message或send_image_message"""
print("警告: send_file_message已废弃请根据文件类型使用send_video_message或send_image_message")
# 尝试自动检测文件类型
content_lower = content.lower()
if any(ext in content_lower for ext in ['.mp4', '.avi', '.mov', '.wmv', '.flv']):
return self.send_video_message(content, uid, store_id)
elif any(ext in content_lower for ext in ['.jpg', '.jpeg', '.png', '.gif', '.webp']):
return self.send_image_message(content, uid, store_id)
else:
# 默认作为文本消息发送
return self.send_text_message(content, uid, store_id)
def send_transfer(self, customer: str, pin: str, store_id: str):
"""发送转接消息"""
message = {
'type': 'transfer',
'customer': customer, # 客服名称
'pin': pin, # 顾客名称
'store_id': store_id
}
return self.send_message(message)
def close_store(self, store_id: str):
"""关闭店铺"""
message = {
'type': 'close',
'store_id': store_id
}
return self.send_message(message)
# ==================== 消息处理方法 ====================
def _handle_store_list(self, message: Dict[str, Any]):
"""处理店铺列表"""
store_list = message.get('store_list', [])
print(f"获取到{len(store_list)}个店铺:")
for store in store_list:
merchant_name = store.get('merchant_name', '')
store_id = store.get('store_id', '')
store_platform = store.get('store_platform', '')
print(f" - {merchant_name} (ID: {store_id}, 平台: {store_platform})")
if self.store_list_callback:
self.store_list_callback(store_list)
def _handle_customers_list(self, message: Dict[str, Any]):
"""处理客服列表"""
customers = message.get('customers', [])
store_id = message.get('store_id', '')
print(f"店铺{store_id}的客服列表,共{len(customers)}个客服:")
for customer in customers:
pin = customer.get('pin', '')
nickname = customer.get('nickname', '')
print(f" - {nickname} ({pin})")
if self.customers_callback:
self.customers_callback(customers, store_id)
def _handle_message(self, message: Dict[str, Any]):
"""处理消息"""
store_id = message.get('store_id', '')
data = message.get('data')
content = message.get('content', '')
print(f"[{store_id}] [{message.get('msg_type', 'unknown')}] : {content}")
# 尝试将后端AI/客服回复转发到对应平台
try:
receiver = message.get('receiver') or (data.get('receiver') if isinstance(data, dict) else None) or {}
recv_pin = receiver.get('id')
if recv_pin and store_id:
# 根据store_id动态确定平台类型
platform_type = self._get_platform_by_store_id(store_id)
if platform_type == "京东":
self._forward_to_jd(store_id, recv_pin, content)
elif platform_type == "抖音":
self._forward_to_douyin(store_id, recv_pin, content)
elif platform_type == "千牛":
self._forward_to_qianniu(store_id, recv_pin, content)
elif platform_type == "拼多多":
self._forward_to_pdd(store_id, recv_pin, content)
else:
print(f"[Forward] 未知平台类型或未找到店铺: {platform_type}, store_id={store_id}")
except Exception as e:
print(f"转发到平台失败: {e}")
if self.message_callback:
self.message_callback(data if data else message, store_id)
def _get_platform_by_store_id(self, store_id: str) -> str:
"""根据店铺ID获取平台类型"""
try:
# 从WebSocket管理器获取平台信息
from WebSocket.backend_singleton import get_websocket_manager
manager = get_websocket_manager()
if manager and hasattr(manager, 'platform_listeners'):
for key, listener_info in manager.platform_listeners.items():
if listener_info.get('store_id') == store_id:
return listener_info.get('platform', '')
return ""
except Exception as e:
print(f"获取平台类型失败: {e}")
return ""
def _forward_to_jd(self, store_id: str, recv_pin: str, content: str):
"""转发消息到京东平台"""
try:
from Utils.JD.JdUtils import WebsocketManager as JDWSManager
jd_mgr = JDWSManager()
shop_key = f"京东:{store_id}"
entry = jd_mgr.get_connection(shop_key)
if not entry:
print(f"[JD Forward] 未找到连接: {shop_key}")
return
platform_info = (entry or {}).get('platform') or {}
ws = platform_info.get('ws')
aid = platform_info.get('aid')
pin_zj = platform_info.get('pin_zj')
vender_id = platform_info.get('vender_id')
loop = platform_info.get('loop')
print(
f"[JD Forward] shop_key={shop_key} has_ws={bool(ws)} aid={aid} pin_zj={pin_zj} vender_id={vender_id} has_loop={bool(loop)} recv_pin={recv_pin}")
if ws and aid and pin_zj and vender_id and loop and content:
async def _send():
import hashlib as _hashlib
import time as _time
import json as _json
msg = {
"ver": "4.3",
"type": "chat_message",
"from": {"pin": pin_zj, "app": "im.waiter", "clientType": "comet"},
"to": {"app": "im.customer", "pin": recv_pin},
"id": _hashlib.md5(str(int(_time.time() * 1000)).encode()).hexdigest(),
"lang": "zh_CN",
"aid": aid,
"timestamp": int(_time.time() * 1000),
"readFlag": 0,
"body": {
"content": content,
"translated": False,
"param": {"cusVenderId": vender_id},
"type": "text"
}
}
await ws.send(_json.dumps(msg))
import asyncio as _asyncio
_future = _asyncio.run_coroutine_threadsafe(_send(), loop)
try:
_future.result(timeout=2)
print(f"[JD Forward] 已转发到平台: pin={recv_pin}, content_len={len(content)}")
except Exception as fe:
print(f"[JD Forward] 转发提交失败: {fe}")
else:
print("[JD Forward] 条件不足,未转发:",
{
'has_ws': bool(ws), 'has_aid': bool(aid), 'has_pin_zj': bool(pin_zj),
'has_vender_id': bool(vender_id), 'has_loop': bool(loop), 'has_content': bool(content)
})
except Exception as e:
print(f"[JD Forward] 转发失败: {e}")
def _forward_to_douyin(self, store_id: str, recv_pin: str, content: str):
"""转发消息到抖音平台"""
try:
from Utils.Dy.DyUtils import DouYinWebsocketManager
dy_mgr = DouYinWebsocketManager()
shop_key = f"抖音:{store_id}"
entry = dy_mgr.get_connection(shop_key)
if not entry:
print(f"[DY Forward] 未找到连接: {shop_key}")
return
platform_info = entry.get('platform', {})
douyin_bot = platform_info.get('douyin_bot')
message_handler = platform_info.get('message_handler')
print(
f"[DY Forward] shop_key={shop_key} has_bot={bool(douyin_bot)} has_handler={bool(message_handler)} recv_pin={recv_pin}")
if douyin_bot and message_handler and content:
# 在消息处理器的事件循环中发送消息
def send_in_loop():
try:
# 获取消息处理器的事件循环
loop = message_handler._loop
if loop and not loop.is_closed():
# 在事件循环中执行发送
future = asyncio.run_coroutine_threadsafe(
message_handler.send_message_external(recv_pin, content),
loop
)
# 等待结果
try:
result = future.result(timeout=5)
if result:
print(f"[DY Forward] 已转发到平台: pin={recv_pin}, content_len={len(content)}")
else:
print(f"[DY Forward] 转发失败: 消息处理器返回False")
except Exception as fe:
print(f"[DY Forward] 转发执行失败: {fe}")
else:
print(f"[DY Forward] 事件循环不可用")
except Exception as e:
print(f"[DY Forward] 发送过程异常: {e}")
# 在新线程中执行发送操作
import threading
send_thread = threading.Thread(target=send_in_loop, daemon=True)
send_thread.start()
else:
print("[DY Forward] 条件不足,未转发:",
{
'has_bot': bool(douyin_bot),
'has_handler': bool(message_handler),
'has_content': bool(content)
})
except Exception as e:
print(f"[DY Forward] 转发失败: {e}")
def _forward_to_qianniu(self, store_id: str, recv_pin: str, content: str):
"""转发消息到千牛平台"""
try:
# TODO: 实现千牛平台的消息转发逻辑
print(
f"[QN Forward] 千牛平台消息转发功能待实现: store_id={store_id}, recv_pin={recv_pin}, content={content}")
except Exception as e:
print(f"[QN Forward] 转发失败: {e}")
def _forward_to_pdd(self, store_id: str, recv_pin: str, content: str):
"""转发消息到拼多多平台"""
try:
from Utils.Pdd.PddUtils import WebsocketManager as PDDWSManager
pdd_mgr = PDDWSManager()
shop_key = f"拼多多:{store_id}"
entry = pdd_mgr.get_connection(shop_key)
if not entry:
print(f"[PDD Forward] 未找到连接: {shop_key}")
return
platform_info = entry.get('platform', {})
pdd_instance = platform_info.get('pdd_instance')
loop = platform_info.get('loop')
print(
f"[PDD Forward] shop_key={shop_key} has_pdd_instance={bool(pdd_instance)} has_loop={bool(loop)} recv_pin={recv_pin}")
if pdd_instance and loop and content:
# 在拼多多实例的事件循环中发送消息
def send_in_loop():
try:
# 在事件循环中执行发送
future = asyncio.run_coroutine_threadsafe(
pdd_instance.send_message_external(recv_pin, content),
loop
)
# 等待结果
try:
result = future.result(timeout=10) # 拼多多可能需要更长时间
if result:
print(f"[PDD Forward] 已转发到平台: uid={recv_pin}, content_len={len(content)}")
else:
print(f"[PDD Forward] 转发失败: 拼多多实例返回False")
except Exception as fe:
print(f"[PDD Forward] 转发执行失败: {fe}")
except Exception as e:
print(f"[PDD Forward] 发送过程异常: {e}")
# 在新线程中执行发送操作
import threading
send_thread = threading.Thread(target=send_in_loop, daemon=True)
send_thread.start()
else:
print("[PDD Forward] 条件不足,未转发:",
{
'has_pdd_instance': bool(pdd_instance),
'has_loop': bool(loop),
'has_content': bool(content)
})
except Exception as e:
print(f"[PDD Forward] 转发失败: {e}")
def _transfer_to_pdd(self, customer_service_id: str, user_id: str, store_id: str):
"""执行拼多多平台转接操作"""
try:
from Utils.Pdd.PddUtils import WebsocketManager as PDDWSManager
pdd_mgr = PDDWSManager()
shop_key = f"拼多多:{store_id}"
entry = pdd_mgr.get_connection(shop_key)
if not entry:
print(f"[PDD Transfer] 未找到拼多多连接: {shop_key}")
return
platform_info = entry.get('platform', {})
pdd_instance = platform_info.get('pdd_instance')
loop = platform_info.get('loop')
print(f"[PDD Transfer] 找到拼多多连接,准备执行转接: user_id={user_id}, cs_id={customer_service_id}")
if pdd_instance and loop:
# 设置目标客服ID并执行转接
def transfer_in_loop():
try:
# 设置目标客服ID
pdd_instance.csid = customer_service_id
# 在事件循环中执行转接
future = asyncio.run_coroutine_threadsafe(
pdd_instance.handle_transfer_message({
"content": customer_service_id,
"receiver": {"id": user_id}
}),
loop
)
# 等待转接结果
try:
result = future.result(timeout=15) # 转接可能需要更长时间
if result:
print(f"[PDD Transfer] ✅ 转接成功: user_id={user_id} -> cs_id={customer_service_id}")
else:
print(f"[PDD Transfer] ❌ 转接失败: user_id={user_id}")
except Exception as fe:
print(f"[PDD Transfer] 转接执行失败: {fe}")
except Exception as e:
print(f"[PDD Transfer] 转接过程异常: {e}")
# 在新线程中执行转接操作
import threading
transfer_thread = threading.Thread(target=transfer_in_loop, daemon=True)
transfer_thread.start()
else:
print(f"[PDD Transfer] 条件不足: has_pdd_instance={bool(pdd_instance)}, has_loop={bool(loop)}")
except Exception as e:
print(f"[PDD Transfer] 拼多多转接失败: {e}")
def _transfer_to_jd(self, customer_service_id: str, user_id: str, store_id: str):
"""执行京东平台转接操作"""
try:
from Utils.JD.JdUtils import WebsocketManager as JDWSManager
jd_mgr = JDWSManager()
shop_key = f"京东:{store_id}"
entry = jd_mgr.get_connection(shop_key)
if not entry:
print(f"[JD Transfer] 未找到京东连接: {shop_key}")
return
platform_info = entry.get('platform', {})
websocket = platform_info.get('ws') # 京东使用'ws'字段
aid = platform_info.get('aid', '')
pin_zj = platform_info.get('pin_zj', '')
print(f"[JD Transfer] 找到京东连接,准备执行转接: user_id={user_id}, cs_id={customer_service_id}")
print(f"[JD Transfer] 连接信息: has_ws={bool(websocket)}, aid={aid}, pin_zj={pin_zj}")
if websocket:
# 执行转接操作
def transfer_in_loop():
try:
# 导入京东工具类
from Utils.JD.JdUtils import FixJdCookie
# 创建临时实例用于转接
jd_instance = FixJdCookie()
# 执行转接操作
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
result = loop.run_until_complete(
jd_instance.transfer_customer(
websocket, aid, user_id, pin_zj, customer_service_id
)
)
if result:
print(f"[JD Transfer] ✅ 转接成功: user_id={user_id} -> cs_id={customer_service_id}")
else:
print(f"[JD Transfer] ❌ 转接失败: user_id={user_id}")
finally:
loop.close()
except Exception as e:
print(f"[JD Transfer] 转接过程异常: {e}")
# 在新线程中执行转接操作
import threading
transfer_thread = threading.Thread(target=transfer_in_loop, daemon=True)
transfer_thread.start()
else:
print(f"[JD Transfer] 条件不足: has_ws={bool(websocket)}, aid='{aid}', pin_zj='{pin_zj}'")
except Exception as e:
print(f"[JD Transfer] 京东转接失败: {e}")
def _transfer_to_dy(self, customer_service_id: str, user_id: str, store_id: str):
"""执行抖音平台转接操作"""
try:
from Utils.Dy.DyUtils import DouYinWebsocketManager as DYWSManager
dy_mgr = DYWSManager()
shop_key = f"抖音:{store_id}"
entry = dy_mgr.get_connection(shop_key)
if not entry:
print(f"[DY Transfer] 未找到抖音连接: {shop_key}")
return
platform_info = entry.get('platform', {})
dy_instance = platform_info.get('douyin_bot') # 修正字段名称
cookie_dict = platform_info.get('cookie', {})
print(f"[DY Transfer] 找到抖音连接,准备执行转接: user_id={user_id}, cs_id={customer_service_id}")
print(f"[DY Transfer] 连接信息: has_douyin_bot={bool(dy_instance)}, has_cookie={bool(cookie_dict)}")
if dy_instance:
# 执行转接操作
def transfer_in_loop():
try:
# 抖音转接通过message_handler执行
if dy_instance and hasattr(dy_instance, 'message_handler') and dy_instance.message_handler:
# 获取实际的抖音店铺ID从cookie中获取
shop_id = cookie_dict.get('SHOP_ID', store_id) # 优先使用cookie中的SHOP_ID
print(f"[DY Transfer] 使用shop_id: {shop_id}")
print(
f"[DY Transfer] 转接参数: receiver_id={user_id}, shop_id={shop_id}, staff_id={customer_service_id}")
# 检查是否是自己转给自己的情况
try:
# 获取可用客服列表来验证
staff_list = dy_instance.message_handler.get_casl()
if staff_list:
print(f"[DY Transfer] 当前可用客服数量: {len(staff_list)}")
if len(staff_list) <= 1:
print(f"[DY Transfer] ⚠️ 只有一个客服在线,可能无法转接")
# 查找目标客服信息
target_staff = None
for staff in staff_list:
if str(staff.get('staffId', '')) == str(customer_service_id):
target_staff = staff
break
if target_staff:
print(
f"[DY Transfer] 找到目标客服: {target_staff.get('staffName', 'Unknown')} (ID: {customer_service_id})")
else:
print(f"[DY Transfer] ⚠️ 未找到目标客服ID: {customer_service_id}")
print(
f"[DY Transfer] 可用客服列表: {[{'id': s.get('staffId'), 'name': s.get('staffName')} for s in staff_list]}")
else:
print(f"[DY Transfer] ⚠️ 无法获取客服列表")
except Exception as e:
print(f"[DY Transfer] 获取客服列表时出错: {e}")
# 执行同步转接操作
result = dy_instance.message_handler.transfer_conversation(
receiver_id=user_id,
shop_id=shop_id,
staff_id=customer_service_id
)
if result:
print(f"[DY Transfer] ✅ 转接成功: user_id={user_id} -> cs_id={customer_service_id}")
else:
print(f"[DY Transfer] ❌ 转接失败: user_id={user_id}")
print(
f"[DY Transfer] 💡 可能原因1) 只有一个客服无法转接 2) 客服ID不存在 3) 权限不足 4) 会话状态不允许转接")
else:
print(f"[DY Transfer] ⚠️ 抖音实例或message_handler不可用")
except Exception as e:
print(f"[DY Transfer] 转接过程异常: {e}")
# 在新线程中执行转接操作
import threading
transfer_thread = threading.Thread(target=transfer_in_loop, daemon=True)
transfer_thread.start()
else:
print(f"[DY Transfer] 条件不足: has_douyin_bot={bool(dy_instance)}")
except Exception as e:
print(f"[DY Transfer] 抖音转接失败: {e}")
def _transfer_to_qianniu(self, customer_service_id: str, user_id: str, store_id: str):
"""执行千牛平台转接操作"""
try:
# TODO: 实现千牛平台转接逻辑
print(f"[QN Transfer] 千牛平台转接功能待实现: user_id={user_id}, cs_id={customer_service_id}")
except Exception as e:
print(f"[QN Transfer] 千牛转接失败: {e}")
def _handle_transfer(self, message: Dict[str, Any]):
"""处理转接消息"""
# 新版转接消息格式: {"type": "transfer", "content": "客服ID", "receiver": {"id": "用户ID"}, "store_id": "店铺ID"}
customer_service_id = message.get('content', '') # 目标客服ID
receiver_info = message.get('receiver', {})
user_id = receiver_info.get('id', '') # 用户ID
store_id = message.get('store_id', '')
print(f"转接消息: 顾客{user_id}已转接给客服{customer_service_id} (店铺: {store_id})")
# 根据店铺ID确定平台类型并执行转接
try:
platform_type = self._get_platform_by_store_id(store_id)
if platform_type == "京东":
# 京东转接逻辑
self._transfer_to_jd(customer_service_id, user_id, store_id)
elif platform_type == "抖音":
# 抖音转接逻辑
self._transfer_to_dy(customer_service_id, user_id, store_id)
elif platform_type == "千牛":
# 千牛转接逻辑
self._transfer_to_qianniu(customer_service_id, user_id, store_id)
elif platform_type == "拼多多":
self._transfer_to_pdd(customer_service_id, user_id, store_id)
else:
print(f"[Transfer] 未知平台类型或未找到店铺: {platform_type}, store_id={store_id}")
except Exception as e:
print(f"执行转接操作失败: {e}")
# 保持旧版回调兼容性
if self.transfer_callback:
self.transfer_callback(customer_service_id, user_id, store_id)
def _handle_close(self, message: Dict[str, Any]):
"""处理店铺关闭"""
store_id = message.get('store_id', '')
print(f"店铺{store_id}已关闭")
if self.close_callback:
self.close_callback(store_id)
def _handle_pong(self, message: Dict[str, Any]):
"""处理心跳响应"""
uuid_received = message.get('uuid', '')
print(f"收到心跳响应: {uuid_received}")
def _handle_connect_success(self, message: Dict[str, Any]):
"""处理连接成功消息(旧版兼容)"""
print("后端连接成功(connect_success)")
if self.token:
self.get_store()
def _handle_login(self, message: Dict[str, Any]):
"""处理平台登录消息新版type=login, cookies/login_params, store_id, platform_name"""
cookies = message.get('cookies', '')
store_id = message.get('store_id', '')
platform_name = message.get('platform_name', '')
content = message.get('content', '')
data = message.get('data', {})
# 判断是拼多多登录参数还是普通Cookie
if platform_name == "拼多多" and ("拼多多登录" in content) and data.get('login_params'):
# 拼多多登录参数模式 - 传递完整的消息JSON给处理器
print(f"收到拼多多登录参数: 平台={platform_name}, 店铺={store_id}, 类型={content}")
if self.login_callback:
# 传递完整的JSON消息让拼多多处理器来解析login_params
import json
full_message = json.dumps(message)
self.login_callback(platform_name, store_id, full_message)
else:
# 普通Cookie模式
print(f"收到登录指令: 平台={platform_name}, 店铺={store_id}, cookies_len={len(cookies) if cookies else 0}")
if self.login_callback:
self.login_callback(platform_name, store_id, cookies)
def _handle_error_message(self, message: Dict[str, Any]):
"""处理错误消息"""
error_msg = message.get('error', '未知错误')
print(f"后端连接错误: {error_msg}")
if self.error_callback:
self.error_callback(error_msg, message)
def _handle_staff_list(self, message: Dict[str, Any]):
"""处理客服列表更新消息"""
staff_list = message.get('data', {}).get('staff_list', [])
store_id = message.get('store_id', '')
print(f"店铺{store_id}的客服列表已更新,共{len(staff_list)}个客服:")
for staff in staff_list:
pin = staff.get('pin', '')
nickname = staff.get('nickname', '')
print(f" - {nickname} ({pin})")
if self.customers_callback: # 假设客服列表更新也触发客服列表回调
self.customers_callback(staff_list, store_id)
# ==================== 辅助方法 ====================
def set_token(self, token: str):
"""设置或更新令牌"""
self.token = token
def get_connection_info(self) -> Dict[str, Any]:
"""获取连接信息"""
return {
'url': self.url,
'token': self.token,
'uuid': self.uuid,
'is_connected': self.is_connected
}
# 使用示例
if __name__ == '__main__':
pass
# import time
#
# def on_store_list(stores):
# print(f"回调: 收到{len(stores)}个店铺")
#
# def on_message(data, store_id):
# print(f"回调: 店铺{store_id}收到消息: {data.get('content')}")
#
# def on_error(error, message):
# print(f"回调: 发生错误: {error}")
#
# def on_login(platform_name, store_id, cookies):
# print(f"回调: 登录指令 平台={platform_name}, 店铺={store_id}, cookies_len={len(cookies) if cookies else 0}")
# # 此处触发对应平台的WS连接/更新cookies逻辑并将该平台WS与store_id绑定
#
# def on_success():
# print("回调: 后端连接成功")
#
# # 创建客户端(新版:单连接)
# client = BackendClient.from_exe_token("your_exe_token_here")
#
# # 设置回调
# client.set_callbacks(
# store_list=on_store_list,
# message=on_message,
# error=on_error,
# login=on_login,
# success=on_success
# )
#
# try:
# # 连接
# client.connect()
#
# # 等待连接
# time.sleep(2)
#
# # 发送心跳
# client.send_ping()
#
# # 保持运行
# while True:
# time.sleep(30)
# client.send_ping()
#
# except KeyboardInterrupt:
# print("用户中断")
# finally:
# client.disconnect()