Files
shuidrop_gui/Utils/JD/JdUtils.py
2025-09-12 20:42:00 +08:00

982 lines
49 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
# python let's go
# 编辑人:kris思成
import asyncio
import hashlib
import traceback
from datetime import datetime
import aiohttp
import jsonpath
import websockets
from loguru import logger
import requests
import json
import time
import threading
# 定义持久化数据类
class WebsocketManager:
_instance = None
_lock = threading.Lock()
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._store = {}
cls._instance._lock = threading.RLock()
return cls._instance
def on_connect(self, shop_key, ws, **kwargs):
"""完全保持原有数据结构"""
with self._lock:
entry = self._store.setdefault(shop_key, {
'platform': None,
'customers': [],
'user_assignments': {}
})
entry['platform'] = {
'ws': ws, # 注意:这里存储的是强引用
'last_heartbeat': datetime.now(),
**kwargs
}
return entry
def get_connection(self, shop_key):
with self._lock:
return self._store.get(shop_key)
def remove_connection(self, shop_key):
with self._lock:
if shop_key in self._store:
del self._store[shop_key]
class JDBackendService:
"""京东后端服务调用类(已废弃:使用单后端连接 BackendClient 代替)"""
def __init__(self, *args, **kwargs):
self.current_store_id = ""
async def connect(self, store_id, *args, **kwargs):
try:
self.current_store_id = str(store_id or "")
except Exception:
self.current_store_id = ""
return True
async def send_message_to_backend(self, platform_message):
"""改为通过单后端连接发送需携带store_id"""
try:
from WebSocket.backend_singleton import get_backend_client
backend = get_backend_client()
if not backend:
return None
# 从platform_message中构造统一上行结构并附加store_id
body = platform_message.get('body', {}) if isinstance(platform_message, dict) else {}
sender_pin = platform_message.get('from', {}).get('pin', '') if isinstance(platform_message, dict) else ''
# 优先取消息内的store_id其次取body内再次退回当前会话store_id
store_id = (platform_message.get('store_id')
or body.get('store_id')
or self.current_store_id
or '')
body_type = body.get('type', 'text')
content_for_backend = body.get('content', '')
if body_type in ('image', 'video'):
# 统一从常见字段取URL
content_for_backend = (
body.get('url')
or body.get('imageUrl')
or body.get('videoUrl')
or body.get('picUrl')
or content_for_backend
)
# 检测文本中的商品卡片/订单卡片
if body_type == 'text' and isinstance(content_for_backend, str):
try:
import re as _re
text = content_for_backend.strip()
# 商品卡片JD商品URL
if _re.search(r"https?://item(\.m)?\.jd\.com/\d+\.html(\?.*)?", text):
body_type = 'product_card'
else:
# 订单卡片多样式匹配
# 1) 订单在前:咨询/查询/订单号:<order>[,]? 商品(ID|id|号|编号)<product>
m1 = _re.search(
r"(?:咨询订单号|查询订单号|订单号)\s*[:]\s*(\d+)[,]?\s*商品(?:ID|id|号|编号)\s*[:]\s*(\d+)",
text)
# 2) 商品在前:商品(ID|id|号|编号)<product>[,]? (咨询/查询)?订单号:<order>
m2 = _re.search(
r"商品(?:ID|id|号|编号)\s*[:]\s*(\d+)[,]?\s*(?:咨询订单号|查询订单号|订单号)\s*[:]\s*(\d+)",
text)
if m1 or m2:
body_type = 'order_card'
if m1:
order_number, product_id = m1.group(1), m1.group(2)
else:
product_id, order_number = m2.group(1), m2.group(2)
# 归一化 content
content_for_backend = f"商品id{product_id} 订单号:{order_number}"
except Exception:
pass
msg = {
'type': 'message',
'content': content_for_backend,
'msg_type': body_type,
'sender': {'id': sender_pin},
'store_id': store_id
}
backend.send_message(msg)
return None
except Exception:
return None
class FixJdCookie:
def __init__(self, log_callback=None):
# 定义一些常用参数
super().__init__() # 继承一些父类的初始化参数
self.ws_manager = WebsocketManager()
self.log_callback = log_callback # 存储日志回调
# 新增后端服务实例
self.backend_service = JDBackendService()
self.backend_connected = False
# 新增重连参数
self.reconnect_attempts = 0
self.max_reconnect_attempts = 10 # 最大重连次数
self.base_reconnect_delay = 1.0 # 基础重连延迟
self.max_reconnect_delay = 60.0 # 最大重连延迟
self.reconnect_backoff = 1.5 # 退避系数
def _log(self, message, log_type="INFO"):
"""内部日志方法"""
if self.log_callback:
self.log_callback(message, log_type)
else:
print(f"[{log_type}] {message}")
def get_config(self, cookies_str):
"""获取配置"""
headers = {
"cookie": cookies_str,
"user-agent": "Mozilla/5.0",
"referer": "https://dongdong.jd.com/",
}
response = requests.get("https://dongdong.jd.com/workbench/checkin.json", headers=headers,
params={"version": "2.6.3", "client": "openweb"})
return response.json()
async def init_wss(self, ws, aid, pin_zj):
"""初始化 socket"""
await self.send_heartbeat(ws, aid, pin_zj)
print("开始监听初始化")
auth = {
"id": hashlib.md5(str(int(time.time() * 1000)).encode()).hexdigest(),
"aid": aid,
"lang": "zh_CN",
"timestamp": int(time.time() * 1000),
"type": "auth",
"body": {"presence": 1, "clientVersion": "2.6.3"},
"to": {"app": "im.waiter"},
"from": {"app": "im.waiter", "pin": pin_zj, "clientType": "comet", "dvc": "device1234"}
}
await ws.send(json.dumps(auth))
async def waiter_status_switch(self, ws, aid, pin_zj):
"""设置接待状态"""
message = {
"id": hashlib.md5(str(int(time.time() * 1000)).encode()).hexdigest(),
"aid": aid,
"lang": "zh_CN",
"timestamp": int(time.time() * 1000),
"from": {
"app": "im.waiter",
"pin": pin_zj,
"art": "customerGroupMsg",
"clientType": "comet"
},
"type": "waiter_status_switch",
"body": {
"s": 1
}
}
await ws.send(json.dumps(message))
async def transfer_customer(self, ws, aid, pin, pin_zj, chat_name):
"""异步客服转接 在发送的消息为客服转接的关键词的时候"""
message = {
"id": hashlib.md5(str(int(time.time() * 1000)).encode()).hexdigest(),
"aid": aid,
"lang": "zh_CN",
"timestamp": int(time.time() * 1000),
"from": {
"app": "im.waiter", "pin": pin_zj, "art": "customerGroupMsg", "clientType": "comet"
},
"type": "chat_transfer_partern",
"body": {
"customer": pin, "cappId": "im.customer", "waiter": chat_name, "reason": "",
"ext": {"pid": ""}, "pid": ""
}
}
try:
await ws.send(json.dumps(message))
return True
except Exception:
traceback.print_exc()
return False
async def send_message(self, ws, pin, aid, pin_zj, vender_id, content):
"""异步发送单条消息"""
try:
print('本地发送消息')
message = {
"ver": "4.3",
"type": "chat_message",
"from": {"pin": pin_zj, "app": "im.waiter", "clientType": "comet"},
"to": {"app": "im.customer", "pin": pin},
"id": hashlib.md5(str(int(time.time() * 1000)).encode()).hexdigest(),
"lang": "zh_CN",
"aid": aid,
"timestamp": int(time.time() * 1000),
"readFlag": 0,
"body": {
"content": content,
"translated": False,
"param": {"cusVenderId": vender_id},
"type": "text"
}
}
await ws.send(json.dumps(message))
logger.info(f"消息已经发送到客户端[info] {pin}: {content[:20]} ...")
except websockets.ConnectionClosed:
logger.error('本地发送消息失败 连接关闭')
raise
except Exception as e:
# 同时这里也要及时进行raise抛出 这样比较好让系统可以看出 异常了可以抛出信息不至于后续被认为
logger.error(f"消息发送过程中出现特殊异常异常信息为: {e}")
raise
def get_userinfo(self, response_text):
"""获取用户 pin 并存入 pins 列表"""
pin = jsonpath.jsonpath(json.loads(response_text), "$..from.pin")
return pin[0] if pin else None
def is_merchants(self, store: object, response):
"""判断消息是否来自顾客"""
send = response['from']['pin']
# 补充: 方法lower()是转化为小写 upper()是转化为大写 title()是每个单词首字母大写其余小写
if send.lower() == "KLD测试".lower():
return False
return True
async def send_heartbeat(self, ws, aid, pin_zj):
"""发送心跳包"""
msg = {
"id": hashlib.md5(str(int(time.time() * 1000)).encode()).hexdigest(),
"type": "client_heartbeat",
"aid": aid,
"ver": "4.1",
"lang": "zh_CN",
"from": {"pin": pin_zj, "app": "im.waiter", "art": "customerGroupMsg", "clientType": "comet"}
}
await ws.send(json.dumps(msg, separators=(',', ':'))) # 使用最简心跳包模式 来节约部分性能 减少传输压力
async def heartbeat_loop(self, websocket, aid, pin_zj):
"""独立的心跳循环"""
"""
优化心跳 循环 新增改进
1. 心跳间隔动态调整
2. 异常重试机制
3. 心跳超时控制
4. 状态监控
"""
retry_count = 0
max_retries = 3
base_interval = 3.0 # 基础间隔1s
backoff_factor = 1.5 # 退避系数
timeout = 10.0
while True:
try:
start_time = time.monotonic()
# 使用websocket原生的ping/pong机制
pong_waiter = await websocket.ping()
await asyncio.wait_for(pong_waiter, timeout=timeout)
# 如果需要发送自定义心跳包
await self.send_heartbeat(websocket, aid, pin_zj)
# 计算剩余等待时间
elapsed = time.monotonic() - start_time
sleep_time = max(0.0, base_interval - elapsed)
if sleep_time > 0:
await asyncio.sleep(sleep_time)
retry_count = 0 # 重置重试计数
except asyncio.TimeoutError:
logger.warning(f'心跳超时,已用时: {time.monotonic() - start_time:.2f}')
retry_count += 1
except websockets.ConnectionClosed:
logger.error("连接关闭,心跳已停止")
break
except Exception as e:
logger.error(f"心跳异常: {e}", exc_info=True)
retry_count += 1
if retry_count >= max_retries:
logger.error(f"心跳连续失败{retry_count}次,终止循环")
break
# 退避策略
if retry_count > 0:
backoff_time = min(
base_interval * (backoff_factor ** (retry_count - 1)),
60.0 # 最大等待60秒
)
logger.debug(f"心跳失败,等待{backoff_time:.2f}秒后重试")
await asyncio.sleep(backoff_time)
''' 整理重连方法 '''
async def calculate_reconnect_delay(self):
"""计算指数退避的重连延迟时间"""
delay = self.base_reconnect_delay * (self.reconnect_backoff ** self.reconnect_attempts)
return min(delay, self.max_reconnect_delay)
async def should_reconnect(self):
"""判断是否应该继续重连"""
if self.reconnect_attempts >= self.max_reconnect_attempts:
self._log(f"已达到最大重连次数({self.max_reconnect_attempts}),停止重连", "ERROR")
return False
return True
async def handle_reconnect(self, exception=None):
"""处理重连逻辑"""
if exception:
error_type = type(exception).__name__
error_msg = str(exception)
self._log(f"连接异常[{error_type}]: {error_msg}", "WARNING")
if not await self.should_reconnect():
return False
delay = await self.calculate_reconnect_delay()
self._log(f"{self.reconnect_attempts + 1}次重连尝试,等待{delay:.1f}秒...", "WARNING")
await asyncio.sleep(delay)
self.reconnect_attempts += 1
return True
''' 后端服务调用方法 '''
async def connect_backend_service(self, store_id):
"""连接后端AI服务"""
try:
success = await self.backend_service.connect(store_id)
if success:
self.backend_connected = True
self._log("✅ 后端AI服务连接成功", "SUCCESS")
return success
except Exception as e:
self._log(f"❌ 连接后端AI服务失败: {e}", "ERROR")
return False
async def get_ai_reply_from_backend(self, platform_message):
"""从后端服务获取AI回复"""
# 首先检查后端服务连接状态
if not self.backend_connected:
# 尝试重新连接
store_id = str(platform_message.get('body', {}).get('chatinfo', {}).get('venderId', ''))
if store_id:
self.backend_connected = await self.connect_backend_service(store_id)
if not self.backend_connected:
self._log("❌ 后端服务未连接尝试重新连接失败使用本地AI回复", "WARNING")
# 降级到本地AI回复
customer_sid = platform_message.get('body', {}).get('chatinfo', {}).get('sid', '')
customer_message = platform_message.get('body', {}).get('content', '')
return
try:
# 推送给后端由后端异步回传AI消息到GUI此处不进行本地立即回复
await self.backend_service.send_message_to_backend(platform_message)
return None
except Exception as e:
self._log(f"❌ 获取AI回复失败: {e}使用本地AI", "ERROR")
customer_sid = platform_message.get('body', {}).get('chatinfo', {}).get('sid', '')
customer_message = platform_message.get('body', {}).get('content', '')
return
async def process_incoming_message(self, response, ws, aid, pin_zj, vender_id, store):
"""处理接收到的消息"""
try:
# 解析消息
json_resp = json.loads(response) if isinstance(response, (str, bytes)) else response
# 验证消息格式
if not all([
json_resp,
json_resp.get('type') == "chat_message",
json_resp.get('ver') == "4.2",
isinstance(json_resp.get("body"), dict)
]):
return
# 过滤非客户消息
pin = self.get_userinfo(response)
if not self.is_merchants(store, json_resp):
return
# 提取消息内容
message_type = json_resp['body']['type']
if message_type == 'text':
customer_message = json_resp['body']['content']
elif message_type == 'image':
customer_message = f"[图片] {json_resp['body'].get('url') or json_resp['body'].get('imageUrl') or json_resp['body'].get('picUrl') or ''}"
elif message_type == 'video':
customer_message = f"[视频] {json_resp['body'].get('url') or json_resp['body'].get('videoUrl') or ''}"
else:
return
self._log(f"📩 收到客户消息: {pin}: {customer_message[:100]}...", "INFO")
# 从后端服务获取AI回复单连接模式仅转交不本地立即回复
ai_result = await self.get_ai_reply_from_backend(json_resp)
if isinstance(ai_result, str) and ai_result.strip():
# 发送回复消息(仅当本地生成/降级时)
await self.send_message(
ws=ws, pin=pin, aid=aid,
pin_zj=pin_zj, vender_id=vender_id,
content=ai_result
)
self._log(f"📤 已发送回复: {ai_result[:100]}...", "INFO")
else:
# 正常链路已转交后端AI等待后端异步回传并由 GUI 转发到平台
self._log("🔄 已转交后端AI处理等待平台回复下发", "INFO")
except json.JSONDecodeError:
self._log("❌ 消息JSON解析失败", "ERROR")
except Exception as e:
self._log(f"❌ 消息处理失败: {e}", "ERROR")
async def message_monitoring(self, cookies_str, aid, pin_zj, vender_id, store, sleep_time=0.5, stop_event=None):
print("✅ DEBUG 进入message_monitoring方法")
print(f"参数验证: cookies={bool(cookies_str)}, aid={aid}, pin_zj={pin_zj}")
# 连接后端AI服务 - 使用店铺ID或venderId
store_id = str(store.get('id', '')) or str(vender_id)
self._log(f"🔗 尝试连接后端服务店铺ID: {store_id}", "DEBUG")
backend_connected = await self.connect_backend_service(store_id)
if not backend_connected:
self._log("⚠️ 后端服务连接失败将使用本地AI回复", "WARNING")
else:
self._log("✅ 后端服务连接成功", "SUCCESS")
stop_event = stop_event or asyncio.Event() # 如果外部没传,自己创建
uri = "wss://dongdong.jd.com/workbench/websocket"
headers = {"cookie": cookies_str}
self._log(f"🔵 开始连接WebSocket: {uri}", "INFO")
# 充值重连计数器
self.reconnect_attempts = 0
while not stop_event.is_set():
try:
self._log(f"🔄 尝试连接WebSocket", "INFO")
async with websockets.connect(uri, additional_headers=headers, ping_interval=6, ping_timeout=10, close_timeout=1, max_queue=1024) as ws:
# 连接成功,重置重连计数器
self.reconnect_attempts = 0
self._log("✅ WebSocket-JD连接成功", "SUCCESS")
await self.init_wss(ws, aid, pin_zj)
# === 验证连接成功的核心指标 ===
# print(f"✅ 连接状态: open={ws.open}, closed={ws.closed}")
print(f"🖥️ 服务端地址: {ws.remote_address}")
# --- 注册连接信息到全局管理
shop_key = f"京东:{store['id']}"
loop = asyncio.get_running_loop()
entry = self.ws_manager.on_connect(
shop_key=shop_key,
ws=ws,
vender_id=vender_id,
aid=aid,
pin_zj=pin_zj,
platform="京东",
loop=loop
)
await self.waiter_status_switch(ws=ws, aid=aid, pin_zj=pin_zj)
heartbeat_task = asyncio.create_task(self.heartbeat_loop(ws, aid, pin_zj))
message_tasks = set()
try:
while not stop_event.is_set(): # 检查是否收到中止信号
try:
print(f"等待监听消息-{datetime.now()}")
response = await asyncio.wait_for(ws.recv(), timeout=1)
print(f"原始消息类型:{type(response)}, 消息体为: {response}")
await self.process_incoming_message(response, ws, aid, pin_zj, vender_id, store)
# 安全解析消息
json_resp = json.loads(response) if isinstance(response, (str, bytes)) else response
print(json_resp)
ver = json_resp.get("ver")
print(f"版本{ver}")
except asyncio.TimeoutError:
continue
except websockets.ConnectionClosed:
# 连接关闭 跳出内层循环进行重连
break
except Exception as e:
self._log(f"消息处理异常: {e}", "ERROR")
continue
await asyncio.sleep(sleep_time)
###
##{'ver': '4.2', 'mid': 366566937, 'body': {'chatinfo': {'venderId': '11961298', 'isJdSuperMarket': '0', 'pid': '10143502227300', 'source': 'jimitwo_service_smart_sdk', 'deviceNo': 'dd_dvc_aes_30EA91E824A2F36365F7B7193C10B76A8842E4E08C0DA687EC9BEB307FCF7195', 'label': 1, 'IMService': False, 'distinguishPersonJimi': 2, 'proVer': 'smart_android_15.2.0', 'sid': 'ce7f35b51a9c00ac898b0fe08608674a', 'entry': 'sdk_item', 'leaveMsgTable': 1, 'venderName': 'TYBOY康路达数字科技专卖店', 'disputeId': -1, 'ddSessionType': '1', 'appId': 'im.waiter', 'systemVer': 'android_13_V2239A', 'eidtoken': 'jdd01C5XRGQHGNJBSFYWWA7RF54QXYHF2N34TM32NGLZV6YAAPPNKWLAIQ2MZV25T4QZ2TJE4HRF6UZ2L3THX7I2BLLB37YVAWKR2BYGRRPA01234567', 'auctionType': '0', 'region': 'CN', 'verification': 'slide'}, 'param': {'venderId': '11961298', 'isJdSuperMarket': '0', 'pid': '10143502227300', 'source': 'jimitwo_service_smart_sdk', 'deviceNo': 'dd_dvc_aes_30EA91E824A2F36365F7B7193C10B76A8842E4E08C0DA687EC9BEB307FCF7195', 'label': 1, 'IMService': False, 'distinguishPersonJimi': 2, 'proVer': 'smart_android_15.2.0', 'sid': 'ce7f35b51a9c00ac898b0fe08608674a', 'entry': 'sdk_item', 'leaveMsgTable': 1, 'venderName': 'TYBOY康路达数字科技专卖店', 'disputeId': -1, 'ddSessionType': '1', 'appId': 'im.waiter', 'systemVer': 'android_13_V2239A', 'eidtoken': 'jdd01C5XRGQHGNJBSFYWWA7RF54QXYHF2N34TM32NGLZV6YAAPPNKWLAIQ2MZV25T4QZ2TJE4HRF6UZ2L3THX7I2BLLB37YVAWKR2BYGRRPA01234567', 'auctionType': '0', 'region': 'CN', 'verification': 'slide'}, 'type': 'text', 'requestData': {'entry': 'sdk_item', 'venderId': '11961298'}, 'content': '你好', 'sid': 'ce7f35b51a9c00ac898b0fe08608674a'}, 'type': 'chat_message', 'clientTime': 1755076542320, 'datetime': '2025-08-13 17:15:42', 'len': 0, 'from': {'app': 'im.customer', 'art': '', 'clientType': 'android', 'pin': 'jd_thpotntctwys'}, 'subType': 'text', 'id': 'bc3c42a706fa4fa482ff565304c7dfbb', 'to': {'app': 'im.waiter', 'clientType': 'comet', 'pin': 'KLD测试'}, 'lang': 'zh_CN', 'timestamp': 1755076542671}
# {'ver': '4.2', 'mid': 366566966, 'body': {'chatinfo': {'venderId': '11961298', 'isJdSuperMarket': '0', 'pid': '10052172306055', 'source': 'jimitwo_service_smart_sdk', 'deviceNo': 'dd_dvc_aes_30EA91E824A2F36365F7B7193C10B76A8842E4E08C0DA687EC9BEB307FCF7195', 'IMService': False, 'distinguishPersonJimi': 2, 'proVer': 'smart_android_15.2.0', 'sid': '538fc3761474ea693812ceed4b39639c', 'entry': 'sdk_item', 'leaveMsgTable': 1, 'venderName': 'TYBOY康路达数字科技专卖店', 'disputeId': -1, 'ddSessionType': '1', 'appId': 'im.waiter', 'systemVer': 'android_13_V2239A', 'eidtoken': 'jdd014TJZZMUXFLXYGJJJ4M3HY3PTU4PBA22SPDIXDWOGZ7XTUTOWTY4LU3VWN6OKDOJPJOVDINMCJXCSPSG5X2K6KWHNQJQOROB57LDG5EY01234567', 'auctionType': '0', 'region': 'CN', 'verification': 'slide'}, 'param': {'venderId': '11961298', 'isJdSuperMarket': '0', 'pid': '10052172306055', 'source': 'jimitwo_service_smart_sdk', 'deviceNo': 'dd_dvc_aes_30EA91E824A2F36365F7B7193C10B76A8842E4E08C0DA687EC9BEB307FCF7195', 'IMService': False, 'distinguishPersonJimi': 2, 'proVer': 'smart_android_15.2.0', 'sid': '538fc3761474ea693812ceed4b39639c', 'entry': 'sdk_item', 'leaveMsgTable': 1, 'venderName': 'TYBOY康路达数字科技专卖店', 'disputeId': -1, 'ddSessionType': '1', 'appId': 'im.waiter', 'systemVer': 'android_13_V2239A', 'eidtoken': 'jdd014TJZZMUXFLXYGJJJ4M3HY3PTU4PBA22SPDIXDWOGZ7XTUTOWTY4LU3VWN6OKDOJPJOVDINMCJXCSPSG5X2K6KWHNQJQOROB57LDG5EY01234567', 'auctionType': '0', 'region': 'CN', 'verification': 'slide'}, 'type': 'text', 'requestData': {'entry': 'sdk_item', 'venderId': '11961298'}, 'content': '你好啊', 'sid': '538fc3761474ea693812ceed4b39639c'}, 'type': 'chat_message', 'clientTime': 1755592905140, 'datetime': '2025-08-19 16:41:45', 'len': 0, 'from': {'app': 'im.customer', 'art': '', 'clientType': 'android', 'pin': 'jd_thpotntctwys'}, 'subType': 'text', 'id': 'd31d369f17f24b05abbe2b7c334f340e', 'to': {'app': 'im.waiter', 'clientType': 'comet', 'pin': 'KLD测试'}, 'lang': 'zh_CN', 'timestamp': 1755592905232}
# {'ver': '4.2', 'mid': 366566984, 'body': {'chatinfo': {'venderId': '11961298', 'isJdSuperMarket': '0', 'pid': '10143502227300', 'source': 'jimitwo_service_smart_sdk', 'deviceNo': 'dd_dvc_aes_30EA91E824A2F36365F7B7193C10B76A8842E4E08C0DA687EC9BEB307FCF7195', 'label': 1, 'IMService': False, 'distinguishPersonJimi': 2, 'proVer': 'smart_android_15.2.0', 'sid': '5b5e044c73ee243b7e67eeca5b11393c', 'entry': 'sdk_item', 'leaveMsgTable': 1, 'venderName': 'TYBOY康路达数字科技专卖店', 'disputeId': -1, 'ddSessionType': '1', 'appId': 'im.waiter', 'systemVer': 'android_13_V2239A', 'eidtoken': 'jdd01ZC7NRD2EX45UN4Q5IZUQC4VLKXIKR7LWP2HC45VQRBDXEYHACT6U5KFBIAI52JBHYCO5BLMOHXTYUYU35RQPB2XA37HL4MPP7CXET7Q01234567', 'auctionType': '0', 'region': 'CN', 'verification': 'slide'}, 'param': {'venderId': '11961298', 'isJdSuperMarket': '0', 'pid': '10143502227300', 'source': 'jimitwo_service_smart_sdk', 'deviceNo': 'dd_dvc_aes_30EA91E824A2F36365F7B7193C10B76A8842E4E08C0DA687EC9BEB307FCF7195', 'label': 1, 'IMService': False, 'distinguishPersonJimi': 2, 'proVer': 'smart_android_15.2.0', 'sid': '5b5e044c73ee243b7e67eeca5b11393c', 'entry': 'sdk_item', 'leaveMsgTable': 1, 'venderName': 'TYBOY康路达数字科技专卖店', 'disputeId': -1, 'ddSessionType': '1', 'appId': 'im.waiter', 'systemVer': 'android_13_V2239A', 'eidtoken': 'jdd01ZC7NRD2EX45UN4Q5IZUQC4VLKXIKR7LWP2HC45VQRBDXEYHACT6U5KFBIAI52JBHYCO5BLMOHXTYUYU35RQPB2XA37HL4MPP7CXET7Q01234567', 'auctionType': '0', 'region': 'CN', 'verification': 'slide'}, 'type': 'text', 'requestData': {'entry': 'sdk_item', 'venderId': '11961298'}, 'content': '您好', 'sid': '5b5e044c73ee243b7e67eeca5b11393c'}, 'type': 'chat_message', 'clientTime': 1755595443735, 'datetime': '2025-08-19 17:24:03', 'len': 0, 'from': {'app': 'im.customer', 'art': '', 'clientType': 'android', 'pin': 'jd_thpotntctwys'}, 'subType': 'text', 'id': '800deaf802f9436b98937bb084ee2a56', 'to': {'app': 'im.waiter', 'clientType': 'comet', 'pin': 'KLD测试'}, 'lang': 'zh_CN', 'timestamp': 1755595443839}
# {'ver': '4.2', 'mid': 366566986, 'body': {'chatinfo': {'venderId': '11961298', 'isJdSuperMarket': '0', 'pid': '10143502227300', 'source': 'jimitwo_service_smart_sdk', 'deviceNo': 'dd_dvc_aes_30EA91E824A2F36365F7B7193C10B76A8842E4E08C0DA687EC9BEB307FCF7195', 'label': 1, 'IMService': False, 'distinguishPersonJimi': 2, 'proVer': 'smart_android_15.2.0', 'sid': '5b5e044c73ee243b7e67eeca5b11393c', 'entry': 'sdk_item', 'leaveMsgTable': 1, 'venderName': 'TYBOY康路达数字科技专卖店', 'disputeId': -1, 'ddSessionType': '1', 'appId': 'im.waiter', 'systemVer': 'android_13_V2239A', 'eidtoken': 'jdd01ZC7NRD2EX45UN4Q5IZUQC4VLKXIKR7LWP2HC45VQRBDXEYHACT6U5KFBIAI52JBHYCO5BLMOHXTYUYU35RQPB2XA37HL4MPP7CXET7Q01234567', 'auctionType': '0', 'region': 'CN', 'verification': 'slide'}, 'param': {'venderId': '11961298', 'isJdSuperMarket': '0', 'pid': '10143502227300', 'source': 'jimitwo_service_smart_sdk', 'deviceNo': 'dd_dvc_aes_30EA91E824A2F36365F7B7193C10B76A8842E4E08C0DA687EC9BEB307FCF7195', 'label': 1, 'IMService': False, 'distinguishPersonJimi': 2, 'proVer': 'smart_android_15.2.0', 'sid': '5b5e044c73ee243b7e67eeca5b11393c', 'entry': 'sdk_item', 'leaveMsgTable': 1, 'venderName': 'TYBOY康路达数字科技专卖店', 'disputeId': -1, 'ddSessionType': '1', 'appId': 'im.waiter', 'systemVer': 'android_13_V2239A', 'eidtoken': 'jdd01ZC7NRD2EX45UN4Q5IZUQC4VLKXIKR7LWP2HC45VQRBDXEYHACT6U5KFBIAI52JBHYCO5BLMOHXTYUYU35RQPB2XA37HL4MPP7CXET7Q01234567', 'auctionType': '0', 'region': 'CN', 'verification': 'slide'}, 'type': 'text', 'requestData': {'entry': 'sdk_item', 'venderId': '11961298'}, 'content': '我先自己看看谢谢您', 'sid': '5b5e044c73ee243b7e67eeca5b11393c'}, 'type': 'chat_message', 'clientTime': 1755595460153, 'datetime': '2025-08-19 17:24:20', 'len': 0, 'from': {'app': 'im.customer', 'art': '', 'clientType': 'android', 'pin': 'jd_thpotntctwys'}, 'subType': 'text', 'id': 'a3d4de984f2d4811952f8ba872850bc2', 'to': {'app': 'im.waiter', 'clientType': 'comet', 'pin': 'KLD测试'}, 'lang': 'zh_CN', 'timestamp': 1755595460274}
except Exception as e:
logger.error(f"接收对应监控消息时候产生特殊错误 , 错误信息为{e}")
finally:
# 清理资源
heartbeat_task.cancel()
try:
await heartbeat_task
except asyncio.CancelledError:
pass
self._log("🔄 连接断开,准备重连", "INFO")
except websockets.ConnectionClosed as e:
self._log(f"🔌 连接已关闭: {e.code} {e.reason}", "WARNING")
if not await self.handle_reconnect(e):
break
except (websockets.WebSocketException, aiohttp.ClientError, OSError) as e:
self._log(f"🌐 网络异常: {type(e).__name__} - {str(e)}", "WARNING")
if not await self.handle_reconnect(e):
break
except Exception as e:
self._log(f"⚠️ 未知异常: {type(e).__name__} - {str(e)}", "ERROR")
if not await self.handle_reconnect(e):
break
# 关闭后端服务连接
if self.backend_connected:
await self.backend_service.close()
self.backend_connected = False
self._log("🛑 消息监听已停止", "INFO")
# 新增: GUI集成包装器类
# class JDListenerForGUI:
# """用于GUI集成的JD监听包装器 ()"""
#
# def __init__(self, log_callback=None):
# self.fix_jd_util = FixJdCookie(log_callback)
# self.log_callback = log_callback
# self.running = False
# self.stop_event = None
# self.username = None
# self.password = None
#
# def _log(self, message, log_type="INFO"):
# """处理日志输出"""
# if self.log_callback:
# self.log_callback(message, log_type)
# else:
# print(f"[{log_type}] {message}")
#
# async def start_listening(self, username, password):
# """启动监听的主方法"""
# try:
# # 存储用户名和密码
# self.username = username
# self.password = password
#
# self._log("🔵 开始JD平台连接流程", "INFO")
# print("🔵 开始JD平台连接流程 - 调试断点1")
#
# # 1. 获取店铺信息
# self._log("🔵 步骤2: 获取店铺信息...", "INFO")
# print("🔵 步骤2: 获取店铺信息... - 调试断点2")
# store_today = self.fix_jd_util.get_today_store()
# if not store_today:
# self._log("❌ 未找到店铺信息", "ERROR")
# print("❌ 未找到店铺信息")
# return False
# self._log(f"✅ 获取到店铺信息: {store_today.get("id", '未知')}", "SUCCESS")
# print(f"✅ 获取到店铺信息: {store_today.get("id", '未知')}")
#
# # 2. 连接后端服务获取cookie
# self._log("🔵 步骤3: 连接后端服务获取cookie...", "INFO")
# store_id = str(store_today.get('id', ''))
#
# jd_cookie = await self.fix_jd_util.connect_backend_service(store_id, username, password)
#
# if not jd_cookie or not jd_cookie.get('status'):
# self._log("❌ 从后端服务获取cookie失败", "ERROR")
# if jd_cookie and jd_cookie.get('verify_link'):
# self._log(f"❌ 需要验证登录,验证链接: {jd_cookie.get('verify_link')}", "ERROR")
# return False
#
# self._log("✅ 从后端服务获取cookie成功", "SUCCESS")
# cookies_str = jd_cookie.get('cookie')
# self._log(f"📦 获取到cookie: {cookies_str[:50] + '...' if cookies_str else '无'}", "DEBUG")
#
# # 3. 获取配置信息
# self._log("🔵 步骤4: 获取配置信息...", "INFO")
# config = None
# for i in range(3):
# try:
# config = self.fix_jd_util.get_config(cookies_str)
# if config and config.get('data'):
# self._log(f"✅ 第{i + 1}次尝试获取配置成功", "SUCCESS")
# break
# else:
# self._log(f"⚠️ 第{i + 1}次尝试获取配置返回空数据", "WARNING")
# except Exception as e:
# self._log(f"获取配置异常({i + 1}/3): {str(e)}", "WARNING")
# await asyncio.sleep(3)
#
# if not config or not config.get('data'):
# self._log("获取配置失败", "ERROR")
# return False
#
# # 4. 提取必要参数
# self._log("🔵 步骤5: 提取配置参数...", "INFO")
# aid = config["data"].get("aid")
# vender_id = config["data"].get("venderId")
# pin_zj = config["data"].get("pin", "").lower()
#
# if not all([aid, vender_id, pin_zj]):
# self._log("❌ 登录信息不完整", "ERROR")
# return False
#
# self._log(f"获取到配置: aid={aid}, vender_id={vender_id}, pin_zj={pin_zj}", "INFO")
#
# # 5. 启动监听
# self._log("🔵 步骤6: 启动消息监听...", "INFO")
# self.stop_event = asyncio.Event()
# self.running = True
#
# self._log("🎉开始监听JD平台消息...", "SUCCESS")
#
# # 调用实际的监听方法
# await self.fix_jd_util.message_monitoring(
# cookies_str=cookies_str,
# aid=aid,
# pin_zj=pin_zj,
# vender_id=vender_id,
# store=store_today,
# stop_event=self.stop_event,
# username=username,
# password=password
# )
#
# return True
#
# except Exception as e:
# self._log(f"监听过程中出现严重错误: {str(e)}", "ERROR")
# import traceback
# self._log(f"错误详情: {traceback.format_exc()}", "DEBUG")
# return False
#
# def stop_listening(self):
# """停止监听"""
# if self.stop_event:
# self.stop_event.set()
# self.running = False
# self._log("JD监听已停止", "INFO")
# 新增: GUI集成包装器类
class JDListenerForGUI:
"""用于GUI集成的JD监听包装器 ()"""
def __init__(self, log_callback=None):
self.fix_jd_util = FixJdCookie(log_callback)
self.log_callback = log_callback
self.running = False
self.stop_event = None
def _log(self, message, log_type="INFO"):
"""处理日志输出"""
if self.log_callback:
self.log_callback(message, log_type)
else:
print(f"[{log_type}] {message}")
async def start_listening(self, username, password):
"""启动监听的主方法"""
try:
self._log("🔵 开始JD平台连接流程", "INFO")
print("🔵 开始JD平台连接流程 - 调试断点1")
# 1. 获取店铺信息
self._log("🔵 步骤2: 获取店铺信息...", "INFO")
print("🔵 步骤2: 获取店铺信息... - 调试断点2")
store_today = self.fix_jd_util.get_today_store()
if not store_today:
self._log("❌ 未找到店铺信息", "ERROR")
print("❌ 未找到店铺信息")
return False
self._log(f"✅ 获取到店铺信息: {store_today.get("id", '未知')}", "SUCCESS")
print(f"✅ 获取到店铺信息: {store_today.get("id", '未知')}")
cookie_str = store_today.get('platform_cookie')
self._log(f"📦 当前存储的cookie: {cookie_str[:50] + '...' if cookie_str else ''}", "DEBUG")
# 2. 获取或更新cookie - 在这里设置断点②
self._log("🔵 步骤3: 获取或更新JD cookie...", "INFO")
# 2. 获取或更新cookie
jd_login_cookie = self.fix_jd_util.get_cookies(
username=username,
password=password,
cookies_str=cookie_str
)
if not jd_login_cookie['status']:
fail_status = jd_login_cookie.get('verify_link', None)
if fail_status:
self._log(f"❌ JD登录失败: , 失败类型为二次验证(手机验证码) 对应url:{fail_status}", "ERROR")
else:
# 表示没有返回verify_url 未知错误
self._log(f"❌ JD登录失败: , 失败类型为:{fail_status} 未知错误 请单独测试login方法")
return False
self._log("✅ JD登录成功", "SUCCESS")
self._log(f"📦 获取到cookie: {jd_login_cookie['cookie'][:50] + '...'}", "DEBUG")
self._log("🔵 步骤4: 获取配置信息...", "INFO")
# 3. 获取配置信息
config = None
for i in range(3):
try:
config = self.fix_jd_util.get_config(jd_login_cookie['cookie'])
if config and config.get('data'):
self._log(f"✅ 第{i + 1}次尝试获取配置成功", "SUCCESS")
break
else:
self._log(f"⚠️ 第{i + 1}次尝试获取配置返回空数据", "WARNING")
except Exception as e:
self._log(f"获取配置异常({i + 1}/3): {str(e)}", "WARNING")
await asyncio.sleep(3)
if not config or not config.get('data'):
self._log("获取配置失败", "ERROR")
return False
# 4. 提取必要参数
self._log("🔵 步骤5: 提取配置参数...", "INFO")
aid = config["data"].get("aid")
vender_id = config["data"].get("venderId")
pin_zj = config["data"].get("pin", "").lower()
if not all([aid, vender_id, pin_zj]):
self._log("❌ 登录信息不完整", "ERROR")
return False
self._log(f"获取到配置: aid={aid}, vender_id={vender_id}, pin_zj={pin_zj}", "INFO")
# 5. 启动监听
self._log("🔵 步骤6: 启动消息监听...", "INFO")
self.stop_event = asyncio.Event()
self.running = True
self._log("🎉开始监听JD平台消息...", "SUCCESS")
# 调用实际的监听方法
await self.fix_jd_util.message_monitoring(
cookies_str=jd_login_cookie['cookie'],
aid=aid,
pin_zj=pin_zj,
vender_id=vender_id,
store=store_today,
stop_event=self.stop_event
)
return True
except Exception as e:
self._log(f"监听过程中出现严重错误: {str(e)}", "ERROR")
import traceback
self._log(f"错误详情: {traceback.format_exc()}", "DEBUG")
return False
async def start_with_cookies(self, store_id: str, cookies: str):
"""使用下发的cookies与store_id直接建立JD平台WS并开始监听"""
try:
self._log("🔵 [JD] 收到后端登录指令开始使用cookies连接平台", "INFO")
# 获取平台配置
config = None
for i in range(3):
try:
config = self.fix_jd_util.get_config(cookies)
if config and config.get('data'):
self._log(f"✅ 第{i + 1}次尝试获取配置成功", "SUCCESS")
break
else:
self._log(f"⚠️ 第{i + 1}次尝试获取配置返回空数据", "WARNING")
except Exception as e:
self._log(f"获取配置异常({i + 1}/3): {str(e)}", "WARNING")
await asyncio.sleep(3)
if not config or not config.get('data'):
self._log("获取配置失败", "ERROR")
return False
aid = config["data"].get("aid")
vender_id = config["data"].get("venderId")
pin_zj = config["data"].get("pin", "").lower()
if not all([aid, vender_id, pin_zj]):
self._log("❌ 登录信息不完整", "ERROR")
return False
# 建立与后端的AI通道确保使用GUI的store_id
await self.fix_jd_util.connect_backend_service(store_id)
# 启动监听
self.stop_event = asyncio.Event()
self.running = True
store = {'id': store_id}
self._log("🎉 [JD] 开始监听平台消息", "SUCCESS")
await self.fix_jd_util.message_monitoring(
cookies_str=cookies,
aid=aid,
pin_zj=pin_zj,
vender_id=vender_id,
store=store,
stop_event=self.stop_event
)
return True
except Exception as e:
self._log(f"[JD] 监听过程中出现错误: {str(e)}", "ERROR")
import traceback
self._log(f"错误详情: {traceback.format_exc()}", "DEBUG")
return False
def stop_listening(self):
"""停止监听"""
if self.stop_event:
self.stop_event.set()
self.running = False
self._log("JD监听已停止", "INFO")
async def main():
username = "KLD测试"
password = "kld168168"
fix_jd_util = FixJdCookie()
store_today = fix_jd_util.get_today_store()
# 检查店铺信息
if not store_today:
logger.error("❌ 未找到店铺信息")
return
store_id = str(store_today.get('id', ''))
logger.info(f"✅ 获取到店铺信息: {store_id}")
try:
# 1. 直接连接后端服务获取 cookie
logger.info("🔵 步骤1: 连接后端服务获取 cookie...")
jd_cookie = await fix_jd_util.connect_backend_service(store_id, username, password)
print(f"完整的cookie数据: {jd_cookie}")
if not jd_cookie or not jd_cookie.get('status'):
logger.error("❌ 从后端服务获取 cookie 失败")
if jd_cookie and jd_cookie.get('verify_link'):
logger.error(f"❌ 需要验证登录,验证链接: {jd_cookie.get('verify_link')}")
return
cookies_str = jd_cookie.get('cookie')
logger.info("✅ 从后端服务获取 cookie 成功")
logger.debug(f"📦 获取到 cookie: {cookies_str[:50] + '...' if cookies_str else ''}")
# 测试cookie后端生成的有效性
# cookies_str = "shshshfpa=112082d7-6f59-f35d-093a-e4c035938ab8-1754961318; shshshfpx=112082d7-6f59-f35d-093a-e4c035938ab8-1754961318; __jdu=17549613198151684402245; user-key=13d41f1d-5d8a-447e-a3d1-19c964e2fa13; __jdv=76161171|direct|-|none|-|1756259208627; areaId=18; ipLoc-djd=18-1482-48942-49052; pinId=EifU7rHmf-gwaaxnNveDFw; _tp=%2Bw%2Fs5xFMS9g0SkUd93EGxqh4USt1M5LwIyzTR4TbgCM%3D; _pst=KLD%E6%B5%8B%E8%AF%95; pin=KLD%E6%B5%8B%E8%AF%95; unick=u40d47u5ckctnz; PCSYCityID=CN_430000_430100_0; ceshi3.com=000; sdtoken=AAbEsBpEIOVjqTAKCQtvQu17tqE2au0-pftQOStCUKRw9JX4gfXHESNiN_EgrTvDv8qS5IZHipleuQxIX9JXWojJS8sKju6Vh1Qqt5LjakfSeWqqAOL-HhXeBn9m; shshshfpb=BApXS1gvPG_xA1izHnR4d6bvzjbeOVTtiBhbXFDdg9xJ1Mh6uh462; 3AB9D23F7A4B3CSS=jdd03OFWZGTHWHQYLKSX5BEWHXMTLYAHXEEXC7HBOIZDBIPLMQZT5LTHKICALRZU5ZDL6X6T3NMHRNSJJDX6BTEI6AVJS5IAAAAMZDDKTS5YAAAAACJZVYQP7FYJA3UX; _gia_d=1; wlfstk_smdl=0whbpvk7ixj27lbykeeb8rh6iulon3fo; __jda=95931165.17549613198151684402245.1754961320.1757039732.1757053789.21; __jdb=95931165.30.17549613198151684402245|21.1757053789; __jdc=95931165; 3AB9D23F7A4B3C9B=OFWZGTHWHQYLKSX5BEWHXMTLYAHXEEXC7HBOIZDBIPLMQZT5LTHKICALRZU5ZDL6X6T3NMHRNSJJDX6BTEI6AVJS5I; TrackID=1C24wlDX-QPSyJRaB_1YHqCLhBW6qIo2stht_nwl5g9fGI-lJpP-CZT3TaCr9QVppcvhilhcCe1VhgXMKGWao6Fd3wJ5bQhJ9w9VwWcYOySsXfbCTQbteFWevVN1ZQYp9; thor=4E136F9D9458703D01BE17544D30601F9649F79B89E5FC150CA91054C788CAA1C82670080466F219573AE7FD6EB9ABDF9F52D520671373DAD721CC3B78613FABC99ADA1FAC8E92CDC42F5131682B1F008727F1BA49783B055AFED9349D0B79E53A51F059A1DDE3FC181DD38D1B388D829CE8ADD775D0D30C38A8CAD0519DCD0C; flash=3_KFKaImBVn2spH8stTd9wjKlZQTxgYcZCPXXP_axvJMphR3w29aJNU2c2qPReKxWHRX1lzJ7MfD9iQmHQI-2cKp0dYzs6YsH9eDyB3lQxuu6MtkM8jCiBynVSdRBnr21oDrLKGMeYG6yYlcEsAsbe8OC-yKO69758MJYyMZd_4soV; light_key=AASBKE7rOxgWQziEhC_QY6yakCROyWTrRIF9K9uCpw_IcR8gGNaL7IM6AQuVa-3pJoC9wTze; logining=1; rita=A1EA9FF92ADE7FC61C825E83F126B9E97EF9243BEED9B77E4F7110D6081254A8EEAA66B26BFA00E08CBD8B0C88DD3D292CAD14839A50184501755B761A11F679F63D9DAA76E6785799D2F78AE378F76F32E05C1914C1132995B15CC5F79AFB9314A9D6FE7911DAFE1D958906C016E724"
# 2. 获取配置信息
logger.info("🔵 步骤2: 获取配置信息...")
config = None
for i in range(3):
try:
config = fix_jd_util.get_config(cookies_str)
if config and config.get('data'):
logger.info(f"✅ 第{i + 1}次尝试获取配置成功")
break
else:
logger.warning(f"⚠️ 第{i + 1}次尝试获取配置返回空数据")
except Exception as e:
logger.error(f"获取配置异常({i + 1}/3): {e}")
if i == 2:
return
await asyncio.sleep(3) # 使用异步等待
if not config or not config.get('data'):
logger.error("❌ 获取配置失败")
return
# 3. 提取必要参数
logger.info("🔵 步骤3: 提取配置参数...")
aid = config["data"].get("aid")
vender_id = config["data"].get("venderId")
pin_zj = config["data"].get("pin", "").lower()
if not all([aid, vender_id, pin_zj]):
logger.error("❌ 登录信息不完整,需要重新登录")
return
logger.info(f"✅ 获取到配置: aid={aid}, vender_id={vender_id}, pin_zj={pin_zj}")
# 4. 启动监听
logger.info("🔵 步骤4: 启动消息监听...")
stop_event = asyncio.Event()
try:
await fix_jd_util.message_monitoring(
cookies_str=cookies_str,
aid=aid,
pin_zj=pin_zj,
vender_id=vender_id,
store=store_today,
stop_event=stop_event,
username=username,
password=password
)
except Exception as e:
logger.error(f"❌ 监听过程中出现错误: {e}")
import traceback
logger.error(f"错误详情: {traceback.format_exc()}")
except Exception as e:
logger.error(f"❌ 主程序执行过程中出现错误: {e}")
import traceback
logger.error(f"错误详情: {traceback.format_exc()}")
if __name__ == '__main__':
asyncio.run(main())
# asyncio.run(new_test_login())