[patch] 设计PDD的处理消息架构为msg_type 通用模式 处理PDD支持图片类型和 商品卡片类型的消息的发送回复 修改GUI托盘内部样式

This commit is contained in:
2025-10-21 14:20:19 +08:00
parent f7606f09cf
commit 9b21287bd0
4 changed files with 375 additions and 31 deletions

View File

@@ -614,9 +614,9 @@ class FixJdCookie:
try:
while not stop_event.is_set(): # 检查是否收到中止信号
try:
print(f"等待监听消息-{datetime.now()}")
print(f"[JD]等待监听消息-{datetime.now()}")
response = await asyncio.wait_for(ws.recv(), timeout=1)
print(f"原始消息类型:{type(response)}, 消息体为: {response}")
print(f"[JD]原始消息类型:{type(response)}, 消息体为: {response}")
# 🔧 修复:检测被踢下线消息
json_resp = json.loads(response) if isinstance(response, (str, bytes)) else response
@@ -654,10 +654,10 @@ class FixJdCookie:
await self.process_incoming_message(response, ws, aid, pin_zj, vender_id, store)
print(json_resp)
# print(json_resp) 暂去解析后消息结构
ver = json_resp.get("ver")
print(f"版本{ver}")
print(f"[jd]信息编码版本号:{ver}")
except asyncio.TimeoutError:
continue
except websockets.ConnectionClosed:

View File

