diff --git a/Utils/Dy/DyUtils.py b/Utils/Dy/DyUtils.py index 2e1c8d6..5fc99df 100644 --- a/Utils/Dy/DyUtils.py +++ b/Utils/Dy/DyUtils.py @@ -119,11 +119,17 @@ class DouYinBackendService: msg_type = platform_message.get('msg_type', 'text') content_for_backend = platform_message.get('content', '') - + + pin_image = platform_message.get('pin_image') + if not pin_image: + pin_image = "" + else: + pass # 构造标准消息格式 msg = { 'type': 'message', 'content': content_for_backend, + 'pin_image': pin_image, 'msg_type': msg_type, 'sender': {'id': sender_id}, 'store_id': store_id diff --git a/Utils/Pdd/PddUtils.py b/Utils/Pdd/PddUtils.py index e6fd824..a7fc948 100644 --- a/Utils/Pdd/PddUtils.py +++ b/Utils/Pdd/PddUtils.py @@ -87,7 +87,7 @@ class PddBackendService: if isinstance(platform_message, dict): if 'store_id' not in platform_message and self.current_store_id: platform_message['store_id'] = self.current_store_id - + # 通过统一后端连接发送 backend.send_message(platform_message) return True @@ -503,7 +503,7 @@ class ChatPdd: if not uid or not content: self._log("❌ [External] 参数不完整", "ERROR") return False - + result = await self.send_ai_reply(uid, content) if result: self._log(f"✅ [External] 外部消息发送成功: uid={uid}", "SUCCESS") @@ -962,23 +962,23 @@ class PddListenerForGUI: """使用后端下发的登录参数执行登录并启动监听""" try: self._log("🔵 [PDD] 收到后端登录参数,开始执行登录获取cookies", "INFO") - + # 1. 解析登录参数 params_dict = self._parse_login_params(login_params) if not params_dict: self._log("❌ [PDD] 登录参数解析失败", "ERROR") return False - + # 2. 执行登录获取Cookie cookies = await self._execute_pdd_login(params_dict) if not cookies: self._log("❌ [PDD] 登录失败,无法获取cookies", "ERROR") return False - + # 3. 使用获取的Cookie继续原有流程 self._log("✅ [PDD] 登录成功,使用获取的cookies连接平台", "SUCCESS") return await self.start_with_cookies(store_id, cookies) - + except Exception as e: self._log(f"❌ [PDD] 使用登录参数启动失败: {str(e)}", "ERROR") import traceback @@ -1123,12 +1123,12 @@ class PddListenerForGUI: login_request = self._build_login_request(login_params) if not login_request: return "" - + # 2. 发送登录请求 response_data = await self._send_login_request(login_request) if not response_data: return "" - + # 3. 处理登录响应 if "需要验证图形验证码" in str(response_data): self._log("⚠️ [PDD] 登录需要验证码,暂不支持自动处理", "WARNING") @@ -1148,7 +1148,7 @@ class PddListenerForGUI: error_msg = response_data.get("errorMsg", "登录失败") self._log(f"❌ [PDD] 登录失败: {error_msg}", "ERROR") return "" - + except Exception as e: self._log(f"❌ [PDD] 执行登录失败: {e}", "ERROR") import traceback @@ -1163,13 +1163,13 @@ class PddListenerForGUI: anti_content = login_params.get("anti_content", "") risk_sign = login_params.get("risk_sign", "") timestamp = login_params.get("timestamp", int(time.time() * 1000)) - + if not all([username, password, anti_content, risk_sign]): self._log("❌ [PDD] 登录参数不完整", "ERROR") return {} - + import random - + # 构造请求头 headers = { "authority": "mms.pinduoduo.com", @@ -1189,7 +1189,7 @@ class PddListenerForGUI: "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", "anti-content": anti_content # 后端提供的关键参数 } - + # 构造请求体(完全复用pdd_login/login.py的结构) payload = { "username": username, @@ -1249,14 +1249,14 @@ class PddListenerForGUI: "timestamp": timestamp, "crawlerInfo": anti_content # 后端提供 } - + self._log("✅ [PDD] 登录请求构造成功", "INFO") return { "url": "https://mms.pinduoduo.com/janus/api/auth", "headers": headers, "payload": payload } - + except Exception as e: self._log(f"❌ [PDD] 构造登录请求失败: {e}", "ERROR") return {} @@ -1266,22 +1266,22 @@ class PddListenerForGUI: try: import requests import asyncio - + url = login_request["url"] headers = login_request["headers"] payload = login_request["payload"] - + # 使用线程池执行同步请求 def _send_request(): response = requests.post(url, headers=headers, json=payload, timeout=30) return response - + # 在事件循环中执行 loop = asyncio.get_event_loop() response = await loop.run_in_executor(None, _send_request) - + self._log(f"✅ [PDD] 登录请求发送完成,状态码: {response.status_code}", "INFO") - + if response.status_code == 200: result = response.json() cookies = response.cookies.get_dict() @@ -1294,7 +1294,7 @@ class PddListenerForGUI: else: self._log(f"❌ [PDD] 登录请求失败,HTTP状态码: {response.status_code}", "ERROR") return {} - + except Exception as e: self._log(f"❌ [PDD] 发送登录请求异常: {e}", "ERROR") return {} diff --git a/Utils/QianNiu/QianNiuUtils.py b/Utils/QianNiu/QianNiuUtils.py index e2d07ed..610796e 100644 --- a/Utils/QianNiu/QianNiuUtils.py +++ b/Utils/QianNiu/QianNiuUtils.py @@ -18,18 +18,114 @@ import websockets import aiohttp import sys import io +import threading +from Utils.message_models import PlatformMessage +import config # 设置标准输出编码为UTF-8 sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8') sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8') -# 配置日志 +# 千牛WebSocket管理器类 +class QianNiuWebsocketManager: + """千牛WebSocket管理器 - 管理多店铺连接""" + _instance = None + _lock = threading.Lock() + + def __new__(cls): + if cls._instance is None: + with cls._lock: + if cls._instance is None: + cls._instance = super().__new__(cls) + cls._instance._store = {} + cls._instance._lock = threading.RLock() + return cls._instance + + def on_connect(self, shop_key, qianniu_client, **kwargs): + """存储千牛连接信息""" + with self._lock: + entry = self._store.setdefault(shop_key, { + 'platform': None, + 'customers': [], + 'user_assignments': {} + }) + entry['platform'] = { + 'qianniu_client': qianniu_client, + 'message_handler': qianniu_client.message_handler if hasattr(qianniu_client, + 'message_handler') else None, + 'last_heartbeat': datetime.now(), + **kwargs + } + return entry + + def get_connection(self, shop_key): + """获取千牛连接信息""" + with self._lock: + return self._store.get(shop_key) + + def remove_connection(self, shop_key): + """移除千牛连接""" + with self._lock: + if shop_key in self._store: + del self._store[shop_key] + + def get_all_connections(self): + """获取所有千牛连接""" + with self._lock: + return dict(self._store) + + def update_heartbeat(self, shop_key): + """更新心跳时间""" + with self._lock: + if shop_key in self._store and self._store[shop_key]['platform']: + self._store[shop_key]['platform']['last_heartbeat'] = datetime.now() + + +# 千牛(淘宝)平台专用配置 +QIANNIU_CONFIG = { + # 赛牛插件服务配置 + "host": "127.0.0.1", + "port": 3030, + "ws_url": "ws://127.0.0.1:3030", + "http_api_url": "http://127.0.0.1:3030/QianNiu/Function", + "http_api_url_avatar": "http://127.0.0.1:3030/QianNiu/Api", + + # 认证配置 + "access_type": 1, # 企业版 + "access_id": "maguabishop", + "access_key": "bWFndWFfYmlzaG9w", + + # 默认签名配置 + "sign_key": b'111111', + "sha256": True, + + # 测试配置 + "test_store_id": "test_store_001", + "test_store_name": "测试店铺", + "test_user_nick": "tb420723827:redboat", + "default_store_id": "4c4025e3-8702-42fc-bdc2-671e335c0ff7", + + # 连接配置 + "connect_timeout": 60, + "reconnect_attempts": 3, + "reconnect_delay": 5, + "heartbeat_interval": 20, + "message_timeout": 5.0, + + # DLL配置 + "dll_startup_timeout": 15, + "dll_ready_timeout": 30, + "dll_ready_check_delay": 2, + "service_startup_delay": 2 +} + +# 配置日志(修复Unicode编码问题) logging.basicConfig( level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ - logging.FileHandler("sainiu_test.log"), + logging.FileHandler("sainiu_test.log", encoding='utf-8'), logging.StreamHandler() ] ) @@ -37,7 +133,8 @@ logger = logging.getLogger("SaiNiuTest") # 赛牛插件配置 QN_WS_URL = "ws://127.0.0.1:3030" # 赛牛插件WebSocket地址 -QN_HTTP_API_URL = "http://127.0.0.1:3030/QianNiu/Api" # 赛牛插件HTTP API地址 +QN_HTTP_API_URL = "http://127.0.0.1:3030/QianNiu/Function" +QN_HTTP_API_URL_AVATAR = 'http://127.0.0.1:3030/QianNiu/Api' # 赛牛插件HTTP API地址 STORE_ID = "test_store_001" # 测试店铺ID STORE_NAME = "测试店铺" # 测试店铺名称 USER_NICK = "tb420723827:redboat" # 测试账号 @@ -57,124 +154,297 @@ class SaiNiuService: self.port = 3030 # 默认端口 self.sign_key = b'' # 默认签名密钥 self.sha256 = True # 使用SHA256签名 - self.python32_path = r"F:\飞书下载地址\shuidrop_gui\Utils\PythonNew32\python32.exe" + + # 获取项目根目录(相对于当前文件) + current_dir = os.path.dirname(os.path.abspath(__file__)) # Utils/QianNiu/ + project_root = os.path.dirname(os.path.dirname(current_dir)) # shuidrop_gui/ + self.python32_path = os.path.join(project_root, "Utils", "PythonNew32", "python32.exe") + self.dll_dir = os.path.join(project_root, "Utils", "PythonNew32") self.dll_process = None - def load_dll(self, dll_path='F:\\飞书下载地址\\shuidrop_gui\\Utils\\PythonNew32\\SaiNiuApi.dll'): + # 验证关键路径 + self._validate_paths() + + def _validate_paths(self): + """验证关键路径是否存在""" + print(f"🔍 验证路径配置...") + print(f"📁 DLL目录: {self.dll_dir}") + print(f"🐍 Python32路径: {self.python32_path}") + + if not os.path.exists(self.dll_dir): + print(f"⚠️ DLL目录不存在: {self.dll_dir}") + else: + print(f"✅ DLL目录存在") + + if not os.path.exists(self.python32_path): + print(f"⚠️ Python32执行文件不存在: {self.python32_path}") + else: + print(f"✅ Python32执行文件存在") + + # 检查DLL文件 + dll_path = os.path.join(self.dll_dir, "SaiNiuApi.dll") + if not os.path.exists(dll_path): + print(f"⚠️ SaiNiuApi.dll文件不存在: {dll_path}") + else: + print(f"✅ SaiNiuApi.dll文件存在") + + def _read_dll_log(self, log_file_path): + """读取DLL日志文件""" + try: + if os.path.exists(log_file_path): + with open(log_file_path, 'r', encoding='utf-8') as f: + content = f.read() + if content: + print("📄 DLL启动日志文件内容:") + print("-" * 50) + print(content) + print("-" * 50) + else: + print("📄 DLL日志文件为空") + else: + print("📄 DLL日志文件不存在") + except Exception as e: + print(f"❌ 读取DLL日志文件失败: {e}") + + def load_dll(self, dll_path=None): """加载DLL文件 - 使用Python32执行""" try: + # 如果没有指定路径,使用默认的相对路径 + if dll_path is None: + dll_path = os.path.join(self.dll_dir, "SaiNiuApi.dll") + + # 验证路径是否存在 + if not os.path.exists(dll_path): + print(f"❌ DLL文件不存在: {dll_path}") + return False + + if not os.path.exists(self.python32_path): + print(f"❌ Python32执行文件不存在: {self.python32_path}") + return False + # 切换到DLL目录 dll_dir = os.path.dirname(dll_path) + original_dir = os.getcwd() # 保存当前目录 os.chdir(dll_dir) print(f"📁 切换到DLL目录: {dll_dir}") - # 创建DLL启动脚本 - 完全按照demo的代码 + # 创建DLL启动脚本 - 增强版,带详细状态输出和日志 script_content = '''import ctypes import hashlib import time import json +import sys +import os -# 加载DLL文件 -sainiu_api = ctypes.CDLL('SaiNiuApi.dll') +# 强制刷新输出缓冲区 +sys.stdout.reconfigure(line_buffering=True) +sys.stderr.reconfigure(line_buffering=True) -# 定义函数参数类型和返回值类型 -sainiu_api.Access_ServerStart.argtypes = [ - ctypes.c_int, - ctypes.c_bool, - ctypes.c_bool, - ctypes.c_bool, - ctypes.c_bool, - ctypes.c_int, - ctypes.c_char_p, - ctypes.c_bool, - ctypes.c_bool -] -sainiu_api.Access_ServerStart.restype = ctypes.c_char_p +# 创建日志文件 +log_file = open("dll_startup.log", "w", encoding="utf-8") -# 调用函数时传入的参数 -port = 3030 -web_socket = True -http_server = True -remote = False -log = True -ws_max = 0 -sign_key = b'' -sha256 = True -version_tip = True +def log_print(msg): + """同时输出到控制台和日志文件""" + print(msg) + log_file.write(msg + "\\n") + log_file.flush() + sys.stdout.flush() -# 启动服务器 -result = sainiu_api.Access_ServerStart( - port, - web_socket, - http_server, - remote, - log, - ws_max, - sign_key, - sha256, - version_tip -) +log_print("=== SaiNiu DLL 启动脚本开始 ===") +log_print(f"Python版本: {sys.version}") +log_print(f"当前工作目录: {os.getcwd()}") +log_print(f"时间戳: {time.strftime('%Y-%m-%d %H:%M:%S')}") -# 解码并打印结果 try: - result_str = result.decode('gbk') - print(f"Access_ServerStart 服务器启动: {result_str}") -except UnicodeDecodeError: - print(f"Access_ServerStart 服务器启动: {result}") + log_print("正在加载 SaiNiuApi.dll...") + # 加载DLL文件 + sainiu_api = ctypes.CDLL('SaiNiuApi.dll') + log_print("✅ SaiNiuApi.dll 加载成功") -# 保持进程运行 -try: - while True: - time.sleep(1) -except KeyboardInterrupt: - pass + # 定义函数参数类型和返回值类型 + sainiu_api.Access_ServerStart.argtypes = [ + ctypes.c_int, + ctypes.c_bool, + ctypes.c_bool, + ctypes.c_bool, + ctypes.c_bool, + ctypes.c_int, + ctypes.c_char_p, + ctypes.c_bool, + ctypes.c_bool + ] + sainiu_api.Access_ServerStart.restype = ctypes.c_char_p + log_print("✅ DLL函数类型定义完成") + + # 调用函数时传入的参数 + port = 3030 + web_socket = True + http_server = True + remote = False + log = True + ws_max = 0 + sign_key = b'' + sha256 = True + version_tip = True + + log_print(f"正在启动服务器 - 端口: {port}") + # 启动服务器 + result = sainiu_api.Access_ServerStart( + port, + web_socket, + http_server, + remote, + log, + ws_max, + sign_key, + sha256, + version_tip + ) + + # 解码并打印结果 + try: + result_str = result.decode('gbk') + log_print(f"✅ Access_ServerStart 服务器启动结果: {result_str}") + except UnicodeDecodeError: + log_print(f"✅ Access_ServerStart 服务器启动结果: {result}") + + log_print("=== DLL服务启动完成,进入监控模式 ===") + + # 保持进程运行 + try: + heartbeat_count = 0 + while True: + time.sleep(5) + heartbeat_count += 1 + log_print(f"❤️ DLL服务心跳 #{heartbeat_count} - 服务正常运行中...") + except KeyboardInterrupt: + log_print("收到中断信号,正在停止服务...") + +except Exception as e: + log_print(f"❌ DLL启动失败: {e}") + import traceback + error_trace = traceback.format_exc() + log_print(f"错误详情: {error_trace}") + log_file.close() + sys.exit(1) +finally: + if 'log_file' in locals(): + log_file.close() ''' - # 保存脚本 + # 保存脚本(先清理旧的) script_path = os.path.join(dll_dir, "dll_loader.py") + if os.path.exists(script_path): + print(f"🧹 清理旧的启动脚本: {script_path}") + os.remove(script_path) + + print(f"📝 创建新的启动脚本: {script_path}") with open(script_path, "w", encoding="utf-8") as f: f.write(script_content) # 使用Python32启动脚本 + print(f"🚀 启动Python32进程: {self.python32_path}") + print(f"📄 脚本路径: {script_path}") + self.dll_process = subprocess.Popen( - [self.python32_path, script_path], + [self.python32_path, "-u", script_path], # -u 参数强制不缓冲输出 stdout=subprocess.PIPE, stderr=subprocess.PIPE, creationflags=subprocess.CREATE_NEW_PROCESS_GROUP, cwd=dll_dir, encoding='gbk', - text=True + text=True, + bufsize=0 # 不缓冲 ) - # 等待进程启动并获取结果 - try: - stdout, stderr = self.dll_process.communicate(timeout=15) + # 等待进程启动(改进版) + print("⏰ 等待Python32进程启动DLL服务...") - if stderr: - print(f"❌ DLL启动错误: {stderr}") + # 给进程一些时间启动 + time.sleep(3) + + # 检查进程是否还在运行 + if self.dll_process.poll() is not None: + # 进程已结束,获取输出 + stdout, stderr = self.dll_process.communicate() + print(f"❌ Python32进程意外退出") + print(f"标准输出: {stdout}") + print(f"错误输出: {stderr}") + return False + + # 进程还在运行,检查进程状态和日志文件(增强版) + print("🔍 监控Python32进程状态和DLL日志...") + log_file_path = os.path.join(dll_dir, "dll_startup.log") + + # 等待一段时间让DLL有机会启动 + for i in range(15): # 延长到15秒 + if self.dll_process.poll() is not None: + # 进程已退出 + stdout, stderr = self.dll_process.communicate() + print(f"❌ Python32进程退出 (退出码: {self.dll_process.returncode})") + if stdout: + print(f"📋 标准输出: {stdout}") + if stderr: + print(f"❌ 错误输出: {stderr}") + + # 尝试读取日志文件了解详细情况 + self._read_dll_log(log_file_path) return False - print(f"✅ 赛牛服务器启动结果: {stdout}") + # 检查日志文件是否有新内容 + if os.path.exists(log_file_path): + try: + with open(log_file_path, 'r', encoding='utf-8') as f: + log_content = f.read() + if log_content: + print(f"📋 DLL日志 (第{i + 1}秒):") + for line in log_content.strip().split('\n'): + if line.strip(): + print(f" {line}") - # 检查是否启动成功 - if "SUCCESS" in stdout or "成功" in stdout or "200" in stdout: - self.dll_loaded = True - print("✅ DLL加载成功") - return True - else: - print("❌ 服务器启动失败") - return False + # 检查是否有启动成功的标志 + if "Access_ServerStart 服务器启动结果:" in log_content: + if any(keyword in log_content for keyword in ["成功", "SUCCESS", "200"]): + print("✅ 检测到DLL启动成功标志") + self.dll_loaded = True + return True - except subprocess.TimeoutExpired: - # 超时不一定表示失败,可能服务正在运行 - print("⏰ DLL启动超时,但服务可能已在后台运行") + # 检查是否有错误 + if "❌ DLL启动失败:" in log_content: + print("❌ 检测到DLL启动失败") + return False + except Exception as e: + print(f"⚠️ 读取日志文件失败: {e}") + + print(f"⏰ 等待DLL启动... ({i + 1}/15)") + time.sleep(1) + + # 15秒后进程仍在运行,检查最终状态 + if self.dll_process.poll() is None: + print("✅ Python32进程仍在运行") + # 最后读取一次日志文件 + self._read_dll_log(log_file_path) self.dll_loaded = True return True + else: + print("❌ 进程已退出,启动失败") + self._read_dll_log(log_file_path) + return False except Exception as e: print(f"❌ 加载SaiNiu DLL失败: {e}") + # 恢复原始目录 + try: + os.chdir(original_dir) + except: + pass return False - + finally: + # 确保恢复原始目录 + try: + os.chdir(original_dir) + except: + pass def sign_get_sign(self, post, timestamp): """计算签名 - 按照官方demo的逻辑修改""" @@ -262,8 +532,8 @@ except KeyboardInterrupt: # 4. 清理可能的临时文件 temp_files = [ - os.path.join(os.path.dirname(self.dll_path), "dll_loader.py"), - os.path.join(os.path.dirname(self.dll_path), "*.tmp") + os.path.join(self.dll_dir, "dll_loader.py"), + os.path.join(self.dll_dir, "*.tmp") ] for pattern in temp_files: for f in glob.glob(pattern): @@ -317,213 +587,113 @@ class MockStore: self.store_account = account self.store_password = password -@dataclass -class MessageTemplate: - """通用消息结构体 - 严格按照WebSocket文档v2格式""" - # 必填字段 - type: str # 消息类型:"message" | "staff_list" | "ping" | "pong" | "transfer" | "connect_success" | "error" - - # 条件必填字段(根据消息类型) - content: Optional[str] = None # 消息内容 - msg_type: Optional[str] = None # 消息实际类型:"text" | "image" | "video" | "product_card" | "order_card" - pin_image: Optional[str] = None # 用户头像URL - sender: Optional[Dict] = None # 发送者信息,必须包含id字段 - store_id: Optional[str] = None # 店铺ID - - # 可选字段 - receiver: Optional[Dict] = None # 接收者信息 - data: Optional[Dict] = None # 扩展数据(如客服列表、验证链接等) - uuid: Optional[str] = None # 心跳包UUID - token: Optional[str] = None # 认证令牌(心跳包可能需要) - - def __post_init__(self): - """初始化后处理""" - # 自动设置msg_type(仅对message类型) - if self.type == "message" and self.msg_type is None and self.content: - self.msg_type = self._detect_msg_type() - - def _detect_msg_type(self) -> str: - """自动检测消息类型""" - if not self.content: - return "text" - - content = self.content.lower() - - # 图片检测 - if any(ext in content for ext in ['.jpg', '.jpeg', '.png', '.gif', '.webp']): - return "image" - - # 视频检测 - if any(ext in content for ext in ['.mp4', '.avi', '.mov', '.wmv', '.flv']): - return "video" - - # 订单卡片检测 - 支持多种格式 - if ("商品id:" in self.content and "订单号:" in self.content) or \ - ("商品ID:" in self.content and "订单号:" in self.content) or \ - ("咨询订单号:" in self.content and "商品ID:" in self.content): - return "order_card" - - # 商品卡片检测 - if any(keyword in content for keyword in ['goods.html', 'item.html', 'item.jd.com', '商品卡片id:']): - return "product_card" - - # 默认文本消息 - return "text" - - def to_json(self) -> str: - """序列化为JSON字符串""" - return json.dumps(self.to_dict(), ensure_ascii=False) - - def to_dict(self) -> Dict[str, Any]: - """转换为字典,过滤None值""" - result = {} - for key, value in asdict(self).items(): - if value is not None: - result[key] = value - return result - - @classmethod - def from_json(cls, json_str: str): - """从JSON字符串恢复结构体""" - data = json.loads(json_str) - return cls(**data) - - @classmethod - def from_dict(cls, data: Dict[str, Any]): - """从字典创建实例""" - return cls(**data) - - @classmethod - def create_text_message(cls, content: str, sender_id: str, store_id: str, pin_image: str = None): - """创建文本消息""" - return cls( - type="message", - content=content, - msg_type="text", - pin_image=pin_image, - sender={"id": sender_id}, - store_id=store_id - ) - - @classmethod - def create_image_message(cls, image_url: str, sender_id: str, store_id: str, pin_image: str = None): - """创建图片消息""" - return cls( - type="message", - content=image_url, - msg_type="image", - pin_image=pin_image, - sender={"id": sender_id}, - store_id=store_id - ) - - @classmethod - def create_video_message(cls, video_url: str, sender_id: str, store_id: str, pin_image: str = None): - """创建视频消息""" - return cls( - type="message", - content=video_url, - msg_type="video", - pin_image=pin_image, - sender={"id": sender_id}, - store_id=store_id - ) - - @classmethod - def create_order_card_message(cls, product_id: str, order_number: str, sender_id: str, store_id: str, pin_image: str = None): - """创建订单卡片消息""" - return cls( - type="message", - content=f"商品id:{product_id} 订单号:{order_number}", - msg_type="order_card", - pin_image=pin_image, - sender={"id": sender_id}, - store_id=store_id - ) - - @classmethod - def create_product_card_message(cls, product_url: str, sender_id: str, store_id: str, pin_image: str = None): - """创建商品卡片消息""" - return cls( - type="message", - content=product_url, - msg_type="product_card", - pin_image=pin_image, - sender={"id": sender_id}, - store_id=store_id - ) - - @classmethod - def create_staff_list_message(cls, staff_list: list, store_id: str): - """创建客服列表消息""" - return cls( - type="staff_list", - content="客服列表更新", - data={"staff_list": staff_list}, - store_id=store_id - ) - - @classmethod - def create_ping_message(cls, uuid_str: str, token: str = None): - """创建心跳检测消息""" - message = cls( - type="ping", - uuid=uuid_str - ) - if token: - message.token = token - return message - - @classmethod - def create_pong_message(cls, uuid_str: str): - """创建心跳回复消息""" - return cls( - type="pong", - uuid=uuid_str - ) - class AIServiceConnector: - """AI服务连接器 - 负责与AI后端服务通信(已改为单后端连接)""" + """AI服务连接器 - 负责与AI后端服务通信(单连接多店铺架构)""" - def __init__(self, backend_host="192.168.5.155", backend_port=8000): - # 旧版后端WS地址移除:统一使用单后端连接(ws/gui//) + def __init__(self, backend_host=config.BACKEND_HOST, backend_port=config.BACKEND_PORT): + # 使用新的单连接多店铺架构 + self.backend_host = backend_host + self.backend_port = backend_port self.websocket = None self.store_id = None self.connected = False - self.connect_timeout = 60 + self.connect_timeout = QIANNIU_CONFIG["connect_timeout"] self.service_cookie = None - self.message_handlers = {} # 新增:消息处理器字典 - self.pending_ai_replies = {} # 新增:等待AI回复的Future + self.message_handlers = {} # 消息处理器字典 + self.pending_ai_replies = {} # 等待AI回复的结果存储 + self._cleanup_task = None # 清理任务 self.reconnect_attempts = 0 - self.max_reconnect_attempts = 3 - self.reconnect_delay = 5 # 重连间隔时间(秒) + self.max_reconnect_attempts = QIANNIU_CONFIG["reconnect_attempts"] + self.reconnect_delay = QIANNIU_CONFIG["reconnect_delay"] - - async def connect(self, store_id, auth_dict=None): - """连接后端AI服务:已不再直接连接后端,统一走单后端连接""" - self.store_id = store_id - self.connected = True - return True - - def send_to_backend_single(self, content: str, msg_type: str, sender_pin: str): - """通过单后端连接发送消息,携带store_id""" + async def connect_with_token(self, exe_token): + """使用用户token连接后端AI服务(新架构)""" try: - from WebSocket.backend_singleton import get_backend_client - backend = get_backend_client() - if not backend: - return False - message = { - 'type': 'message', - 'content': content, - 'msg_type': msg_type, - 'sender': {'id': sender_pin}, - 'store_id': self.store_id - } - backend.send_message(message) - return True - except Exception: + # 使用新的单连接多店铺URL格式 + full_url = config.get_gui_ws_url(exe_token) + + print(f"🔗 连接到后端服务: {full_url}") + + # 添加连接超时和重试机制 + self.websocket = await asyncio.wait_for( + websockets.connect(full_url), + timeout=self.connect_timeout + ) + + # 等待连接确认(设置超时) + try: + response = await asyncio.wait_for(self.websocket.recv(), timeout=16.0) + data = json.loads(response) + + print(f"🔗 收到后端连接响应: {data}") + + # 处理新架构的连接成功消息 + if data.get("type") == "success" and data.get("content") == "connected": + self.connected = True + self._log(f"✅ 后端服务连接成功(单连接多店铺架构)", "SUCCESS") + + # 启动清理任务 + self._start_cleanup_task() + return True + + elif data.get("type") == "connect_success": + # 兼容旧版连接成功消息 + self.connected = True + self._log(f"✅ AI服务连接成功: {data.get('content')}", "SUCCESS") + + # 从连接成功消息中提取认证信息 + content = data.get("content", "") + if content: + try: + # 解析认证信息 + self.service_cookie = { + 'cookie': content, + 'status': True + } + self._log(f"✅ 从服务获取到认证信息", "SUCCESS") + except Exception as e: + self._log(f"❌ 解析认证信息失败: {e}", "ERROR") + self.service_cookie = None + + # 启动清理任务 + self._start_cleanup_task() + return True + + elif data.get("type") == "error": + # 处理错误消息 + error_data = data.get("data", {}) + if error_data.get('verify_link'): + error_msg = error_data.get('verify_link') + self._log(f"❌ 服务器返回错误,需要验证: {error_msg}", "ERROR") + self.service_cookie = { + 'status': False, + 'verify_link': error_msg + } + else: + error_msg = data.get("message", "未知错误") + self._log(f"❌ 服务器返回错误: {error_msg}", "ERROR") + self.service_cookie = { + 'status': False, + 'verify_link': None + } + return False + + else: + self._log(f"❌ 服务连接失败: {data.get('content')}", "ERROR") + return False + + except asyncio.TimeoutError: + self._log("⏰ 等待连接确认超时,但连接可能已建立", "WARNING") + self.connected = True + return True + + except asyncio.TimeoutError: + self._log(f"❌ 连接服务超时({self.connect_timeout}秒)", "ERROR") + return False + except Exception as e: + self._log(f"❌ 连接服务失败: {e}", "ERROR") return False async def start_message_loop(self): @@ -563,8 +733,8 @@ class AIServiceConnector: self._log(f"❌ 消息循环异常: {e}", "ERROR") await asyncio.sleep(1) - async def _reconnect(self): - """重连机制""" + async def _reconnect(self, exe_token=None): + """重连机制(新架构)""" if self.reconnect_attempts >= self.max_reconnect_attempts: self._log("❌ 重连次数超过限制,停止重连", "ERROR") return False @@ -577,8 +747,13 @@ class AIServiceConnector: if self.websocket: await self.websocket.close() - # 重新连接 - success = await self.connect(self.store_id) + # 重新连接(需要exe_token) + if exe_token: + success = await self.connect_with_token(exe_token) + else: + self._log("❌ 重连失败:缺少exe_token", "ERROR") + return False + if success: self.reconnect_attempts = 0 # 重置重试计数 self._log("✅ 重连成功", "SUCCESS") @@ -602,123 +777,149 @@ class AIServiceConnector: try: message_dict = message_template.to_dict() - user_id = message_template.sender.get("id", "") # 使用sender.id作为key + + # 使用sender.id作为匹配键(因为AI回复会使用receiver.id) + sender_info = message_dict.get("sender", {}) + sender_id = sender_info.get("id") if sender_info else None + + if not sender_id: + self._log("❌ 消息中没有发送者ID", "ERROR") + return None # 详细的调试信息 print("=" * 60) print("🔍 调试信息 - 准备发送消息到后端:") print(f"连接状态: {self.connected}") print(f"WebSocket状态: {self.websocket is not None}") - print(f"消息ID: {message_id}") + print(f"消息ID: {sender_id}") print(f"消息内容: {message_dict.get('content', '')[:100]}...") print("=" * 60) - # 创建Future来等待回复 - future = asyncio.Future() - self.pending_ai_replies[user_id] = future + # 创建回复等待记录(添加时间戳用于清理) + self.pending_ai_replies[sender_id] = { + 'reply': None, + 'received': False, + 'created_at': asyncio.get_event_loop().time() + } await self.websocket.send(json.dumps(message_dict)) self._log(f"📤 已发送消息到AI服务: {message_template.content[:50]}...", "DEBUG") - # 等待AI回复(设置超时) + # 轮询等待AI回复(设置超时) try: - ai_reply = await asyncio.wait_for(future, timeout=15.0) - self._log(f"✅ 成功获取AI回复: {ai_reply[:50]}...", "SUCCESS") - return ai_reply + timeout = 15.0 + start_time = asyncio.get_event_loop().time() - except asyncio.TimeoutError: - self._log("⏰ 等待AI回复超时(15秒)", "WARNING") - self.pending_ai_replies.pop(user_id, None) + while True: + # 检查是否收到回复 + if sender_id in self.pending_ai_replies: + reply_info = self.pending_ai_replies[sender_id] + if reply_info.get('received'): + ai_reply = reply_info.get('reply') + self.pending_ai_replies.pop(sender_id, None) + self._log(f"✅ 成功获取AI回复: {ai_reply[:50]}...", "SUCCESS") + return ai_reply + + # 检查超时 + if asyncio.get_event_loop().time() - start_time > timeout: + self._log("⏰ 等待AI回复超时(15秒)", "WARNING") + self.pending_ai_replies.pop(sender_id, None) + return None + + # 短暂等待后继续检查 + await asyncio.sleep(0.1) + + except Exception as e: + self._log(f"❌ 等待AI回复异常: {e}", "ERROR") + self.pending_ai_replies.pop(sender_id, None) return None except ConnectionError as e: self._log(f"❌ 连接已断开: {e}", "ERROR") - self.pending_ai_replies.pop(user_id, None) + self.pending_ai_replies.pop(sender_id, None) return None except websockets.exceptions.ConnectionClosed: self._log("❌ 发送消息时连接已关闭", "ERROR") self.connected = False - self.pending_ai_replies.pop(user_id, None) + self.pending_ai_replies.pop(sender_id, None) return None except Exception as e: self._log(f"❌ 发送消息到AI服务失败: {e}", "ERROR") - self.pending_ai_replies.pop(user_id, None) + self.pending_ai_replies.pop(sender_id, None) return None async def _dispatch_message(self, data, message_type): - """分发消息到对应的处理器""" - # 处理customer_message类型(客服消息) - if message_type == "customer_message": - handler = self.message_handlers.get("customer_message") - if handler and callable(handler): - asyncio.create_task(handler(data)) + """分发消息到对应的处理器(更新为新协议)""" - # 处理message类型(AI回复) - elif message_type == "message": - # 检查是否有等待AI回复的Future - 使用receiver.id匹配 + # 处理message类型(统一消息处理) + if message_type == "message": + # 使用receiver.id作为匹配键 receiver_info = data.get("receiver", {}) - user_id = receiver_info.get("id", "") - if user_id and user_id in self.pending_ai_replies: - future = self.pending_ai_replies.pop(user_id) - if not future.done(): - future.set_result(data.get("content", "")) + receiver_id = receiver_info.get("id") if receiver_info else None - # 同时调用消息处理器(用于监控) - handler = self.message_handlers.get("message") + # 添加调试日志 + self._log(f"处理AI回复 - receiver_id: {receiver_id}", "DEBUG") + self._log(f"当前等待的回复列表: {list(self.pending_ai_replies.keys())}", "DEBUG") + + if receiver_id and receiver_id in self.pending_ai_replies: + # 这是对买家消息的AI回复,设置回复结果 + reply_info = self.pending_ai_replies.get(receiver_id) + if reply_info and not reply_info.get('received'): + content = data.get("content", "") + self._log(f"设置AI回复结果: {content[:50]}...", "DEBUG") + reply_info['reply'] = content + reply_info['received'] = True + else: + self._log("回复已接收或已取消", "WARNING") + else: + # 这是后端主动推送的消息,需要转发给买家 + self._log(f"后端主动推送消息: {receiver_id}", "WARNING") + # 打印所有等待中的回复,用于调试 + for mid, info in self.pending_ai_replies.items(): + self._log(f"等待中的回复 - {mid}: {info}", "DEBUG") + + # 只有在没有对应等待时才调用消息处理器(避免重复处理) + handler = self.message_handlers.get("message") + if handler and callable(handler): + # 添加任务管理避免泄露 + task = asyncio.create_task(handler(data)) + task.add_done_callback(self._handle_task_completion) + + # 处理transfer类型消息 + elif message_type == "transfer": + receiver_info = data.get("receiver", {}) + receiver_id = receiver_info.get("id") if receiver_info else None + + if receiver_id and receiver_id in self.pending_ai_replies: + reply_info = self.pending_ai_replies.get(receiver_id) + if reply_info and not reply_info.get('received'): + reply_info['reply'] = data + reply_info['received'] = True + + # 处理其他协议消息类型 + elif message_type in ["connect_success", "error", "staff_list"]: + handler = self.message_handlers.get(message_type) if handler and callable(handler): - asyncio.create_task(handler(data)) + task = asyncio.create_task(handler(data)) + task.add_done_callback(self._handle_task_completion) - # 可以添加其他消息类型的处理 + # 记录未处理的消息类型 else: self._log(f"📨 未处理的消息类型: {message_type}", "DEBUG") + def _handle_task_completion(self, task): + """处理异步任务完成(避免任务泄露)""" + try: + if task.exception(): + self._log(f"❌ 异步任务异常: {task.exception()}", "ERROR") + except Exception as e: + self._log(f"❌ 处理任务完成时异常: {e}", "ERROR") + def register_message_handler(self, message_type, handler): """注册消息处理器""" self.message_handlers[message_type] = handler self._log(f"✅ 注册消息处理器: {message_type}", "DEBUG") - # async def listen_for_backend_messages(self, message_callback): - # """监听后端发送的消息""" - # self._log(f"🎯 进入后端消息监听方法,连接状态: {self.connected}", "DEBUG") - # - # if not self.connected or not self.websocket: - # self._log("❌ AI服务未连接,无法监听消息", "ERROR") - # return - # - # self._log("🔵 开始监听后端消息...", "INFO") - # RECV_TIMEOUT = 15.0 - # - # try: - # while self.connected and self.websocket: - # try: - # message = await asyncio.wait_for( - # self.websocket.recv(), - # timeout=RECV_TIMEOUT - # ) - # - # data = json.loads(message) - # self._log(f"📥 收到消息类型: {data.get('type')}", "DEBUG") - # - # if data.get("type") == "customer_message": - # self._log("✅ 处理客服消息", "INFO") - # if callable(message_callback): - # # 使用create_task避免阻塞主循环 - # asyncio.create_task(message_callback(data)) - # - # except asyncio.TimeoutError: - # # 超时是正常的,继续监听 - # continue - # except websockets.exceptions.ConnectionClosed: - # self._log("🔌 连接已关闭", "WARNING") - # self.connected = False - # break - # except Exception as e: - # self._log(f"❌ 消息处理异常: {e}", "ERROR") - # await asyncio.sleep(1) # 避免频繁报错 - # - # except Exception as e: - # self._log(f"❌ 监听循环异常: {e}", "ERROR") - async def _parse_ai_response(self, response_data): """解析后端返回的AI回复""" try: @@ -754,8 +955,52 @@ class AIServiceConnector: self._log(f"❌ 解析AI回复失败: {e}", "ERROR") return None + def _start_cleanup_task(self): + """启动清理任务""" + if self._cleanup_task and not self._cleanup_task.done(): + self._cleanup_task.cancel() + self._cleanup_task = asyncio.create_task(self._periodic_cleanup()) + + async def _periodic_cleanup(self): + """定期清理超时的回复等待""" + while self.connected: + try: + current_time = asyncio.get_event_loop().time() + expired_ids = [] + + # 检查超时的回复等待(30秒超时) + for message_id, reply_info in self.pending_ai_replies.items(): + if isinstance(reply_info, dict): + create_time = reply_info.get('created_at', 0) + if current_time - create_time > 30: # 30秒超时 + expired_ids.append(message_id) + else: + # 兼容旧格式 + expired_ids.append(message_id) + + # 清理超时的回复等待 + for message_id in expired_ids: + self.pending_ai_replies.pop(message_id, None) + self._log(f"🧹 清理超时回复等待: {message_id}", "DEBUG") + + # 每10秒清理一次 + await asyncio.sleep(10) + + except asyncio.CancelledError: + break + except Exception as e: + self._log(f"❌ 清理异常: {e}", "ERROR") + await asyncio.sleep(10) + async def close(self): """关闭连接""" + # 停止清理任务 + if self._cleanup_task and not self._cleanup_task.done(): + self._cleanup_task.cancel() + + # 清理所有等待的回复 + self.pending_ai_replies.clear() + if self.websocket: await self.websocket.close() self.connected = False @@ -783,10 +1028,11 @@ class AIServiceConnector: class AIServiceIntegration: - """AI服务集成类 - 负责处理AI回复相关功能""" + """AI服务集成类 - 负责处理AI回复相关功能(单连接多店铺架构)""" - def __init__(self, store_id="4c4025e3-8702-42fc-bdc2-671e335c0ff7"): - self.store_id = store_id + def __init__(self, store_id=None, exe_token=None): + self.store_id = store_id or QIANNIU_CONFIG["default_store_id"] + self.exe_token = exe_token # 新增:用户执行令牌 self.ai_service = AIServiceConnector() self.loop = None self._ensure_event_loop() @@ -802,10 +1048,15 @@ class AIServiceIntegration: asyncio.set_event_loop(self.loop) async def initialize_ai_service(self): - """初始化AI服务""" + """初始化AI服务(新架构)""" try: - self._log("正在初始化AI服务...", "INFO") - success = await self.ai_service.connect(self.store_id) + self._log("正在初始化AI服务(单连接多店铺架构)...", "INFO") + + if not self.exe_token: + self._log("❌ 缺少exe_token,无法初始化AI服务", "ERROR") + return False + + success = await self.ai_service.connect_with_token(self.exe_token) if success: self._log("AI服务初始化成功", "SUCCESS") @@ -836,34 +1087,35 @@ class AIServiceIntegration: if not await self.ensure_connection(): return "您好,感谢您的咨询,我们会尽快回复您!" + if message_content == "当前用户来自 商品详情页": + return "查看商品中~~~~" - # 创建消息模板对象 - message_template = MessageTemplate( + # 检测消息类型 + msg_type = "text" # 默认文本类型 + control_type = str(message_content) + if control_type.__contains__("http") and ( + control_type.__contains__(".jpg") or control_type.__contains__(".png")): + msg_type = "image" + elif control_type.__contains__("http") and ( + control_type.__contains__(".mp4") or control_type.__contains__(".mov")): + msg_type = "video" + elif control_type.__contains__("https://h5.m.taobao.com/awp"): + msg_type = "product_card" + elif control_type.__contains__("订单号"): + msg_type = "order_card" + + # 创建消息模板对象(避免重复创建) + message_template = PlatformMessage( type="message", content=message_content, + msg_type=msg_type, sender={ "id": sender_nick, - "name": f"淘宝用户_{sender_nick}", - "is_customer": True }, - data={ - "msg_type": "text", - "pin_image": avatar_url - }, - store_id=self.store_id + pin_image=avatar_url, + store_id=self.store_id, ) - control_type = str(message_content) - if control_type.__contains__("http") and (control_type.__contains__(".jpg") or control_type.__contains__(".png")): - message_template = MessageTemplate( - type="message", - content=message_content, - msg_type="image", - pin_image=avatar_url, - sender={"id": sender_nick}, - store_id=self.store_id - ) - print(f"最终发送给后端的数据📕:{message_template.to_dict()}") # 获取AI回复 @@ -903,8 +1155,6 @@ class AIServiceIntegration: # self._log(f"❌ 启动后端监听失败: {e}", "ERROR") # return False - - def _log(self, message, level="INFO"): """日志记录""" timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") @@ -968,7 +1218,7 @@ class QianNiuClient: def __init__(self, user_nick=None): self.user_nick: str = user_nick self.qn_ws = None - self.pending: Dict[str, asyncio.Future] = {} + self.pending: Dict[str, dict] = {} self.is_connected = False self.is_authenticated = False # 改为连接成功即认证 self.message_handler = None @@ -984,7 +1234,6 @@ class QianNiuClient: # 赛牛服务 self.sainiu_service = SaiNiuService() - async def __aenter__(self): """异步上下文管理器入口""" self._http_session = aiohttp.ClientSession() @@ -1001,10 +1250,10 @@ class QianNiuClient: form = aiohttp.FormData() form.add_field("post", post_name) form.add_field("data", json.dumps(data_obj)) - logger.debug(f"[TB-DIAG] HTTP CALL -> {post_name} url={QN_HTTP_API_URL} data={data_obj}") + logger.debug(f"[TB-DIAG] HTTP CALL -> {post_name} url={QN_HTTP_API_URL_AVATAR} data={data_obj}") try: async with aiohttp.ClientSession() as session: - async with session.post(QN_HTTP_API_URL, data=form, timeout=10) as resp: + async with session.post(QN_HTTP_API_URL_AVATAR, data=form, timeout=10) as resp: text = await resp.text() logger.debug(f"[TB-DIAG] HTTP RESP <- {post_name} status={resp.status} body={text[:300]}") if resp.status != 200: @@ -1236,8 +1485,8 @@ class QianNiuClient: async def _start_sainiu_service(self): """启动赛牛DLL服务""" try: - # 加载DLL - if not self.sainiu_service.load_dll('F:\\飞书下载地址\\shuidrop_gui\\Utils\\PythonNew32\\SaiNiuApi.dll'): + # 加载DLL(使用默认相对路径) + if not self.sainiu_service.load_dll(): return False success = True @@ -1359,6 +1608,8 @@ class QianNiuClient: if code == 200: self.is_authenticated = True print("✅ 授权和初始化成功") + # 初始化完成后再等待一下确保服务完全就绪 + await asyncio.sleep(2) return True else: error_msg = return_data.get("msg", "未知错误") @@ -1433,11 +1684,9 @@ class QianNiuClient: # 发送心跳 heartbeat_msg = {"type": "ping"} await self.qn_ws.send(json.dumps(heartbeat_msg)) - logger.debug("发送心跳") print("=== 断点8: 心跳已发送 ===") await asyncio.sleep(20) except Exception as e: - logger.error(f"心跳发送失败: {e}") self.is_connected = False break @@ -1484,7 +1733,6 @@ class QianNiuClient: # 处理心跳响应 if msg_type == "pong": - logger.debug("收到pong响应") print("=== 断点13: 收到心跳响应(pong) ===") return @@ -1509,11 +1757,12 @@ class QianNiuClient: print(f"=== API响应: type={msg_type}, codeType={code_type} ===") print(f"响应内容: {json.dumps(data, indent=2, ensure_ascii=False)}") - # 处理pending的Future + # 处理等待的响应 if trace_id and trace_id in self.pending: - future = self.pending[trace_id] - if not future.done(): - future.set_result(data) + pending_info = self.pending[trace_id] + if not pending_info.get('received'): + pending_info['response'] = data + pending_info['received'] = True return # 处理买家消息 @@ -1543,19 +1792,28 @@ class QianNiuClient: logger.info(f"收到买家消息: {sender_nick} -> {message_content}") print(f"=== 断点14: 收到买家消息 - {sender_nick}: {message_content} ===") - # 调用消息处理回调 - if self.message_handler: - await self.message_handler({ - "type": "message", - "data": { - "msg_type": "text", - "content": message_content, - "pin": sender_nick, - "uid": str(uuid.uuid4()) - }, - "store_id": STORE_ID - }) - return + # 构造标准格式消息(但保留原始数据) + processed_message = { + "type": "message", + "content": message_content, + "msg_type": "text", + "sender": { + "id": sender_nick, + "name": f"淘宝用户_{sender_nick}", + "is_customer": True + }, + "store_id": STORE_ID, + "message_id": str(uuid.uuid4()), + "platform": "淘宝", + # 新增:保存原始数据 + "raw_data": msg_data + } + + print("=== 转换后的消息 ===") + print(json.dumps(processed_message, indent=2, ensure_ascii=False)) + print("===================") + + await self.message_handler(processed_message) # 记录未处理的消息类型 print(f"=== 未处理的消息类型: {msg_type} ===") @@ -1600,8 +1858,11 @@ class QianNiuClient: try: trace_id = str(uuid.uuid4()) - future = asyncio.Future() - self.pending[trace_id] = future + self.pending[trace_id] = { + 'response': None, + 'received': False, + 'created_at': time.time() + } # 构建发送消息请求 send_msg = { @@ -1628,25 +1889,46 @@ class QianNiuClient: print("=== 断点19: 消息已发送到赛牛插件 ===") print(f"发送的消息内容: {json.dumps(send_msg, indent=2, ensure_ascii=False)}") - # 等待响应 + # 轮询等待响应 try: - response = await asyncio.wait_for(future, timeout=5.0) - return_data = response.get("returnData") or response.get("data", {}) - code = return_data.get("code", 0) + timeout = 5.0 + start_time = time.time() - if code in [200, 0, 403]: # 403也视为成功(测试账号特殊处理) - logger.info("消息发送成功") - print("=== 断点20: 消息发送成功 ===") - return True - else: - error_msg = return_data.get("msg", "未知错误") - logger.warning(f"消息发送返回非成功码: {code}, 错误信息: {error_msg}") - print(f"=== 断点21: 消息发送失败 - 错误码: {code}, 错误信息: {error_msg} ===") - return False - except asyncio.TimeoutError: - logger.warning("消息发送超时,但可能已成功") - print("=== 断点22: 消息发送超时 ===") - return True + while True: + # 检查是否收到响应 + if trace_id in self.pending: + pending_info = self.pending[trace_id] + if pending_info.get('received'): + response = pending_info.get('response') + self.pending.pop(trace_id, None) + + return_data = response.get("returnData") or response.get("data", {}) + code = return_data.get("code", 0) + + if code in [200, 0, 403]: # 403也视为成功(测试账号特殊处理) + logger.info("消息发送成功") + print("=== 断点20: 消息发送成功 ===") + return True + else: + error_msg = return_data.get("msg", "未知错误") + logger.warning(f"消息发送返回非成功码: {code}, 错误信息: {error_msg}") + print(f"=== 断点21: 消息发送失败 - 错误码: {code}, 错误信息: {error_msg} ===") + return False + + # 检查超时 + if time.time() - start_time > timeout: + logger.warning("消息发送超时,但可能已成功") + print("=== 断点22: 消息发送超时 ===") + self.pending.pop(trace_id, None) + return True + + # 短暂等待后继续检查 + await asyncio.sleep(0.1) + + except Exception as e: + logger.error(f"等待响应异常: {e}") + self.pending.pop(trace_id, None) + return False except Exception as e: logger.error(f"发送消息异常: {e}") @@ -1663,13 +1945,18 @@ class QianNiuClient: class TestMessageHandler: - """测试消息处理器""" + """测试消息处理器(单连接多店铺架构)""" - def __init__(self, qn_client): + def __init__(self, qn_client, store_id=None, exe_token=None): self.qn_client = qn_client self.received_messages = [] - self.ai_service = AIServiceIntegration() - self.backend_listening_task = None # 新增:保存监听任务 + self.ai_service = AIServiceIntegration(store_id=store_id, exe_token=exe_token) + self.backend_listening_task = None # 保存监听任务 + self.store_id = store_id + self.exe_token = exe_token + # 消息去重缓存 + self.message_cache = {} # 缓存最近处理的消息 + self.cache_expire_time = 300 # 缓存过期时间(5分钟) async def initialize(self): """初始化消息处理器和AI服务""" @@ -1680,10 +1967,11 @@ class TestMessageHandler: logger.error("AI服务初始化失败") return False - # 注册消息处理器 + # 注册消息处理器(更新为新协议) self.ai_service.register_message_handlers({ - "customer_message": self.handle_customer_message, - "message": self.handle_ai_message # 新增:处理AI消息 + "message": self.handle_customer_message, # 处理后端主动推送的消息 + "connect_success": self.handle_connect_success, + "error": self.handle_error_message }) return True @@ -1691,7 +1979,6 @@ class TestMessageHandler: logger.error(f"消息处理器初始化失败: {e}") return False - async def start_backend_listening(self): """启动后端消息监听""" try: @@ -1736,9 +2023,56 @@ class TestMessageHandler: self._log(f"❌ 处理AI消息异常: {e}", "ERROR") return False + def _generate_message_hash(self, message): + """生成消息的唯一标识符(用于去重)""" + try: + # 使用发送者、内容、消息类型生成hash + sender_id = message.get("sender", {}).get("id", "") + content = message.get("content", "") + msg_type = message.get("msg_type", "") + + # 创建唯一标识字符串 + unique_string = f"{sender_id}:{content}:{msg_type}" + + # 生成MD5哈希 + return hashlib.md5(unique_string.encode('utf-8')).hexdigest() + except Exception as e: + self._log(f"生成消息哈希失败: {e}", "ERROR") + return None + + def _is_duplicate_message(self, message_hash): + """检查是否为重复消息""" + if not message_hash: + return False + + current_time = time.time() + + # 清理过期的缓存 + expired_keys = [] + for key, timestamp in self.message_cache.items(): + if current_time - timestamp > self.cache_expire_time: + expired_keys.append(key) + + for key in expired_keys: + del self.message_cache[key] + + # 检查是否为重复消息 + if message_hash in self.message_cache: + return True + + # 添加到缓存 + self.message_cache[message_hash] = current_time + return False + async def handle_message(self, message): """处理接收到的买家消息""" try: + # 消息去重检查 + message_hash = self._generate_message_hash(message) + if self._is_duplicate_message(message_hash): + self._log(f"⚠️ 检测到重复消息,跳过处理: {message.get('content', '')[:50]}...", "WARNING") + return + self.received_messages.append(message) # 新增:打印完整的接收消息体 @@ -1747,16 +2081,29 @@ class TestMessageHandler: print(json.dumps(message, indent=2, ensure_ascii=False)) print("=" * 60) - msg_data = message.get("data", {}) - # dingdan_id = "738740221403" - # store_msg_data = msg_data.get("data", {}) - # store_msg_data_detail = store_msg_data.get("E3_keyValueDescArray", []) - # if len(store_msg_data_detail) > 0: - # store_msg_id = store_msg_data_detail[0].get("desc", "") - # else: - # store_msg_id = "" - content = msg_data.get("content", "") - sender_nick = msg_data.get("pin", "") + self.received_messages.append(message) + + # 新增:分别打印原始数据和转换后的数据 + print("=" * 60) + print("📨 收到买家消息 - 转换后格式:") + print(json.dumps(message, indent=2, ensure_ascii=False)) + + if "raw_data" in message: + print("\n📨 收到买家消息 - 原始格式:") + print(json.dumps(message["raw_data"], indent=2, ensure_ascii=False)) + print("=" * 60) + + # 使用转换后的数据 + content = message.get("content", "") + sender_nick = message.get("sender", {}).get("id", "") + + # 获取头像URL(如果存在) 先拿到对应的sendernick + raw_data = message.get("raw_data", {}) + if raw_data: + sender_nick = raw_data.get("senderNick", "") + avatar_url = await self.qn_client.fetch_buyer_avatar_by_http(sender_nick) + else: + pass if not content or not sender_nick: self._log("消息内容或发送者昵称为空", "WARNING") @@ -1764,17 +2111,6 @@ class TestMessageHandler: self._log(f"处理买家消息: {sender_nick} -> {content}", "INFO") - # # 获取买家头像 - # avatar_url = "" - # try: - # avatar_url = await self.qn_client.fetch_buyer_avatar_by_http(sender_nick) - # if avatar_url: - # self._log(f"成功获取头像!!!yes: {sender_nick}", "SUCCESS") - # else: - # self._log(f"未获取到头像nononno: {sender_nick}", "WARNING") - # except Exception as e: - # self._log(f"获取头像异常aaaa: {e}", "ERROR") - # 获取AI回复(带重试机制) max_retries = 2 # 减少重试次数,避免长时间等待 for retry in range(max_retries): @@ -1782,7 +2118,7 @@ class TestMessageHandler: ai_reply = await self.ai_service.get_ai_reply( message_content=content, sender_nick=sender_nick, - # avatar_url=avatar_url # 新增: 传递 头像url + avatar_url=avatar_url # 新增: 传递 头像url ) if ai_reply: @@ -1824,9 +2160,9 @@ class TestMessageHandler: self._log("❌ 客服消息内容为空", "WARNING") return False - # 优先从receiver获取,如果没有再从data获取 + # 从receiver中获取接收者ID receiver_info = message_data.get("receiver", {}) - receiver_id = receiver_info.get("id", "") or message_data.get("data", {}).get("pin", "") + receiver_id = receiver_info.get("id", "") if receiver_info else "" if not receiver_id: self._log("❌ 无法确定消息接收者", "WARNING") @@ -1848,6 +2184,32 @@ class TestMessageHandler: self._log(f"❌ 处理客服消息异常: {e}", "ERROR") return False + async def handle_connect_success(self, message_data): + """处理连接成功消息""" + try: + content = message_data.get("content", "") + self._log(f"✅ 收到连接成功确认: {content}", "SUCCESS") + return True + except Exception as e: + self._log(f"❌ 处理连接成功消息异常: {e}", "ERROR") + return False + + async def handle_error_message(self, message_data): + """处理错误消息""" + try: + content = message_data.get("content", "") + verify_link = message_data.get("data", {}).get("verify_link", "") + + error_msg = f"收到错误消息: {content}" + if verify_link: + error_msg += f", 验证链接: {verify_link}" + + self._log(error_msg, "ERROR") + return True + except Exception as e: + self._log(f"❌ 处理错误消息异常: {e}", "ERROR") + return False + async def close(self): """关闭消息处理器""" if self.ai_service: @@ -1882,8 +2244,19 @@ class QianNiuListenerForGUI: self.log_callback = log_callback self.log_signal = None # 添加日志信号属性 self.tasks = [] # 新增:任务列表 + self.store_id = None self.user_nick = None + @property + def message_handler(self): + """获取消息处理器实例""" + return self._message_handler + + @message_handler.setter + def message_handler(self, value): + """设置消息处理器实例""" + self._message_handler = value + def _log(self, message, log_type="INFO"): """内部日志方法""" timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") @@ -1903,28 +2276,80 @@ class QianNiuListenerForGUI: color = color_map.get(log_type, "") print(f"{color}[{timestamp}] [{log_type}] {message}\033[0m") - async def _get_first_user(self): - """获取第一个可用的账号""" - try: - self._log("🔵 获取所有连接的账号...", "INFO") - connected_users = await self.get_all_connected_users() + async def _get_first_user(self, max_retries=2, retry_delay=2): + """获取第一个可用的账号(初始化完成后调用,减少重试)""" + for retry in range(max_retries): + try: + self._log(f"🔵 获取所有连接的账号... (尝试 {retry + 1}/{max_retries})", "INFO") + connected_users = await self.get_all_connected_users() - if not connected_users: - self._log("❌ 未找到可用的千牛账号", "ERROR") - return None + if not connected_users: + if retry < max_retries - 1: + self._log(f"❌ 未找到可用的千牛账号,{retry_delay}秒后重试...", "WARNING") + await asyncio.sleep(retry_delay) + continue + else: + self._log("❌ 未找到可用的千牛账号(所有重试已耗尽)", "ERROR") + self._log("💡 请确认:1) 千牛客户端已启动 2) 已登录账号 3) DLL初始化已完成", "INFO") + return None - # 获取第一个账号 - first_user = next(iter(connected_users.values())) - user_nick = first_user["userNick"] + # 获取第一个账号 + first_user = next(iter(connected_users.values())) + user_nick = first_user.get("userNick") - self._log(f"✅ 获取到账号: {user_nick}", "SUCCESS") - self._log(f"账号详情: UID={first_user.get('userUid')}, 版本={first_user.get('version')}", "DEBUG") + if not user_nick: + self._log("❌ 账号信息中缺少userNick字段", "ERROR") + if retry < max_retries - 1: + await asyncio.sleep(retry_delay) + continue + return None - return user_nick + self._log(f"✅ 获取到账号: {user_nick}", "SUCCESS") + self._log(f"账号详情: UID={first_user.get('userUid')}, 版本={first_user.get('version')}", "DEBUG") - except Exception as e: - self._log(f"❌ 获取账号失败: {e}", "ERROR") - return None + return user_nick + + except Exception as e: + if retry < max_retries - 1: + self._log(f"❌ 获取账号失败: {e},{retry_delay}秒后重试...", "WARNING") + await asyncio.sleep(retry_delay) + else: + self._log(f"❌ 获取账号失败(所有重试已耗尽): {e}", "ERROR") + return None + + async def _wait_for_dll_ready(self, max_attempts=15, delay=2): + """等待DLL服务完全就绪(改进版)""" + import socket + + for attempt in range(max_attempts): + try: + self._log(f"🔍 检查DLL服务状态 (尝试 {attempt + 1}/{max_attempts})", "DEBUG") + + # 首先检查端口是否在监听 + try: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.settimeout(2) + result = s.connect_ex(('127.0.0.1', 3030)) + if result != 0: + self._log(f"⏰ 端口3030尚未开启,等待 {delay} 秒...", "DEBUG") + await asyncio.sleep(delay) + continue + except Exception as e: + self._log(f"🔍 检查端口时出错: {e}", "DEBUG") + await asyncio.sleep(delay) + continue + + self._log("✅ 端口3030已开启", "SUCCESS") + # 端口开启后再等待一下确保服务完全启动 + await asyncio.sleep(3) + return True + + except Exception as e: + self._log(f"🔍 检查DLL服务时出错: {e}", "DEBUG") + await asyncio.sleep(delay) + + self._log("❌ DLL服务等待超时", "ERROR") + return False async def get_all_connected_users(self) -> Dict[str, Dict]: """获取所有已连接的千牛账号 @@ -1936,19 +2361,25 @@ class QianNiuListenerForGUI: # 通过HTTP API调用 response = await self._http_api_call("GetAllUser", {}) - if not response: - logger.error("获取连接账号失败:API返回为空") + # 区分API调用失败(None)和返回空结果({}) + if response is None: + logger.error("获取连接账号失败:API调用失败") return {} logger.debug(f"获取连接账号响应: {response}") # 检查返回数据格式 if isinstance(response, dict): - # 直接返回账号信息字典 + # 直接返回账号信息字典(可能为空) logger.info(f"成功获取到 {len(response)} 个已连接账号") + if len(response) == 0: + logger.warning("API调用成功但未返回任何账号信息,可能原因:") + logger.warning("1. 千牛客户端未启动") + logger.warning("2. 千牛客户端已启动但未登录任何账号") + logger.warning("3. DLL服务与千牛客户端连接异常") return response else: - logger.error(f"获取连接账号失败:返回数据格式错误 - {response}") + logger.error(f"获取连接账号失败:返回数据格式错误 - {type(response)}: {response}") return {} except Exception as e: @@ -1956,83 +2387,128 @@ class QianNiuListenerForGUI: return {} async def _http_api_call(self, post_name: str, data_obj: Dict[str, Any]) -> Dict[str, Any]: - """按官方文档以 form-data 方式调用 QianNiu/Api 接口""" + """按官方文档以 form-data 方式调用 QianNiu/Api 接口(改进版)""" form = aiohttp.FormData() form.add_field("post", post_name) form.add_field("data", json.dumps(data_obj)) logger.debug(f"[TB-DIAG] HTTP CALL -> {post_name} url={QN_HTTP_API_URL} data={data_obj}") + try: async with aiohttp.ClientSession() as session: async with session.post(QN_HTTP_API_URL, data=form, timeout=26) as resp: text = await resp.text() logger.debug(f"[TB-DIAG] HTTP RESP <- {post_name} status={resp.status} body={text[:300]}") + if resp.status != 200: logger.error(f"[QianNiuClient] HTTP接口 {post_name} 调用失败: status={resp.status}") - return {} + # 返回None而不是空字典,便于区分真正的空结果和错误 + return None + try: - return json.loads(text) - except Exception: - logger.error(f"[TB-DIAG] HTTP RESP JSON decode error on {post_name}") - return {} + result = json.loads(text) + # 即使结果为空也是成功的API调用 + logger.debug(f"[TB-DIAG] HTTP API {post_name} 调用成功,返回数据类型: {type(result)}") + return result if result is not None else {} + except json.JSONDecodeError as e: + logger.error(f"[TB-DIAG] HTTP RESP JSON decode error on {post_name}: {e}") + return None + + except asyncio.TimeoutError: + logger.error(f"[QianNiuClient] HTTP接口 {post_name} 调用超时") + return None + except aiohttp.ClientError as e: + logger.error(f"[QianNiuClient] HTTP接口 {post_name} 网络错误: {e}") + return None except Exception as e: logger.error(f"[QianNiuClient] HTTP接口 {post_name} 调用异常: {e}") - return {} + return None - async def start_listening(self): - """启动监听的主方法""" + # 新增供给架构调用 ---- 测试中 + async def start_listening_with_store(self, store_id, exe_token=None): + """带店铺ID的监听启动方法(单连接多店铺架构)""" try: - self._log("🔵 开始千牛平台连接流程", "INFO") + self._log("🔵 开始千牛平台连接流程(单连接多店铺架构)", "INFO") + self.store_id = store_id # 保存店铺ID + self.exe_token = exe_token # 保存用户令牌 - # New. 获取第一个可用账号 - self.user_nick = await self._get_first_user() - if not self.user_nick: - self._log("❌ 无法获取可用账号", "ERROR") - return False + # 1. 先创建临时千牛客户端用于启动DLL服务 + temp_client = QianNiuClient() + self._log("🔵 步骤1: 启动DLL服务...", "INFO") - # 1. 创建千牛客户端 - self.qn_client = QianNiuClient(user_nick=self.user_nick) - self._log("✅ 千牛客户端创建成功", "DEBUG") - - # 2. 测试DLL加载和启动(与main方法一致) - self._log("🔵 步骤1: 测试DLL加载和启动...", "INFO") - if not await self.qn_client._start_sainiu_service(): + # 2. 启动DLL服务(必须在所有API调用之前) + if not await temp_client._start_sainiu_service(): self._log("❌ DLL服务启动失败", "ERROR") return False self._log("✅ DLL服务启动成功", "SUCCESS") - # 3. 测试WebSocket连接(与main方法一致) - self._log("🔵 步骤2: 测试WebSocket连接...", "INFO") + # 3. 等待DLL服务完全就绪 + self._log("🔵 等待DLL服务完全就绪...", "INFO") + await asyncio.sleep(5) # 增加等待时间确保服务完全启动 + + # 4. 验证DLL服务是否可用(仅检查端口) + if not await self._wait_for_dll_ready(): + self._log("❌ DLL服务未能完全就绪", "ERROR") + return False + self._log("✅ DLL服务已完全就绪", "SUCCESS") + + # 5. 创建正式的千牛客户端(先不获取用户信息) + self._log("🔵 步骤2: 创建千牛客户端...", "INFO") + self.qn_client = QianNiuClient() + # 复用已启动的DLL服务 + self.qn_client.sainiu_service = temp_client.sainiu_service + self._log("✅ 千牛客户端创建成功", "DEBUG") + + # 6. 建立WebSocket连接并完成完整初始化流程 + self._log("🔵 步骤3: 建立WebSocket连接并初始化...", "INFO") if not await self.qn_client._connect_websocket(): self._log("❌ WebSocket连接失败", "ERROR") return False - self._log("✅ WebSocket连接成功", "SUCCESS") + self._log("✅ WebSocket连接和初始化成功", "SUCCESS") - # 4. 创建消息处理器 - self.message_handler = TestMessageHandler(self.qn_client) + # 7. 现在可以安全地获取已连接的账号(初始化完成后) + self._log("🔵 步骤4: 获取可用账号...", "INFO") + self.user_nick = await self._get_first_user() + if not self.user_nick: + self._log("❌ 无法获取可用账号", "ERROR") + return False + self._log(f"✅ 获取到账号: {self.user_nick}", "SUCCESS") + + # 更新客户端的用户信息 + self.qn_client.user_nick = self.user_nick + + # 8. 创建消息处理器 + self._log("🔵 步骤5: 创建消息处理器...", "INFO") + self.message_handler = TestMessageHandler(self.qn_client, store_id, exe_token) self._log("✅ 消息处理器创建成功", "DEBUG") - # 5. 初始化AI服务 + # 9. 初始化AI服务 + self._log("🔵 步骤6: 初始化AI服务...", "INFO") success = await self.message_handler.initialize() if not success: self._log("❌ AI服务初始化失败", "ERROR") return False self._log("✅ AI服务初始化成功", "SUCCESS") - # 6. 开始监听消息 - self._log("🔵 步骤3: 开始监听消息...", "INFO") + # 10. 注册到全局管理器 + qn_manager = QianNiuWebsocketManager() + shop_key = f"千牛:{store_id}" + qn_manager.on_connect(shop_key, self.qn_client, store_id=store_id, exe_token=exe_token) + self._log(f"✅ 已注册千牛连接: {shop_key}", "SUCCESS") + + # 10. 开始监听消息 + self._log("🔵 步骤7: 开始监听消息...", "INFO") self.stop_event = asyncio.Event() self.running = True - print(self.user_nick) - - # 7. 启动监听任务 + # 11. 启动监听任务 listen_task, heartbeat_task = await self.qn_client.start_listening( self.message_handler.handle_message ) self.tasks.extend([listen_task, heartbeat_task]) self._log("✅ 监听任务启动成功", "SUCCESS") - # 8. 等待任务完成或停止信号 + # 12. 等待任务完成或停止信号 + self._log("🎉 千牛平台已完全启动,开始监听消息", "SUCCESS") try: await asyncio.gather(*self.tasks, return_exceptions=True) except asyncio.CancelledError: @@ -2042,7 +2518,6 @@ class QianNiuListenerForGUI: import traceback self._log(f"错误详情: {traceback.format_exc()}", "DEBUG") - self._log("🎉 千牛平台消息监听已启动", "SUCCESS") return True except Exception as e: @@ -2051,77 +2526,6 @@ class QianNiuListenerForGUI: self._log(f"错误详情: {traceback.format_exc()}", "DEBUG") return False - def stop_listening(self): - """停止监听""" - try: - self._log("🔵 开始停止监听...", "INFO") - self.running = False - - # 取消所有任务 - for task in self.tasks: - if not task.done(): - task.cancel() - self.tasks.clear() - - # 关闭千牛客户端 - if self.qn_client: - self._log("🔵 关闭千牛客户端...", "DEBUG") - asyncio.create_task(self.qn_client.close()) - - # 关闭消息处理器 - if self.message_handler: - self._log("🔵 关闭消息处理器...", "DEBUG") - asyncio.create_task(self.message_handler.close()) - - # 设置停止事件 - if self.stop_event: - self.stop_event.set() - - self._log("✅ 监听已停止", "SUCCESS") - - except Exception as e: - self._log(f"❌ 停止监听时出现错误: {e}", "ERROR") - import traceback - self._log(f"错误详情: {traceback.format_exc()}", "DEBUG") - - def is_running(self): - """检查是否正在运行""" - return self.running - - def set_log_signal(self, signal): - """设置日志信号""" - self.log_signal = signal - """用于GUI集成的千牛监听包装器类""" - - def __init__(self, log_callback=None): - """初始化千牛监听包装器""" - self.qn_client = None - self.message_handler = None - self.running = False - self.stop_event = None - self.log_callback = log_callback - self.log_signal = None # 添加日志信号属性 - self.tasks = [] # 新增:任务列表 - - def _log(self, message, log_type="INFO"): - """内部日志方法""" - timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - log_entry = f"[{timestamp}] [{log_type}] {message}" - - if hasattr(self, 'log_signal') and self.log_signal: - self.log_signal.emit(message, log_type) - elif self.log_callback: - self.log_callback(message, log_type) - else: - color_map = { - "ERROR": "\033[91m", - "WARNING": "\033[93m", - "SUCCESS": "\033[92m", - "DEBUG": "\033[96m", - } - color = color_map.get(log_type, "") - print(f"{color}[{timestamp}] [{log_type}] {message}\033[0m") - async def start_listening(self, user_nick): """启动监听的主方法""" try: @@ -2229,53 +2633,37 @@ class QianNiuListenerForGUI: self.log_signal = signal - -async def main(): +async def main(store_id=None): """主测试函数""" logger.info("=== 赛牛接口本地测试开始 ===") print("=== 断点A: 测试开始 ===") - # 创建千牛客户端 - qn_client = QianNiuClient() - message_handler = None # 提前定义变量 + # 如果没有提供store_id,使用默认值 + if store_id is None: + store_id = STORE_ID + + listener = QianNiuListenerForGUI() try: - # 1. 测试DLL加载和启动 - print("\n📋 步骤1: 测试DLL加载和启动...") - if not await qn_client._start_sainiu_service(): - print("❌ DLL服务启动失败") + success = await listener.start_listening_with_store(store_id) + + if not success: + print('启动监听失败') return False - print("✅ DLL服务启动成功") - # 2. 测试WebSocket连接(包含授权和初始化) - print("\n📋 步骤2: 测试WebSocket连接...") - if not await qn_client._connect_websocket(): - print("❌ WebSocket连接失败") + # 获取message_handler实例以便后续操作 + message_handler = listener.message_handler + if message_handler is None: + print("❌ 消息处理器未正确初始化") return False - print("✅ WebSocket连接成功") - # 3. 创建消息处理器 - message_handler = TestMessageHandler(qn_client) - - # 4. 初始化消息处理器和AI服务 - print("\n📋 步骤3: 初始化AI服务...") - if not await message_handler.initialize(): - logger.error("AI服务初始化失败") - print("❌ AI服务初始化失败") - return - print("✅ AI服务初始化成功") - - # 5. 开始监听消息 - print("\n📋 步骤4: 开始监听消息...") - listen_task, heartbeat_task = await qn_client.start_listening(message_handler.handle_message) - - # 6. 测试发送消息功能 + # 测试发送消息功能 print("\n📋 步骤5: 测试发送消息功能...") test_nick = "test_buyer" test_message = "这是一条测试消息" print(f"=== 准备发送测试消息给 {test_nick} ===") - success = await qn_client.send_message(test_nick, test_message) + success = await listener.qn_client.send_message(test_nick, test_message) if success: print("✅ 测试消息发送成功") @@ -2286,8 +2674,9 @@ async def main(): logger.info("测试程序已启动,按Ctrl+C停止...") print("=== 测试程序运行中,等待消息... ===") - # 等待任务完成或用户中断 - await asyncio.gather(listen_task, heartbeat_task, return_exceptions=True) + # 等待停止信号 + while listener.is_running(): + await asyncio.sleep(1) except KeyboardInterrupt: logger.info("用户中断测试") @@ -2295,28 +2684,82 @@ async def main(): except Exception as e: logger.error(f"测试异常: {e}") print(f"=== 断点E: 测试异常 - {e} ===") + import traceback + print(f"错误详情: {traceback.format_exc()}") finally: - if message_handler: # 检查变量是否已定义 - await message_handler.close() # 关闭消息处理器和AI服务 - await qn_client.close() + listener.stop_listening() logger.info("=== 赛牛接口本地测试结束 ===") print("=== 断点F: 测试结束 ===") - - - -async def test_auth(): - """测试完整的集成连接""" - print("开始测试完整的赛牛集成连接...") - - sainiu_service = SaiNiuService() - if sainiu_service.load_dll(): - print("DLL加载成功") - else: - print("DLL加载失败") + # # 创建千牛客户端 + # qn_client = QianNiuClient() + # message_handler = None # 提前定义变量 + # + # try: + # # 1. 测试DLL加载和启动 + # print("\n📋 步骤1: 测试DLL加载和启动...") + # if not await qn_client._start_sainiu_service(): + # print("❌ DLL服务启动失败") + # return False + # print("✅ DLL服务启动成功") + # + # # 2. 测试WebSocket连接(包含授权和初始化) + # print("\n📋 步骤2: 测试WebSocket连接...") + # if not await qn_client._connect_websocket(): + # print("❌ WebSocket连接失败") + # return False + # print("✅ WebSocket连接成功") + # + # # 3. 创建消息处理器 + # message_handler = TestMessageHandler(qn_client) + # + # # 4. 初始化消息处理器和AI服务 + # print("\n📋 步骤3: 初始化AI服务...") + # if not await message_handler.initialize(): + # logger.error("AI服务初始化失败") + # print("❌ AI服务初始化失败") + # return + # print("✅ AI服务初始化成功") + # + # # 5. 开始监听消息 + # print("\n📋 步骤4: 开始监听消息...") + # listen_task, heartbeat_task = await qn_client.start_listening(message_handler.handle_message) + # + # # 6. 测试发送消息功能 + # print("\n📋 步骤5: 测试发送消息功能...") + # test_nick = "test_buyer" + # test_message = "这是一条测试消息" + # + # print(f"=== 准备发送测试消息给 {test_nick} ===") + # success = await qn_client.send_message(test_nick, test_message) + # + # if success: + # print("✅ 测试消息发送成功") + # else: + # print("❌ 测试消息发送失败") + # + # # 保持运行,直到用户中断 + # logger.info("测试程序已启动,按Ctrl+C停止...") + # print("=== 测试程序运行中,等待消息... ===") + # + # # 等待任务完成或用户中断 + # await asyncio.gather(listen_task, heartbeat_task, return_exceptions=True) + # + # except KeyboardInterrupt: + # logger.info("用户中断测试") + # print("=== 断点D: 用户中断测试 ===") + # except Exception as e: + # logger.error(f"测试异常: {e}") + # print(f"=== 断点E: 测试异常 - {e} ===") + # finally: + # if message_handler: # 检查变量是否已定义 + # await message_handler.close() # 关闭消息处理器和AI服务 + # await qn_client.close() + # logger.info("=== 赛牛接口本地测试结束 ===") + # print("=== 断点F: 测试结束 ===") if __name__ == "__main__": # 运行测试 - asyncio.run(main()) - # asyncio.run(test_auth()) \ No newline at end of file + asyncio.run(main(store_id="4c4025e3-8702-42fc-bdc2-671e335c0ff7")) + # asyncio.run(test_auth()) diff --git a/WebSocket/BackendClient.py b/WebSocket/BackendClient.py index 1d14ff4..eca840f 100644 --- a/WebSocket/BackendClient.py +++ b/WebSocket/BackendClient.py @@ -44,7 +44,6 @@ class BackendClient: if self.loop and self.loop.is_running(): asyncio.run_coroutine_threadsafe(self._close(), self.loop) - if self.thread and self.thread.is_alive(): self.thread.join(timeout=3) @@ -388,7 +387,7 @@ class BackendClient: if recv_pin and store_id: # 根据store_id动态确定平台类型 platform_type = self._get_platform_by_store_id(store_id) - + if platform_type == "京东": self._forward_to_jd(store_id, recv_pin, content) elif platform_type == "抖音": @@ -430,16 +429,17 @@ class BackendClient: if not entry: print(f"[JD Forward] 未找到连接: {shop_key}") return - + platform_info = (entry or {}).get('platform') or {} ws = platform_info.get('ws') aid = platform_info.get('aid') pin_zj = platform_info.get('pin_zj') vender_id = platform_info.get('vender_id') loop = platform_info.get('loop') - - print(f"[JD Forward] shop_key={shop_key} has_ws={bool(ws)} aid={aid} pin_zj={pin_zj} vender_id={vender_id} has_loop={bool(loop)} recv_pin={recv_pin}") - + + print( + f"[JD Forward] shop_key={shop_key} has_ws={bool(ws)} aid={aid} pin_zj={pin_zj} vender_id={vender_id} has_loop={bool(loop)} recv_pin={recv_pin}") + if ws and aid and pin_zj and vender_id and loop and content: async def _send(): import hashlib as _hashlib @@ -463,7 +463,7 @@ class BackendClient: } } await ws.send(_json.dumps(msg)) - + import asyncio as _asyncio _future = _asyncio.run_coroutine_threadsafe(_send(), loop) try: @@ -487,17 +487,18 @@ class BackendClient: dy_mgr = DouYinWebsocketManager() shop_key = f"抖音:{store_id}" entry = dy_mgr.get_connection(shop_key) - + if not entry: print(f"[DY Forward] 未找到连接: {shop_key}") return - + platform_info = entry.get('platform', {}) douyin_bot = platform_info.get('douyin_bot') message_handler = platform_info.get('message_handler') - - print(f"[DY Forward] shop_key={shop_key} has_bot={bool(douyin_bot)} has_handler={bool(message_handler)} recv_pin={recv_pin}") - + + print( + f"[DY Forward] shop_key={shop_key} has_bot={bool(douyin_bot)} has_handler={bool(message_handler)} recv_pin={recv_pin}") + if douyin_bot and message_handler and content: # 在消息处理器的事件循环中发送消息 def send_in_loop(): @@ -523,17 +524,17 @@ class BackendClient: print(f"[DY Forward] 事件循环不可用") except Exception as e: print(f"[DY Forward] 发送过程异常: {e}") - + # 在新线程中执行发送操作 import threading send_thread = threading.Thread(target=send_in_loop, daemon=True) send_thread.start() - + else: print("[DY Forward] 条件不足,未转发:", { - 'has_bot': bool(douyin_bot), - 'has_handler': bool(message_handler), + 'has_bot': bool(douyin_bot), + 'has_handler': bool(message_handler), 'has_content': bool(content) }) except Exception as e: @@ -543,7 +544,8 @@ class BackendClient: """转发消息到千牛平台""" try: # TODO: 实现千牛平台的消息转发逻辑 - print(f"[QN Forward] 千牛平台消息转发功能待实现: store_id={store_id}, recv_pin={recv_pin}, content={content}") + print( + f"[QN Forward] 千牛平台消息转发功能待实现: store_id={store_id}, recv_pin={recv_pin}, content={content}") except Exception as e: print(f"[QN Forward] 转发失败: {e}") @@ -554,17 +556,18 @@ class BackendClient: pdd_mgr = PDDWSManager() shop_key = f"拼多多:{store_id}" entry = pdd_mgr.get_connection(shop_key) - + if not entry: print(f"[PDD Forward] 未找到连接: {shop_key}") return - + platform_info = entry.get('platform', {}) pdd_instance = platform_info.get('pdd_instance') loop = platform_info.get('loop') - - print(f"[PDD Forward] shop_key={shop_key} has_pdd_instance={bool(pdd_instance)} has_loop={bool(loop)} recv_pin={recv_pin}") - + + print( + f"[PDD Forward] shop_key={shop_key} has_pdd_instance={bool(pdd_instance)} has_loop={bool(loop)} recv_pin={recv_pin}") + if pdd_instance and loop and content: # 在拼多多实例的事件循环中发送消息 def send_in_loop(): @@ -585,17 +588,17 @@ class BackendClient: print(f"[PDD Forward] 转发执行失败: {fe}") except Exception as e: print(f"[PDD Forward] 发送过程异常: {e}") - + # 在新线程中执行发送操作 import threading send_thread = threading.Thread(target=send_in_loop, daemon=True) send_thread.start() - + else: print("[PDD Forward] 条件不足,未转发:", { - 'has_pdd_instance': bool(pdd_instance), - 'has_loop': bool(loop), + 'has_pdd_instance': bool(pdd_instance), + 'has_loop': bool(loop), 'has_content': bool(content) }) except Exception as e: @@ -608,24 +611,24 @@ class BackendClient: pdd_mgr = PDDWSManager() shop_key = f"拼多多:{store_id}" entry = pdd_mgr.get_connection(shop_key) - + if not entry: print(f"[PDD Transfer] 未找到拼多多连接: {shop_key}") return - + platform_info = entry.get('platform', {}) pdd_instance = platform_info.get('pdd_instance') loop = platform_info.get('loop') - + print(f"[PDD Transfer] 找到拼多多连接,准备执行转接: user_id={user_id}, cs_id={customer_service_id}") - + if pdd_instance and loop: # 设置目标客服ID并执行转接 def transfer_in_loop(): try: # 设置目标客服ID pdd_instance.csid = customer_service_id - + # 在事件循环中执行转接 future = asyncio.run_coroutine_threadsafe( pdd_instance.handle_transfer_message({ @@ -634,7 +637,7 @@ class BackendClient: }), loop ) - + # 等待转接结果 try: result = future.result(timeout=15) # 转接可能需要更长时间 @@ -646,77 +649,24 @@ class BackendClient: print(f"[PDD Transfer] 转接执行失败: {fe}") except Exception as e: print(f"[PDD Transfer] 转接过程异常: {e}") - + # 在新线程中执行转接操作 import threading transfer_thread = threading.Thread(target=transfer_in_loop, daemon=True) transfer_thread.start() - + else: print(f"[PDD Transfer] 条件不足: has_pdd_instance={bool(pdd_instance)}, has_loop={bool(loop)}") except Exception as e: print(f"[PDD Transfer] 拼多多转接失败: {e}") - def _transfer_to_jd(self, customer_service_id: str, user_id: str, store_id: str): - """执行京东平台转接操作""" + def _transfer_to_qianniu(self, customer_service_id: str, user_id: str, store_id: str): + """执行千牛平台转接操作""" try: - from Utils.JD.JdUtils import WebsocketManager as JDWSManager - jd_mgr = JDWSManager() - shop_key = f"京东:{store_id}" - entry = jd_mgr.get_connection(shop_key) - - if not entry: - print(f"[JD Transfer] 未找到京东连接: {shop_key}") - return - - platform_info = entry.get('platform', {}) - ws = platform_info.get('ws') - aid = platform_info.get('aid') - pin_zj = platform_info.get('pin_zj') - loop = platform_info.get('loop') - - print(f"[JD Transfer] 找到京东连接,准备执行转接: user_id={user_id}, cs_id={customer_service_id}") - print(f"[JD Transfer] 连接参数: has_ws={bool(ws)}, aid={aid}, pin_zj={pin_zj}, has_loop={bool(loop)}") - - if ws and aid and pin_zj and loop: - # 在事件循环中执行转接 - def transfer_in_loop(): - try: - # 导入FixJdCookie类来调用转接方法 - from Utils.JD.JdUtils import FixJdCookie - jd_utils = FixJdCookie() - - # 在事件循环中执行转接 - future = asyncio.run_coroutine_threadsafe( - jd_utils.transfer_customer(ws, aid, user_id, pin_zj, customer_service_id), - loop - ) - - # 等待转接结果 - try: - result = future.result(timeout=10) # 京东转接超时时间 - if result: - print(f"[JD Transfer] ✅ 转接成功: user_id={user_id} -> cs_id={customer_service_id}") - else: - print(f"[JD Transfer] ❌ 转接失败: user_id={user_id}") - except Exception as fe: - print(f"[JD Transfer] 转接执行失败: {fe}") - except Exception as e: - print(f"[JD Transfer] 转接过程异常: {e}") - - # 在新线程中执行转接操作 - import threading - transfer_thread = threading.Thread(target=transfer_in_loop, daemon=True) - transfer_thread.start() - - else: - print("[JD Transfer] 条件不足,未转接:", - { - 'has_ws': bool(ws), 'has_aid': bool(aid), 'has_pin_zj': bool(pin_zj), - 'has_loop': bool(loop) - }) + # TODO: 实现千牛平台转接逻辑 + print(f"[QN Transfer] 千牛平台转接功能待实现: user_id={user_id}, cs_id={customer_service_id}") except Exception as e: - print(f"[JD Transfer] 京东转接失败: {e}") + print(f"[QN Transfer] 千牛转接失败: {e}") def _handle_transfer(self, message: Dict[str, Any]): """处理转接消息""" @@ -731,15 +681,16 @@ class BackendClient: # 根据店铺ID确定平台类型并执行转接 try: platform_type = self._get_platform_by_store_id(store_id) - + if platform_type == "京东": - self._transfer_to_jd(customer_service_id, user_id, store_id) + # 京东转接逻辑 - 待实现 + print(f"[JD Transfer] 京东平台转接功能待实现") elif platform_type == "抖音": # 抖音转接逻辑 - 待实现 print(f"[DY Transfer] 抖音平台转接功能待实现") elif platform_type == "千牛": - # 千牛转接逻辑 - 待实现 - print(f"[QN Transfer] 千牛平台转接功能待实现") + # 千牛转接逻辑 + self._transfer_to_qianniu(customer_service_id, user_id, store_id) elif platform_type == "拼多多": self._transfer_to_pdd(customer_service_id, user_id, store_id) else: @@ -777,7 +728,7 @@ class BackendClient: platform_name = message.get('platform_name', '') content = message.get('content', '') data = message.get('data', {}) - + # 判断是拼多多登录参数还是普通Cookie if platform_name == "拼多多" and content == "pdd_login_params" and data.get('login_params'): # 拼多多登录参数模式 @@ -809,7 +760,7 @@ class BackendClient: nickname = staff.get('nickname', '') print(f" - {nickname} ({pin})") - if self.customers_callback: # 假设客服列表更新也触发客服列表回调 + if self.customers_callback: # 假设客服列表更新也触发客服列表回调 self.customers_callback(staff_list, store_id) # ==================== 辅助方法 ==================== diff --git a/WebSocket/backend_singleton.py b/WebSocket/backend_singleton.py index b8ceed4..083c69a 100644 --- a/WebSocket/backend_singleton.py +++ b/WebSocket/backend_singleton.py @@ -9,6 +9,7 @@ from WebSocket.BackendClient import BackendClient from Utils.JD.JdUtils import JDListenerForGUI as JDListenerForGUI_WS from Utils.Dy.DyUtils import DouYinListenerForGUI as DYListenerForGUI_WS from Utils.Pdd.PddUtils import PddListenerForGUI as PDDListenerForGUI_WS +from Utils.QianNiu.QianNiuUtils import QianNiuListenerForGUI as QNListenerForGUI_WS _backend_client = None @@ -123,20 +124,50 @@ class WebSocketManager: return False def _handle_platform_login(self, platform_name: str, store_id: str, cookies: str): - """处理平台登录逻辑""" + """处理平台登录请求""" try: - if platform_name == "京东" and store_id and cookies: - self._start_jd_listener(store_id, cookies) - elif platform_name == "抖音" and store_id and cookies: - self._start_douyin_listener(store_id, cookies) - elif platform_name == "千牛" and store_id and cookies: + # 平台名称映射 + platform_map = { + "淘宝": "千牛", + "QIANNIU": "千牛", + "京东": "京东", + "JD": "京东", + "抖音": "抖音", + "DY": "抖音", + "DOUYIN": "抖音", + "拼多多": "拼多多", + "PDD": "拼多多", + "PINDUODUO": "拼多多" + } + + # 标准化平台名称 + normalized_platform = platform_map.get(platform_name.upper(), platform_name) + + self._log(f"处理平台登录: {platform_name} -> {normalized_platform}, 店铺={store_id}", "INFO") + + # 🔧 关键修改:确保每个平台都能独立处理自己的登录请求 + if normalized_platform == "千牛": + if cookies == "login_success": + self._log("⚠️ 千牛平台收到空cookies,但允许启动监听器", "WARNING") + cookies = "" # 清空cookies,千牛不需要真实cookies self._start_qianniu_listener(store_id, cookies) - elif platform_name == "拼多多" and store_id and cookies: + + elif normalized_platform == "京东": + self._start_jd_listener(store_id, cookies) + + elif normalized_platform == "抖音": + self._start_douyin_listener(store_id, cookies) + + elif normalized_platform == "拼多多": self._start_pdd_listener(store_id, cookies) + else: - self._log(f"不支持的平台或参数不全: {platform_name}", "WARNING") + self._log(f"❌ 不支持的平台: {platform_name}", "ERROR") + except Exception as e: - self._log(f"启动平台监听失败: {e}", "ERROR") + self._log(f"处理平台登录失败: {e}", "ERROR") + import traceback + self._log(f"错误详情: {traceback.format_exc()}", "DEBUG") def _start_jd_listener(self, store_id: str, cookies: str): """启动京东平台监听""" @@ -199,7 +230,7 @@ class WebSocketManager: except json.JSONDecodeError as e: self._log(f"❌ Cookie JSON解析失败: {e}", "ERROR") return False - + result = asyncio.run(listener.start_with_cookies(store_id=store_id, cookie_dict=cookie_dict)) return result except Exception as e: @@ -250,13 +281,64 @@ class WebSocketManager: self._log(f"失败状态下报抖音平台连接状态也失败: {send_e}", "ERROR") def _start_qianniu_listener(self, store_id: str, cookies: str): - """启动千牛平台监听""" + """启动千牛平台监听(单连接多店铺架构)""" try: - # 这里可以添加千牛监听逻辑 - self._log("千牛平台监听功能待实现", "INFO") + # 获取用户token(从后端客户端获取) + exe_token = None + if self.backend_client: + exe_token = getattr(self.backend_client, 'token', None) + + if not exe_token: + self._log("❌ 缺少exe_token,无法启动千牛监听器", "ERROR") + return + + def _runner(): + try: + listener = QNListenerForGUI_WS() + # 千牛平台使用新架构:传递store_id和exe_token + asyncio.run(listener.start_listening_with_store(store_id, exe_token)) + except Exception as e: + self._log(f"千牛监听器运行异常: {e}", "ERROR") + + # 在新线程中启动监听器 + thread = Thread(target=_runner, daemon=True) + thread.start() + + # 保存监听器引用 + self.platform_listeners[f"千牛:{store_id}"] = { + 'thread': thread, + 'platform': '千牛', + 'store_id': store_id, + 'exe_token': exe_token + } + + # 上报连接状态给后端 + if self.backend_client: + try: + self.backend_client.send_message({ + "type": "connect_message", + "store_id": store_id, + "status": True + }) + except Exception as e: + self._log(f"上报连接状态失败: {e}", "WARNING") + + self._log("已启动千牛平台监听(单连接多店铺架构)", "SUCCESS") + except Exception as e: self._log(f"启动千牛平台监听失败: {e}", "ERROR") + # 确保失败时也上报状态 + if self.backend_client: + try: + self.backend_client.send_message({ + "type": "connect_message", + "store_id": store_id, + "status": False + }) + except Exception as send_e: + self._log(f"失败状态下报千牛平台连接状态也失败: {send_e}", "ERROR") + def _start_pdd_listener(self, store_id: str, data: str): """启动拼多多平台监听""" try: