# -*- coding: utf-8 -*- # python let's go # 编辑人:kris思成 import asyncio import hashlib import traceback from datetime import datetime import aiohttp import jsonpath import websockets from loguru import logger import requests import json import time import threading import uuid import os # 定义持久化数据类 class WebsocketManager: _instance = None _lock = threading.Lock() def __new__(cls): if cls._instance is None: with cls._lock: if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._store = {} cls._instance._lock = threading.RLock() return cls._instance def on_connect(self, shop_key, ws, **kwargs): """完全保持原有数据结构""" with self._lock: entry = self._store.setdefault(shop_key, { 'platform': None, 'customers': [], 'user_assignments': {} }) entry['platform'] = { 'ws': ws, # 注意:这里存储的是强引用 'last_heartbeat': datetime.now(), **kwargs } return entry def get_connection(self, shop_key): with self._lock: return self._store.get(shop_key) def remove_connection(self, shop_key): with self._lock: if shop_key in self._store: del self._store[shop_key] class JDBackendService: """京东后端服务调用类(已废弃:使用单后端连接 BackendClient 代替)""" def __init__(self, *args, **kwargs): self.current_store_id = "" async def connect(self, store_id, *args, **kwargs): try: self.current_store_id = str(store_id or "") except Exception: self.current_store_id = "" return True async def send_message_to_backend(self, platform_message): """改为通过单后端连接发送,需携带store_id""" try: from WebSocket.backend_singleton import get_backend_client backend = get_backend_client() if not backend: return None # 从platform_message中构造统一上行结构,并附加store_id body = platform_message.get('body', {}) if isinstance(platform_message, dict) else {} sender_pin = platform_message.get('from', {}).get('pin', '') if isinstance(platform_message, dict) else '' # 优先取消息内的store_id,其次取body内,再次退回当前会话store_id store_id = (platform_message.get('store_id') or body.get('store_id') or self.current_store_id or '') body_type = body.get('type', 'text') content_for_backend = body.get('content', '') if body_type in ('image', 'video'): # 统一从常见字段取URL content_for_backend = ( body.get('url') or body.get('imageUrl') or body.get('videoUrl') or body.get('picUrl') or content_for_backend ) # 检测文本中的商品卡片/订单卡片 if body_type == 'text' and isinstance(content_for_backend, str): try: import re as _re text = content_for_backend.strip() # 商品卡片:JD商品URL if _re.search(r"https?://item(\.m)?\.jd\.com/\d+\.html(\?.*)?", text): body_type = 'product_card' else: # 订单卡片多样式匹配 # 1) 订单在前:咨询/查询/订单号:[,,]? 商品(ID|id|号|编号): m1 = _re.search( r"(?:咨询订单号|查询订单号|订单号)\s*[::]\s*(\d+)[,,]?\s*商品(?:ID|id|号|编号)\s*[::]\s*(\d+)", text) # 2) 商品在前:商品(ID|id|号|编号):[,,]? (咨询/查询)?订单号: m2 = _re.search( r"商品(?:ID|id|号|编号)\s*[::]\s*(\d+)[,,]?\s*(?:咨询订单号|查询订单号|订单号)\s*[::]\s*(\d+)", text) if m1 or m2: body_type = 'order_card' if m1: order_number, product_id = m1.group(1), m1.group(2) else: product_id, order_number = m2.group(1), m2.group(2) # 归一化 content content_for_backend = f"商品id:{product_id} 订单号:{order_number}" except Exception: pass msg = { 'type': 'message', 'content': content_for_backend, 'msg_type': body_type, 'sender': {'id': sender_pin}, 'store_id': store_id } backend.send_message(msg) return None except Exception: return None class FixJdCookie: def __init__(self, log_callback=None): # 定义一些常用参数 super().__init__() # 继承一些父类的初始化参数 self.ws_manager = WebsocketManager() self.log_callback = log_callback # 存储日志回调 # 新增后端服务实例 self.backend_service = JDBackendService() self.backend_connected = False # 新增重连参数 self.reconnect_attempts = 0 self.max_reconnect_attempts = 10 # 最大重连次数 self.base_reconnect_delay = 1.0 # 基础重连延迟 self.max_reconnect_delay = 60.0 # 最大重连延迟 self.reconnect_backoff = 1.5 # 退避系数 # 🔥 存储认证信息,用于文件上传 self.cookies_str = None self.current_aid = None self.current_pin_zj = None # 🔥 启动时清理过期临时文件 self._cleanup_old_temp_files(max_age_hours=24) def _log(self, message, log_type="INFO"): """内部日志方法""" if self.log_callback: self.log_callback(message, log_type) else: print(f"[{log_type}] {message}") def get_config(self, cookies_str): """获取配置""" headers = { "cookie": cookies_str, "user-agent": "Mozilla/5.0", "referer": "https://dongdong.jd.com/", } response = requests.get("https://dongdong.jd.com/workbench/checkin.json", headers=headers, params={"version": "2.6.3", "client": "openweb"}) return response.json() async def init_wss(self, ws, aid, pin_zj): """初始化 socket""" await self.send_heartbeat(ws, aid, pin_zj) print("开始监听初始化") # 🔧 修复:生成唯一设备ID,避免多端互踢 import uuid unique_device_id = f"shuidrop_gui_{uuid.uuid4().hex[:16]}" auth = { "id": hashlib.md5(str(int(time.time() * 1000)).encode()).hexdigest(), "aid": aid, "lang": "zh_CN", "timestamp": int(time.time() * 1000), "type": "auth", "body": {"presence": 1, "clientVersion": "2.6.3"}, "to": {"app": "im.waiter"}, "from": {"app": "im.waiter", "pin": pin_zj, "clientType": "comet", "dvc": unique_device_id} } print(f"[DEBUG] 使用唯一设备ID: {unique_device_id}") await ws.send(json.dumps(auth)) async def waiter_status_switch(self, ws, aid, pin_zj): """设置接待状态""" message = { "id": hashlib.md5(str(int(time.time() * 1000)).encode()).hexdigest(), "aid": aid, "lang": "zh_CN", "timestamp": int(time.time() * 1000), "from": { "app": "im.waiter", "pin": pin_zj, "art": "customerGroupMsg", "clientType": "comet" }, "type": "waiter_status_switch", "body": { "s": 1 } } await ws.send(json.dumps(message)) async def get_all_customer(self, ws, aid, pin_zj): """异步获取商家组织架构(客服列表)""" id_str = hashlib.md5(str(int(time.time() * 1000) + 300).encode()).hexdigest() message = { "id": id_str, "aid": aid, "lang": "zh_CN", "timestamp": int(time.time() * 1000), "from": { "app": "im.waiter", "pin": pin_zj, "art": "customerGroupMsg", "clientType": "comet" }, "type": "org_new", "body": {"param": pin_zj, "paramtype": "ByPin"} } await ws.send(json.dumps(message)) while True: response = await ws.recv() data = json.loads(response) if data.get("id") == id_str and data.get("body", {}).get("groupname") == "商家组织架构": group = data.get("body", {}).get("group") if group: try: # 与旧实现保持一致的数据路径 self.customer_list = group[0].get("group")[0].get("users") except Exception: self.customer_list = [] return async def send_staff_list_to_backend(self, store_id): """发送客服列表到后端(通过统一后端连接)""" try: if not hasattr(self, 'customer_list') or not self.customer_list: self._log("⚠️ 客服列表为空", "WARNING") return False # 转换客服数据格式 staff_infos = [] for staff in self.customer_list: staff_info = { "staff_id": staff.get("pin", ""), "name": staff.get("nickname", ""), "status": staff.get("status", 0), "department": staff.get("department", ""), "online": staff.get("online", True) } staff_infos.append(staff_info) # 通过后端统一连接发送 try: from WebSocket.backend_singleton import get_backend_client backend_client = get_backend_client() if not backend_client or not getattr(backend_client, 'is_connected', False): self._log("❌ 统一后端连接不可用", "ERROR") return False message = { "type": "staff_list", "content": "客服列表更新", "data": { "staff_list": staff_infos, "total_count": len(staff_infos) }, "store_id": str(store_id) } backend_client.send_message(message) self._log(f"✅ 成功发送客服列表到后端,共 {len(staff_infos)} 个客服", "SUCCESS") return True except Exception as e: self._log(f"❌ 发送客服列表到后端失败: {e}", "ERROR") return False except Exception as e: self._log(f"❌ 发送客服列表到后端异常: {e}", "ERROR") return False async def transfer_customer(self, ws, aid, pin, pin_zj, chat_name): """异步客服转接 在发送的消息为客服转接的关键词的时候""" message = { "id": hashlib.md5(str(int(time.time() * 1000)).encode()).hexdigest(), "aid": aid, "lang": "zh_CN", "timestamp": int(time.time() * 1000), "from": { "app": "im.waiter", "pin": pin_zj, "art": "customerGroupMsg", "clientType": "comet" }, "type": "chat_transfer_partern", "body": { "customer": pin, "cappId": "im.customer", "waiter": chat_name, "reason": "", "ext": {"pid": ""}, "pid": "" } } try: await ws.send(json.dumps(message)) return True except Exception: traceback.print_exc() return False async def send_message(self, ws, pin, aid, pin_zj, vender_id, content, msg_type="text"): """异步发送消息 - 支持文本/图片/视频 Args: ws: WebSocket连接 pin: 客户pin aid: 账号aid pin_zj: 客服pin vender_id: 商家ID content: 消息内容(文本内容或URL) msg_type: 消息类型 (text/image/video) """ try: # 根据消息类型调用不同的发送方法 if msg_type == "image": return await self.send_image_message(ws, pin, aid, pin_zj, vender_id, content) elif msg_type == "video": return await self.send_video_message(ws, pin, aid, pin_zj, vender_id, content) else: # 文本消息(默认) print('本地发送文本消息') message = { "ver": "4.3", "type": "chat_message", "from": {"pin": pin_zj, "app": "im.waiter", "clientType": "comet"}, "to": {"app": "im.customer", "pin": pin}, "id": hashlib.md5(str(int(time.time() * 1000)).encode()).hexdigest(), "lang": "zh_CN", "aid": aid, "timestamp": int(time.time() * 1000), "readFlag": 0, "body": { "content": content, "translated": False, "param": {"cusVenderId": vender_id}, "type": "text" } } await ws.send(json.dumps(message)) logger.info(f"消息已经发送到客户端[info] {pin}: {content[:20]} ...") except websockets.ConnectionClosed: logger.error('本地发送消息失败 连接关闭') raise except Exception as e: # 同时这里也要及时进行raise抛出 这样比较好让系统可以看出 异常了可以抛出信息不至于后续被认为 logger.error(f"消息发送过程中出现特殊异常异常信息为: {e}") raise def _get_temp_directory(self) -> str: """智能选择临时文件目录(优先级降级策略) 优先级: 1. 环境变量 SHUIDROP_TEMP_DIR(用户自定义) 2. 应用程序所在目录/temp(如果有写权限且不在Program Files) 3. 系统临时目录(兜底方案) Returns: str: 临时目录路径 """ import tempfile import sys try: # 优先级1:用户自定义环境变量 custom_temp = os.getenv('SHUIDROP_TEMP_DIR') if custom_temp: custom_temp = os.path.join(custom_temp, "shuidrop_jd_temp_uploads") try: os.makedirs(custom_temp, exist_ok=True) # 测试写权限 test_file = os.path.join(custom_temp, ".write_test") with open(test_file, 'w') as f: f.write("test") os.remove(test_file) self._log(f"✅ [JD路径] 使用自定义临时目录: {custom_temp}", "INFO") return custom_temp except Exception as e: self._log(f"⚠️ [JD路径] 自定义目录不可用: {e}", "WARNING") # 优先级2:应用程序所在目录(避免占用C盘) try: # 获取可执行文件路径 if getattr(sys, 'frozen', False): # 打包后 app_dir = os.path.dirname(sys.executable) else: # 开发环境 app_dir = os.path.dirname(os.path.abspath(__file__)) # 检查是否在 Program Files(需要管理员权限) if 'Program Files' not in app_dir and 'Program Files (x86)' not in app_dir: app_temp = os.path.join(app_dir, "temp_uploads", "jd") os.makedirs(app_temp, exist_ok=True) # 测试写权限 test_file = os.path.join(app_temp, ".write_test") with open(test_file, 'w') as f: f.write("test") os.remove(test_file) self._log(f"✅ [JD路径] 使用应用程序目录: {app_temp}", "INFO") return app_temp else: self._log(f"ℹ️ [JD路径] 应用在Program Files,跳过应用目录", "DEBUG") except Exception as e: self._log(f"⚠️ [JD路径] 应用目录不可用: {e}", "DEBUG") # 优先级3:系统临时目录(兜底方案) system_temp = os.path.join(tempfile.gettempdir(), "shuidrop_jd_temp_uploads") os.makedirs(system_temp, exist_ok=True) self._log(f"✅ [JD路径] 使用系统临时目录: {system_temp}", "INFO") return system_temp except Exception as e: # 最终兜底 self._log(f"❌ [JD路径] 所有路径策略失败,使用默认: {e}", "ERROR") import tempfile return os.path.join(tempfile.gettempdir(), "shuidrop_jd_temp_uploads") def _get_file_extension(self, url: str, default_ext: str) -> str: """智能提取文件扩展名 Args: url: 文件URL default_ext: 默认扩展名(jpg/mp4) Returns: str: 文件扩展名 """ try: # 移除查询参数 url_without_params = url.split('?')[0] # 检查是否有有效的文件扩展名 if '.' in url_without_params: parts = url_without_params.split('.') ext = parts[-1].lower() # 验证扩展名是否合法(只包含字母数字) if ext and len(ext) <= 5 and ext.isalnum(): # 常见图片/视频扩展名 valid_exts = ['jpg', 'jpeg', 'png', 'gif', 'webp', 'bmp', 'mp4', 'avi', 'mov', 'wmv', 'flv', 'mkv'] if ext in valid_exts: return ext # 如果无法提取有效扩展名,使用默认值 return default_ext except Exception: return default_ext def _cleanup_old_temp_files(self, max_age_hours=24): """清理过期的临时文件(24小时以上) Args: max_age_hours: 文件最大保留时间(小时) """ try: # 使用智能路径选择 temp_dir = self._get_temp_directory() if not os.path.exists(temp_dir): return now = time.time() max_age_seconds = max_age_hours * 3600 cleaned_count = 0 # 遍历临时目录中的文件 for filename in os.listdir(temp_dir): file_path = os.path.join(temp_dir, filename) # 只处理文件,跳过目录 if not os.path.isfile(file_path): continue try: # 检查文件年龄 file_mtime = os.path.getmtime(file_path) file_age = now - file_mtime if file_age > max_age_seconds: file_size = os.path.getsize(file_path) os.remove(file_path) cleaned_count += 1 self._log(f"🗑️ [JD清理] 删除过期文件: {filename} ({file_size} bytes, {file_age/3600:.1f}小时前)", "DEBUG") except Exception as e: self._log(f"⚠️ [JD清理] 删除文件失败 {filename}: {e}", "DEBUG") if cleaned_count > 0: self._log(f"✅ [JD清理] 已清理 {cleaned_count} 个过期临时文件", "INFO") except Exception as e: self._log(f"⚠️ [JD清理] 临时文件清理失败: {e}", "DEBUG") async def download_file(self, url: str, save_dir: str = None, max_retries: int = 3) -> str: """下载外部文件到本地(带重试机制 + 智能路径选择) Args: url: 文件URL save_dir: 保存目录(None时自动选择最佳路径) max_retries: 最大重试次数 Returns: str: 本地文件路径 """ # 使用智能路径选择策略 if save_dir is None: save_dir = self._get_temp_directory() # 确保目录存在 os.makedirs(save_dir, exist_ok=True) # 智能提取文件扩展名 default_ext = 'jpg' ext = self._get_file_extension(url, default_ext) # 生成唯一文件名 file_name = f"jd_download_{uuid.uuid4().hex[:12]}.{ext}" save_path = os.path.join(save_dir, file_name) # 重试机制 for attempt in range(max_retries): try: if attempt > 0: self._log(f"🔄 [JD下载] 第{attempt + 1}次重试下载...", "INFO") else: self._log(f"📥 [JD下载] 开始下载: {url[:100]}...", "INFO") # 使用线程池下载 loop = asyncio.get_event_loop() response = await loop.run_in_executor( None, lambda: requests.get(url, timeout=60, headers={ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' }) ) # 检查HTTP状态 if response.status_code != 200: self._log(f"❌ [JD下载] HTTP状态码: {response.status_code}", "ERROR") if attempt < max_retries - 1: await asyncio.sleep(2) continue raise Exception(f"下载失败,HTTP状态码: {response.status_code}") # 获取文件数据 file_data = response.content file_size_kb = len(file_data) // 1024 # 检查文件大小(限制50MB) if file_size_kb > 51200: self._log(f"❌ [JD下载] 文件过大: {file_size_kb}KB,超过50MB限制", "ERROR") raise Exception(f"文件过大: {file_size_kb}KB") # 检查文件是否为空 if file_size_kb == 0: self._log(f"❌ [JD下载] 下载的文件为空", "ERROR") if attempt < max_retries - 1: await asyncio.sleep(1) continue raise Exception("下载的文件为空") # 写入临时文件 with open(save_path, 'wb') as f: f.write(file_data) self._log(f"✅ [JD下载] 下载成功,大小: {file_size_kb}KB,文件: {file_name}", "SUCCESS") return save_path except requests.exceptions.RequestException as e: self._log(f"❌ [JD下载] 网络请求失败: {e}", "ERROR") if attempt < max_retries - 1: await asyncio.sleep(2) continue raise except Exception as e: self._log(f"❌ [JD下载] 下载失败: {e}", "ERROR") if attempt < max_retries - 1: await asyncio.sleep(1) continue raise # 所有重试都失败 raise Exception(f"下载失败:已重试{max_retries}次") async def upload_file_to_jd(self, file_path: str, file_type: str) -> dict: """上传文件到京东服务器 Args: file_path: 本地文件路径 file_type: 文件类型 (image/video) Returns: dict: {path: 京东URL, width: 宽度, height: 高度} """ try: if not self.cookies_str or not self.current_aid or not self.current_pin_zj: raise Exception("缺少必要的认证信息(cookies/aid/pin)") self._log(f"📤 开始上传文件到京东: {file_path}", "INFO") # 读取文件内容 with open(file_path, 'rb') as f: file_content = f.read() # 根据文件类型选择API和MIME类型 if file_type == "image" or any(ext in file_path.lower() for ext in ['.png', '.jpg', '.jpeg', '.gif']): url = "https://imio.jd.com/uploadfile/file/uploadImg.action" mime_type = 'image/jpeg' elif file_type == "video" or '.mp4' in file_path.lower(): url = "https://imio.jd.com/uploadfile/file/uploadFile.action" mime_type = 'video/mp4' else: raise Exception(f"不支持的文件类型: {file_path}") # 准备请求 headers = { "authority": "imio.jd.com", "accept": "*/*", "accept-language": "zh-CN,zh;q=0.9", "cache-control": "no-cache", "origin": "https://dongdong.jd.com", "pragma": "no-cache", "referer": "https://dongdong.jd.com/", "sec-ch-ua": '"Chromium";v="134", "Not:A-Brand";v="24", "Google Chrome";v="134"', "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": '"Windows"', "sec-fetch-dest": "empty", "sec-fetch-mode": "cors", "sec-fetch-site": "same-site", "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36", "cookie": self.cookies_str } files = { 'upload': (os.path.basename(file_path), file_content, mime_type) } data = { 'httpsEnable': 'true', 'clientType': 'comet', 'appId': 'im.waiter', 'pin': self.current_pin_zj, 'aid': self.current_aid } # 同步请求(在异步上下文中使用 run_in_executor) loop = asyncio.get_event_loop() response = await loop.run_in_executor( None, lambda: requests.post(url, headers=headers, files=files, data=data, timeout=30) ) result = response.json() if result.get('path'): self._log(f"✅ 文件上传成功: {result.get('path')}", "SUCCESS") return result else: raise Exception(f"上传失败: {result}") except Exception as e: self._log(f"❌ 文件上传失败: {e}", "ERROR") raise async def get_video_thumbnail(self, video_path: str) -> dict: """提取视频封面并上传 Args: video_path: 视频文件路径 Returns: dict: {path: 封面URL, width: 宽度, height: 高度} """ import cv2 try: self._log(f"🎬 开始提取视频封面: {video_path}", "INFO") # 提取视频第一帧 cap = cv2.VideoCapture(video_path) fps = cap.get(cv2.CAP_PROP_FPS) frame_position = int(0 * fps) # 第0秒 cap.set(cv2.CAP_PROP_POS_FRAMES, frame_position) ret, frame = cap.read() cap.release() if not ret: raise Exception("无法读取视频帧") # 编码为JPG success, encoded_image = cv2.imencode('.jpg', frame) if not success: raise Exception("无法编码图像") thumbnail_content = encoded_image.tobytes() # 上传封面(使用图片上传接口) headers = { "authority": "imio.jd.com", "accept": "*/*", "accept-language": "zh-CN,zh;q=0.9", "cache-control": "no-cache", "origin": "https://dongdong.jd.com", "pragma": "no-cache", "referer": "https://dongdong.jd.com/", "sec-ch-ua": '"Chromium";v="134", "Not:A-Brand";v="24", "Google Chrome";v="134"', "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": '"Windows"', "sec-fetch-dest": "empty", "sec-fetch-mode": "cors", "sec-fetch-site": "same-site", "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36", "cookie": self.cookies_str } url = "https://imio.jd.com/uploadfile/file/uploadImg.action" files = { 'upload': ("thumbnail.jpg", thumbnail_content, 'image/jpeg') } data = { 'httpsEnable': 'true', 'clientType': 'comet', 'appId': 'im.waiter', 'pin': self.current_pin_zj, 'aid': self.current_aid } # 同步请求 loop = asyncio.get_event_loop() response = await loop.run_in_executor( None, lambda: requests.post(url, headers=headers, files=files, data=data, timeout=30) ) result = response.json() if result.get('path'): self._log(f"✅ 视频封面上传成功: {result.get('path')}", "SUCCESS") return result else: raise Exception(f"封面上传失败: {result}") except Exception as e: self._log(f"❌ 视频封面提取失败: {e}", "ERROR") raise async def send_image_message(self, ws, pin: str, aid: str, pin_zj: str, vender_id: str, image_url: str): """发送图片消息 Args: ws: WebSocket连接 pin: 客户pin aid: 账号aid pin_zj: 客服pin vender_id: 商家ID image_url: 图片URL(外部URL,需要下载后上传) """ temp_file = None try: self._log(f"📷 开始发送图片消息: {image_url}", "INFO") # 1. 下载图片 temp_file = await self.download_file(image_url) # 2. 上传到京东服务器 upload_result = await self.upload_file_to_jd(temp_file, "image") # 3. 发送图片消息 message = { "ver": "4.3", "type": "chat_message", "from": {"pin": pin_zj, "app": "im.waiter", "clientType": "comet"}, "to": {"app": "im.customer", "pin": pin}, "id": hashlib.md5(str(int(time.time() * 1000)).encode()).hexdigest(), "lang": "zh_CN", "aid": aid, "timestamp": int(time.time() * 1000), "readFlag": 0, "body": { "height": upload_result.get("height", 0), "width": upload_result.get("width", 0), "url": upload_result.get("path"), "translated": "", "param": {"cusVenderId": vender_id}, "type": "image" } } await ws.send(json.dumps(message)) self._log(f"✅ 图片消息发送成功: {pin}", "SUCCESS") except Exception as e: self._log(f"❌ 图片消息发送失败: {e}", "ERROR") raise finally: # 清理临时文件 if temp_file and os.path.exists(temp_file): try: os.remove(temp_file) self._log(f"🗑️ 已清理临时文件: {temp_file}", "DEBUG") except Exception: pass async def send_video_message(self, ws, pin: str, aid: str, pin_zj: str, vender_id: str, video_url: str): """发送视频消息 Args: ws: WebSocket连接 pin: 客户pin aid: 账号aid pin_zj: 客服pin vender_id: 商家ID video_url: 视频URL(外部URL,需要下载后上传) """ temp_video = None try: self._log(f"🎥 开始发送视频消息: {video_url}", "INFO") # 1. 下载视频 temp_video = await self.download_file(video_url) # 2. 提取并上传封面 thumbnail_result = await self.get_video_thumbnail(temp_video) # 3. 上传视频 await asyncio.sleep(2) # 等待2秒,避免请求过快 video_result = await self.upload_file_to_jd(temp_video, "video") # 4. 发送视频消息 message = { "ver": "4.3", "type": "chat_message", "from": {"pin": pin_zj, "app": "im.waiter", "clientType": "comet"}, "to": {"app": "im.customer", "pin": pin}, "id": hashlib.md5(str(int(time.time() * 1000)).encode()).hexdigest(), "lang": "zh_CN", "aid": aid, "timestamp": int(time.time() * 1000), "readFlag": 0, "body": { "desc": "", "duration": 6, "param": {}, "reupload": "false", "size": os.path.getsize(temp_video), "thumbHeight": thumbnail_result.get("height", 0), "thumbWidth": thumbnail_result.get("width", 0), "thumbnail": thumbnail_result.get("path"), "url": video_result.get("path"), "type": "video" } } await ws.send(json.dumps(message)) self._log(f"✅ 视频消息发送成功: {pin}", "SUCCESS") except Exception as e: self._log(f"❌ 视频消息发送失败: {e}", "ERROR") raise finally: # 清理临时文件 if temp_video and os.path.exists(temp_video): try: os.remove(temp_video) self._log(f"🗑️ 已清理临时视频: {temp_video}", "DEBUG") except Exception: pass def get_userinfo(self, response_text): """获取用户 pin 并存入 pins 列表""" pin = jsonpath.jsonpath(json.loads(response_text), "$..from.pin") return pin[0] if pin else None def is_merchants(self, store: object, response): """判断消息是否来自顾客""" send = response['from']['pin'] # 补充: 方法lower()是转化为小写 upper()是转化为大写 title()是每个单词首字母大写其余小写 if send.lower() == "KLD测试".lower(): return False return True async def send_heartbeat(self, ws, aid, pin_zj): """发送心跳包""" msg = { "id": hashlib.md5(str(int(time.time() * 1000)).encode()).hexdigest(), "type": "client_heartbeat", "aid": aid, "ver": "4.1", "lang": "zh_CN", "from": {"pin": pin_zj, "app": "im.waiter", "art": "customerGroupMsg", "clientType": "comet"} } await ws.send(json.dumps(msg, separators=(',', ':'))) # 使用最简心跳包模式 来节约部分性能 减少传输压力 async def heartbeat_loop(self, websocket, aid, pin_zj): """独立的心跳循环""" """ 优化心跳 循环 新增改进 1. 心跳间隔动态调整 2. 异常重试机制 3. 心跳超时控制 4. 状态监控 """ retry_count = 0 max_retries = 3 base_interval = 3.0 # 基础间隔1s backoff_factor = 1.5 # 退避系数 timeout = 10.0 while True: try: start_time = time.monotonic() # 使用websocket原生的ping/pong机制 pong_waiter = await websocket.ping() await asyncio.wait_for(pong_waiter, timeout=timeout) # 如果需要发送自定义心跳包 await self.send_heartbeat(websocket, aid, pin_zj) # 计算剩余等待时间 elapsed = time.monotonic() - start_time sleep_time = max(0.0, base_interval - elapsed) if sleep_time > 0: await asyncio.sleep(sleep_time) retry_count = 0 # 重置重试计数 except asyncio.TimeoutError: logger.warning(f'心跳超时,已用时: {time.monotonic() - start_time:.2f}秒') retry_count += 1 except websockets.ConnectionClosed: logger.error("连接关闭,心跳已停止") break except Exception as e: logger.error(f"心跳异常: {e}", exc_info=True) retry_count += 1 if retry_count >= max_retries: logger.error(f"心跳连续失败{retry_count}次,终止循环") break # 退避策略 if retry_count > 0: backoff_time = min( base_interval * (backoff_factor ** (retry_count - 1)), 60.0 # 最大等待60秒 ) logger.debug(f"心跳失败,等待{backoff_time:.2f}秒后重试") await asyncio.sleep(backoff_time) ''' 整理重连方法 ''' async def calculate_reconnect_delay(self): """计算指数退避的重连延迟时间""" delay = self.base_reconnect_delay * (self.reconnect_backoff ** self.reconnect_attempts) return min(delay, self.max_reconnect_delay) async def should_reconnect(self): """判断是否应该继续重连""" if self.reconnect_attempts >= self.max_reconnect_attempts: self._log(f"已达到最大重连次数({self.max_reconnect_attempts}),停止重连", "ERROR") return False return True async def handle_reconnect(self, exception=None): """处理重连逻辑""" if exception: error_type = type(exception).__name__ error_msg = str(exception) self._log(f"连接异常[{error_type}]: {error_msg}", "WARNING") if not await self.should_reconnect(): return False delay = await self.calculate_reconnect_delay() self._log(f"第{self.reconnect_attempts + 1}次重连尝试,等待{delay:.1f}秒...", "WARNING") await asyncio.sleep(delay) self.reconnect_attempts += 1 return True ''' 后端服务调用方法 ''' async def connect_backend_service(self, store_id): """连接后端AI服务""" try: success = await self.backend_service.connect(store_id) if success: self.backend_connected = True self._log("✅ 后端AI服务连接成功", "SUCCESS") return success except Exception as e: self._log(f"❌ 连接后端AI服务失败: {e}", "ERROR") return False async def get_ai_reply_from_backend(self, platform_message): """从后端服务获取AI回复""" # 首先检查后端服务连接状态 if not self.backend_connected: # 尝试重新连接 store_id = str(platform_message.get('body', {}).get('chatinfo', {}).get('venderId', '')) if store_id: self.backend_connected = await self.connect_backend_service(store_id) if not self.backend_connected: self._log("❌ 后端服务未连接,尝试重新连接失败,使用本地AI回复", "WARNING") # 降级到本地AI回复 customer_sid = platform_message.get('body', {}).get('chatinfo', {}).get('sid', '') customer_message = platform_message.get('body', {}).get('content', '') return try: # 推送给后端,由后端异步回传AI消息到GUI;此处不进行本地立即回复 await self.backend_service.send_message_to_backend(platform_message) return None except Exception as e: self._log(f"❌ 获取AI回复失败: {e},使用本地AI", "ERROR") customer_sid = platform_message.get('body', {}).get('chatinfo', {}).get('sid', '') customer_message = platform_message.get('body', {}).get('content', '') return async def process_incoming_message(self, response, ws, aid, pin_zj, vender_id, store): """处理接收到的消息""" try: # 解析消息 json_resp = json.loads(response) if isinstance(response, (str, bytes)) else response # 验证消息格式 if not all([ json_resp, json_resp.get('type') == "chat_message", json_resp.get('ver') == "4.2", isinstance(json_resp.get("body"), dict) ]): return # 过滤非客户消息 pin = self.get_userinfo(response) if not self.is_merchants(store, json_resp): return # 提取消息内容 message_type = json_resp['body']['type'] if message_type == 'text': customer_message = json_resp['body']['content'] elif message_type == 'image': customer_message = f"[图片] {json_resp['body'].get('url') or json_resp['body'].get('imageUrl') or json_resp['body'].get('picUrl') or ''}" elif message_type == 'video': customer_message = f"[视频] {json_resp['body'].get('url') or json_resp['body'].get('videoUrl') or ''}" else: return self._log(f"📩 收到客户消息: {pin}: {customer_message[:100]}...", "INFO") # 从后端服务获取AI回复(单连接模式:仅转交,不本地立即回复) ai_result = await self.get_ai_reply_from_backend(json_resp) if isinstance(ai_result, str) and ai_result.strip(): # 发送回复消息(仅当本地生成/降级时) await self.send_message( ws=ws, pin=pin, aid=aid, pin_zj=pin_zj, vender_id=vender_id, content=ai_result ) self._log(f"📤 已发送回复: {ai_result[:100]}...", "INFO") else: # 正常链路:已转交后端AI,等待后端异步回传并由 GUI 转发到平台 self._log("🔄 已转交后端AI处理,等待平台回复下发", "INFO") except json.JSONDecodeError: self._log("❌ 消息JSON解析失败", "ERROR") except Exception as e: self._log(f"❌ 消息处理失败: {e}", "ERROR") async def message_monitoring(self, cookies_str, aid, pin_zj, vender_id, store, sleep_time=0.5, stop_event=None): print("✅ DEBUG 进入message_monitoring方法") print(f"参数验证: cookies={bool(cookies_str)}, aid={aid}, pin_zj={pin_zj}") # 🔥 保存认证信息,用于文件上传 self.cookies_str = cookies_str self.current_aid = aid self.current_pin_zj = pin_zj # 连接后端AI服务 - 使用店铺ID或venderId store_id = str(store.get('id', '')) or str(vender_id) self._log(f"🔗 尝试连接后端服务,店铺ID: {store_id}", "DEBUG") backend_connected = await self.connect_backend_service(store_id) if not backend_connected: self._log("⚠️ 后端服务连接失败,将使用本地AI回复", "WARNING") else: self._log("✅ 后端服务连接成功", "SUCCESS") stop_event = stop_event or asyncio.Event() # 如果外部没传,自己创建 uri = "wss://dongdong.jd.com/workbench/websocket" headers = {"cookie": cookies_str} self._log(f"🔵 开始连接WebSocket: {uri}", "INFO") # 充值重连计数器 self.reconnect_attempts = 0 while not stop_event.is_set(): try: self._log(f"🔄 尝试连接WebSocket", "INFO") async with websockets.connect(uri, additional_headers=headers, ping_interval=6, ping_timeout=10, close_timeout=1, max_queue=1024) as ws: # 连接成功,重置重连计数器 self.reconnect_attempts = 0 self._log("✅ WebSocket-JD连接成功", "SUCCESS") await self.init_wss(ws, aid, pin_zj) # === 验证连接成功的核心指标 === # print(f"✅ 连接状态: open={ws.open}, closed={ws.closed}") print(f"🖥️ 服务端地址: {ws.remote_address}") # 连接成功后,获取并上报一次客服列表(最小改动恢复) try: await self.get_all_customer(ws, aid, pin_zj) await self.send_staff_list_to_backend(store.get('id', '')) except Exception as e: self._log(f"❌ 获取或发送客服列表失败: {e}", "ERROR") # --- 注册连接信息到全局管理 shop_key = f"京东:{store['id']}" loop = asyncio.get_running_loop() entry = self.ws_manager.on_connect( shop_key=shop_key, ws=ws, vender_id=vender_id, aid=aid, pin_zj=pin_zj, platform="京东", loop=loop, cookies_str=cookies_str # 🔥 传递cookies用于文件上传 ) await self.waiter_status_switch(ws=ws, aid=aid, pin_zj=pin_zj) heartbeat_task = asyncio.create_task(self.heartbeat_loop(ws, aid, pin_zj)) message_tasks = set() try: while not stop_event.is_set(): # 检查是否收到中止信号 try: print(f"等待监听消息-{datetime.now()}") response = await asyncio.wait_for(ws.recv(), timeout=1) print(f"原始消息类型:{type(response)}, 消息体为: {response}") # 🔧 修复:检测被踢下线消息 json_resp = json.loads(response) if isinstance(response, (str, bytes)) else response # 检查是否为server_msg类型且code=5(被踢下线) if json_resp.get("type") == "server_msg": body = json_resp.get("body", {}) if body.get("code") == 5: msgtext = body.get("msgtext", "账号在其他设备登录") self._log(f"⚠️ 收到被踢下线消息: {msgtext}", "WARNING") self._log("⚠️ 检测到多端登录冲突,请确保:", "WARNING") self._log(" 1. 关闭网页版京东咚咚", "WARNING") self._log(" 2. 关闭其他客户端", "WARNING") self._log(" 3. 确认只有本客户端连接", "WARNING") # 通知GUI显示弹窗 try: from WebSocket.backend_singleton import get_websocket_manager ws_manager = get_websocket_manager() if ws_manager: # 从platform_listeners获取店铺名称 store_name = "京东店铺" for key, info in ws_manager.platform_listeners.items(): if info.get('store_id') == store: store_name = info.get('store_name', '') or "京东店铺" break # 传递 store_id 参数,用于通知后端 ws_manager.notify_platform_kicked("京东", store_name, msgtext, store_id=store) except Exception as notify_error: self._log(f"通知GUI失败: {notify_error}", "ERROR") # 不再自动重连,等待用户处理 stop_event.set() break await self.process_incoming_message(response, ws, aid, pin_zj, vender_id, store) # print(json_resp) ver = json_resp.get("ver") print(f"版本{ver}") except asyncio.TimeoutError: continue except websockets.ConnectionClosed: # 连接关闭 跳出内层循环进行重连 break except Exception as e: self._log(f"消息处理异常: {e}", "ERROR") continue await asyncio.sleep(sleep_time) ### ##{'ver': '4.2', 'mid': 366566937, 'body': {'chatinfo': {'venderId': '11961298', 'isJdSuperMarket': '0', 'pid': '10143502227300', 'source': 'jimitwo_service_smart_sdk', 'deviceNo': 'dd_dvc_aes_30EA91E824A2F36365F7B7193C10B76A8842E4E08C0DA687EC9BEB307FCF7195', 'label': 1, 'IMService': False, 'distinguishPersonJimi': 2, 'proVer': 'smart_android_15.2.0', 'sid': 'ce7f35b51a9c00ac898b0fe08608674a', 'entry': 'sdk_item', 'leaveMsgTable': 1, 'venderName': 'TYBOY康路达数字科技专卖店', 'disputeId': -1, 'ddSessionType': '1', 'appId': 'im.waiter', 'systemVer': 'android_13_V2239A', 'eidtoken': 'jdd01C5XRGQHGNJBSFYWWA7RF54QXYHF2N34TM32NGLZV6YAAPPNKWLAIQ2MZV25T4QZ2TJE4HRF6UZ2L3THX7I2BLLB37YVAWKR2BYGRRPA01234567', 'auctionType': '0', 'region': 'CN', 'verification': 'slide'}, 'param': {'venderId': '11961298', 'isJdSuperMarket': '0', 'pid': '10143502227300', 'source': 'jimitwo_service_smart_sdk', 'deviceNo': 'dd_dvc_aes_30EA91E824A2F36365F7B7193C10B76A8842E4E08C0DA687EC9BEB307FCF7195', 'label': 1, 'IMService': False, 'distinguishPersonJimi': 2, 'proVer': 'smart_android_15.2.0', 'sid': 'ce7f35b51a9c00ac898b0fe08608674a', 'entry': 'sdk_item', 'leaveMsgTable': 1, 'venderName': 'TYBOY康路达数字科技专卖店', 'disputeId': -1, 'ddSessionType': '1', 'appId': 'im.waiter', 'systemVer': 'android_13_V2239A', 'eidtoken': 'jdd01C5XRGQHGNJBSFYWWA7RF54QXYHF2N34TM32NGLZV6YAAPPNKWLAIQ2MZV25T4QZ2TJE4HRF6UZ2L3THX7I2BLLB37YVAWKR2BYGRRPA01234567', 'auctionType': '0', 'region': 'CN', 'verification': 'slide'}, 'type': 'text', 'requestData': {'entry': 'sdk_item', 'venderId': '11961298'}, 'content': '你好', 'sid': 'ce7f35b51a9c00ac898b0fe08608674a'}, 'type': 'chat_message', 'clientTime': 1755076542320, 'datetime': '2025-08-13 17:15:42', 'len': 0, 'from': {'app': 'im.customer', 'art': '', 'clientType': 'android', 'pin': 'jd_thpotntctwys'}, 'subType': 'text', 'id': 'bc3c42a706fa4fa482ff565304c7dfbb', 'to': {'app': 'im.waiter', 'clientType': 'comet', 'pin': 'KLD测试'}, 'lang': 'zh_CN', 'timestamp': 1755076542671} # {'ver': '4.2', 'mid': 366566966, 'body': {'chatinfo': {'venderId': '11961298', 'isJdSuperMarket': '0', 'pid': '10052172306055', 'source': 'jimitwo_service_smart_sdk', 'deviceNo': 'dd_dvc_aes_30EA91E824A2F36365F7B7193C10B76A8842E4E08C0DA687EC9BEB307FCF7195', 'IMService': False, 'distinguishPersonJimi': 2, 'proVer': 'smart_android_15.2.0', 'sid': '538fc3761474ea693812ceed4b39639c', 'entry': 'sdk_item', 'leaveMsgTable': 1, 'venderName': 'TYBOY康路达数字科技专卖店', 'disputeId': -1, 'ddSessionType': '1', 'appId': 'im.waiter', 'systemVer': 'android_13_V2239A', 'eidtoken': 'jdd014TJZZMUXFLXYGJJJ4M3HY3PTU4PBA22SPDIXDWOGZ7XTUTOWTY4LU3VWN6OKDOJPJOVDINMCJXCSPSG5X2K6KWHNQJQOROB57LDG5EY01234567', 'auctionType': '0', 'region': 'CN', 'verification': 'slide'}, 'param': {'venderId': '11961298', 'isJdSuperMarket': '0', 'pid': '10052172306055', 'source': 'jimitwo_service_smart_sdk', 'deviceNo': 'dd_dvc_aes_30EA91E824A2F36365F7B7193C10B76A8842E4E08C0DA687EC9BEB307FCF7195', 'IMService': False, 'distinguishPersonJimi': 2, 'proVer': 'smart_android_15.2.0', 'sid': '538fc3761474ea693812ceed4b39639c', 'entry': 'sdk_item', 'leaveMsgTable': 1, 'venderName': 'TYBOY康路达数字科技专卖店', 'disputeId': -1, 'ddSessionType': '1', 'appId': 'im.waiter', 'systemVer': 'android_13_V2239A', 'eidtoken': 'jdd014TJZZMUXFLXYGJJJ4M3HY3PTU4PBA22SPDIXDWOGZ7XTUTOWTY4LU3VWN6OKDOJPJOVDINMCJXCSPSG5X2K6KWHNQJQOROB57LDG5EY01234567', 'auctionType': '0', 'region': 'CN', 'verification': 'slide'}, 'type': 'text', 'requestData': {'entry': 'sdk_item', 'venderId': '11961298'}, 'content': '你好啊', 'sid': '538fc3761474ea693812ceed4b39639c'}, 'type': 'chat_message', 'clientTime': 1755592905140, 'datetime': '2025-08-19 16:41:45', 'len': 0, 'from': {'app': 'im.customer', 'art': '', 'clientType': 'android', 'pin': 'jd_thpotntctwys'}, 'subType': 'text', 'id': 'd31d369f17f24b05abbe2b7c334f340e', 'to': {'app': 'im.waiter', 'clientType': 'comet', 'pin': 'KLD测试'}, 'lang': 'zh_CN', 'timestamp': 1755592905232} # {'ver': '4.2', 'mid': 366566984, 'body': {'chatinfo': {'venderId': '11961298', 'isJdSuperMarket': '0', 'pid': '10143502227300', 'source': 'jimitwo_service_smart_sdk', 'deviceNo': 'dd_dvc_aes_30EA91E824A2F36365F7B7193C10B76A8842E4E08C0DA687EC9BEB307FCF7195', 'label': 1, 'IMService': False, 'distinguishPersonJimi': 2, 'proVer': 'smart_android_15.2.0', 'sid': '5b5e044c73ee243b7e67eeca5b11393c', 'entry': 'sdk_item', 'leaveMsgTable': 1, 'venderName': 'TYBOY康路达数字科技专卖店', 'disputeId': -1, 'ddSessionType': '1', 'appId': 'im.waiter', 'systemVer': 'android_13_V2239A', 'eidtoken': 'jdd01ZC7NRD2EX45UN4Q5IZUQC4VLKXIKR7LWP2HC45VQRBDXEYHACT6U5KFBIAI52JBHYCO5BLMOHXTYUYU35RQPB2XA37HL4MPP7CXET7Q01234567', 'auctionType': '0', 'region': 'CN', 'verification': 'slide'}, 'param': {'venderId': '11961298', 'isJdSuperMarket': '0', 'pid': '10143502227300', 'source': 'jimitwo_service_smart_sdk', 'deviceNo': 'dd_dvc_aes_30EA91E824A2F36365F7B7193C10B76A8842E4E08C0DA687EC9BEB307FCF7195', 'label': 1, 'IMService': False, 'distinguishPersonJimi': 2, 'proVer': 'smart_android_15.2.0', 'sid': '5b5e044c73ee243b7e67eeca5b11393c', 'entry': 'sdk_item', 'leaveMsgTable': 1, 'venderName': 'TYBOY康路达数字科技专卖店', 'disputeId': -1, 'ddSessionType': '1', 'appId': 'im.waiter', 'systemVer': 'android_13_V2239A', 'eidtoken': 'jdd01ZC7NRD2EX45UN4Q5IZUQC4VLKXIKR7LWP2HC45VQRBDXEYHACT6U5KFBIAI52JBHYCO5BLMOHXTYUYU35RQPB2XA37HL4MPP7CXET7Q01234567', 'auctionType': '0', 'region': 'CN', 'verification': 'slide'}, 'type': 'text', 'requestData': {'entry': 'sdk_item', 'venderId': '11961298'}, 'content': '您好', 'sid': '5b5e044c73ee243b7e67eeca5b11393c'}, 'type': 'chat_message', 'clientTime': 1755595443735, 'datetime': '2025-08-19 17:24:03', 'len': 0, 'from': {'app': 'im.customer', 'art': '', 'clientType': 'android', 'pin': 'jd_thpotntctwys'}, 'subType': 'text', 'id': '800deaf802f9436b98937bb084ee2a56', 'to': {'app': 'im.waiter', 'clientType': 'comet', 'pin': 'KLD测试'}, 'lang': 'zh_CN', 'timestamp': 1755595443839} # {'ver': '4.2', 'mid': 366566986, 'body': {'chatinfo': {'venderId': '11961298', 'isJdSuperMarket': '0', 'pid': '10143502227300', 'source': 'jimitwo_service_smart_sdk', 'deviceNo': 'dd_dvc_aes_30EA91E824A2F36365F7B7193C10B76A8842E4E08C0DA687EC9BEB307FCF7195', 'label': 1, 'IMService': False, 'distinguishPersonJimi': 2, 'proVer': 'smart_android_15.2.0', 'sid': '5b5e044c73ee243b7e67eeca5b11393c', 'entry': 'sdk_item', 'leaveMsgTable': 1, 'venderName': 'TYBOY康路达数字科技专卖店', 'disputeId': -1, 'ddSessionType': '1', 'appId': 'im.waiter', 'systemVer': 'android_13_V2239A', 'eidtoken': 'jdd01ZC7NRD2EX45UN4Q5IZUQC4VLKXIKR7LWP2HC45VQRBDXEYHACT6U5KFBIAI52JBHYCO5BLMOHXTYUYU35RQPB2XA37HL4MPP7CXET7Q01234567', 'auctionType': '0', 'region': 'CN', 'verification': 'slide'}, 'param': {'venderId': '11961298', 'isJdSuperMarket': '0', 'pid': '10143502227300', 'source': 'jimitwo_service_smart_sdk', 'deviceNo': 'dd_dvc_aes_30EA91E824A2F36365F7B7193C10B76A8842E4E08C0DA687EC9BEB307FCF7195', 'label': 1, 'IMService': False, 'distinguishPersonJimi': 2, 'proVer': 'smart_android_15.2.0', 'sid': '5b5e044c73ee243b7e67eeca5b11393c', 'entry': 'sdk_item', 'leaveMsgTable': 1, 'venderName': 'TYBOY康路达数字科技专卖店', 'disputeId': -1, 'ddSessionType': '1', 'appId': 'im.waiter', 'systemVer': 'android_13_V2239A', 'eidtoken': 'jdd01ZC7NRD2EX45UN4Q5IZUQC4VLKXIKR7LWP2HC45VQRBDXEYHACT6U5KFBIAI52JBHYCO5BLMOHXTYUYU35RQPB2XA37HL4MPP7CXET7Q01234567', 'auctionType': '0', 'region': 'CN', 'verification': 'slide'}, 'type': 'text', 'requestData': {'entry': 'sdk_item', 'venderId': '11961298'}, 'content': '我先自己看看谢谢您', 'sid': '5b5e044c73ee243b7e67eeca5b11393c'}, 'type': 'chat_message', 'clientTime': 1755595460153, 'datetime': '2025-08-19 17:24:20', 'len': 0, 'from': {'app': 'im.customer', 'art': '', 'clientType': 'android', 'pin': 'jd_thpotntctwys'}, 'subType': 'text', 'id': 'a3d4de984f2d4811952f8ba872850bc2', 'to': {'app': 'im.waiter', 'clientType': 'comet', 'pin': 'KLD测试'}, 'lang': 'zh_CN', 'timestamp': 1755595460274} except Exception as e: logger.error(f"接收对应监控消息时候产生特殊错误 , 错误信息为{e}") finally: # 清理资源 heartbeat_task.cancel() try: await heartbeat_task except asyncio.CancelledError: pass self._log("🔄 连接断开,准备重连", "INFO") except websockets.ConnectionClosed as e: self._log(f"🔌 连接已关闭: {e.code} {e.reason}", "WARNING") if not await self.handle_reconnect(e): break except (websockets.WebSocketException, aiohttp.ClientError, OSError) as e: self._log(f"🌐 网络异常: {type(e).__name__} - {str(e)}", "WARNING") if not await self.handle_reconnect(e): break except Exception as e: self._log(f"⚠️ 未知异常: {type(e).__name__} - {str(e)}", "ERROR") if not await self.handle_reconnect(e): break # 关闭后端服务连接(JDBackendService没有close方法,跳过) if self.backend_connected: self.backend_connected = False self._log("🛑 消息监听已停止", "INFO") # 新增: GUI集成包装器类 # class JDListenerForGUI: # """用于GUI集成的JD监听包装器 ()""" # # def __init__(self, log_callback=None): # self.fix_jd_util = FixJdCookie(log_callback) # self.log_callback = log_callback # self.running = False # self.stop_event = None # self.username = None # self.password = None # # def _log(self, message, log_type="INFO"): # """处理日志输出""" # if self.log_callback: # self.log_callback(message, log_type) # else: # print(f"[{log_type}] {message}") # # async def start_listening(self, username, password): # """启动监听的主方法""" # try: # # 存储用户名和密码 # self.username = username # self.password = password # # self._log("🔵 开始JD平台连接流程", "INFO") # print("🔵 开始JD平台连接流程 - 调试断点1") # # # 1. 获取店铺信息 # self._log("🔵 步骤2: 获取店铺信息...", "INFO") # print("🔵 步骤2: 获取店铺信息... - 调试断点2") # store_today = self.fix_jd_util.get_today_store() # if not store_today: # self._log("❌ 未找到店铺信息", "ERROR") # print("❌ 未找到店铺信息") # return False # self._log(f"✅ 获取到店铺信息: {store_today.get("id", '未知')}", "SUCCESS") # print(f"✅ 获取到店铺信息: {store_today.get("id", '未知')}") # # # 2. 连接后端服务获取cookie # self._log("🔵 步骤3: 连接后端服务获取cookie...", "INFO") # store_id = str(store_today.get('id', '')) # # jd_cookie = await self.fix_jd_util.connect_backend_service(store_id, username, password) # # if not jd_cookie or not jd_cookie.get('status'): # self._log("❌ 从后端服务获取cookie失败", "ERROR") # if jd_cookie and jd_cookie.get('verify_link'): # self._log(f"❌ 需要验证登录,验证链接: {jd_cookie.get('verify_link')}", "ERROR") # return False # # self._log("✅ 从后端服务获取cookie成功", "SUCCESS") # cookies_str = jd_cookie.get('cookie') # self._log(f"📦 获取到cookie: {cookies_str[:50] + '...' if cookies_str else '无'}", "DEBUG") # # # 3. 获取配置信息 # self._log("🔵 步骤4: 获取配置信息...", "INFO") # config = None # for i in range(3): # try: # config = self.fix_jd_util.get_config(cookies_str) # if config and config.get('data'): # self._log(f"✅ 第{i + 1}次尝试获取配置成功", "SUCCESS") # break # else: # self._log(f"⚠️ 第{i + 1}次尝试获取配置返回空数据", "WARNING") # except Exception as e: # self._log(f"获取配置异常({i + 1}/3): {str(e)}", "WARNING") # await asyncio.sleep(3) # # if not config or not config.get('data'): # self._log("获取配置失败", "ERROR") # return False # # # 4. 提取必要参数 # self._log("🔵 步骤5: 提取配置参数...", "INFO") # aid = config["data"].get("aid") # vender_id = config["data"].get("venderId") # pin_zj = config["data"].get("pin", "").lower() # # if not all([aid, vender_id, pin_zj]): # self._log("❌ 登录信息不完整", "ERROR") # return False # # self._log(f"获取到配置: aid={aid}, vender_id={vender_id}, pin_zj={pin_zj}", "INFO") # # # 5. 启动监听 # self._log("🔵 步骤6: 启动消息监听...", "INFO") # self.stop_event = asyncio.Event() # self.running = True # # self._log("🎉开始监听JD平台消息...", "SUCCESS") # # # 调用实际的监听方法 # await self.fix_jd_util.message_monitoring( # cookies_str=cookies_str, # aid=aid, # pin_zj=pin_zj, # vender_id=vender_id, # store=store_today, # stop_event=self.stop_event, # username=username, # password=password # ) # # return True # # except Exception as e: # self._log(f"监听过程中出现严重错误: {str(e)}", "ERROR") # import traceback # self._log(f"错误详情: {traceback.format_exc()}", "DEBUG") # return False # # def stop_listening(self): # """停止监听""" # if self.stop_event: # self.stop_event.set() # self.running = False # self._log("JD监听已停止", "INFO") # 新增: GUI集成包装器类 class JDListenerForGUI: """用于GUI集成的JD监听包装器 ()""" def __init__(self, log_callback=None): self.fix_jd_util = FixJdCookie(log_callback) self.log_callback = log_callback self.running = False self.stop_event = None def _log(self, message, log_type="INFO"): """处理日志输出""" if self.log_callback: self.log_callback(message, log_type) else: print(f"[{log_type}] {message}") async def start_listening(self, username, password): """启动监听的主方法""" try: self._log("🔵 开始JD平台连接流程", "INFO") print("🔵 开始JD平台连接流程 - 调试断点1") # 1. 获取店铺信息 self._log("🔵 步骤2: 获取店铺信息...", "INFO") print("🔵 步骤2: 获取店铺信息... - 调试断点2") store_today = self.fix_jd_util.get_today_store() if not store_today: self._log("❌ 未找到店铺信息", "ERROR") print("❌ 未找到店铺信息") return False self._log(f"✅ 获取到店铺信息: {store_today.get("id", '未知')}", "SUCCESS") print(f"✅ 获取到店铺信息: {store_today.get("id", '未知')}") cookie_str = store_today.get('platform_cookie') self._log(f"📦 当前存储的cookie: {cookie_str[:50] + '...' if cookie_str else '无'}", "DEBUG") # 2. 获取或更新cookie - 在这里设置断点② self._log("🔵 步骤3: 获取或更新JD cookie...", "INFO") # 2. 获取或更新cookie jd_login_cookie = self.fix_jd_util.get_cookies( username=username, password=password, cookies_str=cookie_str ) if not jd_login_cookie['status']: fail_status = jd_login_cookie.get('verify_link', None) if fail_status: self._log(f"❌ JD登录失败: , 失败类型为二次验证(手机验证码) 对应url:{fail_status}", "ERROR") else: # 表示没有返回verify_url 未知错误 self._log(f"❌ JD登录失败: , 失败类型为:{fail_status} 未知错误 请单独测试login方法") return False self._log("✅ JD登录成功", "SUCCESS") self._log(f"📦 获取到cookie: {jd_login_cookie['cookie'][:50] + '...'}", "DEBUG") self._log("🔵 步骤4: 获取配置信息...", "INFO") # 3. 获取配置信息 config = None for i in range(3): try: config = self.fix_jd_util.get_config(jd_login_cookie['cookie']) if config and config.get('data'): self._log(f"✅ 第{i + 1}次尝试获取配置成功", "SUCCESS") break else: self._log(f"⚠️ 第{i + 1}次尝试获取配置返回空数据", "WARNING") except Exception as e: self._log(f"获取配置异常({i + 1}/3): {str(e)}", "WARNING") await asyncio.sleep(3) if not config or not config.get('data'): self._log("获取配置失败", "ERROR") return False # 4. 提取必要参数 self._log("🔵 步骤5: 提取配置参数...", "INFO") aid = config["data"].get("aid") vender_id = config["data"].get("venderId") pin_zj = config["data"].get("pin", "").lower() if not all([aid, vender_id, pin_zj]): self._log("❌ 登录信息不完整", "ERROR") return False self._log(f"获取到配置: aid={aid}, vender_id={vender_id}, pin_zj={pin_zj}", "INFO") # 5. 启动监听 self._log("🔵 步骤6: 启动消息监听...", "INFO") self.stop_event = asyncio.Event() self.running = True self._log("🎉开始监听JD平台消息...", "SUCCESS") # 调用实际的监听方法 await self.fix_jd_util.message_monitoring( cookies_str=jd_login_cookie['cookie'], aid=aid, pin_zj=pin_zj, vender_id=vender_id, store=store_today, stop_event=self.stop_event ) return True except Exception as e: self._log(f"监听过程中出现严重错误: {str(e)}", "ERROR") import traceback self._log(f"错误详情: {traceback.format_exc()}", "DEBUG") return False async def start_with_cookies(self, store_id: str, cookies: str): """使用下发的cookies与store_id直接建立JD平台WS并开始监听""" try: self._log("🔵 [JD] 收到后端登录指令,开始使用cookies连接平台", "INFO") # 获取平台配置 config = None for i in range(3): try: config = self.fix_jd_util.get_config(cookies) if config and config.get('data'): self._log(f"✅ 第{i + 1}次尝试获取配置成功", "SUCCESS") break else: self._log(f"⚠️ 第{i + 1}次尝试获取配置返回空数据", "WARNING") except Exception as e: self._log(f"获取配置异常({i + 1}/3): {str(e)}", "WARNING") await asyncio.sleep(3) if not config or not config.get('data'): self._log("获取配置失败", "ERROR") return False aid = config["data"].get("aid") vender_id = config["data"].get("venderId") pin_zj = config["data"].get("pin", "").lower() if not all([aid, vender_id, pin_zj]): self._log("❌ 登录信息不完整", "ERROR") return False # 建立与后端的AI通道(确保使用GUI的store_id) await self.fix_jd_util.connect_backend_service(store_id) # 启动监听 self.stop_event = asyncio.Event() self.running = True store = {'id': store_id} self._log("🎉 [JD] 开始监听平台消息", "SUCCESS") await self.fix_jd_util.message_monitoring( cookies_str=cookies, aid=aid, pin_zj=pin_zj, vender_id=vender_id, store=store, stop_event=self.stop_event ) return True except Exception as e: self._log(f"[JD] 监听过程中出现错误: {str(e)}", "ERROR") import traceback self._log(f"错误详情: {traceback.format_exc()}", "DEBUG") return False def stop_listening(self): """停止监听""" if self.stop_event: self.stop_event.set() self.running = False self._log("JD监听已停止", "INFO") async def main(): username = "KLD测试" password = "kld168168" fix_jd_util = FixJdCookie() store_today = fix_jd_util.get_today_store() # 检查店铺信息 if not store_today: logger.error("❌ 未找到店铺信息") return store_id = str(store_today.get('id', '')) logger.info(f"✅ 获取到店铺信息: {store_id}") try: # 1. 直接连接后端服务获取 cookie logger.info("🔵 步骤1: 连接后端服务获取 cookie...") jd_cookie = await fix_jd_util.connect_backend_service(store_id, username, password) print(f"完整的cookie数据: {jd_cookie}") if not jd_cookie or not jd_cookie.get('status'): logger.error("❌ 从后端服务获取 cookie 失败") if jd_cookie and jd_cookie.get('verify_link'): logger.error(f"❌ 需要验证登录,验证链接: {jd_cookie.get('verify_link')}") return cookies_str = jd_cookie.get('cookie') logger.info("✅ 从后端服务获取 cookie 成功") logger.debug(f"📦 获取到 cookie: {cookies_str[:50] + '...' if cookies_str else '无'}") # 测试cookie后端生成的有效性 # cookies_str = "shshshfpa=112082d7-6f59-f35d-093a-e4c035938ab8-1754961318; shshshfpx=112082d7-6f59-f35d-093a-e4c035938ab8-1754961318; __jdu=17549613198151684402245; user-key=13d41f1d-5d8a-447e-a3d1-19c964e2fa13; __jdv=76161171|direct|-|none|-|1756259208627; areaId=18; ipLoc-djd=18-1482-48942-49052; pinId=EifU7rHmf-gwaaxnNveDFw; _tp=%2Bw%2Fs5xFMS9g0SkUd93EGxqh4USt1M5LwIyzTR4TbgCM%3D; _pst=KLD%E6%B5%8B%E8%AF%95; pin=KLD%E6%B5%8B%E8%AF%95; unick=u40d47u5ckctnz; PCSYCityID=CN_430000_430100_0; ceshi3.com=000; sdtoken=AAbEsBpEIOVjqTAKCQtvQu17tqE2au0-pftQOStCUKRw9JX4gfXHESNiN_EgrTvDv8qS5IZHipleuQxIX9JXWojJS8sKju6Vh1Qqt5LjakfSeWqqAOL-HhXeBn9m; shshshfpb=BApXS1gvPG_xA1izHnR4d6bvzjbeOVTtiBhbXFDdg9xJ1Mh6uh462; 3AB9D23F7A4B3CSS=jdd03OFWZGTHWHQYLKSX5BEWHXMTLYAHXEEXC7HBOIZDBIPLMQZT5LTHKICALRZU5ZDL6X6T3NMHRNSJJDX6BTEI6AVJS5IAAAAMZDDKTS5YAAAAACJZVYQP7FYJA3UX; _gia_d=1; wlfstk_smdl=0whbpvk7ixj27lbykeeb8rh6iulon3fo; __jda=95931165.17549613198151684402245.1754961320.1757039732.1757053789.21; __jdb=95931165.30.17549613198151684402245|21.1757053789; __jdc=95931165; 3AB9D23F7A4B3C9B=OFWZGTHWHQYLKSX5BEWHXMTLYAHXEEXC7HBOIZDBIPLMQZT5LTHKICALRZU5ZDL6X6T3NMHRNSJJDX6BTEI6AVJS5I; TrackID=1C24wlDX-QPSyJRaB_1YHqCLhBW6qIo2stht_nwl5g9fGI-lJpP-CZT3TaCr9QVppcvhilhcCe1VhgXMKGWao6Fd3wJ5bQhJ9w9VwWcYOySsXfbCTQbteFWevVN1ZQYp9; thor=4E136F9D9458703D01BE17544D30601F9649F79B89E5FC150CA91054C788CAA1C82670080466F219573AE7FD6EB9ABDF9F52D520671373DAD721CC3B78613FABC99ADA1FAC8E92CDC42F5131682B1F008727F1BA49783B055AFED9349D0B79E53A51F059A1DDE3FC181DD38D1B388D829CE8ADD775D0D30C38A8CAD0519DCD0C; flash=3_KFKaImBVn2spH8stTd9wjKlZQTxgYcZCPXXP_axvJMphR3w29aJNU2c2qPReKxWHRX1lzJ7MfD9iQmHQI-2cKp0dYzs6YsH9eDyB3lQxuu6MtkM8jCiBynVSdRBnr21oDrLKGMeYG6yYlcEsAsbe8OC-yKO69758MJYyMZd_4soV; light_key=AASBKE7rOxgWQziEhC_QY6yakCROyWTrRIF9K9uCpw_IcR8gGNaL7IM6AQuVa-3pJoC9wTze; logining=1; rita=A1EA9FF92ADE7FC61C825E83F126B9E97EF9243BEED9B77E4F7110D6081254A8EEAA66B26BFA00E08CBD8B0C88DD3D292CAD14839A50184501755B761A11F679F63D9DAA76E6785799D2F78AE378F76F32E05C1914C1132995B15CC5F79AFB9314A9D6FE7911DAFE1D958906C016E724" # 2. 获取配置信息 logger.info("🔵 步骤2: 获取配置信息...") config = None for i in range(3): try: config = fix_jd_util.get_config(cookies_str) if config and config.get('data'): logger.info(f"✅ 第{i + 1}次尝试获取配置成功") break else: logger.warning(f"⚠️ 第{i + 1}次尝试获取配置返回空数据") except Exception as e: logger.error(f"获取配置异常({i + 1}/3): {e}") if i == 2: return await asyncio.sleep(3) # 使用异步等待 if not config or not config.get('data'): logger.error("❌ 获取配置失败") return # 3. 提取必要参数 logger.info("🔵 步骤3: 提取配置参数...") aid = config["data"].get("aid") vender_id = config["data"].get("venderId") pin_zj = config["data"].get("pin", "").lower() if not all([aid, vender_id, pin_zj]): logger.error("❌ 登录信息不完整,需要重新登录") return logger.info(f"✅ 获取到配置: aid={aid}, vender_id={vender_id}, pin_zj={pin_zj}") # 4. 启动监听 logger.info("🔵 步骤4: 启动消息监听...") stop_event = asyncio.Event() try: await fix_jd_util.message_monitoring( cookies_str=cookies_str, aid=aid, pin_zj=pin_zj, vender_id=vender_id, store=store_today, stop_event=stop_event, username=username, password=password ) except Exception as e: logger.error(f"❌ 监听过程中出现错误: {e}") import traceback logger.error(f"错误详情: {traceback.format_exc()}") except Exception as e: logger.error(f"❌ 主程序执行过程中出现错误: {e}") import traceback logger.error(f"错误详情: {traceback.format_exc()}") if __name__ == '__main__': asyncio.run(main()) # asyncio.run(new_test_login())