@@ -2679,22 +2679,345 @@ class ChatPdd:
traceback.print_exc()
return False
async def send_message_external(self, uid, content):
"""外部消息发送接口,用于接收后端转发的回复"""
self._log(f"[External] 收到外部消息发送请求: uid={uid}, content_len={len(content) if content else 0}", "INFO")
async def _download_image(self, image_url):
"""下载图片并返回base64编码的数据
Args:
image_url: 图片URL
Returns:
tuple: (base64_data, file_size_kb) 或 (None, None)
"""
try:
self._log(f"🔽 [PDD图片] 开始下载图片: {image_url[:100]}...", "INFO")
# 使用线程池下载图片(避免阻塞)
response = await asyncio.get_event_loop().run_in_executor(
self.pool,
lambda: requests.get(image_url, timeout=10)
)
if response.status_code != 200:
self._log(f"❌ [PDD图片] 下载失败HTTP状态码: {response.status_code}", "ERROR")
return None, None
# 获取图片数据
image_data = response.content
file_size_kb = len(image_data) // 1024 # 转换为KB
# 检查文件大小限制10MB
if file_size_kb > 10240:
self._log(f"❌ [PDD图片] 图片过大: {file_size_kb}KB超过10MB限制", "ERROR")
return None, None
# 转换为base64
base64_data = base64.b64encode(image_data).decode()
self._log(f"✅ [PDD图片] 图片下载成功,大小: {file_size_kb}KB", "SUCCESS")
return base64_data, file_size_kb
except Exception as e:
self._log(f"❌ [PDD图片] 下载图片失败: {e}", "ERROR")
traceback.print_exc()
return None, None
async def _pre_upload_image(self):
"""预上传获取上传签名
Returns:
str: upload_signature 或 None
"""
try:
self._log("📝 [PDD图片] 请求预上传签名...", "DEBUG")
url = "https://mms.pinduoduo.com/plateau/file/pre_upload"
data = {"chat_type_id": 1, "file_usage": 1}
response = await asyncio.get_event_loop().run_in_executor(
self.pool,
lambda: requests.post(url, headers=self.headers, cookies=self.cookie, json=data)
)
if response.status_code == 200:
result = response.json()
upload_signature = result.get("result", {}).get("upload_signature")
if upload_signature:
self._log("✅ [PDD图片] 预上传签名获取成功", "SUCCESS")
return upload_signature
else:
self._log("❌ [PDD图片] 响应中没有upload_signature", "ERROR")
return None
else:
self._log(f"❌ [PDD图片] 预上传失败HTTP状态码: {response.status_code}", "ERROR")
return None
except Exception as e:
self._log(f"❌ [PDD图片] 预上传请求失败: {e}", "ERROR")
traceback.print_exc()
return None
async def _upload_image_to_pdd(self, base64_data, upload_signature):
"""上传图片到拼多多服务器
Args:
base64_data: 图片的base64编码
upload_signature: 预上传获取的签名
Returns:
dict: {'url': img_url, 'width': width, 'height': height} 或 None
"""
try:
self._log("📤 [PDD图片] 上传图片到PDD服务器...", "INFO")
url = "https://file.pinduoduo.com/v2/store_image"
payload = {
"image": f"data:image/jpeg;base64,{base64_data}",
"upload_sign": upload_signature
}
response = await asyncio.get_event_loop().run_in_executor(
self.pool,
lambda: requests.post(url, headers=self.headers, cookies=self.cookie, json=payload)
)
if response.status_code == 200:
result = response.json()
img_url = result.get("url")
width = result.get("width")
height = result.get("height")
if img_url and width and height:
self._log(f"✅ [PDD图片] 图片上传成功: {img_url[:50]}... ({width}x{height})", "SUCCESS")
return {
"url": img_url,
"width": width,
"height": height
}
else:
self._log("❌ [PDD图片] 上传响应缺少必要字段", "ERROR")
self._log(f"响应内容: {response.text[:200]}", "DEBUG")
return None
else:
self._log(f"❌ [PDD图片] 上传失败HTTP状态码: {response.status_code}", "ERROR")
self._log(f"响应内容: {response.text[:200]}", "DEBUG")
return None
except Exception as e:
self._log(f"❌ [PDD图片] 上传图片失败: {e}", "ERROR")
traceback.print_exc()
return None
async def _send_image_message_to_user(self, uid, img_url, width, height, file_size_kb, base64_data):
"""发送图片消息给用户
Args:
uid: 用户ID
img_url: PDD服务器返回的图片URL
width: 图片宽度
height: 图片高度
file_size_kb: 文件大小KB
base64_data: 图片base64数据用于缩略图
Returns:
bool: 是否发送成功
"""
try:
self._log(f"📨 [PDD图片] 发送图片消息给用户 {uid}", "INFO")
ts = int(time.time() * 1000)
url = "https://mms.pinduoduo.com/plateau/chat/send_message"
# 参考 chat.py 的图片消息格式
data = {
"data": {
"cmd": "send_message",
"anti_content": "0asWfxUeMwVeGxYVoXeqYL7CLRsBPLYHBd1WqQguPf-gOONP68B3pCYl52i-hv-NYbVCeS-0oX3P68z3pHYQp2iUWvNNYOaCkm36348NXVKtpFKccGEYNycj5oPYgO1YTu5oWdNEQJFUXTpJAYvP54AzLPQ-gIcyXOKqu8xGrPjy0XyOY9ynYTxXYPYnGuaX0uaeIUCjp0cOfIJMun0YOUvynYXyXUuqXUvYXGPYsTEx0van0vYn9gt2UTuuPZPt0qLlrgqOrtTOpGCnrNIXquSvAivYy22PxxKvE33OefC5PtZVkL2cTfTaXYg7I64YgwdamBVP4ujQ0EflrngJideEX_edg608qQgjyA6PpuYljE2Qdv3ljidadMUt0NJzA_T4BV944KgfLe-sMz4cdwKw2dFkFvv74mMBedMwxpMkgvbx8Wv3rS3fTAEl5Dsla7vz15--hkSB6uTHEt46l3aY94LkeBguJkluz-b-1B-wrtGum-YDt7Upk-UhIJcpIjcUGnenVIhhN2aV_g2gxX9DnWU0ty5QOP_",
"request_id": ts,
"message": {
"to": {
"role": "user",
"uid": uid
},
"from": {
"role": "mall_cs"
},
"ts": int(ts / 1000),
"content": img_url, # 图片URL
"msg_id": 'null',
"type": 1, # type=1 表示图片
"is_aut": 0,
"manual_reply": 1,
"status": "read",
"is_read": 1,
"hash": "5b368ed74a3de79c2e8da624400e36b427b44637a110f4dc2c08ccfe34cba7be",
"size": {
"height": height,
"width": width,
"image_size": file_size_kb
},
"info": {
"thumb_data": f"data:image/jpeg;base64,{base64_data[:1000]}" # 缩略图只取前1000字符
}
},
"random": "67f7f6f44204086b9619d936fed35dcf"
},
"client": "WEB",
"anti_content": "0asWfxUEMwVEfxFbeswd-fTDtzoS3cDHsWTMeA_MVigWOS3hCHfBEF-hHM1wd9LHyTCtfvkKeAwF1cU78wHKBAKeMsKDMZ5eMwMkB3ZEB3Vxs4mIy6kEpvoXbW6StBxMDBfFEM31EMs5DMsIkzhKDMeKEBZIeMZOEMsWd009EWPZXtUhxXHXqOGB_oqCClHXSnqgovniTadnXt2shz7sqobZ_VEBhVez4O92uxXH2dIYgayswaG4Sf_ajlpo0nGig8nYCzXYwTyo0j4C6jys6PqOjXvJslpOmXjseaY5ofYdmdnUE9BVt24uBfkE-gMvgh0wFw2D-egVzv5SFsKCFwZ_-kguJxBWeLD7Jfa1zn_hsn9kEvekM-RZSg6y_KSqlglxaqVzxePlBCSsfWsex-eW5_rBfurZjELM1T31cokHcpWxU5GekQVhVOXnxVCX0g29fM6Vy_cPUr4m"
}
data_str = json.dumps(data, separators=(',', ':'))
response = await asyncio.get_event_loop().run_in_executor(
self.pool,
lambda: requests.post(url, headers=self.headers, cookies=self.cookie, data=data_str)
)
self._log(f"PDD发送状态: HTTP {response.status_code}", "INFO")
self._log(f"PDD返回体: {response.text}", "DEBUG")
if response.status_code == 200:
self._log(f"✅ 已发送图片消息给用户 {uid}", "SUCCESS")
return True
else:
self._log(f"❌ 发送图片消息失败HTTP状态码: {response.status_code}", "ERROR")
return False
except Exception as e:
self._log(f"❌ 发送图片消息失败: {e}", "ERROR")
traceback.print_exc()
return False
async def _send_image_external(self, uid, image_url):
"""外部图片发送的完整流程
Args:
uid: 用户ID
image_url: 图片URL
Returns:
bool: 是否发送成功
"""
try:
self._log(f"🖼️ [PDD图片] 开始处理图片发送流程...", "INFO")
# 步骤1: 下载图片
base64_data, file_size_kb = await self._download_image(image_url)
if not base64_data:
self._log("❌ [PDD图片] 下载图片失败,中止流程", "ERROR")
return False
# 步骤2: 预上传获取签名
upload_signature = await self._pre_upload_image()
if not upload_signature:
self._log("❌ [PDD图片] 获取上传签名失败,中止流程", "ERROR")
return False
# 步骤3: 上传图片到PDD
upload_result = await self._upload_image_to_pdd(base64_data, upload_signature)
if not upload_result:
self._log("❌ [PDD图片] 上传图片失败,中止流程", "ERROR")
return False
# 步骤4: 发送图片消息给用户
result = await self._send_image_message_to_user(
uid=uid,
img_url=upload_result['url'],
width=upload_result['width'],
height=upload_result['height'],
file_size_kb=file_size_kb,
base64_data=base64_data
)
if result:
self._log(f"✅ [PDD图片] 图片发送完整流程成功!", "SUCCESS")
else:
self._log(f"❌ [PDD图片] 图片发送完整流程失败", "ERROR")
return result
except Exception as e:
self._log(f"❌ [PDD图片] 图片发送流程异常: {e}", "ERROR")
traceback.print_exc()
return False
async def _send_product_card_external(self, uid, goods_id):
"""发送商品卡片给用户
Args:
uid: 用户ID
goods_id: 商品ID
Returns:
bool: 是否发送成功
"""
try:
self._log(f"🛒 [PDD商品卡] 开始发送商品卡片: goods_id={goods_id}, uid={uid}", "INFO")
# 参考 chat.py 的商品卡片发送API
url = "https://mms.pinduoduo.com/plateau/message/send/mallGoodsCard"
data = {
"uid": uid,
"goods_id": goods_id,
"biz_type": 3
}
data_str = json.dumps(data, separators=(',', ':'))
response = await asyncio.get_event_loop().run_in_executor(
self.pool,
lambda: requests.post(url, headers=self.headers, cookies=self.cookie, data=data_str)
)
self._log(f"PDD发送状态: HTTP {response.status_code}", "INFO")
self._log(f"PDD返回体: {response.text}", "DEBUG")
if response.status_code == 200:
self._log(f"✅ 已发送商品卡片给用户 {uid}: goods_id={goods_id}", "SUCCESS")
return True
else:
self._log(f"❌ 发送商品卡片失败HTTP状态码: {response.status_code}", "ERROR")
return False
except Exception as e:
self._log(f"❌ 发送商品卡片失败: {e}", "ERROR")
traceback.print_exc()
return False
async def send_message_external(self, uid, content, msg_type="text"):
"""外部消息发送接口,用于接收后端转发的回复
Args:
uid: 用户ID
content: 消息内容(文本/图片URL/商品ID
msg_type: 消息类型text/image/video/product_card
"""
self._log(f"[External] 收到外部消息发送请求: uid={uid}, msg_type={msg_type}, content_len={len(content) if content else 0}", "INFO")
try:
if not uid or not content:
self._log("❌ [External] 参数不完整", "ERROR")
return False
result = await self.send_ai_reply(uid, content)
# 根据消息类型分发到不同的处理函数
if msg_type == "image":
result = await self._send_image_external(uid, content)
elif msg_type == "video":
# TODO: 视频发送功能待实现
self._log("⚠️ [External] 视频发送功能暂未实现", "WARNING")
return False
elif msg_type == "product_card":
# 商品卡片content 为商品ID
result = await self._send_product_card_external(uid, content)
else: # text 或其他类型默认作为文本发送
result = await self.send_ai_reply(uid, content)
if result:
self._log(f"✅ [External] 外部消息发送成功: uid={uid}", "SUCCESS")
self._log(f"✅ [External] 外部消息发送成功: uid={uid}, type={msg_type}", "SUCCESS")
else:
self._log(f"❌ [External] 外部消息发送失败: uid={uid}", "ERROR")
self._log(f"❌ [External] 外部消息发送失败: uid={uid}, type={msg_type}", "ERROR")
return result
except Exception as e:
self._log(f"❌ [External] 外部消息发送异常: {e}", "ERROR")
traceback.print_exc()
return False
async def get_all_customer(self, ws):
@@ -2957,6 +3280,7 @@ class ChatPdd:
(goods_info and goods_info.get('goodsID') and not goods_info.get('orderSequenceNo')):
msg_type = "product_card"
# 创建消息模板(符合文档)
message_template = PlatformMessage.create_text_message(
content=content,
@@ -2999,7 +3323,7 @@ class ChatPdd:
while True:
try:
message = await wss.recv()
self._log(f"收到新消息 {base64.b64encode(message).decode()}", "DEBUG")
# 收到新消息日志已删除(减少日志噪音)
try:
text = self.ctx.call("dencode_data", base64.b64encode(message).decode())
if not isinstance(text, dict):
@@ -3007,18 +3331,19 @@ class ChatPdd:
push = text.get("push_data") or {}
data = push.get("data")
if not data:
self._log("push_data.data 为空,忽略", "DEBUG")
# push_data.data 为空日志已删除(减少日志噪音)
continue
# ✅ 只打印解析成功的消息
self._log(f"✅ [PDD] 解析消息成功,数据类型: {type(data)}, 消息数: {len(data) if isinstance(data, list) else 1}", "DEBUG")
print(f"原始数据格式打印:{data}")
for d in data:
await self.process_incoming_message(d, wss, store)
except Exception as e:
# buffer error是正常的调整为DEBUG级别
if "buffer error" in str(e):
self._log(f"解析消息失败: {e}", "DEBUG")
else:
# buffer error是正常的心跳或无关消息,不再打印
if "buffer error" not in str(e):
# 只记录非buffer error的真实错误
self._log(f"解析消息失败: {e}", "ERROR")
traceback.print_exc()
except Exception as e:

View File

@@ -150,12 +150,22 @@ class BackendClient:
# 消息循环
async for message in self.websocket:
try:
# 打印原始文本帧与长度
# 🔍 添加心跳检测日志
try:
raw_len = len(message.encode('utf-8')) if isinstance(message, str) else len(message)
print(f"后端发送消息体内容:{message}")
# 解析消息类型
data_preview = json.loads(message)
msg_type = data_preview.get('type', 'unknown')
# 心跳相关消息用DEBUG级别其他消息用INFO级别
if msg_type in ['pong', 'connection_status_ack']:
self._log(f"💓 [心跳] 收到后端响应: {msg_type}", "DEBUG")
else:
self._log(f"📨 [后端] 收到消息: type={msg_type}, 长度={raw_len}字节", "DEBUG")
print(f"后端发送消息体内容:{message}")
except Exception:
pass
data = json.loads(message)
self.on_message_received(data)
except json.JSONDecodeError:
@@ -696,7 +706,8 @@ class BackendClient:
store_id = message.get('store_id', '')
data = message.get('data')
content = message.get('content', '')
print(f"[{store_id}] [{message.get('msg_type', 'unknown')}] : {content}")
msg_type = message.get('msg_type', 'text') # 获取消息类型,默认为text
print(f"[{store_id}] [{msg_type}] : {content}")
# 尝试将后端AI/客服回复转发到对应平台
try:
@@ -713,7 +724,8 @@ class BackendClient:
elif platform_type == "千牛":
self._forward_to_qianniu(store_id, recv_pin, content)
elif platform_type == "拼多多":
self._forward_to_pdd(store_id, recv_pin, content)
# 传递msg_type参数支持图片/视频等类型
self._forward_to_pdd(store_id, recv_pin, content, msg_type)
else:
print(f"[Forward] 未知平台类型或未找到店铺: {platform_type}, store_id={store_id}")
except Exception as e:
@@ -867,8 +879,15 @@ class BackendClient:
except Exception as e:
print(f"[QN Forward] 转发失败: {e}")
def _forward_to_pdd(self, store_id: str, recv_pin: str, content: str):
"""转发消息到拼多多平台"""
def _forward_to_pdd(self, store_id: str, recv_pin: str, content: str, msg_type: str = "text"):
"""转发消息到拼多多平台
Args:
store_id: 店铺ID
recv_pin: 接收者ID
content: 消息内容文本或图片URL
msg_type: 消息类型text/image/video
"""
try:
from Utils.Pdd.PddUtils import WebsocketManager as PDDWSManager
pdd_mgr = PDDWSManager()
@@ -884,22 +903,22 @@ class BackendClient:
loop = platform_info.get('loop')
print(
f"[PDD Forward] shop_key={shop_key} has_pdd_instance={bool(pdd_instance)} has_loop={bool(loop)} recv_pin={recv_pin}")
f"[PDD Forward] shop_key={shop_key} has_pdd_instance={bool(pdd_instance)} has_loop={bool(loop)} recv_pin={recv_pin} msg_type={msg_type}")
if pdd_instance and loop and content:
# 在拼多多实例的事件循环中发送消息
def send_in_loop():
try:
# 在事件循环中执行发送
# 在事件循环中执行发送传递msg_type参数
future = asyncio.run_coroutine_threadsafe(
pdd_instance.send_message_external(recv_pin, content),
pdd_instance.send_message_external(recv_pin, content, msg_type),
loop
)
# 等待结果
try:
result = future.result(timeout=10) # 拼多多可能需要更长时间
if result:
print(f"[PDD Forward] 已转发到平台: uid={recv_pin}, content_len={len(content)}")
print(f"[PDD Forward] 已转发到平台: uid={recv_pin}, type={msg_type}, content_len={len(content)}")
else:
print(f"[PDD Forward] 转发失败: 拼多多实例返回False")
except Exception as fe:

