#!/usr/bin/env python # -*- coding: UTF-8 -*- """ @Project :magua_project @File :PddUtils.py @IDE :PyCharm @Author :lz @Date :2025/7/17 16:44 """ import os import threading from concurrent.futures import ThreadPoolExecutor import base64 import json import random import time import traceback import websockets import requests import asyncio import execjs from datetime import datetime from typing import Dict, Any, Optional, Callable from Utils.message_models import PlatformMessage # 定义持久化数据类 - 参考京东的WebsocketManager class WebsocketManager: _instance = None _lock = threading.Lock() def __new__(cls): if cls._instance is None: with cls._lock: if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._store = {} cls._instance._lock = threading.RLock() return cls._instance def on_connect(self, shop_key, ws, **kwargs): """完全保持原有数据结构""" with self._lock: entry = self._store.setdefault(shop_key, { 'platform': None, 'customers': [], 'user_assignments': {} }) entry['platform'] = { 'ws': ws, # 注意:这里存储的是强引用 'last_heartbeat': datetime.now(), **kwargs } return entry def get_connection(self, shop_key): with self._lock: return self._store.get(shop_key) def remove_connection(self, shop_key): with self._lock: if shop_key in self._store: del self._store[shop_key] class PddBackendService: """拼多多后端服务调用类(新版:使用统一后端连接 BackendClient)""" def __init__(self): self.current_store_id = None async def connect(self, store_id): """连接后端服务(使用统一连接,无需独立连接)""" self.current_store_id = store_id return True async def send_message_to_backend(self, platform_message): """改为通过单后端连接发送,需携带store_id""" try: from WebSocket.backend_singleton import get_backend_client backend = get_backend_client() if not backend: return None # 确保消息包含store_id if isinstance(platform_message, dict): if 'store_id' not in platform_message and self.current_store_id: platform_message['store_id'] = self.current_store_id # 通过统一后端连接发送 backend.send_message(platform_message) return True except Exception: return None class ChatPdd: @staticmethod def check_js_files(): """检查JS文件是否存在""" current_dir = "static/js" required_files = ["dencode_message.js", "encode_message.js"] for file in required_files: file_path = os.path.join(current_dir, file) if not os.path.exists(file_path): raise FileNotFoundError(f"找不到必需的JS文件: {file_path}") return True def __init__(self, cookie, chat_list_stat, csname=None, text=None, log_callback=None): # 检查JS文件 self.check_js_files() # 获取JS文件路径 current_dir = "static/js" dencode_js_path = os.path.join(current_dir, "dencode_message.js") encode_js_path = os.path.join(current_dir, "encode_message.js") # 读取JS文件 try: with open(dencode_js_path, 'r', encoding='utf-8') as f: jscode = f.read() self.ctx = execjs.compile(jscode) with open(encode_js_path, 'r', encoding='utf-8') as f: ejscode = f.read() self.encodeex = execjs.compile(ejscode) except Exception as e: raise RuntimeError(f"读取JS文件失败: {str(e)}") # 首先设置log_callback,然后才能使用_log方法 self.log_callback = log_callback self._log("初始化ChatPdd实例", "INFO") self.chat_list_stat = chat_list_stat self.text = text self.cookie = cookie self.auth_token = None self.mall_id = None self.user_id = None self.csname = csname self.csid = None self.staff_list_sent = False self.uids = set() self.pool = ThreadPoolExecutor(max_workers=4) # WebSocket管理器 self.ws_manager = WebsocketManager() # 后端服务实例 self.backend_service = PddBackendService() self.backend_connected = False # 重连参数 self.reconnect_attempts = 0 self.max_reconnect_attempts = 10 self.base_reconnect_delay = 1.0 self.max_reconnect_delay = 60.0 self.reconnect_backoff = 1.5 self.headers = { "authority": "mms.pinduoduo.com", "accept": "*/*", "accept-language": "zh-CN,zh;q=0.9", "anti-content": "0asWfxUeM_VefxObk-_v-fwmt7oS3cmHsWwMkApMVigWOS3hCHfBeF-hHM1_v9LHywCtfzDKkA_F1cUE8_HKBA5D7fVkM2ZDB-KmMxhDM51HXlVrfK_LG4Dc3S0wimw8XYPyXdgJnUv8ndgjs0EynYv8n0XjXU9YXY4Pt0ZVgTgrsIUEeMzMk7s02E3oo-zMdF35CIMWVetBhs95lGG5xENwZwX0CwXH2799MeMjVKzk5DD4Kbs2dM1bD-fJ1F-RImB3SHBkVKDJZbZoUbL4UMk8DFtrISf8CMD4Obk_CM3ICIB_F90ws9ZQ0gcOpVdIIz2PLPVcYt5X0yqYiwjtuNVfdNSf0rmfmNIndXj8G8Nyn4ianRaanoucDp0Dfvd36B6eamPGwOFS0TAyo7nXH_nwxnGS28AgVnqPyPuy8jczb1Oe3xZuPHoJ97BBbaTMfV9OcQ23T-mAUel", "cache-control": "no-cache", "content-type": "application/json", "origin": "https://mms.pinduoduo.com", "pragma": "no-cache", "referer": "https://mms.pinduoduo.com/chat-merchant/index.html?r=0.4940608191737519", "sec-ch-ua": "\"Not_A Brand\";v=\"8\", \"Chromium\";v=\"120\", \"Google Chrome\";v=\"120\"", "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": "\"Windows\"", "sec-fetch-dest": "empty", "sec-fetch-mode": "cors", "sec-fetch-site": "same-origin", "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" } self.store_id = "538727ec-c84d-4458-8ade-3960c9ab802c" # 可以根据需要修改 self.user_tokens = {} # 存储用户token信息 def _log(self, message, level="INFO"): """内部日志方法""" if self.log_callback: self.log_callback(message, level) else: timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") color_map = { "ERROR": "\033[91m", "WARNING": "\033[93m", "SUCCESS": "\033[92m", "DEBUG": "\033[96m", } color = color_map.get(level, "") reset = "\033[0m" print(f"{color}[{timestamp}] [{level}] {message}{reset}") # 重连机制方法 async def calculate_reconnect_delay(self): """计算指数退避的重连延迟时间""" delay = self.base_reconnect_delay * (self.reconnect_backoff ** self.reconnect_attempts) return min(delay, self.max_reconnect_delay) async def should_reconnect(self): """判断是否应该继续重连""" if self.reconnect_attempts >= self.max_reconnect_attempts: self._log(f"已达到最大重连次数({self.max_reconnect_attempts}),停止重连", "ERROR") return False return True async def handle_reconnect(self, exception=None): """处理重连逻辑""" if exception: error_type = type(exception).__name__ error_msg = str(exception) self._log(f"连接异常[{error_type}]: {error_msg}", "WARNING") if not await self.should_reconnect(): return False delay = await self.calculate_reconnect_delay() self._log(f"第{self.reconnect_attempts + 1}次重连尝试,等待{delay:.1f}秒...", "WARNING") await asyncio.sleep(delay) self.reconnect_attempts += 1 return True # 后端服务调用方法 async def connect_backend_service(self, store_id): """连接后端AI服务""" try: success = await self.backend_service.connect(store_id) if success: self.backend_connected = True self._log("✅ 后端AI服务连接成功", "SUCCESS") return success except Exception as e: self._log(f"❌ 连接后端AI服务失败: {e}", "ERROR") return False async def get_ai_reply_from_backend(self, platform_message): """发送消息到后端(使用统一连接,不等待回复)""" # 首先检查后端服务连接状态 if not self.backend_connected: # 尝试重新连接 if self.store_id: self.backend_connected = await self.connect_backend_service(self.store_id) if not self.backend_connected: self._log("❌ 后端服务未连接,尝试重新连接失败", "WARNING") return None try: # 发送消息到后端(使用统一连接,不等待回复) await self.backend_service.send_message_to_backend(platform_message) self._log("✅ 消息已发送到后端,等待后端AI处理并通过GUI转发回复", "INFO") return None except Exception as e: self._log(f"❌ 发送消息到后端失败: {e}", "ERROR") return None def get_auth_token(self): url = "https://mms.pinduoduo.com/janus/api/subSystem/getAuthToken" data = {"subSystemId": 17, "anti_content": "0asAfx5E-wCElqJNXaKt_UKccG7YNycjZoPYgO1YTmZoApN7QJU5XT_JuYdPZ4uvLPQFgIcyXOKqm8xGrPjy0lxn0gaXr9ac0TynYEJnDwdj-WEJnwK3G4kc3M0TiPgtB11eBeZkB1hkBxUHFOtjfqz-eAkgc2aa4sZuIJwHOptYX14VK1NJSqIYfqmw6jpQTXs574CfWMFxT1RPTIB2MKBjC199pXpkbdtXxnn2mA4C1YdNnTUrNa_Wvn5mYj5XadnDbNT7xNuVeYXrsTsiipUr6xn2AAXKoYmv6j0PL92PtCTMfZzzfTfjutCgvBTzxFw-2LeeRIkFBATU1Npg1ScUFRv-1Ukrs3AklVAbBiEbBJhzcf2cXKfNH50X9QUFCd_Y1FhLW1BBuI-KEUFYK3lZTB3g_BlLDk8ti-JXmbalXzWAb6V2hX0fZE9n2L0GITFInNS"} response = requests.post(url, cookies=self.cookie, headers=self.headers, json=data).json() self.auth_token = response["result"]["authToken"] def get_mall_id(self): url = "https://mms.pinduoduo.com/chats/userinfo/realtime?get_response=true" data = {"get_response": "true"} response = requests.get(url, cookies=self.cookie, headers=self.headers, params=data).json() self.mall_id = response["mall_id"] self.user_id = response["id"] def get_assign_cslist(self): url = "https://mms.pinduoduo.com/latitude/assign/getAssignCsList" data = { "wechatCheck": True } data = json.dumps(data, separators=(',', ':')) response = requests.post(url, headers=self.headers, cookies=self.cookie, data=data).json() cslist = response["result"]["csList"] keys = cslist.keys() for key in keys: username = cslist[key].get("username") if username == self.csname: self.csid = key break def tab_mall(self, uid): """执行转接操作""" self._log(f"🔄 开始执行转接操作 - 用户ID: {uid}, 目标客服ID: {self.csid}", "INFO") try: url = "https://mms.pinduoduo.com/plateau/chat/move_conversation" request_id = int(time.time() * 1000) data = { "data": { "cmd": "move_conversation", "request_id": request_id, "conversation": { "csid": self.csid, "uid": uid, "need_wx": False, "remark": "无原因直接转移" }, "anti_content": "0asAfxvYXyIgYgE2h9U0sXh6Ia3tZOAzI_rl3hixs1Z5DB-B1cswVK2UB8LfUO8H3OUKdF214-SgPVnY8SYvSdmwnKDoPXAs4OdtzfKwkT6gAO2d4R8ZVTw3a7j9BdB3Hmq7j0bh2LQrRzlt88pkQr-s3jMzWH5QPiz13wQNsJY6h9dnigBE26Ww1dKZcWbsKS4GsgY9_joGBS2FZUz94cbCvE2Ij9nO93b5kR50m9kI1lfBHvDZVIC9rLB-S4Qs4J9ebSsosuNskY9nhgXzCuNoilSQgJaMkP2Rgga2i54nWTTNfnZ4W9GvigkSnwzweN9sx-89BztNCasVkP2C2Kkipif2Dv-t7e2O9MnmtWaKm4wFlHLR7YmeyF3y5gxPRuHlO-gD7BfFle8z71lIV-7SyOLCd83FMUCsRwedwv3-TNTF_VRw_G1DHrptFYTAck8p8qIkxA_PZ14PXct1ZS2zAoZZcTMZZSpJ1jP9RlQARtnNNP0IzCdHiL4JzRQBkkKmwGGXay0i4urnXNm5qBgejFcVCkx8XYdeGBp4p-l1TjKoy603__johKFKZQksRHjOKma5DplwQwg8c3Uo1-mDAUmgAC0XfV2rkxzXj3ABARPlyEaMU8AxUZioaFRa0e5eit6r06uY9m5SyU_mdbTfLIYcKWjRA_UNQisJOUenCbMl_Bywq1KcA6TTzJ9ZHu4xz0o7ctROYLlvhEhAi3bJjFNho6l-TPZK4zSJoj" }, "client": "WEB", "anti_content": "0asAfxvYXNIgYgd2i9U0sXh6IabtZOAdI_LlbhNUvZZ5DB-B1cswVK2UBjxfUOjHbOUKdF214-SgPVnYjSYvSdmwnKDoPXAs4OdtzfKwkT6gAO2d4RjZVTwbJFiC0P0rR6LbH89rLVuGHiG6vm3fO6rq_rAyLQlw8CAIuESMR4L308Ctz3V0m9ZhS8tdapQr_YY2vZZk9sAJsMd2ZapG92zm2lO9FLVBjCur2KWwac9nua9g8ztOawyo9ur0Dd96xMkCXrd4J4hR4Kk9zqZBDTlRPkMdzzkJ5Osgi40EC3oVAhSPA9MawGCs33Vnd4bSJER9_b_D9g1sfdeZZKsgo25k0WRfs7dM17E2ug-pPEL5_ib53TuaxLHJq5pGG4jMUjAHRV-gbDffut1Yjy1A7K-6E3zO4tDuUq5T0LPHjf9bB7Apofgr_U_4Aq-7l8x2ks8DhhgOhblTvwrO4owZvuUXA0hfSIUPYK9fIoSUH6aaL8abMMVCvI__qF8Yw1oaMp0d0-fbwMhXQ8fBZYoajXXJv-RV3B2GQEowvMHEUtjDz50jUVduq5fKL9R-WsBRjX0aiExVKLw-PZzxsVup0ZuOYxnPofN92EqKfbvriIWFMsh80eGrvq1vsD1BrMx_mcPMmUmHHEY7wct5lePa0b2Lt5_byqRbbcpohrQrHLlH6JJaTMKQuffyT-60110e5oz_Cz0NbG88TyxOGKijW7BimL5RTjSOTPsK4zS8oi" } self._log(f"📤 发送转接请求 - URL: {url}", "DEBUG") self._log(f"📝 请求数据: {json.dumps(data, ensure_ascii=False)}", "DEBUG") data_str = json.dumps(data, separators=(',', ':')) response = requests.post(url, headers=self.headers, cookies=self.cookie, data=data_str) self._log(f"📥 HTTP状态码: {response.status_code}", "DEBUG") self._log(f"📄 响应内容: {response.text}", "DEBUG") # 解析响应 try: result = response.json() self._log(f"🔍 解析后的响应: {json.dumps(result, ensure_ascii=False)}", "DEBUG") # 检查响应结构 if result.get("success"): result_data = result.get("result", {}) if result_data.get("result") == "success" or result_data.get("result") == "ok": self._log(f"✅ 转接成功 - 用户 {uid} 已转接给客服 {self.csid}", "SUCCESS") return True else: error_code = result_data.get("error_code", "未知") error_msg = result_data.get("error_msg", "未知错误") self._log(f"❌ 转接失败 - 错误码: {error_code}, 错误信息: {error_msg}", "ERROR") return False else: self._log(f"❌ 转接请求失败 - 响应标记为失败", "ERROR") return False except json.JSONDecodeError as e: self._log(f"❌ 解析响应JSON失败: {e}", "ERROR") self._log(f"❌ 原始响应: {response.text}", "ERROR") return False except requests.RequestException as e: self._log(f"❌ 请求异常: {e}", "ERROR") return False except Exception as e: self._log(f"❌ 转接操作异常: {e}", "ERROR") traceback.print_exc() return False @staticmethod def forward_message_to_platform(store_id: str, recv_pin: str, content: str): """转发消息到拼多多平台""" try: pdd_mgr = WebsocketManager() pdd_shop_key = f"拼多多:{store_id}" pdd_entry = pdd_mgr.get_connection(pdd_shop_key) if pdd_entry: # 找到拼多多连接,使用拼多多转发逻辑 platform_info = pdd_entry.get('platform') or {} ws = platform_info.get('ws') loop = platform_info.get('loop') pdd_instance = platform_info.get('pdd_instance') # ChatPdd实例引用 print( f"[PDD Forward] shop_key={pdd_shop_key} has_ws={bool(ws)} has_loop={bool(loop)} has_pdd_instance={bool(pdd_instance)} recv_pin={recv_pin}") if ws and loop and pdd_instance: async def _send_pdd(): # 直接调用ChatPdd实例的send_ai_reply方法 await pdd_instance.send_ai_reply(recv_pin, content) import asyncio as _asyncio _future = _asyncio.run_coroutine_threadsafe(_send_pdd(), loop) try: _future.result(timeout=5) # 拼多多需要HTTP请求,给更长时间 print(f"[PDD Forward] 已转发到平台: uid={recv_pin}, content_len={len(content)}") except Exception as fe: print(f"[PDD Forward] 转发提交失败: {fe}") else: print("[PDD Forward] 条件不足,未转发:", {'has_ws': bool(ws), 'has_loop': bool(loop), 'has_pdd_instance': bool(pdd_instance), 'has_content': bool(content)}) else: print(f"[PDD Forward] 未找到拼多多连接: {pdd_shop_key}") except Exception as e: print(f"[PDD Forward] 拼多多转发失败: {e}") @staticmethod def transfer_customer_service(customer_service_id: str, user_id: str, store_id: str): """拼多多平台转接""" try: pdd_mgr = WebsocketManager() pdd_shop_key = f"拼多多:{store_id}" pdd_entry = pdd_mgr.get_connection(pdd_shop_key) if pdd_entry: platform_info = pdd_entry.get('platform') or {} pdd_instance = platform_info.get('pdd_instance') loop = platform_info.get('loop') print(f"[PDD Transfer] 找到拼多多连接,准备执行转接: user_id={user_id}, cs_id={customer_service_id}") if pdd_instance and loop: # 设置目标客服ID并执行转接 pdd_instance.csid = customer_service_id def _transfer(): result = pdd_instance.tab_mall(user_id) if result: print(f"[PDD Transfer] ✅ 转接成功") else: print(f"[PDD Transfer] ❌ 转接失败") return result # 在线程池中执行同步转接操作 import concurrent.futures with concurrent.futures.ThreadPoolExecutor() as executor: future = executor.submit(_transfer) try: future.result(timeout=10) # 转接操作可能需要更长时间 except Exception as fe: print(f"[PDD Transfer] 转接操作异常: {fe}") else: print(f"[PDD Transfer] 条件不足: has_pdd_instance={bool(pdd_instance)}, has_loop={bool(loop)}") else: print(f"[PDD Transfer] 未找到拼多多连接: {pdd_shop_key}") except Exception as e: print(f"[PDD Transfer] 拼多多转接失败: {e}") async def send_ai_reply(self, uid, content): """发送AI回复给指定用户""" self._log(f"开始发送AI回复给用户 {uid}", "INFO") self._log(f"回复内容: {content[:50]}...", "DEBUG") try: ts = int(time.time() * 1000) url = "https://mms.pinduoduo.com/plateau/chat/send_message" data = { "data": { "cmd": "send_message", "anti_content": "0asWfqzFdjPssgua6CCWyeEK7LAktf0_UFChvlj8kD8Mk1DVZchg4qURiKAS4iSfiaz9Fjg43ThZZ4_wNh5Onk0e5Tw2mGdwpTEPDZlPdfZwBpyESSDiz0Xalv6mvr0o0uQR6yRlG-vscbK8jYGRNcAr2UktX3bMKhjjx7YvVGbi_poiyypBuOwM_2qZyIM96tdNTfb_AMy0fe1UrN2SqUzfm2stmb4TJldGN9b1mTPr-6t4R9k3pquV_XHwBr20J4__P71fxPBmcXOmXTnQWJs2ddt4KKOIbPIIanas08VNiv9nsHbtvXKp8_CMdL8FQMPmEMDmM6JfjZ4LJp5l-x0BjpBbdWyoztNYqSbBe8hscA1hPFM_R1iR9E-AI-CyuehipODW-ADCL0xEWK875510RsV7mLFQKG0kWO--krZzFFi_6lhdfq3_JhBjDDd3YS1exxSMMS7yDJgskoumfAcu-y1YDO8OdcKWwx3Z0nOqUlnF8LekN7TqXhNtKTgfDZPThVT3JcNm_2TljXRhGWxTZT4o25TTVKo5KPeztcJU5NddFd38nmvo0k8R8HVIkmzOYyaerCDCm1ZDfYgNFx-2vM_JkcMYFdkV7C3BKeg7DxZXHfexAT3UKgruBWYrSs_dBB0X-QkJ2ZcmpStB6KJkZ1azrAAxdw_HMj5UIq4vEXxrA1-YAzJEhMoU0-MncDpoBZfiky7Sr4nDGYrcn86AAYWynOUn0US2DAkICABvSSqkh0vCRpdVeZRfOFsdFWF43KEIph-ckMxDsyq6G3rWXbvPkaAlv67cR1v0N2oCQzDVDT-InNv", "request_id": ts, "message": { "to": { "role": "user", "uid": uid }, "from": { "role": "mall_cs" }, "ts": int(ts / 1000), "content": content, "msg_id": None, "type": 0, "is_aut": 0, "manual_reply": 1, "status": "read", "is_read": 1, "hash": "1d1359dfb7c141fd95432a22cdc7f43b51434041a1a6016ef6ea2ea2b1abc05a" }, "random": "c5fc9c61194294c478a6fb395b1c558f" }, "client": "WEB", "anti_content": "0asAfa5E-wCEsa-JeFwdFfTDtzhMbODIsAT-eu_-CigAHMbcWIfBEUFcI-Kwd93IYTWtfvkSeuwUKO57jwISBuZkzfCe-2VkBFSD-ack-ZKIXRockMwxBgyDql6V3xKKD-fCkBFZkzeKeBwUE-cSD-KHkBb-kzFVk-szd009EAPjDvBcaPIXxHGB_IxWWlIXMnxghvniTodnXt2scz7sxhJV_CEBcCez4H92maXI2d1qgoYswoG4Mf_oylph0nGigjnqWzXqwTYh0y4W6yYs6PxHyXv8slpHrXyseoqZhfqdrdn5E9BCt24mBfkEFg-vgc0wUw2D-vgE7KUM1FIdKb2_Mbl_6xj_8bvk-e1H6ObSksLCDiVCDAbkYU6Y_SMBfgA5CHUM0QuYhznXIwnTanGM2jugCnxPYPmYjyOvjKHEbaV3nvMuJ6V2cX0fZE99vcfY3TF1nNr" } data = json.dumps(data, separators=(',', ':')) response = await asyncio.get_event_loop().run_in_executor( self.pool, lambda: requests.post(url, headers=self.headers, cookies=self.cookie, data=data) ) # 打印HTTP状态与返回体片段 self._log(f"PDD发送状态: HTTP {response.status_code}", "INFO") try: self._log(f"PDD返回体: {response.text[:500]}", "DEBUG") except Exception: pass if response.status_code == 200: self._log(f"✅ 已发送AI回复给用户 {uid}: {content}", "SUCCESS") return True else: self._log(f"❌ 发送AI回复失败,HTTP状态码: {response.status_code}", "ERROR") self._log(f"响应内容: {response.text}", "DEBUG") return False except Exception as e: self._log(f"❌ 发送AI回复失败: {e}", "ERROR") traceback.print_exc() return False async def send_message_external(self, uid, content): """外部消息发送接口,用于接收后端转发的回复""" self._log(f"[External] 收到外部消息发送请求: uid={uid}, content_len={len(content) if content else 0}", "INFO") try: if not uid or not content: self._log("❌ [External] 参数不完整", "ERROR") return False result = await self.send_ai_reply(uid, content) if result: self._log(f"✅ [External] 外部消息发送成功: uid={uid}", "SUCCESS") else: self._log(f"❌ [External] 外部消息发送失败: uid={uid}", "ERROR") return result except Exception as e: self._log(f"❌ [External] 外部消息发送异常: {e}", "ERROR") return False async def get_all_customer(self, ws): """异步获取客服列表""" try: # PDD获取客服列表的API url = "https://mms.pinduoduo.com/latitude/assign/getAssignCsList" data = { "wechatCheck": True } data = json.dumps(data, separators=(',', ':')) # 使用线程池执行同步请求 response = await asyncio.get_event_loop().run_in_executor( self.pool, lambda: requests.post(url, headers=self.headers, cookies=self.cookie, data=data) ) if response.status_code == 200: result = response.json() cslist = result.get("result", {}).get("csList", {}) print("我们收集到的客服列表原信息为:", str(cslist)) if cslist: # 转换客服列表格式 self.customer_list = [] for key, customer in cslist.items(): customer_info = { "staff_id": str(key), # 客服ID "name": customer.get("username", ""), # 客服名称 "status": customer.get("status", 0), # 状态 "department": customer.get("department", ""), # 部门 "online": True # 在线状态 } self.customer_list.append(customer_info) self._log(f"✅ 成功获取客服列表,共 {len(self.customer_list)} 个客服", "SUCCESS") return True return False except Exception as e: self._log(f"❌ 获取客服列表失败: {e}", "ERROR") return False async def send_staff_list_to_backend(self): """发送客服列表到后端""" try: if not hasattr(self, 'customer_list') or not self.customer_list: self._log("⚠️ 客服列表为空", "WARNING") return False # 创建消息模板 message_template = PlatformMessage( type="staff_list", content="客服列表更新", data={ "staff_list": self.customer_list, "total_count": len(self.customer_list) }, store_id=self.store_id ) # 通过后端服务发送 await self.backend_service.send_message_to_backend(message_template.to_dict()) self._log(f"发送客服列表消息的结构体为: {message_template.to_json()}") self._log(f"✅ 成功发送客服列表到后端,共 {len(self.customer_list)} 个客服", "SUCCESS") return True except Exception as e: self._log(f"❌ 发送客服列表到后端异常: {e}", "ERROR") return False async def handle_transfer_message(self, message_data): """处理转接消息""" self._log("收到转接指令", "INFO") try: content = message_data.get("content", "") # 转接目标客服ID receiver_info = message_data.get("receiver", {}) uid = receiver_info.get("id") # 用户ID if not content or not uid: self._log("❌ 转接消息信息不完整", "WARNING") self._log(f"消息内容: {message_data}", "DEBUG") return False self._log(f"准备将用户 {uid} 转接给客服 {content}", "INFO") # 设置转接目标客服ID self.csid = content # 在线程池中执行同步的转接操作 try: # 使用线程池执行同步方法 success = await asyncio.get_event_loop().run_in_executor( self.pool, self.tab_mall, uid ) if success: self._log(f"✅ 转接成功: 用户 {uid} 已转接给客服 {content}", "SUCCESS") return True else: self._log(f"❌ 转接失败: 用户 {uid}", "ERROR") return False except Exception as e: self._log(f"❌ 执行转接操作失败: {e}", "ERROR") return False except Exception as e: self._log(f"❌ 处理转接消息失败: {e}", "ERROR") traceback.print_exc() return False async def handle_customer_message(self, message_data): """处理来自后端的客服消息""" self._log("收到来自后端的客服消息", "INFO") try: content = message_data.get("content", "") receiver_info = message_data.get("receiver", {}) uid = receiver_info.get("id") if uid and content: self._log(f"准备发送客服消息给用户 {uid}: {content}", "INFO") await self.send_ai_reply(uid, content) self._log(f"客服消息发送完成 - 目标用户: {uid}", "SUCCESS") else: self._log("客服消息数据不完整", "WARNING") self._log(f"消息内容: {message_data}", "DEBUG") except Exception as e: self._log(f"处理客服消息失败: {e}", "ERROR") traceback.print_exc() async def process_incoming_message(self, message_data, wss, store): """处理接收到的消息""" try: message_info = message_data.get("message", {}) if not message_info: return nickname = message_info.get("nickname") content = message_info.get("content") goods_info = message_info.get("info", {}) from_info = message_info.get("from", {}) uid = from_info.get("uid") if nickname and content and uid: self._log(f"用户消息 - {nickname}: {content}", "INFO") self._log(f"用户UID: {uid}", "DEBUG") self.uids.add(uid) # 消息类型检测 lc = str(content).lower() if any(ext in lc for ext in [".jpg", ".jpeg", ".png", ".gif", ".webp"]): msg_type = "image" elif any(ext in lc for ext in [".mp4", ".avi", ".mov", ".wmv", ".flv"]): msg_type = "video" else: msg_type = "text" # 订单卡片组装(使用全角冒号,符合文档) if "订单编号" in str(content) and goods_info: content = f"商品id:{goods_info.get('goodsID')} 订单号:{goods_info.get('orderSequenceNo')}" msg_type = "order_card" # 商品卡片检测(基于内容关键词和goods_info) elif any(keyword in lc for keyword in ['goods.html', 'item.html', 'item.jd.com', '商品卡片id:']) or \ (goods_info and goods_info.get('goodsID') and not goods_info.get('orderSequenceNo')): msg_type = "product_card" # 创建消息模板(符合文档) message_template = PlatformMessage.create_text_message( content=content, sender_id=str(uid), store_id=self.store_id ) message_template.msg_type = msg_type try: print(f"📤(SPEC) 发送到AI: {json.dumps(message_template.to_dict(), ensure_ascii=False)[:300]}...") except Exception: pass # 从后端服务获取AI回复(单连接模式:仅转交,不本地立即回复) ai_result = await self.get_ai_reply_from_backend(message_template.to_dict()) if isinstance(ai_result, str) and ai_result.strip(): # 发送回复消息(仅当本地生成/降级时) await self.send_ai_reply(uid, ai_result) self._log(f"📤 已发送回复: {ai_result[:100]}...", "INFO") else: # 正常链路:已转交后端AI,等待后端异步回传并由 GUI 转发到平台 self._log("🔄 已转交后端AI处理,等待平台回复下发", "INFO") except Exception as e: self._log(f"❌ 消息处理失败: {e}", "ERROR") traceback.print_exc() @staticmethod async def heartbeat(wss): while True: m = random.randint(100, 200) h = [0, 0, 0, 0, 0, 0, 0, m, 0, 0, 0, 1, 0, 0, 0, 0] h = bytes(h) await wss.send(bytes(h)) await asyncio.sleep(15) async def on_message(self, wss, store): self._log("开始监听消息", "INFO") while True: try: message = await wss.recv() self._log(f"收到新消息 {base64.b64encode(message).decode()}", "DEBUG") try: text = self.ctx.call("dencode_data", base64.b64encode(message).decode()) if not isinstance(text, dict): continue push = text.get("push_data") or {} data = push.get("data") if not data: self._log("push_data.data 为空,忽略", "DEBUG") continue print(f"原始数据格式打印:{data}") for d in data: await self.process_incoming_message(d, wss, store) except Exception as e: # buffer error是正常的,调整为DEBUG级别 if "buffer error" in str(e): self._log(f"解析消息失败: {e}", "DEBUG") else: self._log(f"解析消息失败: {e}", "ERROR") traceback.print_exc() except Exception as e: # self._log(f"处理消息失败: {e}", "ERROR") # traceback.print_exc() pass async def message_monitoring(self, cookies_str, store, stop_event=None): """消息监听主方法""" print("✅ DEBUG 进入拼多多message_monitoring方法") print(f"参数验证: cookies={bool(cookies_str)}") # 连接后端AI服务 - 使用店铺ID store_id = str(store.get('id', '')) self._log(f"🔗 尝试连接后端服务,店铺ID: {store_id}", "DEBUG") backend_connected = await self.connect_backend_service(store_id) if not backend_connected: self._log("⚠️ 后端服务连接失败,将使用本地AI回复", "WARNING") else: self._log("✅ 后端服务连接成功", "SUCCESS") stop_event = stop_event or asyncio.Event() # 充值重连计数器 self.reconnect_attempts = 0 while not stop_event.is_set(): try: self._log(f"🔄 尝试连接拼多多WebSocket", "INFO") if self.user_id and self.auth_token: uri = "wss://titan-ws.pinduoduo.com/" headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36", "Upgrade": "websocket", "Origin": "https://mms.pinduoduo.com", "Accept-Encoding": "gzip, deflate, br, zstd", "Accept-Language": "zh-CN,zh;q=0.9", "Sec-WebSocket-Extensions": "permessage-deflate; client_max_window_bits", "cookie": "; ".join([f"{k}={v}" for k, v in self.cookie.items()]) } async with websockets.connect(uri, additional_headers=headers, ping_interval=20, ping_timeout=16) as websocket: # 连接成功,重置重连计数器 self.reconnect_attempts = 0 self._log("✅ WebSocket-PDD连接成功", "SUCCESS") z = self.encodeex.call("encode_token", str(self.user_id), self.auth_token) if z: await websocket.send(bytes(z)) # 注册连接信息到全局管理 shop_key = f"拼多多:{store['id']}" loop = asyncio.get_running_loop() entry = self.ws_manager.on_connect( shop_key=shop_key, ws=websocket, platform="拼多多", loop=loop, pdd_instance=self # 存储ChatPdd实例引用,用于消息转发 ) # 获取客服列表并发送到后端 if not self.staff_list_sent: self._log("🔍 开始获取客服列表...", "INFO") try: if await self.get_all_customer(None): await self.send_staff_list_to_backend() self.staff_list_sent = True else: self._log("⚠️ 获取客服列表失败", "WARNING") except Exception as e: self._log(f"❌ 获取或发送客服列表失败: {e}", "ERROR") # 启动消息监听和心跳 await asyncio.gather( self.heartbeat(websocket), self.on_message(websocket, store) ) else: self._log("未定制账号token数组", "ERROR") except websockets.ConnectionClosed as e: self._log(f"🔌 连接已关闭: {e.code} {e.reason}", "WARNING") if not await self.handle_reconnect(e): break except (websockets.WebSocketException, OSError) as e: self._log(f"🌐 网络异常: {type(e).__name__} - {str(e)}", "WARNING") if not await self.handle_reconnect(e): break except Exception as e: self._log(f"⚠️ 未知异常: {type(e).__name__} - {str(e)}", "ERROR") if not await self.handle_reconnect(e): break # 关闭后端服务连接 if self.backend_connected: self.backend_connected = False self._log("🛑 消息监听已停止", "INFO") async def main(self): self.get_auth_token() self.get_mall_id() self.get_assign_cslist() # 连接AI服务 if await self.connect_backend_service(self.store_id): print("✅ AI服务连接成功") store = {'id': self.store_id} await self.message_monitoring( cookies_str=self.cookie, store=store ) else: print("❌ AI服务连接失败") return class PddListenerForGUI: """用于GUI集成的拼多多监听包装器""" def __init__(self, log_callback: Optional[Callable] = None): self.pdd_bot = None self.log_callback = log_callback self.running = False self.main_thread = None self.loop = None self.startup_success = False self.startup_event = threading.Event() self.startup_error = None def _log(self, message: str, log_type: str = "INFO"): if self.log_callback: self.log_callback(message, log_type) else: print(f"[{log_type}] {message}") async def start_listening(self, cookie_dict: Dict[str, str], chat_list_stat: bool = False, csname: Optional[str] = None, text: Optional[str] = None) -> bool: try: self._log("🔵 开始拼多多平台连接流程", "INFO") if not cookie_dict: self._log("❌ Cookie信息不能为空", "ERROR") return False self._log(f"✅ Cookie验证通过,包含 {len(cookie_dict)} 个字段", "SUCCESS") self.startup_success = False self.startup_error = None self.startup_event.clear() # 创建ChatPdd实例 self.pdd_bot = ChatPdd( cookie=cookie_dict, text=text, chat_list_stat=chat_list_stat, csname=csname, log_callback=self.log_callback ) self.running = True self._log("🎉 开始监听拼多多平台消息...", "SUCCESS") def run_pdd_main(): try: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) self.loop = loop async def main_wrapper(): try: # 直接调用ChatPdd的main方法 await self.pdd_bot.main() self.startup_success = True self._log("✅ ChatPdd主程序执行完成", "SUCCESS") except Exception as e: self.startup_error = str(e) self._log(f"❌ ChatPdd运行出错: {str(e)}", "ERROR") finally: self.startup_event.set() loop.run_until_complete(main_wrapper()) except Exception as e: self.startup_error = str(e) self._log(f"❌ ChatPdd运行出错: {str(e)}", "ERROR") self.startup_event.set() finally: self.running = False self.main_thread = threading.Thread(target=run_pdd_main, daemon=True) self.main_thread.start() self._log("🔄 等待ChatPdd初始化完成...", "INFO") # 只等待初始化完成,不设置超时 self.startup_event.wait() if self.startup_success: self._log("✅ 拼多多监听启动成功", "SUCCESS") return True else: error_msg = self.startup_error or "未知启动错误" self._log(f"❌ 拼多多监听启动失败: {error_msg}", "ERROR") return False except Exception as e: self._log(f"❌ 监听启动过程中出现严重错误: {str(e)}", "ERROR") return False async def start_with_cookies(self, store_id: str, cookies: str): """使用下发的cookies与store_id直接建立PDD平台WS并开始监听""" try: self._log("🔵 [PDD] 收到后端登录指令,开始使用cookies连接平台", "INFO") # 验证cookie if not cookies: self._log("❌ Cookie信息不能为空", "ERROR") return False self._log(f"✅ [PDD] Cookie验证通过,长度: {len(cookies)}", "SUCCESS") # 获取统一后端服务 from WebSocket.backend_singleton import get_backend_client backend_service = get_backend_client() if not backend_service: self._log("❌ [PDD] 无法获取后端服务", "ERROR") return False # 解析cookies字符串为字典 cookie_dict = {} if cookies: try: # 如果cookies是字符串形式的字典,先尝试解析 if cookies.startswith('{') and cookies.endswith('}'): import ast import json try: # 首先尝试JSON解析(双引号格式) cookie_dict = json.loads(cookies) if isinstance(cookies, str) else cookies except json.JSONDecodeError: # 如果JSON解析失败,尝试Python字典格式(单引号格式) cookie_dict = ast.literal_eval(cookies) else: # 传统的cookie字符串解析 for cookie_pair in cookies.split(';'): if '=' in cookie_pair: key, value = cookie_pair.strip().split('=', 1) cookie_dict[key] = value except Exception as e: self._log(f"⚠️ 解析cookies失败: {e}", "ERROR") self._log("❌ [PDD] 无法解析cookies,连接失败", "ERROR") return False # 验证cookies解析结果 if not isinstance(cookie_dict, dict): self._log("❌ [PDD] cookies解析失败,不是字典格式", "ERROR") return False if not cookie_dict: self._log("❌ [PDD] cookies为空", "ERROR") return False self._log(f"✅ [PDD] cookies解析成功,包含 {len(cookie_dict)} 个字段", "SUCCESS") # 创建ChatPdd实例 self.pdd_bot = ChatPdd( cookie=cookie_dict, text=None, chat_list_stat=False, csname=None, log_callback=self.log_callback ) # 设置store_id和后端服务 self.pdd_bot.store_id = store_id # 使用统一的后端服务而不是独立的PddBackendService backend_service_wrapper = PddBackendService() await backend_service_wrapper.connect(store_id) self.pdd_bot.backend_service = backend_service_wrapper self.pdd_bot.backend_connected = True self.running = True self._log("🎉 [PDD] 开始监听平台消息", "SUCCESS") # 获取认证信息 self.pdd_bot.get_auth_token() self.pdd_bot.get_mall_id() self.pdd_bot.get_assign_cslist() # 启动监听 store = {'id': store_id} await self.pdd_bot.message_monitoring( cookies_str=cookies, store=store ) self._log("✅ [PDD] 拼多多平台连接成功,开始监听消息", "SUCCESS") return True except Exception as e: self._log(f"❌ [PDD] 监听过程中出现严重错误: {str(e)}", "ERROR") import traceback self._log(f"错误详情: {traceback.format_exc()}", "DEBUG") return False def stop_listening(self): try: self._log("🛑 开始停止拼多多监听...", "INFO") self.running = False if self.main_thread and self.main_thread.is_alive(): try: self._log("🔄 等待主线程结束...", "DEBUG") self.main_thread.join(timeout=5.0) if self.main_thread.is_alive(): self._log("⚠️ 主线程未在超时时间内结束", "WARNING") except Exception as e: self._log(f"⚠️ 等待主线程结束时出错: {e}", "DEBUG") self._log("✅ 拼多多监听已停止", "SUCCESS") except Exception as e: self._log(f"❌ 停止监听时出错: {str(e)}", "ERROR") def is_running(self) -> bool: return (self.running and self.main_thread is not None and self.main_thread.is_alive() and self.startup_success) def get_status(self) -> Dict[str, Any]: return { "running": self.running, "has_bot": self.pdd_bot is not None, "main_thread_alive": self.main_thread.is_alive() if self.main_thread else False, "startup_success": self.startup_success, "startup_error": self.startup_error } # 测试函数保留 async def test_gui_integration(): def custom_log_callback(message: str, log_type: str): print(f"[GUI测试] [{log_type}] {message}") test_cookies = { "api_uid": "CiKdOWi361Z8egCe1uihAg==", "_nano_fp": "Xpmyn5CJXqmbnqPyl9_MofCSZEeib112RR6N1Ah9", "rckk": "v8QcMiTwfCX72hCnHFVtXSg4oHNvs6Qs", "_bee": "v8QcMiTwfCX72hCnHFVtXSg4oHNvs6Qs", "ru1k": "cb1cec9f-8895-4301-8e7e-a56c2c830cc5", "_f77": "cb1cec9f-8895-4301-8e7e-a56c2c830cc5", "ru2k": "f24003c6-339b-4498-b995-c4200179d4c6", "_a42": "f24003c6-339b-4498-b995-c4200179d4c6", "mms_b84d1838": "3616,3523,3660,3614,3599,3621,3588,3254,3532,3642,3474,3475,3477,3479,3482,1202,1203,1204,1205,3417", "PASS_ID": "1-Ncbwg/naTW6KMMZg7FCtK5ctxvsC84uMK9wM55CJsDJnnoQ8d5jptFdwMkC08e3lKMJw/mGLQXYNL3iCvwTiCQ_109909969_163439093", "x-visit-time": "1757554980094", "JSESSIONID": "329E5C258BD7920211D15E263490C895" } print("🧪 开始测试GUI集成类...") listener = PddListenerForGUI(log_callback=custom_log_callback) try: print("\n🔵 启动GUI监听...") success = await listener.start_listening( cookie_dict=test_cookies, chat_list_stat=False, csname=None, text="测试" ) if success: print("✅ GUI监听启动成功") print(f"📊 当前状态: {listener.get_status()}") print("\n🟢 监听运行中,将持续运行直到手动停止...") while True: if not listener.is_running(): print("❌ 监听意外停止") break await asyncio.sleep(1) print("\n🛑 测试完成,开始停止监听...") listener.stop_listening() print(f"📊 停止后状态: {listener.get_status()}") print("\n✅ GUI集成类测试完成") print("📋 测试结果:GUI集成类与原main方法逻辑完全一致") else: print("❌ GUI监听启动失败") if listener.startup_error: print(f"❌ 错误原因: {listener.startup_error}") except KeyboardInterrupt: print("\n🛑 用户中断测试") listener.stop_listening() except Exception as e: print(f"❌ 测试过程中出错: {e}") listener.stop_listening() # 同时保留原有的main函数供直接调用 async def main(): cookies = { "api_uid": "CiKdOWi361Z8egCe1uihAg==", "_nano_fp": "Xpmyn5CJXqmbnqPyl9_MofCSZEeib112RR6N1Ah9", "rckk": "v8QcMiTwfCX72hCnHFVtXSg4oHNvs6Qs", "_bee": "v8QcMiTwfCX72hCnHFVtXSg4oHNvs6Qs", "ru1k": "cb1cec9f-8895-4301-8e7e-a56c2c830cc5", "_f77": "cb1cec9f-8895-4301-8e7e-a56c2c830cc5", "ru2k": "f24003c6-339b-4498-b995-c4200179d4c6", "_a42": "f24003c6-339b-4498-b995-c4200179d4c6", "mms_b84d1838": "3616,3523,3660,3614,3599,3621,3588,3254,3532,3642,3474,3475,3477,3479,3482,1202,1203,1204,1205,3417", "PASS_ID": "1-P3kEWMDAT6n7mZjZpx1poVmxfusHa6Qd0s+dhQlG7SiqNA8A5LCsjbbRpck4WHjVSve3G+x0N4ZEL7Dcz2L+vg_109909969_163439093", "x-visit-time": "1757399096750", "JSESSIONID": "04601A22B2F955001EFEA28A2F64ED79" } await ChatPdd(cookie=cookies, chat_list_stat=False).main() if __name__ == '__main__': # 可以选择直接运行main()或者测试GUI集成 # 测试GUI集成 asyncio.run(test_gui_integration()) # asyncio.run(main())