[patch] 因pdd内部回复消息接口与登录接口不同步导致监听过程中发送消息接口返回会话过期处理优化逻辑 并设计开发提示用户 并提供用户是否重新发送会话过期消息体按钮

This commit is contained in:
2025-10-24 17:47:28 +08:00
parent 9effc859ac
commit 7a1ad47439
3 changed files with 393 additions and 50 deletions

View File

@@ -2047,7 +2047,7 @@ class ChatPdd:
# 降级处理:返回相对路径
return relative_path
def __init__(self, cookie, chat_list_stat, csname=None, text=None, log_callback=None):
def __init__(self, cookie, chat_list_stat, csname=None, text=None, log_callback=None, session_timeout_callback=None):
# 检查JS文件
self.check_js_files()
@@ -2058,6 +2058,7 @@ class ChatPdd:
# 首先设置log_callback以便在初始化过程中使用日志
self.log_callback = log_callback
self.session_timeout_callback = session_timeout_callback # 会话超时回调
# 读取JS文件
try:
@@ -2192,6 +2193,13 @@ class ChatPdd:
self.store_id = "538727ec-c84d-4458-8ade-3960c9ab802c" # 可以根据需要修改
self.user_tokens = {} # 存储用户token信息
# 🔥 新增:会话过期管理
self.session_expired_notified = False # 防止重复通知后端
self.last_session_check_time = 0 # 最后检测时间(避免频繁检测)
self.session_notify_time = 0 # 发送会话过期通知的时间戳
self.session_timeout_timer = None # 超时检测定时器
self.session_timeout_duration = 66 # 超时时长(秒),可配置
def _log(self, message, level="INFO"):
"""内部日志方法"""
if self.log_callback:
@@ -2661,13 +2669,37 @@ class ChatPdd:
# 打印HTTP状态与返回体片段
self._log(f"PDD发送状态: HTTP {response.status_code}", "INFO")
try:
self._log(f"PDD返回体: {response.text[:500]}", "DEBUG")
except Exception:
pass
self._log(f"PDD返回体: {response.text[:200]}", "DEBUG")
# 🔥 检查响应状态码
if response.status_code == 200:
self._log(f"✅ 已发送AI回复给用户 {uid}: {content}", "SUCCESS")
# ✅ HTTP 200但需要进一步检查响应体
try:
response_data = response.json()
# 检测会话过期error_code=43001
if not response_data.get('success') and response_data.get('error_code') == 43001:
error_msg = response_data.get('error_msg', '会话已过期')
self._log(f"🔴 检测到会话过期: {error_msg}", "ERROR")
# 通知后端Cookie失效
await self._notify_backend_session_expired()
return False
# 检查其他错误
if not response_data.get('success'):
error_code = response_data.get('error_code', 'unknown')
error_msg = response_data.get('error_msg', '未知错误')
self._log(f"❌ 发送失败 [错误码:{error_code}]: {error_msg}", "ERROR")
return False
# 发送成功
self._log(f"✅ 已发送AI回复给用户 {uid}", "SUCCESS")
return True
except (ValueError, KeyError):
# JSON解析失败保守处理
self._log(f"✅ 已发送AI回复给用户 {uid}", "SUCCESS")
return True
else:
self._log(f"❌ 发送AI回复失败HTTP状态码: {response.status_code}", "ERROR")
@@ -2679,6 +2711,141 @@ class ChatPdd:
traceback.print_exc()
return False
async def _notify_backend_session_expired(self):
"""向后端发送会话过期通知"""
try:
# 防重复通知机制:如果已经通知过,不再重复发送
if self.session_expired_notified:
return
# 检查最后通知时间避免频繁发送间隔至少30秒
import time
current_time = time.time()
if current_time - self.last_session_check_time < 30:
return
from WebSocket.backend_singleton import get_websocket_manager
ws_manager = get_websocket_manager()
if ws_manager and ws_manager.backend_client:
# 使用统一的 connect_message 格式
message = {
"type": "connect_message",
"store_id": self.store_id,
"status": False,
"content": "cookies失效需要重新登录"
}
# 发送消息到后端
ws_manager.backend_client.send_message(message)
# 标记已通知
self.session_expired_notified = True
self.last_session_check_time = current_time
self.session_notify_time = current_time
self._log(f"📤 已向后端发送会话过期通知", "WARNING")
self._log(f"⏳ 等待后端重新下发Cookie...", "INFO")
# 启动超时检测定时器
self._start_session_timeout_check()
else:
self._log("⚠️ WebSocket未连接无法发送通知", "WARNING")
except Exception as e:
self._log(f"❌ [PDD] 发送会话过期通知失败: {e}", "ERROR")
import traceback as _traceback
self._log(f"详细错误: {_traceback.format_exc()}", "DEBUG")
def _start_session_timeout_check(self):
"""启动会话恢复超时检测定时器"""
try:
# 取消之前的定时器
self._cancel_session_timeout_check()
import threading
def timeout_callback():
try:
self._log(f"⏰ 会话恢复等待超时({self.session_timeout_duration}秒)", "WARNING")
self._on_session_timeout()
except Exception as e:
self._log(f"❌ 超时回调失败: {e}", "ERROR")
# 启动定时器
self.session_timeout_timer = threading.Timer(
self.session_timeout_duration,
timeout_callback
)
self.session_timeout_timer.daemon = True
self.session_timeout_timer.start()
except Exception as e:
self._log(f"❌ 启动超时检测失败: {e}", "ERROR")
def _cancel_session_timeout_check(self):
"""取消超时检测定时器"""
try:
if self.session_timeout_timer and self.session_timeout_timer.is_alive():
self.session_timeout_timer.cancel()
self.session_timeout_timer = None
except Exception:
pass
def _on_session_timeout(self):
"""超时时的处理逻辑 - 检查是否已恢复,未恢复则提示用户"""
try:
import time
# 检查是否已经重连成功
if not self.session_expired_notified:
self._log("✅ 会话已自动恢复", "SUCCESS")
return
# 计算等待时长
elapsed_time = int(time.time() - self.session_notify_time)
self._log(f"⚠️ 等待{elapsed_time}秒后,会话仍未恢复", "WARNING")
# 调用专用的超时回调函数
if self.session_timeout_callback:
try:
retry_info = {
"store_id": self.store_id,
"elapsed_time": elapsed_time,
"timeout_duration": self.session_timeout_duration,
"pdd_instance": self
}
self.session_timeout_callback(retry_info)
except Exception as callback_error:
self._log(f"❌ 超时回调失败: {callback_error}", "ERROR")
else:
self._log("⚠️ 未设置超时回调", "WARNING")
except Exception as e:
self._log(f"❌ 超时处理失败: {e}", "ERROR")
async def retry_session_recovery(self):
"""用户点击重试后调用此方法"""
try:
self._log("🔄 用户选择重试,重新发送会话过期通知", "INFO")
# 重置标志位,允许重新发送
self.session_expired_notified = False
self.last_session_check_time = 0
# 取消旧的定时器
self._cancel_session_timeout_check()
# 重新发送通知
await self._notify_backend_session_expired()
self._log("✅ 重试请求已发送", "SUCCESS")
except Exception as e:
self._log(f"❌ 重试失败: {e}", "ERROR")
async def _download_image(self, image_url):
"""下载图片并返回base64编码的数据
@@ -2875,7 +3042,34 @@ class ChatPdd:
self._log(f"PDD发送状态: HTTP {response.status_code}", "INFO")
self._log(f"PDD返回体: {response.text}", "DEBUG")
# 🔥 检查响应状态码
if response.status_code == 200:
# ✅ HTTP 200但需要进一步检查响应体
try:
response_data = response.json()
# 检测会话过期error_code=43001
if not response_data.get('success') and response_data.get('error_code') == 43001:
error_msg = response_data.get('error_msg', '会话已过期')
self._log(f"🔴 图片发送会话过期: {error_msg}", "ERROR")
# 通知后端Cookie失效
await self._notify_backend_session_expired()
return False
# 检查其他错误
if not response_data.get('success'):
error_code = response_data.get('error_code', 'unknown')
error_msg = response_data.get('error_msg', '未知错误')
self._log(f"❌ 发送图片失败 [错误码:{error_code}]: {error_msg}", "ERROR")
return False
# 发送成功
self._log(f"✅ 已发送图片消息给用户 {uid}", "SUCCESS")
return True
except (ValueError, KeyError):
# JSON解析失败保守处理
self._log(f"✅ 已发送图片消息给用户 {uid}", "SUCCESS")
return True
else:
@@ -2971,8 +3165,35 @@ class ChatPdd:
self._log(f"PDD发送状态: HTTP {response.status_code}", "INFO")
self._log(f"PDD返回体: {response.text}", "DEBUG")
# 🔥 检查响应状态码
if response.status_code == 200:
self._log(f"✅ 已发送商品卡片给用户 {uid}: goods_id={goods_id}", "SUCCESS")
# ✅ HTTP 200但需要进一步检查响应体
try:
response_data = response.json()
# 检测会话过期error_code=43001
if not response_data.get('success') and response_data.get('error_code') == 43001:
error_msg = response_data.get('error_msg', '会话已过期')
self._log(f"🔴 商品卡发送会话过期: {error_msg}", "ERROR")
# 通知后端Cookie失效
await self._notify_backend_session_expired()
return False
# 检查其他错误
if not response_data.get('success'):
error_code = response_data.get('error_code', 'unknown')
error_msg = response_data.get('error_msg', '未知错误')
self._log(f"❌ 发送商品卡失败 [错误码:{error_code}]: {error_msg}", "ERROR")
return False
# 发送成功
self._log(f"✅ 已发送商品卡片给用户 {uid}", "SUCCESS")
return True
except (ValueError, KeyError):
# JSON解析失败保守处理
self._log(f"✅ 已发送商品卡片给用户 {uid}", "SUCCESS")
return True
else:
self._log(f"❌ 发送商品卡片失败HTTP状态码: {response.status_code}", "ERROR")
@@ -3143,6 +3364,9 @@ class ChatPdd:
if msg_type == 8: # 平台系统卡片消息(如:售后提醒、平台通知等)
self._log(f"🚫 过滤系统卡片消息(type=8): {message_info.get('content', '')[:50]}", "DEBUG")
return True
if msg_type == 64: # 🔥 新增:售后卡片消息(如:消费者申请售后)
self._log(f"🚫 过滤售后卡片消息(type=64): {message_info.get('content', '')[:50]}", "DEBUG")
return True
# 2. 基于模板名称识别机器人消息
template_name = message_info.get("template_name", "")
@@ -3150,6 +3374,7 @@ class ChatPdd:
"mall_robot_man_intervention_and_restart", # 机器人暂停接待消息
"mall_robot_text_msg", # 机器人自动回复消息
"aftersales_hosting_warning_card", # 售后托管警告卡片
"apply_for_consultation_card_new", # 🔥 新增:售后协商申请卡片
# 可根据实际情况添加更多机器人模板
]
if template_name in robot_templates:
@@ -3182,6 +3407,8 @@ class ChatPdd:
"请尽快解决售后问题", # 平台售后提醒
"平台介入退款", # 平台售后提醒
"请尽快处理售后", # 平台售后提醒
"消费者申请售后", # 🔥 新增:售后申请通知
"建议先与消费者友好协商", # 🔥 新增:售后协商提示
]
if any(pattern in content for pattern in robot_content_patterns):
@@ -3393,6 +3620,14 @@ class ChatPdd:
self.reconnect_attempts = 0
self._log("✅ WebSocket-PDD连接成功", "SUCCESS")
# 重置会话过期标志位(新会话开始)
self.session_expired_notified = False
self.last_session_check_time = 0
self.session_notify_time = 0
# 取消超时检测定时器(会话已恢复)
self._cancel_session_timeout_check()
z = self.encodeex.call("encode_token", str(self.user_id), self.auth_token)
if z:
await websocket.send(bytes(z))
@@ -3492,9 +3727,10 @@ class ChatPdd:
class PddListenerForGUI:
"""用于GUI集成的拼多多监听包装器"""
def __init__(self, log_callback: Optional[Callable] = None):
def __init__(self, log_callback: Optional[Callable] = None, session_timeout_signal=None):
self.pdd_bot = None
self.log_callback = log_callback
self.session_timeout_signal = session_timeout_signal # Qt信号对象
self.running = False
self.main_thread = None
self.loop = None
@@ -3508,6 +3744,21 @@ class PddListenerForGUI:
else:
print(f"[{log_type}] {message}")
def _on_session_timeout(self, retry_info: dict):
"""会话恢复超时的GUI处理回调"""
try:
self._log(f"⚠️ 会话恢复超时(已等待{retry_info['elapsed_time']}秒)", "WARNING")
# 使用Qt信号机制线程安全
if self.session_timeout_signal:
self.session_timeout_signal.emit(retry_info)
self._log(f"✅ 已触发超时对话框", "INFO")
else:
self._log("⚠️ 未设置超时信号,无法显示对话框", "WARNING")
except Exception as e:
self._log(f"❌ 超时处理回调失败: {e}", "ERROR")
async def start_listening(self, cookie_dict: Dict[str, str], chat_list_stat: bool = False,
csname: Optional[str] = None, text: Optional[str] = None) -> bool:
try:
@@ -3523,13 +3774,21 @@ class PddListenerForGUI:
self.startup_error = None
self.startup_event.clear()
# 创建ChatPdd实例
# 如果已有旧实例,先清理定时器
if self.pdd_bot:
try:
self.pdd_bot._cancel_session_timeout_check()
except Exception:
pass
# 创建ChatPdd实例传递超时回调
self.pdd_bot = ChatPdd(
cookie=cookie_dict,
text=text,
chat_list_stat=chat_list_stat,
csname=csname,
log_callback=self.log_callback
log_callback=self.log_callback,
session_timeout_callback=self._on_session_timeout
)
self.running = True
@@ -3690,13 +3949,21 @@ class PddListenerForGUI:
self._log(f"✅ [PDD] cookies解析成功包含 {len(cookie_dict)} 个字段", "SUCCESS")
# 创建ChatPdd实例
# 如果已有旧实例,先清理定时器
if self.pdd_bot:
try:
self.pdd_bot._cancel_session_timeout_check()
except Exception:
pass
# 创建ChatPdd实例传递超时回调
self.pdd_bot = ChatPdd(
cookie=cookie_dict,
text=None,
chat_list_stat=False,
csname=None,
log_callback=self.log_callback
log_callback=self.log_callback,
session_timeout_callback=self._on_session_timeout
)
# 设置store_id和后端服务

View File

@@ -49,6 +49,9 @@ class WebSocketManager:
self.platform_signals = PlatformConnectionSignals()
self.platform_signals.platform_connected.connect(self._on_platform_signal_received)
# 🔥 会话超时信号(用于拼多多会话过期重试)
self.session_timeout_signal = None
self.callbacks = {
'log': None,
'success': None,
@@ -384,6 +387,16 @@ class WebSocketManager:
from Utils.Pdd.PddUtils import WebsocketManager as PDDWSManager
pdd_mgr = PDDWSManager()
conn_info = pdd_mgr.get_connection(key)
# 清理旧listener的定时器避免遗留定时器触发
old_listener = listener_info.get('listener')
if old_listener and hasattr(old_listener, 'pdd_bot') and old_listener.pdd_bot:
try:
old_listener.pdd_bot._cancel_session_timeout_check()
self._log(f"✅ [PDD] 旧监听器的定时器已清理", "DEBUG")
except Exception:
pass
if conn_info and conn_info.get('platform'):
# 关闭WebSocket连接
ws = conn_info['platform'].get('ws')
@@ -685,39 +698,38 @@ class WebSocketManager:
def _start_pdd_listener(self, store_id: str, data: str, store_name: str = ""):
"""启动拼多多平台监听"""
try:
# 在创建新实例前,清理旧实例的定时器(避免遗留定时器触发)
shop_key = f"拼多多:{store_id}"
old_listener_info = self.platform_listeners.get(shop_key)
if old_listener_info:
try:
old_listener = old_listener_info.get('listener')
if old_listener and hasattr(old_listener, 'pdd_bot') and old_listener.pdd_bot:
old_listener.pdd_bot._cancel_session_timeout_check()
self._log(f"✅ 已清理旧监听器定时器", "DEBUG")
except Exception:
pass
def _runner():
try:
self._log("🚀 开始创建拼多多监听器实例", "DEBUG")
listener = PDDListenerForGUI_WS(log_callback=self._log)
self._log("✅ 拼多多监听器实例创建成功", "DEBUG")
# 传递session_timeout_signal用于会话过期重试
listener = PDDListenerForGUI_WS(
log_callback=self._log,
session_timeout_signal=self.session_timeout_signal
)
# 立即保存listener引用到platform_listeners用于后续清理定时器
shop_key = f"拼多多:{store_id}"
if shop_key in self.platform_listeners:
self.platform_listeners[shop_key]['listener'] = listener
# 判断是登录参数还是Cookie
if self._is_pdd_login_params(data):
# 使用登录参数启动
self._log("📋 使用登录参数启动拼多多监听器", "INFO")
self._log("🔄 开始执行 start_with_login_params", "DEBUG")
result = asyncio.run(listener.start_with_login_params(store_id=store_id, login_params=data))
self._log(f"📊 start_with_login_params 执行结果: {result}", "DEBUG")
# 详细的结果分析仅日志记录GUI 已在主线程中通知)
if result == "need_verification_code":
self._log("✅ [PDD] 登录流程正常,已发送验证码需求通知给后端", "SUCCESS")
elif result == "verification_code_error":
self._log("⚠️ [PDD] 验证码错误,已发送错误通知给后端", "WARNING")
elif result:
self._log("✅ [PDD] 登录成功,平台连接已建立", "SUCCESS")
else:
self._log("❌ [PDD] 登录失败", "ERROR")
else:
# 使用Cookie启动兼容旧方式
self._log("🍪 使用Cookie启动拼多多监听器", "INFO")
self._log("🔄 开始执行 start_with_cookies", "DEBUG")
result = asyncio.run(listener.start_with_cookies(store_id=store_id, cookies=data))
self._log(f"📊 start_with_cookies 执行结果: {result}", "DEBUG")
# Cookie启动成功时记录日志GUI 已在主线程中通知)
if result:
self._log("✅ [PDD] Cookie启动成功平台连接已建立", "SUCCESS")
# 根据实际登录结果上报状态给后端
if self.backend_client and result not in ["need_verification_code", "verification_code_error",
@@ -802,29 +814,19 @@ class WebSocketManager:
def _is_pdd_login_params(self, data: str) -> bool:
"""判断是否为拼多多登录参数"""
try:
self._log(f"🔍 [DEBUG] 检查是否为登录参数,数据长度: {len(data)}", "DEBUG")
self._log(f"🔍 [DEBUG] 数据前100字符: {data[:100]}", "DEBUG")
import json
parsed_data = json.loads(data)
self._log(f"🔍 [DEBUG] JSON解析成功键: {list(parsed_data.keys())}", "DEBUG")
login_params = parsed_data.get("data", {}).get("login_params", {})
self._log(f"🔍 [DEBUG] login_params存在: {bool(login_params)}", "DEBUG")
if not login_params:
self._log("🔍 [DEBUG] login_params为空返回False", "DEBUG")
return False
# 检查必需的登录参数字段
required_fields = ["username", "password", "anti_content", "risk_sign", "timestamp"]
has_all_fields = all(field in login_params for field in required_fields)
self._log(f"🔍 [DEBUG] 包含所有必需字段: {has_all_fields}", "DEBUG")
self._log(f"🔍 [DEBUG] 现有字段: {list(login_params.keys())}", "DEBUG")
return has_all_fields
except Exception as e:
self._log(f"🔍 [DEBUG] 解析失败: {e}", "DEBUG")
except Exception:
return False
def send_message(self, message: dict):

74
main.py
View File

@@ -38,6 +38,12 @@ class BalanceInsufficientSignals(QObject):
balance_insufficient = pyqtSignal(str) # (balance_message)
# 新增: 会话超时信号类(用于线程安全的超时重试提示)
class SessionTimeoutSignals(QObject):
"""会话超时信号"""
session_timeout = pyqtSignal(dict) # (retry_info)
# 新增: 用户名密码输入对话框类
class LoginWindow(QMainWindow):
def __init__(self):
@@ -71,6 +77,10 @@ class LoginWindow(QMainWindow):
self.balance_insufficient_signals = BalanceInsufficientSignals()
self.balance_insufficient_signals.balance_insufficient.connect(self._show_balance_insufficient_dialog)
# 创建会话超时信号对象(线程安全)
self.session_timeout_signals = SessionTimeoutSignals()
self.session_timeout_signals.session_timeout.connect(self._show_session_timeout_dialog)
# 横幅相关
self.promo_banner = None
self.banner_shadow = None # 阴影效果引用
@@ -735,6 +745,67 @@ class LoginWindow(QMainWindow):
import traceback
self.add_log(f"详细错误: {traceback.format_exc()}", "ERROR")
def _show_session_timeout_dialog(self, retry_info: dict):
"""显示会话超时重试对话框(信号槽函数,始终在主线程中执行)"""
try:
elapsed_time = retry_info.get('elapsed_time', 0)
store_id = retry_info.get('store_id', 'unknown')
pdd_instance = retry_info.get('pdd_instance')
self.add_log(f"🎯 主线程收到会话超时信号: 已等待{elapsed_time}", "INFO")
# 显示重试对话框
reply = QMessageBox.question(
self,
"拼多多会话过期",
f"拼多多会话已过期,等待后端处理超时(已等待{elapsed_time}秒)。\n\n"
f"可能原因:\n"
f"1. 后端正在处理验证码,需要更多时间\n"
f"2. 网络连接问题\n"
f"3. 后端服务异常\n\n"
f"是否重新发送通知给后端?",
QMessageBox.Yes | QMessageBox.No,
QMessageBox.Yes
)
if reply == QMessageBox.Yes:
self.add_log("✅ [GUI] 用户选择重试", "INFO")
# 调用重试方法
if pdd_instance and hasattr(pdd_instance, 'retry_session_recovery'):
try:
# 在pdd_instance的事件循环中执行重试
import asyncio
import threading
def retry_in_thread():
try:
# 创建新的事件循环或使用现有的
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# 执行重试
loop.run_until_complete(pdd_instance.retry_session_recovery())
self.add_log("✅ [GUI] 重试请求已发送", "SUCCESS")
except Exception as e:
self.add_log(f"❌ [GUI] 执行重试失败: {e}", "ERROR")
# 在新线程中执行异步操作
retry_thread = threading.Thread(target=retry_in_thread, daemon=True)
retry_thread.start()
except Exception as e:
self.add_log(f"❌ [GUI] 启动重试失败: {e}", "ERROR")
else:
self.add_log(" [GUI] 用户取消重试", "INFO")
except Exception as e:
self.add_log(f"❌ 显示会话超时对话框失败: {e}", "ERROR")
import traceback
self.add_log(f"详细错误: {traceback.format_exc()}", "ERROR")
def _show_balance_insufficient_dialog(self, balance_msg: str):
"""显示余额不足提示框(信号槽函数,始终在主线程中执行)"""
try:
@@ -1072,6 +1143,9 @@ class LoginWindow(QMainWindow):
balance_insufficient=self.on_balance_insufficient # 新增:余额不足回调
)
# 🔥 设置会话超时信号(用于拼多多会话过期重试)
ws_manager.session_timeout_signal = self.session_timeout_signals.session_timeout
# 连接后端
success = ws_manager.connect_backend(token)