10
main.py
View File

@@ -82,7 +82,7 @@ class LoginWindow(QMainWindow):
def initUI(self):
# 设置窗口基本属性
self.setWindowTitle(f'AI客服智能助手 v{config.APP_VERSION}')
self.setWindowTitle(f'水滴AI客服智能助手 v{config.APP_VERSION}')
# 只设置宽度,高度自适应内容
self.setFixedWidth(450) # 固定宽度
# 不设置固定高度,让窗口根据内容自适应
@@ -101,7 +101,7 @@ class LoginWindow(QMainWindow):
central_widget.setLayout(main_layout)
# 添加标题和副标题
title_label = QLabel('AI客服智能助手')
title_label = QLabel('水滴AI客服智能助手')
title_label.setObjectName("title")
title_label.setAlignment(Qt.AlignCenter)
title_label.setFont(QFont('Microsoft YaHei', 16, QFont.Bold)) # 稍微减小字体
@@ -867,7 +867,7 @@ class LoginWindow(QMainWindow):
self.tray_icon.setContextMenu(self.tray_menu)
# 设置托盘提示
self.tray_icon.setToolTip("AI客服智能助手")
self.tray_icon.setToolTip("水滴AI客服智能助手")
# 双击托盘图标显示窗口
self.tray_icon.activated.connect(self.tray_icon_activated)
@@ -888,7 +888,7 @@ class LoginWindow(QMainWindow):
# 1. 显示后端连接状态
if ws_manager and ws_manager.backend_client and ws_manager.backend_client.is_connected:
backend_status = QAction("后端已连接", self)
backend_status = QAction("AI服务已连接", self)
backend_status.setEnabled(False) # 不可点击
self.tray_menu.addAction(backend_status)
@@ -1113,7 +1113,7 @@ class LoginWindow(QMainWindow):
# 首次最小化时显示提示消息
if not hasattr(self, '_tray_message_shown'):
self.tray_icon.showMessage(
"AI客服智能助手",
"水滴AI客服智能助手",
"程序已最小化到系统托盘。双击托盘图标可重新显示窗口。",
QSystemTrayIcon.Information,
3000