Files
shuidrop_gui/Utils/JD/JdUtils.py

982 lines
49 KiB
Python
Raw Normal View History

2025-09-12 20:42:00 +08:00
# -*- coding: utf-8 -*-
# python let's go
# 编辑人:kris思成
import asyncio
import hashlib
import traceback
from datetime import datetime
import aiohttp
import jsonpath
import websockets
from loguru import logger
import requests
import json
import time
import threading
# 定义持久化数据类
class WebsocketManager:
_instance = None
_lock = threading.Lock()
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._store = {}
cls._instance._lock = threading.RLock()
return cls._instance
def on_connect(self, shop_key, ws, **kwargs):
"""完全保持原有数据结构"""
with self._lock:
entry = self._store.setdefault(shop_key, {
'platform': None,
'customers': [],
'user_assignments': {}
})
entry['platform'] = {
'ws': ws, # 注意:这里存储的是强引用
'last_heartbeat': datetime.now(),
**kwargs
}
return entry
def get_connection(self, shop_key):
with self._lock:
return self._store.get(shop_key)
def remove_connection(self, shop_key):
with self._lock:
if shop_key in self._store:
del self._store[shop_key]
class JDBackendService:
"""京东后端服务调用类(已废弃:使用单后端连接 BackendClient 代替)"""
def __init__(self, *args, **kwargs):
self.current_store_id = ""
async def connect(self, store_id, *args, **kwargs):
try:
self.current_store_id = str(store_id or "")
except Exception:
self.current_store_id = ""
return True
async def send_message_to_backend(self, platform_message):
"""改为通过单后端连接发送需携带store_id"""
try:
from WebSocket.backend_singleton import get_backend_client
backend = get_backend_client()
if not backend:
return None
# 从platform_message中构造统一上行结构并附加store_id
body = platform_message.get('body', {}) if isinstance(platform_message, dict) else {}
sender_pin = platform_message.get('from', {}).get('pin', '') if isinstance(platform_message, dict) else ''
# 优先取消息内的store_id其次取body内再次退回当前会话store_id
store_id = (platform_message.get('store_id')
or body.get('store_id')
or self.current_store_id
or '')
body_type = body.get('type', 'text')
content_for_backend = body.get('content', '')
if body_type in ('image', 'video'):
# 统一从常见字段取URL
content_for_backend = (
body.get('url')
or body.get('imageUrl')
or body.get('videoUrl')
or body.get('picUrl')
or content_for_backend
)
# 检测文本中的商品卡片/订单卡片
if body_type == 'text' and isinstance(content_for_backend, str):
try:
import re as _re
text = content_for_backend.strip()
# 商品卡片JD商品URL
if _re.search(r"https?://item(\.m)?\.jd\.com/\d+\.html(\?.*)?", text):
body_type = 'product_card'
else:
# 订单卡片多样式匹配
# 1) 订单在前:咨询/查询/订单号:<order>[,]? 商品(ID|id|号|编号)<product>
m1 = _re.search(
r"(?:咨询订单号|查询订单号|订单号)\s*[:]\s*(\d+)[,]?\s*商品(?:ID|id|号|编号)\s*[:]\s*(\d+)",
text)
# 2) 商品在前:商品(ID|id|号|编号)<product>[,]? (咨询/查询)?订单号:<order>
m2 = _re.search(
r"商品(?:ID|id|号|编号)\s*[:]\s*(\d+)[,]?\s*(?:咨询订单号|查询订单号|订单号)\s*[:]\s*(\d+)",
text)
if m1 or m2:
body_type = 'order_card'
if m1:
order_number, product_id = m1.group(1), m1.group(2)
else:
product_id, order_number = m2.group(1), m2.group(2)
# 归一化 content
content_for_backend = f"商品id{product_id} 订单号:{order_number}"
except Exception:
pass
msg = {
'type': 'message',
'content': content_for_backend,
'msg_type': body_type,
'sender': {'id': sender_pin},
'store_id': store_id
}
backend.send_message(msg)
return None
except Exception:
return None
class FixJdCookie:
def __init__(self, log_callback=None):
# 定义一些常用参数
super().__init__() # 继承一些父类的初始化参数
self.ws_manager = WebsocketManager()
self.log_callback = log_callback # 存储日志回调
# 新增后端服务实例
self.backend_service = JDBackendService()
self.backend_connected = False
# 新增重连参数
self.reconnect_attempts = 0
self.max_reconnect_attempts = 10 # 最大重连次数
self.base_reconnect_delay = 1.0 # 基础重连延迟
self.max_reconnect_delay = 60.0 # 最大重连延迟
self.reconnect_backoff = 1.5 # 退避系数
def _log(self, message, log_type="INFO"):
"""内部日志方法"""
if self.log_callback:
self.log_callback(message, log_type)
else:
print(f"[{log_type}] {message}")
def get_config(self, cookies_str):
"""获取配置"""
headers = {
"cookie": cookies_str,
"user-agent": "Mozilla/5.0",
"referer": "https://dongdong.jd.com/",
}
response = requests.get("https://dongdong.jd.com/workbench/checkin.json", headers=headers,
params={"version": "2.6.3", "client": "openweb"})
return response.json()
async def init_wss(self, ws, aid, pin_zj):
"""初始化 socket"""
await self.send_heartbeat(ws, aid, pin_zj)
print("开始监听初始化")
auth = {
"id": hashlib.md5(str(int(time.time() * 1000)).encode()).hexdigest(),
"aid": aid,
"lang": "zh_CN",
"timestamp": int(time.time() * 1000),
"type": "auth",
"body": {"presence": 1, "clientVersion": "2.6.3"},
"to": {"app": "im.waiter"},
"from": {"app": "im.waiter", "pin": pin_zj, "clientType": "comet", "dvc": "device1234"}
}
await ws.send(json.dumps(auth))
async def waiter_status_switch(self, ws, aid, pin_zj):
"""设置接待状态"""
message = {
"id": hashlib.md5(str(int(time.time() * 1000)).encode()).hexdigest(),
"aid": aid,
"lang": "zh_CN",
"timestamp": int(time.time() * 1000),
"from": {
"app": "im.waiter",
"pin": pin_zj,
"art": "customerGroupMsg",
"clientType": "comet"
},
"type": "waiter_status_switch",
"body": {
"s": 1
}
}
await ws.send(json.dumps(message))
async def transfer_customer(self, ws, aid, pin, pin_zj, chat_name):
"""异步客服转接 在发送的消息为客服转接的关键词的时候"""
message = {
"id": hashlib.md5(str(int(time.time() * 1000)).encode()).hexdigest(),
"aid": aid,
"lang": "zh_CN",
"timestamp": int(time.time() * 1000),
"from": {
"app": "im.waiter", "pin": pin_zj, "art": "customerGroupMsg", "clientType": "comet"
},
"type": "chat_transfer_partern",
"body": {
"customer": pin, "cappId": "im.customer", "waiter": chat_name, "reason": "",
"ext": {"pid": ""}, "pid": ""
}
}
try:
await ws.send(json.dumps(message))
return True
except Exception:
traceback.print_exc()
return False
async def send_message(self, ws, pin, aid, pin_zj, vender_id, content):
"""异步发送单条消息"""
try:
print('本地发送消息')
message = {
"ver": "4.3",
"type": "chat_message",
"from": {"pin": pin_zj, "app": "im.waiter", "clientType": "comet"},
"to": {"app": "im.customer", "pin": pin},
"id": hashlib.md5(str(int(time.time() * 1000)).encode()).hexdigest(),
"lang": "zh_CN",
"aid": aid,
"timestamp": int(time.time() * 1000),
"readFlag": 0,
"body": {
"content": content,
"translated": False,
"param": {"cusVenderId": vender_id},
"type": "text"
}
}
await ws.send(json.dumps(message))
logger.info(f"消息已经发送到客户端[info] {pin}: {content[:20]} ...")
except websockets.ConnectionClosed:
logger.error('本地发送消息失败 连接关闭')
raise
except Exception as e:
# 同时这里也要及时进行raise抛出 这样比较好让系统可以看出 异常了可以抛出信息不至于后续被认为
logger.error(f"消息发送过程中出现特殊异常异常信息为: {e}")
raise
def get_userinfo(self, response_text):
"""获取用户 pin 并存入 pins 列表"""
pin = jsonpath.jsonpath(json.loads(response_text), "$..from.pin")
return pin[0] if pin else None
def is_merchants(self, store: object, response):
"""判断消息是否来自顾客"""
send = response['from']['pin']
# 补充: 方法lower()是转化为小写 upper()是转化为大写 title()是每个单词首字母大写其余小写
if send.lower() == "KLD测试".lower():
return False
return True
async def send_heartbeat(self, ws, aid, pin_zj):
"""发送心跳包"""
msg = {
"id": hashlib.md5(str(int(time.time() * 1000)).encode()).hexdigest(),
"type": "client_heartbeat",
"aid": aid,
"ver": "4.1",
"lang": "zh_CN",
"from": {"pin": pin_zj, "app": "im.waiter", "art": "customerGroupMsg", "clientType": "comet"}
}
await ws.send(json.dumps(msg, separators=(',', ':'))) # 使用最简心跳包模式 来节约部分性能 减少传输压力
async def heartbeat_loop(self, websocket, aid, pin_zj):
"""独立的心跳循环"""
"""
优化心跳 循环 新增改进
1. 心跳间隔动态调整
2. 异常重试机制
3. 心跳超时控制
4. 状态监控
"""
retry_count = 0
max_retries = 3
base_interval = 3.0 # 基础间隔1s
backoff_factor = 1.5 # 退避系数
timeout = 10.0
while True:
try:
start_time = time.monotonic()
# 使用websocket原生的ping/pong机制
pong_waiter = await websocket.ping()
await asyncio.wait_for(pong_waiter, timeout=timeout)
# 如果需要发送自定义心跳包
await self.send_heartbeat(websocket, aid, pin_zj)
# 计算剩余等待时间
elapsed = time.monotonic() - start_time
sleep_time = max(0.0, base_interval - elapsed)
if sleep_time > 0:
await asyncio.sleep(sleep_time)
retry_count = 0 # 重置重试计数
except asyncio.TimeoutError:
logger.warning(f'心跳超时,已用时: {time.monotonic() - start_time:.2f}')
retry_count += 1
except websockets.ConnectionClosed:
logger.error("连接关闭,心跳已停止")
break
except Exception as e:
logger.error(f"心跳异常: {e}", exc_info=True)
retry_count += 1
if retry_count >= max_retries:
logger.error(f"心跳连续失败{retry_count}次,终止循环")
break
# 退避策略
if retry_count > 0:
backoff_time = min(
base_interval * (backoff_factor ** (retry_count - 1)),
60.0 # 最大等待60秒
)
logger.debug(f"心跳失败,等待{backoff_time:.2f}秒后重试")
await asyncio.sleep(backoff_time)
''' 整理重连方法 '''
async def calculate_reconnect_delay(self):
"""计算指数退避的重连延迟时间"""
delay = self.base_reconnect_delay * (self.reconnect_backoff ** self.reconnect_attempts)
return min(delay, self.max_reconnect_delay)
async def should_reconnect(self):
"""判断是否应该继续重连"""
if self.reconnect_attempts >= self.max_reconnect_attempts:
self._log(f"已达到最大重连次数({self.max_reconnect_attempts}),停止重连", "ERROR")
return False
return True
async def handle_reconnect(self, exception=None):
"""处理重连逻辑"""
if exception:
error_type = type(exception).__name__
error_msg = str(exception)
self._log(f"连接异常[{error_type}]: {error_msg}", "WARNING")
if not await self.should_reconnect():
return False
delay = await self.calculate_reconnect_delay()
self._log(f"{self.reconnect_attempts + 1}次重连尝试,等待{delay:.1f}秒...", "WARNING")
await asyncio.sleep(delay)
self.reconnect_attempts += 1
return True
''' 后端服务调用方法 '''
async def connect_backend_service(self, store_id):
"""连接后端AI服务"""
try:
success = await self.backend_service.connect(store_id)
if success:
self.backend_connected = True
self._log("✅ 后端AI服务连接成功", "SUCCESS")
return success
except Exception as e:
self._log(f"❌ 连接后端AI服务失败: {e}", "ERROR")
return False
async def get_ai_reply_from_backend(self, platform_message):
"""从后端服务获取AI回复"""
# 首先检查后端服务连接状态
if not self.backend_connected:
# 尝试重新连接
store_id = str(platform_message.get('body', {}).get('chatinfo', {}).get('venderId', ''))
if store_id:
self.backend_connected = await self.connect_backend_service(store_id)
if not self.backend_connected:
self._log("❌ 后端服务未连接尝试重新连接失败使用本地AI回复", "WARNING")
# 降级到本地AI回复
customer_sid = platform_message.get('body', {}).get('chatinfo', {}).get('sid', '')
customer_message = platform_message.get('body', {}).get('content', '')
return
try:
# 推送给后端由后端异步回传AI消息到GUI此处不进行本地立即回复
await self.backend_service.send_message_to_backend(platform_message)
return None
except Exception as e:
self._log(f"❌ 获取AI回复失败: {e}使用本地AI", "ERROR")
customer_sid = platform_message.get('body', {}).get('chatinfo', {}).get('sid', '')
customer_message = platform_message.get('body', {}).get('content', '')
return
async def process_incoming_message(self, response, ws, aid, pin_zj, vender_id, store):
"""处理接收到的消息"""
try:
# 解析消息
json_resp = json.loads(response) if isinstance(response, (str, bytes)) else response
# 验证消息格式
if not all([
json_resp,
json_resp.get('type') == "chat_message",
json_resp.get('ver') == "4.2",
isinstance(json_resp.get("body"), dict)
]):
return
# 过滤非客户消息
pin = self.get_userinfo(response)
if not self.is_merchants(store, json_resp):
return
# 提取消息内容
message_type = json_resp['body']['type']
if message_type == 'text':
customer_message = json_resp['body']['content']
elif message_type == 'image':
customer_message = f"[图片] {json_resp['body'].get('url') or json_resp['body'].get('imageUrl') or json_resp['body'].get('picUrl') or ''}"
elif message_type == 'video':
customer_message = f"[视频] {json_resp['body'].get('url') or json_resp['body'].get('videoUrl') or ''}"
else:
return
self._log(f"📩 收到客户消息: {pin}: {customer_message[:100]}...", "INFO")
# 从后端服务获取AI回复单连接模式仅转交不本地立即回复
ai_result = await self.get_ai_reply_from_backend(json_resp)
if isinstance(ai_result, str) and ai_result.strip():
# 发送回复消息(仅当本地生成/降级时)
await self.send_message(
ws=ws, pin=pin, aid=aid,
pin_zj=pin_zj, vender_id=vender_id,
content=ai_result
)
self._log(f"📤 已发送回复: {ai_result[:100]}...", "INFO")
else:
# 正常链路已转交后端AI等待后端异步回传并由 GUI 转发到平台
self._log("🔄 已转交后端AI处理等待平台回复下发", "INFO")
except json.JSONDecodeError:
self._log("❌ 消息JSON解析失败", "ERROR")
except Exception as e:
self._log(f"❌ 消息处理失败: {e}", "ERROR")
async def message_monitoring(self, cookies_str, aid, pin_zj, vender_id, store, sleep_time=0.5, stop_event=None):
print("✅ DEBUG 进入message_monitoring方法")
print(f"参数验证: cookies={bool(cookies_str)}, aid={aid}, pin_zj={pin_zj}")
# 连接后端AI服务 - 使用店铺ID或venderId
store_id = str(store.get('id', '')) or str(vender_id)
self._log(f"🔗 尝试连接后端服务店铺ID: {store_id}", "DEBUG")
backend_connected = await self.connect_backend_service(store_id)
if not backend_connected:
self._log("⚠️ 后端服务连接失败将使用本地AI回复", "WARNING")
else:
self._log("✅ 后端服务连接成功", "SUCCESS")
stop_event = stop_event or asyncio.Event() # 如果外部没传,自己创建
uri = "wss://dongdong.jd.com/workbench/websocket"
headers = {"cookie": cookies_str}
self._log(f"🔵 开始连接WebSocket: {uri}", "INFO")
# 充值重连计数器
self.reconnect_attempts = 0
while not stop_event.is_set():
try:
self._log(f"🔄 尝试连接WebSocket", "INFO")
async with websockets.connect(uri, additional_headers=headers, ping_interval=6, ping_timeout=10, close_timeout=1, max_queue=1024) as ws:
# 连接成功,重置重连计数器
self.reconnect_attempts = 0
self._log("✅ WebSocket-JD连接成功", "SUCCESS")
await self.init_wss(ws, aid, pin_zj)
# === 验证连接成功的核心指标 ===
# print(f"✅ 连接状态: open={ws.open}, closed={ws.closed}")
print(f"🖥️ 服务端地址: {ws.remote_address}")
# --- 注册连接信息到全局管理
shop_key = f"京东:{store['id']}"
loop = asyncio.get_running_loop()
entry = self.ws_manager.on_connect(
shop_key=shop_key,
ws=ws,
vender_id=vender_id,
aid=aid,
pin_zj=pin_zj,
platform="京东",
loop=loop
)
await self.waiter_status_switch(ws=ws, aid=aid, pin_zj=pin_zj)
heartbeat_task = asyncio.create_task(self.heartbeat_loop(ws, aid, pin_zj))
message_tasks = set()
try:
while not stop_event.is_set(): # 检查是否收到中止信号
try:
print(f"等待监听消息-{datetime.now()}")
response = await asyncio.wait_for(ws.recv(), timeout=1)
print(f"原始消息类型:{type(response)}, 消息体为: {response}")
await self.process_incoming_message(response, ws, aid, pin_zj, vender_id, store)
# 安全解析消息
json_resp = json.loads(response) if isinstance(response, (str, bytes)) else response
print(json_resp)
ver = json_resp.get("ver")
print(f"版本{ver}")
except asyncio.TimeoutError:
continue
except websockets.ConnectionClosed:
# 连接关闭 跳出内层循环进行重连
break
except Exception as e:
self._log(f"消息处理异常: {e}", "ERROR")
continue
await asyncio.sleep(sleep_time)
###
##{'ver': '4.2', 'mid': 366566937, 'body': {'chatinfo': {'venderId': '11961298', 'isJdSuperMarket': '0', 'pid': '10143502227300', 'source': 'jimitwo_service_smart_sdk', 'deviceNo': 'dd_dvc_aes_30EA91E824A2F36365F7B7193C10B76A8842E4E08C0DA687EC9BEB307FCF7195', 'label': 1, 'IMService': False, 'distinguishPersonJimi': 2, 'proVer': 'smart_android_15.2.0', 'sid': 'ce7f35b51a9c00ac898b0fe08608674a', 'entry': 'sdk_item', 'leaveMsgTable': 1, 'venderName': 'TYBOY康路达数字科技专卖店', 'disputeId': -1, 'ddSessionType': '1', 'appId': 'im.waiter', 'systemVer': 'android_13_V2239A', 'eidtoken': 'jdd01C5XRGQHGNJBSFYWWA7RF54QXYHF2N34TM32NGLZV6YAAPPNKWLAIQ2MZV25T4QZ2TJE4HRF6UZ2L3THX7I2BLLB37YVAWKR2BYGRRPA01234567', 'auctionType': '0', 'region': 'CN', 'verification': 'slide'}, 'param': {'venderId': '11961298', 'isJdSuperMarket': '0', 'pid': '10143502227300', 'source': 'jimitwo_service_smart_sdk', 'deviceNo': 'dd_dvc_aes_30EA91E824A2F36365F7B7193C10B76A8842E4E08C0DA687EC9BEB307FCF7195', 'label': 1, 'IMService': False, 'distinguishPersonJimi': 2, 'proVer': 'smart_android_15.2.0', 'sid': 'ce7f35b51a9c00ac898b0fe08608674a', 'entry': 'sdk_item', 'leaveMsgTable': 1, 'venderName': 'TYBOY康路达数字科技专卖店', 'disputeId': -1, 'ddSessionType': '1', 'appId': 'im.waiter', 'systemVer': 'android_13_V2239A', 'eidtoken': 'jdd01C5XRGQHGNJBSFYWWA7RF54QXYHF2N34TM32NGLZV6YAAPPNKWLAIQ2MZV25T4QZ2TJE4HRF6UZ2L3THX7I2BLLB37YVAWKR2BYGRRPA01234567', 'auctionType': '0', 'region': 'CN', 'verification': 'slide'}, 'type': 'text', 'requestData': {'entry': 'sdk_item', 'venderId': '11961298'}, 'content': '你好', 'sid': 'ce7f35b51a9c00ac898b0fe08608674a'}, 'type': 'chat_message', 'clientTime': 1755076542320, 'datetime': '2025-08-13 17:15:42', 'len': 0, 'from': {'app': 'im.customer', 'art': '', 'clientType': 'android', 'pin': 'jd_thpotntctwys'}, 'subType': 'text', 'id': 'bc3c42a706fa4fa482ff565304c7dfbb', 'to': {'app': 'im.waiter', 'clientType': 'comet', 'pin': 'KLD测试'}, 'lang': 'zh_CN', 'timestamp': 1755076542671}
# {'ver': '4.2', 'mid': 366566966, 'body': {'chatinfo': {'venderId': '11961298', 'isJdSuperMarket': '0', 'pid': '10052172306055', 'source': 'jimitwo_service_smart_sdk', 'deviceNo': 'dd_dvc_aes_30EA91E824A2F36365F7B7193C10B76A8842E4E08C0DA687EC9BEB307FCF7195', 'IMService': False, 'distinguishPersonJimi': 2, 'proVer': 'smart_android_15.2.0', 'sid': '538fc3761474ea693812ceed4b39639c', 'entry': 'sdk_item', 'leaveMsgTable': 1, 'venderName': 'TYBOY康路达数字科技专卖店', 'disputeId': -1, 'ddSessionType': '1', 'appId': 'im.waiter', 'systemVer': 'android_13_V2239A', 'eidtoken': 'jdd014TJZZMUXFLXYGJJJ4M3HY3PTU4PBA22SPDIXDWOGZ7XTUTOWTY4LU3VWN6OKDOJPJOVDINMCJXCSPSG5X2K6KWHNQJQOROB57LDG5EY01234567', 'auctionType': '0', 'region': 'CN', 'verification': 'slide'}, 'param': {'venderId': '11961298', 'isJdSuperMarket': '0', 'pid': '10052172306055', 'source': 'jimitwo_service_smart_sdk', 'deviceNo': 'dd_dvc_aes_30EA91E824A2F36365F7B7193C10B76A8842E4E08C0DA687EC9BEB307FCF7195', 'IMService': False, 'distinguishPersonJimi': 2, 'proVer': 'smart_android_15.2.0', 'sid': '538fc3761474ea693812ceed4b39639c', 'entry': 'sdk_item', 'leaveMsgTable': 1, 'venderName': 'TYBOY康路达数字科技专卖店', 'disputeId': -1, 'ddSessionType': '1', 'appId': 'im.waiter', 'systemVer': 'android_13_V2239A', 'eidtoken': 'jdd014TJZZMUXFLXYGJJJ4M3HY3PTU4PBA22SPDIXDWOGZ7XTUTOWTY4LU3VWN6OKDOJPJOVDINMCJXCSPSG5X2K6KWHNQJQOROB57LDG5EY01234567', 'auctionType': '0', 'region': 'CN', 'verification': 'slide'}, 'type': 'text', 'requestData': {'entry': 'sdk_item', 'venderId': '11961298'}, 'content': '你好啊', 'sid': '538fc3761474ea693812ceed4b39639c'}, 'type': 'chat_message', 'clientTime': 1755592905140, 'datetime': '2025-08-19 16:41:45', 'len': 0, 'from': {'app': 'im.customer', 'art': '', 'clientType': 'android', 'pin': 'jd_thpotntctwys'}, 'subType': 'text', 'id': 'd31d369f17f24b05abbe2b7c334f340e', 'to': {'app': 'im.waiter', 'clientType': 'comet', 'pin': 'KLD测试'}, 'lang': 'zh_CN', 'timestamp': 1755592905232}
# {'ver': '4.2', 'mid': 366566984, 'body': {'chatinfo': {'venderId': '11961298', 'isJdSuperMarket': '0', 'pid': '10143502227300', 'source': 'jimitwo_service_smart_sdk', 'deviceNo': 'dd_dvc_aes_30EA91E824A2F36365F7B7193C10B76A8842E4E08C0DA687EC9BEB307FCF7195', 'label': 1, 'IMService': False, 'distinguishPersonJimi': 2, 'proVer': 'smart_android_15.2.0', 'sid': '5b5e044c73ee243b7e67eeca5b11393c', 'entry': 'sdk_item', 'leaveMsgTable': 1, 'venderName': 'TYBOY康路达数字科技专卖店', 'disputeId': -1, 'ddSessionType': '1', 'appId': 'im.waiter', 'systemVer': 'android_13_V2239A', 'eidtoken': 'jdd01ZC7NRD2EX45UN4Q5IZUQC4VLKXIKR7LWP2HC45VQRBDXEYHACT6U5KFBIAI52JBHYCO5BLMOHXTYUYU35RQPB2XA37HL4MPP7CXET7Q01234567', 'auctionType': '0', 'region': 'CN', 'verification': 'slide'}, 'param': {'venderId': '11961298', 'isJdSuperMarket': '0', 'pid': '10143502227300', 'source': 'jimitwo_service_smart_sdk', 'deviceNo': 'dd_dvc_aes_30EA91E824A2F36365F7B7193C10B76A8842E4E08C0DA687EC9BEB307FCF7195', 'label': 1, 'IMService': False, 'distinguishPersonJimi': 2, 'proVer': 'smart_android_15.2.0', 'sid': '5b5e044c73ee243b7e67eeca5b11393c', 'entry': 'sdk_item', 'leaveMsgTable': 1, 'venderName': 'TYBOY康路达数字科技专卖店', 'disputeId': -1, 'ddSessionType': '1', 'appId': 'im.waiter', 'systemVer': 'android_13_V2239A', 'eidtoken': 'jdd01ZC7NRD2EX45UN4Q5IZUQC4VLKXIKR7LWP2HC45VQRBDXEYHACT6U5KFBIAI52JBHYCO5BLMOHXTYUYU35RQPB2XA37HL4MPP7CXET7Q01234567', 'auctionType': '0', 'region': 'CN', 'verification': 'slide'}, 'type': 'text', 'requestData': {'entry': 'sdk_item', 'venderId': '11961298'}, 'content': '您好', 'sid': '5b5e044c73ee243b7e67eeca5b11393c'}, 'type': 'chat_message', 'clientTime': 1755595443735, 'datetime': '2025-08-19 17:24:03', 'len': 0, 'from': {'app': 'im.customer', 'art': '', 'clientType': 'android', 'pin': 'jd_thpotntctwys'}, 'subType': 'text', 'id': '800deaf802f9436b98937bb084ee2a56', 'to': {'app': 'im.waiter', 'clientType': 'comet', 'pin': 'KLD测试'}, 'lang': 'zh_CN', 'timestamp': 1755595443839}
# {'ver': '4.2', 'mid': 366566986, 'body': {'chatinfo': {'venderId': '11961298', 'isJdSuperMarket': '0', 'pid': '10143502227300', 'source': 'jimitwo_service_smart_sdk', 'deviceNo': 'dd_dvc_aes_30EA91E824A2F36365F7B7193C10B76A8842E4E08C0DA687EC9BEB307FCF7195', 'label': 1, 'IMService': False, 'distinguishPersonJimi': 2, 'proVer': 'smart_android_15.2.0', 'sid': '5b5e044c73ee243b7e67eeca5b11393c', 'entry': 'sdk_item', 'leaveMsgTable': 1, 'venderName': 'TYBOY康路达数字科技专卖店', 'disputeId': -1, 'ddSessionType': '1', 'appId': 'im.waiter', 'systemVer': 'android_13_V2239A', 'eidtoken': 'jdd01ZC7NRD2EX45UN4Q5IZUQC4VLKXIKR7LWP2HC45VQRBDXEYHACT6U5KFBIAI52JBHYCO5BLMOHXTYUYU35RQPB2XA37HL4MPP7CXET7Q01234567', 'auctionType': '0', 'region': 'CN', 'verification': 'slide'}, 'param': {'venderId': '11961298', 'isJdSuperMarket': '0', 'pid': '10143502227300', 'source': 'jimitwo_service_smart_sdk', 'deviceNo': 'dd_dvc_aes_30EA91E824A2F36365F7B7193C10B76A8842E4E08C0DA687EC9BEB307FCF7195', 'label': 1, 'IMService': False, 'distinguishPersonJimi': 2, 'proVer': 'smart_android_15.2.0', 'sid': '5b5e044c73ee243b7e67eeca5b11393c', 'entry': 'sdk_item', 'leaveMsgTable': 1, 'venderName': 'TYBOY康路达数字科技专卖店', 'disputeId': -1, 'ddSessionType': '1', 'appId': 'im.waiter', 'systemVer': 'android_13_V2239A', 'eidtoken': 'jdd01ZC7NRD2EX45UN4Q5IZUQC4VLKXIKR7LWP2HC45VQRBDXEYHACT6U5KFBIAI52JBHYCO5BLMOHXTYUYU35RQPB2XA37HL4MPP7CXET7Q01234567', 'auctionType': '0', 'region': 'CN', 'verification': 'slide'}, 'type': 'text', 'requestData': {'entry': 'sdk_item', 'venderId': '11961298'}, 'content': '我先自己看看谢谢您', 'sid': '5b5e044c73ee243b7e67eeca5b11393c'}, 'type': 'chat_message', 'clientTime': 1755595460153, 'datetime': '2025-08-19 17:24:20', 'len': 0, 'from': {'app': 'im.customer', 'art': '', 'clientType': 'android', 'pin': 'jd_thpotntctwys'}, 'subType': 'text', 'id': 'a3d4de984f2d4811952f8ba872850bc2', 'to': {'app': 'im.waiter', 'clientType': 'comet', 'pin': 'KLD测试'}, 'lang': 'zh_CN', 'timestamp': 1755595460274}
except Exception as e:
logger.error(f"接收对应监控消息时候产生特殊错误 , 错误信息为{e}")
finally:
# 清理资源
heartbeat_task.cancel()
try:
await heartbeat_task
except asyncio.CancelledError:
pass
self._log("🔄 连接断开,准备重连", "INFO")
except websockets.ConnectionClosed as e:
self._log(f"🔌 连接已关闭: {e.code} {e.reason}", "WARNING")
if not await self.handle_reconnect(e):
break
except (websockets.WebSocketException, aiohttp.ClientError, OSError) as e:
self._log(f"🌐 网络异常: {type(e).__name__} - {str(e)}", "WARNING")
if not await self.handle_reconnect(e):
break
except Exception as e:
self._log(f"⚠️ 未知异常: {type(e).__name__} - {str(e)}", "ERROR")
if not await self.handle_reconnect(e):
break
# 关闭后端服务连接
if self.backend_connected:
await self.backend_service.close()
self.backend_connected = False
self._log("🛑 消息监听已停止", "INFO")
# 新增: GUI集成包装器类
# class JDListenerForGUI:
# """用于GUI集成的JD监听包装器 ()"""
#
# def __init__(self, log_callback=None):
# self.fix_jd_util = FixJdCookie(log_callback)
# self.log_callback = log_callback
# self.running = False
# self.stop_event = None
# self.username = None
# self.password = None
#
# def _log(self, message, log_type="INFO"):
# """处理日志输出"""
# if self.log_callback:
# self.log_callback(message, log_type)
# else:
# print(f"[{log_type}] {message}")
#
# async def start_listening(self, username, password):
# """启动监听的主方法"""
# try:
# # 存储用户名和密码
# self.username = username
# self.password = password
#
# self._log("🔵 开始JD平台连接流程", "INFO")
# print("🔵 开始JD平台连接流程 - 调试断点1")
#
# # 1. 获取店铺信息
# self._log("🔵 步骤2: 获取店铺信息...", "INFO")
# print("🔵 步骤2: 获取店铺信息... - 调试断点2")
# store_today = self.fix_jd_util.get_today_store()
# if not store_today:
# self._log("❌ 未找到店铺信息", "ERROR")
# print("❌ 未找到店铺信息")
# return False
# self._log(f"✅ 获取到店铺信息: {store_today.get("id", '未知')}", "SUCCESS")
# print(f"✅ 获取到店铺信息: {store_today.get("id", '未知')}")
#
# # 2. 连接后端服务获取cookie
# self._log("🔵 步骤3: 连接后端服务获取cookie...", "INFO")
# store_id = str(store_today.get('id', ''))
#
# jd_cookie = await self.fix_jd_util.connect_backend_service(store_id, username, password)
#
# if not jd_cookie or not jd_cookie.get('status'):
# self._log("❌ 从后端服务获取cookie失败", "ERROR")
# if jd_cookie and jd_cookie.get('verify_link'):
# self._log(f"❌ 需要验证登录,验证链接: {jd_cookie.get('verify_link')}", "ERROR")
# return False
#
# self._log("✅ 从后端服务获取cookie成功", "SUCCESS")
# cookies_str = jd_cookie.get('cookie')
# self._log(f"📦 获取到cookie: {cookies_str[:50] + '...' if cookies_str else '无'}", "DEBUG")
#
# # 3. 获取配置信息
# self._log("🔵 步骤4: 获取配置信息...", "INFO")
# config = None
# for i in range(3):
# try:
# config = self.fix_jd_util.get_config(cookies_str)
# if config and config.get('data'):
# self._log(f"✅ 第{i + 1}次尝试获取配置成功", "SUCCESS")
# break
# else:
# self._log(f"⚠️ 第{i + 1}次尝试获取配置返回空数据", "WARNING")
# except Exception as e:
# self._log(f"获取配置异常({i + 1}/3): {str(e)}", "WARNING")
# await asyncio.sleep(3)
#
# if not config or not config.get('data'):
# self._log("获取配置失败", "ERROR")
# return False
#
# # 4. 提取必要参数
# self._log("🔵 步骤5: 提取配置参数...", "INFO")
# aid = config["data"].get("aid")
# vender_id = config["data"].get("venderId")
# pin_zj = config["data"].get("pin", "").lower()
#
# if not all([aid, vender_id, pin_zj]):
# self._log("❌ 登录信息不完整", "ERROR")
# return False
#
# self._log(f"获取到配置: aid={aid}, vender_id={vender_id}, pin_zj={pin_zj}", "INFO")
#
# # 5. 启动监听
# self._log("🔵 步骤6: 启动消息监听...", "INFO")
# self.stop_event = asyncio.Event()
# self.running = True
#
# self._log("🎉开始监听JD平台消息...", "SUCCESS")
#
# # 调用实际的监听方法
# await self.fix_jd_util.message_monitoring(
# cookies_str=cookies_str,
# aid=aid,
# pin_zj=pin_zj,
# vender_id=vender_id,
# store=store_today,
# stop_event=self.stop_event,
# username=username,
# password=password
# )
#
# return True
#
# except Exception as e:
# self._log(f"监听过程中出现严重错误: {str(e)}", "ERROR")
# import traceback
# self._log(f"错误详情: {traceback.format_exc()}", "DEBUG")
# return False
#
# def stop_listening(self):
# """停止监听"""
# if self.stop_event:
# self.stop_event.set()
# self.running = False
# self._log("JD监听已停止", "INFO")
# 新增: GUI集成包装器类
class JDListenerForGUI:
"""用于GUI集成的JD监听包装器 ()"""
def __init__(self, log_callback=None):
self.fix_jd_util = FixJdCookie(log_callback)
self.log_callback = log_callback
self.running = False
self.stop_event = None
def _log(self, message, log_type="INFO"):
"""处理日志输出"""
if self.log_callback:
self.log_callback(message, log_type)
else:
print(f"[{log_type}] {message}")
async def start_listening(self, username, password):
"""启动监听的主方法"""
try:
self._log("🔵 开始JD平台连接流程", "INFO")
print("🔵 开始JD平台连接流程 - 调试断点1")
# 1. 获取店铺信息
self._log("🔵 步骤2: 获取店铺信息...", "INFO")
print("🔵 步骤2: 获取店铺信息... - 调试断点2")
store_today = self.fix_jd_util.get_today_store()
if not store_today:
self._log("❌ 未找到店铺信息", "ERROR")
print("❌ 未找到店铺信息")
return False
self._log(f"✅ 获取到店铺信息: {store_today.get("id", '未知')}", "SUCCESS")
print(f"✅ 获取到店铺信息: {store_today.get("id", '未知')}")
cookie_str = store_today.get('platform_cookie')
self._log(f"📦 当前存储的cookie: {cookie_str[:50] + '...' if cookie_str else ''}", "DEBUG")
# 2. 获取或更新cookie - 在这里设置断点②
self._log("🔵 步骤3: 获取或更新JD cookie...", "INFO")
# 2. 获取或更新cookie
jd_login_cookie = self.fix_jd_util.get_cookies(
username=username,
password=password,
cookies_str=cookie_str
)
if not jd_login_cookie['status']:
fail_status = jd_login_cookie.get('verify_link', None)
if fail_status:
self._log(f"❌ JD登录失败: , 失败类型为二次验证(手机验证码) 对应url:{fail_status}", "ERROR")
else:
# 表示没有返回verify_url 未知错误
self._log(f"❌ JD登录失败: , 失败类型为:{fail_status} 未知错误 请单独测试login方法")
return False
self._log("✅ JD登录成功", "SUCCESS")
self._log(f"📦 获取到cookie: {jd_login_cookie['cookie'][:50] + '...'}", "DEBUG")
self._log("🔵 步骤4: 获取配置信息...", "INFO")
# 3. 获取配置信息
config = None
for i in range(3):
try:
config = self.fix_jd_util.get_config(jd_login_cookie['cookie'])
if config and config.get('data'):
self._log(f"✅ 第{i + 1}次尝试获取配置成功", "SUCCESS")
break
else:
self._log(f"⚠️ 第{i + 1}次尝试获取配置返回空数据", "WARNING")
except Exception as e:
self._log(f"获取配置异常({i + 1}/3): {str(e)}", "WARNING")
await asyncio.sleep(3)
if not config or not config.get('data'):
self._log("获取配置失败", "ERROR")
return False
# 4. 提取必要参数
self._log("🔵 步骤5: 提取配置参数...", "INFO")
aid = config["data"].get("aid")
vender_id = config["data"].get("venderId")
pin_zj = config["data"].get("pin", "").lower()
if not all([aid, vender_id, pin_zj]):
self._log("❌ 登录信息不完整", "ERROR")
return False
self._log(f"获取到配置: aid={aid}, vender_id={vender_id}, pin_zj={pin_zj}", "INFO")
# 5. 启动监听
self._log("🔵 步骤6: 启动消息监听...", "INFO")
self.stop_event = asyncio.Event()
self.running = True
self._log("🎉开始监听JD平台消息...", "SUCCESS")
# 调用实际的监听方法
await self.fix_jd_util.message_monitoring(
cookies_str=jd_login_cookie['cookie'],
aid=aid,
pin_zj=pin_zj,
vender_id=vender_id,
store=store_today,
stop_event=self.stop_event
)
return True
except Exception as e:
self._log(f"监听过程中出现严重错误: {str(e)}", "ERROR")
import traceback
self._log(f"错误详情: {traceback.format_exc()}", "DEBUG")
return False
async def start_with_cookies(self, store_id: str, cookies: str):
"""使用下发的cookies与store_id直接建立JD平台WS并开始监听"""
try:
self._log("🔵 [JD] 收到后端登录指令开始使用cookies连接平台", "INFO")
# 获取平台配置
config = None
for i in range(3):
try:
config = self.fix_jd_util.get_config(cookies)
if config and config.get('data'):
self._log(f"✅ 第{i + 1}次尝试获取配置成功", "SUCCESS")
break
else:
self._log(f"⚠️ 第{i + 1}次尝试获取配置返回空数据", "WARNING")
except Exception as e:
self._log(f"获取配置异常({i + 1}/3): {str(e)}", "WARNING")
await asyncio.sleep(3)
if not config or not config.get('data'):
self._log("获取配置失败", "ERROR")
return False
aid = config["data"].get("aid")
vender_id = config["data"].get("venderId")
pin_zj = config["data"].get("pin", "").lower()
if not all([aid, vender_id, pin_zj]):
self._log("❌ 登录信息不完整", "ERROR")
return False
# 建立与后端的AI通道确保使用GUI的store_id
await self.fix_jd_util.connect_backend_service(store_id)
# 启动监听
self.stop_event = asyncio.Event()
self.running = True
store = {'id': store_id}
self._log("🎉 [JD] 开始监听平台消息", "SUCCESS")
await self.fix_jd_util.message_monitoring(
cookies_str=cookies,
aid=aid,
pin_zj=pin_zj,
vender_id=vender_id,
store=store,
stop_event=self.stop_event
)
return True
except Exception as e:
self._log(f"[JD] 监听过程中出现错误: {str(e)}", "ERROR")
import traceback
self._log(f"错误详情: {traceback.format_exc()}", "DEBUG")
return False
def stop_listening(self):
"""停止监听"""
if self.stop_event:
self.stop_event.set()
self.running = False
self._log("JD监听已停止", "INFO")
async def main():
username = "KLD测试"
password = "kld168168"
fix_jd_util = FixJdCookie()
store_today = fix_jd_util.get_today_store()
# 检查店铺信息
if not store_today:
logger.error("❌ 未找到店铺信息")
return
store_id = str(store_today.get('id', ''))
logger.info(f"✅ 获取到店铺信息: {store_id}")
try:
# 1. 直接连接后端服务获取 cookie
logger.info("🔵 步骤1: 连接后端服务获取 cookie...")
jd_cookie = await fix_jd_util.connect_backend_service(store_id, username, password)
print(f"完整的cookie数据: {jd_cookie}")
if not jd_cookie or not jd_cookie.get('status'):
logger.error("❌ 从后端服务获取 cookie 失败")
if jd_cookie and jd_cookie.get('verify_link'):
logger.error(f"❌ 需要验证登录,验证链接: {jd_cookie.get('verify_link')}")
return
cookies_str = jd_cookie.get('cookie')
logger.info("✅ 从后端服务获取 cookie 成功")
logger.debug(f"📦 获取到 cookie: {cookies_str[:50] + '...' if cookies_str else ''}")
# 测试cookie后端生成的有效性
# cookies_str = "shshshfpa=112082d7-6f59-f35d-093a-e4c035938ab8-1754961318; shshshfpx=112082d7-6f59-f35d-093a-e4c035938ab8-1754961318; __jdu=17549613198151684402245; user-key=13d41f1d-5d8a-447e-a3d1-19c964e2fa13; __jdv=76161171|direct|-|none|-|1756259208627; areaId=18; ipLoc-djd=18-1482-48942-49052; pinId=EifU7rHmf-gwaaxnNveDFw; _tp=%2Bw%2Fs5xFMS9g0SkUd93EGxqh4USt1M5LwIyzTR4TbgCM%3D; _pst=KLD%E6%B5%8B%E8%AF%95; pin=KLD%E6%B5%8B%E8%AF%95; unick=u40d47u5ckctnz; PCSYCityID=CN_430000_430100_0; ceshi3.com=000; sdtoken=AAbEsBpEIOVjqTAKCQtvQu17tqE2au0-pftQOStCUKRw9JX4gfXHESNiN_EgrTvDv8qS5IZHipleuQxIX9JXWojJS8sKju6Vh1Qqt5LjakfSeWqqAOL-HhXeBn9m; shshshfpb=BApXS1gvPG_xA1izHnR4d6bvzjbeOVTtiBhbXFDdg9xJ1Mh6uh462; 3AB9D23F7A4B3CSS=jdd03OFWZGTHWHQYLKSX5BEWHXMTLYAHXEEXC7HBOIZDBIPLMQZT5LTHKICALRZU5ZDL6X6T3NMHRNSJJDX6BTEI6AVJS5IAAAAMZDDKTS5YAAAAACJZVYQP7FYJA3UX; _gia_d=1; wlfstk_smdl=0whbpvk7ixj27lbykeeb8rh6iulon3fo; __jda=95931165.17549613198151684402245.1754961320.1757039732.1757053789.21; __jdb=95931165.30.17549613198151684402245|21.1757053789; __jdc=95931165; 3AB9D23F7A4B3C9B=OFWZGTHWHQYLKSX5BEWHXMTLYAHXEEXC7HBOIZDBIPLMQZT5LTHKICALRZU5ZDL6X6T3NMHRNSJJDX6BTEI6AVJS5I; TrackID=1C24wlDX-QPSyJRaB_1YHqCLhBW6qIo2stht_nwl5g9fGI-lJpP-CZT3TaCr9QVppcvhilhcCe1VhgXMKGWao6Fd3wJ5bQhJ9w9VwWcYOySsXfbCTQbteFWevVN1ZQYp9; thor=4E136F9D9458703D01BE17544D30601F9649F79B89E5FC150CA91054C788CAA1C82670080466F219573AE7FD6EB9ABDF9F52D520671373DAD721CC3B78613FABC99ADA1FAC8E92CDC42F5131682B1F008727F1BA49783B055AFED9349D0B79E53A51F059A1DDE3FC181DD38D1B388D829CE8ADD775D0D30C38A8CAD0519DCD0C; flash=3_KFKaImBVn2spH8stTd9wjKlZQTxgYcZCPXXP_axvJMphR3w29aJNU2c2qPReKxWHRX1lzJ7MfD9iQmHQI-2cKp0dYzs6YsH9eDyB3lQxuu6MtkM8jCiBynVSdRBnr21oDrLKGMeYG6yYlcEsAsbe8OC-yKO69758MJYyMZd_4soV; light_key=AASBKE7rOxgWQziEhC_QY6yakCROyWTrRIF9K9uCpw_IcR8gGNaL7IM6AQuVa-3pJoC9wTze; logining=1; rita=A1EA9FF92ADE7FC61C825E83F126B9E97EF9243BEED9B77E4F7110D6081254A8EEAA66B26BFA00E08CBD8B0C88DD3D292CAD14839A50184501755B761A11F679F63D9DAA76E6785799D2F78AE378F76F32E05C1914C1132995B15CC5F79AFB9314A9D6FE7911DAFE1D958906C016E724"
# 2. 获取配置信息
logger.info("🔵 步骤2: 获取配置信息...")
config = None
for i in range(3):
try:
config = fix_jd_util.get_config(cookies_str)
if config and config.get('data'):
logger.info(f"✅ 第{i + 1}次尝试获取配置成功")
break
else:
logger.warning(f"⚠️ 第{i + 1}次尝试获取配置返回空数据")
except Exception as e:
logger.error(f"获取配置异常({i + 1}/3): {e}")
if i == 2:
return
await asyncio.sleep(3) # 使用异步等待
if not config or not config.get('data'):
logger.error("❌ 获取配置失败")
return
# 3. 提取必要参数
logger.info("🔵 步骤3: 提取配置参数...")
aid = config["data"].get("aid")
vender_id = config["data"].get("venderId")
pin_zj = config["data"].get("pin", "").lower()
if not all([aid, vender_id, pin_zj]):
logger.error("❌ 登录信息不完整,需要重新登录")
return
logger.info(f"✅ 获取到配置: aid={aid}, vender_id={vender_id}, pin_zj={pin_zj}")
# 4. 启动监听
logger.info("🔵 步骤4: 启动消息监听...")
stop_event = asyncio.Event()
try:
await fix_jd_util.message_monitoring(
cookies_str=cookies_str,
aid=aid,
pin_zj=pin_zj,
vender_id=vender_id,
store=store_today,
stop_event=stop_event,
username=username,
password=password
)
except Exception as e:
logger.error(f"❌ 监听过程中出现错误: {e}")
import traceback
logger.error(f"错误详情: {traceback.format_exc()}")
except Exception as e:
logger.error(f"❌ 主程序执行过程中出现错误: {e}")
import traceback
logger.error(f"错误详情: {traceback.format_exc()}")
if __name__ == '__main__':
asyncio.run(main())
# asyncio.run(new_test_login())