Files
shuidrop_gui/Utils/Pdd/PddUtils.py
2025-09-15 14:16:14 +08:00

1402 lines
61 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
"""
@Project magua_project
@File PddUtils.py
@IDE PyCharm
@Author lz
@Date 2025/7/17 16:44
"""
import os
import threading
from concurrent.futures import ThreadPoolExecutor
import base64
import json
import random
import time
import traceback
import websockets
import requests
import asyncio
import execjs
from datetime import datetime
from typing import Dict, Any, Optional, Callable
from Utils.message_models import PlatformMessage
# 定义持久化数据类 - 参考京东的WebsocketManager
class WebsocketManager:
_instance = None
_lock = threading.Lock()
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._store = {}
cls._instance._lock = threading.RLock()
return cls._instance
def on_connect(self, shop_key, ws, **kwargs):
"""完全保持原有数据结构"""
with self._lock:
entry = self._store.setdefault(shop_key, {
'platform': None,
'customers': [],
'user_assignments': {}
})
entry['platform'] = {
'ws': ws, # 注意:这里存储的是强引用
'last_heartbeat': datetime.now(),
**kwargs
}
return entry
def get_connection(self, shop_key):
with self._lock:
return self._store.get(shop_key)
def remove_connection(self, shop_key):
with self._lock:
if shop_key in self._store:
del self._store[shop_key]
class PddBackendService:
"""拼多多后端服务调用类(新版:使用统一后端连接 BackendClient"""
def __init__(self):
self.current_store_id = None
async def connect(self, store_id):
"""连接后端服务(使用统一连接,无需独立连接)"""
self.current_store_id = store_id
return True
async def send_message_to_backend(self, platform_message):
"""改为通过单后端连接发送需携带store_id"""
try:
from WebSocket.backend_singleton import get_backend_client
backend = get_backend_client()
if not backend:
return None
# 确保消息包含store_id
if isinstance(platform_message, dict):
if 'store_id' not in platform_message and self.current_store_id:
platform_message['store_id'] = self.current_store_id
# 通过统一后端连接发送
backend.send_message(platform_message)
return True
except Exception:
return None
class ChatPdd:
@staticmethod
def check_js_files():
"""检查JS文件是否存在"""
current_dir = "static/js"
required_files = ["dencode_message.js", "encode_message.js"]
for file in required_files:
file_path = os.path.join(current_dir, file)
if not os.path.exists(file_path):
raise FileNotFoundError(f"找不到必需的JS文件: {file_path}")
return True
def __init__(self, cookie, chat_list_stat, csname=None, text=None, log_callback=None):
# 检查JS文件
self.check_js_files()
# 获取JS文件路径
current_dir = "static/js"
dencode_js_path = os.path.join(current_dir, "dencode_message.js")
encode_js_path = os.path.join(current_dir, "encode_message.js")
# 读取JS文件
try:
with open(dencode_js_path, 'r', encoding='utf-8') as f:
jscode = f.read()
self.ctx = execjs.compile(jscode)
with open(encode_js_path, 'r', encoding='utf-8') as f:
ejscode = f.read()
self.encodeex = execjs.compile(ejscode)
except Exception as e:
raise RuntimeError(f"读取JS文件失败: {str(e)}")
# 首先设置log_callback然后才能使用_log方法
self.log_callback = log_callback
self._log("初始化ChatPdd实例", "INFO")
self.chat_list_stat = chat_list_stat
self.text = text
self.cookie = cookie
self.auth_token = None
self.mall_id = None
self.user_id = None
self.csname = csname
self.csid = None
self.staff_list_sent = False
self.uids = set()
self.pool = ThreadPoolExecutor(max_workers=4)
# WebSocket管理器
self.ws_manager = WebsocketManager()
# 后端服务实例
self.backend_service = PddBackendService()
self.backend_connected = False
# 重连参数
self.reconnect_attempts = 0
self.max_reconnect_attempts = 10
self.base_reconnect_delay = 1.0
self.max_reconnect_delay = 60.0
self.reconnect_backoff = 1.5
self.headers = {
"authority": "mms.pinduoduo.com",
"accept": "*/*",
"accept-language": "zh-CN,zh;q=0.9",
"anti-content": "0asWfxUeM_VefxObk-_v-fwmt7oS3cmHsWwMkApMVigWOS3hCHfBeF-hHM1_v9LHywCtfzDKkA_F1cUE8_HKBA5D7fVkM2ZDB-KmMxhDM51HXlVrfK_LG4Dc3S0wimw8XYPyXdgJnUv8ndgjs0EynYv8n0XjXU9YXY4Pt0ZVgTgrsIUEeMzMk7s02E3oo-zMdF35CIMWVetBhs95lGG5xENwZwX0CwXH2799MeMjVKzk5DD4Kbs2dM1bD-fJ1F-RImB3SHBkVKDJZbZoUbL4UMk8DFtrISf8CMD4Obk_CM3ICIB_F90ws9ZQ0gcOpVdIIz2PLPVcYt5X0yqYiwjtuNVfdNSf0rmfmNIndXj8G8Nyn4ianRaanoucDp0Dfvd36B6eamPGwOFS0TAyo7nXH_nwxnGS28AgVnqPyPuy8jczb1Oe3xZuPHoJ97BBbaTMfV9OcQ23T-mAUel",
"cache-control": "no-cache",
"content-type": "application/json",
"origin": "https://mms.pinduoduo.com",
"pragma": "no-cache",
"referer": "https://mms.pinduoduo.com/chat-merchant/index.html?r=0.4940608191737519",
"sec-ch-ua": "\"Not_A Brand\";v=\"8\", \"Chromium\";v=\"120\", \"Google Chrome\";v=\"120\"",
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": "\"Windows\"",
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "same-origin",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
}
self.store_id = "538727ec-c84d-4458-8ade-3960c9ab802c" # 可以根据需要修改
self.user_tokens = {} # 存储用户token信息
def _log(self, message, level="INFO"):
"""内部日志方法"""
if self.log_callback:
self.log_callback(message, level)
else:
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
color_map = {
"ERROR": "\033[91m",
"WARNING": "\033[93m",
"SUCCESS": "\033[92m",
"DEBUG": "\033[96m",
}
color = color_map.get(level, "")
reset = "\033[0m"
print(f"{color}[{timestamp}] [{level}] {message}{reset}")
# 重连机制方法
async def calculate_reconnect_delay(self):
"""计算指数退避的重连延迟时间"""
delay = self.base_reconnect_delay * (self.reconnect_backoff ** self.reconnect_attempts)
return min(delay, self.max_reconnect_delay)
async def should_reconnect(self):
"""判断是否应该继续重连"""
if self.reconnect_attempts >= self.max_reconnect_attempts:
self._log(f"已达到最大重连次数({self.max_reconnect_attempts}),停止重连", "ERROR")
return False
return True
async def handle_reconnect(self, exception=None):
"""处理重连逻辑"""
if exception:
error_type = type(exception).__name__
error_msg = str(exception)
self._log(f"连接异常[{error_type}]: {error_msg}", "WARNING")
if not await self.should_reconnect():
return False
delay = await self.calculate_reconnect_delay()
self._log(f"{self.reconnect_attempts + 1}次重连尝试,等待{delay:.1f}秒...", "WARNING")
await asyncio.sleep(delay)
self.reconnect_attempts += 1
return True
# 后端服务调用方法
async def connect_backend_service(self, store_id):
"""连接后端AI服务"""
try:
success = await self.backend_service.connect(store_id)
if success:
self.backend_connected = True
self._log("✅ 后端AI服务连接成功", "SUCCESS")
return success
except Exception as e:
self._log(f"❌ 连接后端AI服务失败: {e}", "ERROR")
return False
async def get_ai_reply_from_backend(self, platform_message):
"""发送消息到后端(使用统一连接,不等待回复)"""
# 首先检查后端服务连接状态
if not self.backend_connected:
# 尝试重新连接
if self.store_id:
self.backend_connected = await self.connect_backend_service(self.store_id)
if not self.backend_connected:
self._log("❌ 后端服务未连接,尝试重新连接失败", "WARNING")
return None
try:
# 发送消息到后端(使用统一连接,不等待回复)
await self.backend_service.send_message_to_backend(platform_message)
self._log("✅ 消息已发送到后端等待后端AI处理并通过GUI转发回复", "INFO")
return None
except Exception as e:
self._log(f"❌ 发送消息到后端失败: {e}", "ERROR")
return None
def get_auth_token(self):
url = "https://mms.pinduoduo.com/janus/api/subSystem/getAuthToken"
data = {"subSystemId": 17,
"anti_content": "0asAfx5E-wCElqJNXaKt_UKccG7YNycjZoPYgO1YTmZoApN7QJU5XT_JuYdPZ4uvLPQFgIcyXOKqm8xGrPjy0lxn0gaXr9ac0TynYEJnDwdj-WEJnwK3G4kc3M0TiPgtB11eBeZkB1hkBxUHFOtjfqz-eAkgc2aa4sZuIJwHOptYX14VK1NJSqIYfqmw6jpQTXs574CfWMFxT1RPTIB2MKBjC199pXpkbdtXxnn2mA4C1YdNnTUrNa_Wvn5mYj5XadnDbNT7xNuVeYXrsTsiipUr6xn2AAXKoYmv6j0PL92PtCTMfZzzfTfjutCgvBTzxFw-2LeeRIkFBATU1Npg1ScUFRv-1Ukrs3AklVAbBiEbBJhzcf2cXKfNH50X9QUFCd_Y1FhLW1BBuI-KEUFYK3lZTB3g_BlLDk8ti-JXmbalXzWAb6V2hX0fZE9n2L0GITFInNS"}
response = requests.post(url, cookies=self.cookie, headers=self.headers, json=data).json()
self.auth_token = response["result"]["authToken"]
def get_mall_id(self):
url = "https://mms.pinduoduo.com/chats/userinfo/realtime?get_response=true"
data = {"get_response": "true"}
response = requests.get(url, cookies=self.cookie, headers=self.headers, params=data).json()
self.mall_id = response["mall_id"]
self.user_id = response["id"]
def get_assign_cslist(self):
url = "https://mms.pinduoduo.com/latitude/assign/getAssignCsList"
data = {
"wechatCheck": True
}
data = json.dumps(data, separators=(',', ':'))
response = requests.post(url, headers=self.headers, cookies=self.cookie, data=data).json()
cslist = response["result"]["csList"]
keys = cslist.keys()
for key in keys:
username = cslist[key].get("username")
if username == self.csname:
self.csid = key
break
def tab_mall(self, uid):
"""执行转接操作"""
self._log(f"🔄 开始执行转接操作 - 用户ID: {uid}, 目标客服ID: {self.csid}", "INFO")
try:
url = "https://mms.pinduoduo.com/plateau/chat/move_conversation"
request_id = int(time.time() * 1000)
data = {
"data": {
"cmd": "move_conversation",
"request_id": request_id,
"conversation": {
"csid": self.csid,
"uid": uid,
"need_wx": False,
"remark": "无原因直接转移"
},
"anti_content": "0asAfxvYXyIgYgE2h9U0sXh6Ia3tZOAzI_rl3hixs1Z5DB-B1cswVK2UB8LfUO8H3OUKdF214-SgPVnY8SYvSdmwnKDoPXAs4OdtzfKwkT6gAO2d4R8ZVTw3a7j9BdB3Hmq7j0bh2LQrRzlt88pkQr-s3jMzWH5QPiz13wQNsJY6h9dnigBE26Ww1dKZcWbsKS4GsgY9_joGBS2FZUz94cbCvE2Ij9nO93b5kR50m9kI1lfBHvDZVIC9rLB-S4Qs4J9ebSsosuNskY9nhgXzCuNoilSQgJaMkP2Rgga2i54nWTTNfnZ4W9GvigkSnwzweN9sx-89BztNCasVkP2C2Kkipif2Dv-t7e2O9MnmtWaKm4wFlHLR7YmeyF3y5gxPRuHlO-gD7BfFle8z71lIV-7SyOLCd83FMUCsRwedwv3-TNTF_VRw_G1DHrptFYTAck8p8qIkxA_PZ14PXct1ZS2zAoZZcTMZZSpJ1jP9RlQARtnNNP0IzCdHiL4JzRQBkkKmwGGXay0i4urnXNm5qBgejFcVCkx8XYdeGBp4p-l1TjKoy603__johKFKZQksRHjOKma5DplwQwg8c3Uo1-mDAUmgAC0XfV2rkxzXj3ABARPlyEaMU8AxUZioaFRa0e5eit6r06uY9m5SyU_mdbTfLIYcKWjRA_UNQisJOUenCbMl_Bywq1KcA6TTzJ9ZHu4xz0o7ctROYLlvhEhAi3bJjFNho6l-TPZK4zSJoj"
},
"client": "WEB",
"anti_content": "0asAfxvYXNIgYgd2i9U0sXh6IabtZOAdI_LlbhNUvZZ5DB-B1cswVK2UBjxfUOjHbOUKdF214-SgPVnYjSYvSdmwnKDoPXAs4OdtzfKwkT6gAO2d4RjZVTwbJFiC0P0rR6LbH89rLVuGHiG6vm3fO6rq_rAyLQlw8CAIuESMR4L308Ctz3V0m9ZhS8tdapQr_YY2vZZk9sAJsMd2ZapG92zm2lO9FLVBjCur2KWwac9nua9g8ztOawyo9ur0Dd96xMkCXrd4J4hR4Kk9zqZBDTlRPkMdzzkJ5Osgi40EC3oVAhSPA9MawGCs33Vnd4bSJER9_b_D9g1sfdeZZKsgo25k0WRfs7dM17E2ug-pPEL5_ib53TuaxLHJq5pGG4jMUjAHRV-gbDffut1Yjy1A7K-6E3zO4tDuUq5T0LPHjf9bB7Apofgr_U_4Aq-7l8x2ks8DhhgOhblTvwrO4owZvuUXA0hfSIUPYK9fIoSUH6aaL8abMMVCvI__qF8Yw1oaMp0d0-fbwMhXQ8fBZYoajXXJv-RV3B2GQEowvMHEUtjDz50jUVduq5fKL9R-WsBRjX0aiExVKLw-PZzxsVup0ZuOYxnPofN92EqKfbvriIWFMsh80eGrvq1vsD1BrMx_mcPMmUmHHEY7wct5lePa0b2Lt5_byqRbbcpohrQrHLlH6JJaTMKQuffyT-60110e5oz_Cz0NbG88TyxOGKijW7BimL5RTjSOTPsK4zS8oi"
}
self._log(f"📤 发送转接请求 - URL: {url}", "DEBUG")
self._log(f"📝 请求数据: {json.dumps(data, ensure_ascii=False)}", "DEBUG")
data_str = json.dumps(data, separators=(',', ':'))
response = requests.post(url, headers=self.headers, cookies=self.cookie, data=data_str)
self._log(f"📥 HTTP状态码: {response.status_code}", "DEBUG")
self._log(f"📄 响应内容: {response.text}", "DEBUG")
# 解析响应
try:
result = response.json()
self._log(f"🔍 解析后的响应: {json.dumps(result, ensure_ascii=False)}", "DEBUG")
# 检查响应结构
if result.get("success"):
result_data = result.get("result", {})
if result_data.get("result") == "success" or result_data.get("result") == "ok":
self._log(f"✅ 转接成功 - 用户 {uid} 已转接给客服 {self.csid}", "SUCCESS")
return True
else:
error_code = result_data.get("error_code", "未知")
error_msg = result_data.get("error_msg", "未知错误")
self._log(f"❌ 转接失败 - 错误码: {error_code}, 错误信息: {error_msg}", "ERROR")
return False
else:
self._log(f"❌ 转接请求失败 - 响应标记为失败", "ERROR")
return False
except json.JSONDecodeError as e:
self._log(f"❌ 解析响应JSON失败: {e}", "ERROR")
self._log(f"❌ 原始响应: {response.text}", "ERROR")
return False
except requests.RequestException as e:
self._log(f"❌ 请求异常: {e}", "ERROR")
return False
except Exception as e:
self._log(f"❌ 转接操作异常: {e}", "ERROR")
traceback.print_exc()
return False
@staticmethod
def forward_message_to_platform(store_id: str, recv_pin: str, content: str):
"""转发消息到拼多多平台"""
try:
pdd_mgr = WebsocketManager()
pdd_shop_key = f"拼多多:{store_id}"
pdd_entry = pdd_mgr.get_connection(pdd_shop_key)
if pdd_entry:
# 找到拼多多连接,使用拼多多转发逻辑
platform_info = pdd_entry.get('platform') or {}
ws = platform_info.get('ws')
loop = platform_info.get('loop')
pdd_instance = platform_info.get('pdd_instance') # ChatPdd实例引用
print(
f"[PDD Forward] shop_key={pdd_shop_key} has_ws={bool(ws)} has_loop={bool(loop)} has_pdd_instance={bool(pdd_instance)} recv_pin={recv_pin}")
if ws and loop and pdd_instance:
async def _send_pdd():
# 直接调用ChatPdd实例的send_ai_reply方法
await pdd_instance.send_ai_reply(recv_pin, content)
import asyncio as _asyncio
_future = _asyncio.run_coroutine_threadsafe(_send_pdd(), loop)
try:
_future.result(timeout=5) # 拼多多需要HTTP请求给更长时间
print(f"[PDD Forward] 已转发到平台: uid={recv_pin}, content_len={len(content)}")
except Exception as fe:
print(f"[PDD Forward] 转发提交失败: {fe}")
else:
print("[PDD Forward] 条件不足,未转发:",
{'has_ws': bool(ws), 'has_loop': bool(loop), 'has_pdd_instance': bool(pdd_instance),
'has_content': bool(content)})
else:
print(f"[PDD Forward] 未找到拼多多连接: {pdd_shop_key}")
except Exception as e:
print(f"[PDD Forward] 拼多多转发失败: {e}")
@staticmethod
def transfer_customer_service(customer_service_id: str, user_id: str, store_id: str):
"""拼多多平台转接"""
try:
pdd_mgr = WebsocketManager()
pdd_shop_key = f"拼多多:{store_id}"
pdd_entry = pdd_mgr.get_connection(pdd_shop_key)
if pdd_entry:
platform_info = pdd_entry.get('platform') or {}
pdd_instance = platform_info.get('pdd_instance')
loop = platform_info.get('loop')
print(f"[PDD Transfer] 找到拼多多连接,准备执行转接: user_id={user_id}, cs_id={customer_service_id}")
if pdd_instance and loop:
# 设置目标客服ID并执行转接
pdd_instance.csid = customer_service_id
def _transfer():
result = pdd_instance.tab_mall(user_id)
if result:
print(f"[PDD Transfer] ✅ 转接成功")
else:
print(f"[PDD Transfer] ❌ 转接失败")
return result
# 在线程池中执行同步转接操作
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(_transfer)
try:
future.result(timeout=10) # 转接操作可能需要更长时间
except Exception as fe:
print(f"[PDD Transfer] 转接操作异常: {fe}")
else:
print(f"[PDD Transfer] 条件不足: has_pdd_instance={bool(pdd_instance)}, has_loop={bool(loop)}")
else:
print(f"[PDD Transfer] 未找到拼多多连接: {pdd_shop_key}")
except Exception as e:
print(f"[PDD Transfer] 拼多多转接失败: {e}")
async def send_ai_reply(self, uid, content):
"""发送AI回复给指定用户"""
self._log(f"开始发送AI回复给用户 {uid}", "INFO")
self._log(f"回复内容: {content[:50]}...", "DEBUG")
try:
ts = int(time.time() * 1000)
url = "https://mms.pinduoduo.com/plateau/chat/send_message"
data = {
"data": {
"cmd": "send_message",
"anti_content": "0asWfqzFdjPssgua6CCWyeEK7LAktf0_UFChvlj8kD8Mk1DVZchg4qURiKAS4iSfiaz9Fjg43ThZZ4_wNh5Onk0e5Tw2mGdwpTEPDZlPdfZwBpyESSDiz0Xalv6mvr0o0uQR6yRlG-vscbK8jYGRNcAr2UktX3bMKhjjx7YvVGbi_poiyypBuOwM_2qZyIM96tdNTfb_AMy0fe1UrN2SqUzfm2stmb4TJldGN9b1mTPr-6t4R9k3pquV_XHwBr20J4__P71fxPBmcXOmXTnQWJs2ddt4KKOIbPIIanas08VNiv9nsHbtvXKp8_CMdL8FQMPmEMDmM6JfjZ4LJp5l-x0BjpBbdWyoztNYqSbBe8hscA1hPFM_R1iR9E-AI-CyuehipODW-ADCL0xEWK875510RsV7mLFQKG0kWO--krZzFFi_6lhdfq3_JhBjDDd3YS1exxSMMS7yDJgskoumfAcu-y1YDO8OdcKWwx3Z0nOqUlnF8LekN7TqXhNtKTgfDZPThVT3JcNm_2TljXRhGWxTZT4o25TTVKo5KPeztcJU5NddFd38nmvo0k8R8HVIkmzOYyaerCDCm1ZDfYgNFx-2vM_JkcMYFdkV7C3BKeg7DxZXHfexAT3UKgruBWYrSs_dBB0X-QkJ2ZcmpStB6KJkZ1azrAAxdw_HMj5UIq4vEXxrA1-YAzJEhMoU0-MncDpoBZfiky7Sr4nDGYrcn86AAYWynOUn0US2DAkICABvSSqkh0vCRpdVeZRfOFsdFWF43KEIph-ckMxDsyq6G3rWXbvPkaAlv67cR1v0N2oCQzDVDT-InNv",
"request_id": ts,
"message": {
"to": {
"role": "user",
"uid": uid
},
"from": {
"role": "mall_cs"
},
"ts": int(ts / 1000),
"content": content,
"msg_id": None,
"type": 0,
"is_aut": 0,
"manual_reply": 1,
"status": "read",
"is_read": 1,
"hash": "1d1359dfb7c141fd95432a22cdc7f43b51434041a1a6016ef6ea2ea2b1abc05a"
},
"random": "c5fc9c61194294c478a6fb395b1c558f"
},
"client": "WEB",
"anti_content": "0asAfa5E-wCEsa-JeFwdFfTDtzhMbODIsAT-eu_-CigAHMbcWIfBEUFcI-Kwd93IYTWtfvkSeuwUKO57jwISBuZkzfCe-2VkBFSD-ack-ZKIXRockMwxBgyDql6V3xKKD-fCkBFZkzeKeBwUE-cSD-KHkBb-kzFVk-szd009EAPjDvBcaPIXxHGB_IxWWlIXMnxghvniTodnXt2scz7sxhJV_CEBcCez4H92maXI2d1qgoYswoG4Mf_oylph0nGigjnqWzXqwTYh0y4W6yYs6PxHyXv8slpHrXyseoqZhfqdrdn5E9BCt24mBfkEFg-vgc0wUw2D-vgE7KUM1FIdKb2_Mbl_6xj_8bvk-e1H6ObSksLCDiVCDAbkYU6Y_SMBfgA5CHUM0QuYhznXIwnTanGM2jugCnxPYPmYjyOvjKHEbaV3nvMuJ6V2cX0fZE99vcfY3TF1nNr"
}
data = json.dumps(data, separators=(',', ':'))
response = await asyncio.get_event_loop().run_in_executor(
self.pool,
lambda: requests.post(url, headers=self.headers, cookies=self.cookie, data=data)
)
# 打印HTTP状态与返回体片段
self._log(f"PDD发送状态: HTTP {response.status_code}", "INFO")
try:
self._log(f"PDD返回体: {response.text[:500]}", "DEBUG")
except Exception:
pass
if response.status_code == 200:
self._log(f"✅ 已发送AI回复给用户 {uid}: {content}", "SUCCESS")
return True
else:
self._log(f"❌ 发送AI回复失败HTTP状态码: {response.status_code}", "ERROR")
self._log(f"响应内容: {response.text}", "DEBUG")
return False
except Exception as e:
self._log(f"❌ 发送AI回复失败: {e}", "ERROR")
traceback.print_exc()
return False
async def send_message_external(self, uid, content):
"""外部消息发送接口,用于接收后端转发的回复"""
self._log(f"[External] 收到外部消息发送请求: uid={uid}, content_len={len(content) if content else 0}", "INFO")
try:
if not uid or not content:
self._log("❌ [External] 参数不完整", "ERROR")
return False
result = await self.send_ai_reply(uid, content)
if result:
self._log(f"✅ [External] 外部消息发送成功: uid={uid}", "SUCCESS")
else:
self._log(f"❌ [External] 外部消息发送失败: uid={uid}", "ERROR")
return result
except Exception as e:
self._log(f"❌ [External] 外部消息发送异常: {e}", "ERROR")
return False
async def get_all_customer(self, ws):
"""异步获取客服列表"""
try:
# PDD获取客服列表的API
url = "https://mms.pinduoduo.com/latitude/assign/getAssignCsList"
data = {
"wechatCheck": True
}
data = json.dumps(data, separators=(',', ':'))
# 使用线程池执行同步请求
response = await asyncio.get_event_loop().run_in_executor(
self.pool,
lambda: requests.post(url, headers=self.headers, cookies=self.cookie, data=data)
)
if response.status_code == 200:
result = response.json()
cslist = result.get("result", {}).get("csList", {})
print("我们收集到的客服列表原信息为:", str(cslist))
if cslist:
# 转换客服列表格式
self.customer_list = []
for key, customer in cslist.items():
customer_info = {
"staff_id": str(key), # 客服ID
"name": customer.get("username", ""), # 客服名称
"status": customer.get("status", 0), # 状态
"department": customer.get("department", ""), # 部门
"online": True # 在线状态
}
self.customer_list.append(customer_info)
self._log(f"✅ 成功获取客服列表,共 {len(self.customer_list)} 个客服", "SUCCESS")
return True
return False
except Exception as e:
self._log(f"❌ 获取客服列表失败: {e}", "ERROR")
return False
async def send_staff_list_to_backend(self):
"""发送客服列表到后端"""
try:
if not hasattr(self, 'customer_list') or not self.customer_list:
self._log("⚠️ 客服列表为空", "WARNING")
return False
# 创建消息模板
message_template = PlatformMessage(
type="staff_list",
content="客服列表更新",
data={
"staff_list": self.customer_list,
"total_count": len(self.customer_list)
},
store_id=self.store_id
)
# 通过后端服务发送
await self.backend_service.send_message_to_backend(message_template.to_dict())
self._log(f"发送客服列表消息的结构体为: {message_template.to_json()}")
self._log(f"✅ [PDD] 成功发送客服列表到后端,共 {len(self.customer_list)} 个客服", "SUCCESS")
print(f"🔥 [PDD] 客服列表已上传到后端: {len(self.customer_list)} 个客服")
return True
except Exception as e:
self._log(f"❌ 发送客服列表到后端异常: {e}", "ERROR")
return False
async def handle_transfer_message(self, message_data):
"""处理转接消息"""
self._log("收到转接指令", "INFO")
try:
content = message_data.get("content", "") # 转接目标客服ID
receiver_info = message_data.get("receiver", {})
uid = receiver_info.get("id") # 用户ID
if not content or not uid:
self._log("❌ 转接消息信息不完整", "WARNING")
self._log(f"消息内容: {message_data}", "DEBUG")
return False
self._log(f"准备将用户 {uid} 转接给客服 {content}", "INFO")
# 设置转接目标客服ID
self.csid = content
# 在线程池中执行同步的转接操作
try:
# 使用线程池执行同步方法
success = await asyncio.get_event_loop().run_in_executor(
self.pool,
self.tab_mall,
uid
)
if success:
self._log(f"✅ 转接成功: 用户 {uid} 已转接给客服 {content}", "SUCCESS")
return True
else:
self._log(f"❌ 转接失败: 用户 {uid}", "ERROR")
return False
except Exception as e:
self._log(f"❌ 执行转接操作失败: {e}", "ERROR")
return False
except Exception as e:
self._log(f"❌ 处理转接消息失败: {e}", "ERROR")
traceback.print_exc()
return False
async def handle_customer_message(self, message_data):
"""处理来自后端的客服消息"""
self._log("收到来自后端的客服消息", "INFO")
try:
content = message_data.get("content", "")
receiver_info = message_data.get("receiver", {})
uid = receiver_info.get("id")
if uid and content:
self._log(f"准备发送客服消息给用户 {uid}: {content}", "INFO")
await self.send_ai_reply(uid, content)
self._log(f"客服消息发送完成 - 目标用户: {uid}", "SUCCESS")
else:
self._log("客服消息数据不完整", "WARNING")
self._log(f"消息内容: {message_data}", "DEBUG")
except Exception as e:
self._log(f"处理客服消息失败: {e}", "ERROR")
traceback.print_exc()
async def process_incoming_message(self, message_data, wss, store):
"""处理接收到的消息"""
try:
message_info = message_data.get("message", {})
if not message_info:
return
nickname = message_info.get("nickname")
content = message_info.get("content")
goods_info = message_info.get("info", {})
from_info = message_info.get("from", {})
uid = from_info.get("uid")
if nickname and content and uid:
self._log(f"用户消息 - {nickname}: {content}", "INFO")
self._log(f"用户UID: {uid}", "DEBUG")
self.uids.add(uid)
# 消息类型检测
lc = str(content).lower()
if any(ext in lc for ext in [".jpg", ".jpeg", ".png", ".gif", ".webp"]):
msg_type = "image"
elif any(ext in lc for ext in [".mp4", ".avi", ".mov", ".wmv", ".flv"]):
msg_type = "video"
else:
msg_type = "text"
# 订单卡片组装(使用全角冒号,符合文档)
if "订单编号" in str(content) and goods_info:
content = f"商品id{goods_info.get('goodsID')} 订单号:{goods_info.get('orderSequenceNo')}"
msg_type = "order_card"
# 商品卡片检测基于内容关键词和goods_info
elif any(keyword in lc for keyword in ['goods.html', 'item.html', 'item.jd.com', '商品卡片id']) or \
(goods_info and goods_info.get('goodsID') and not goods_info.get('orderSequenceNo')):
msg_type = "product_card"
# 创建消息模板(符合文档)
message_template = PlatformMessage.create_text_message(
content=content,
sender_id=str(uid),
store_id=self.store_id
)
message_template.msg_type = msg_type
try:
print(f"📤(SPEC) 发送到AI: {json.dumps(message_template.to_dict(), ensure_ascii=False)[:300]}...")
except Exception:
pass
# 从后端服务获取AI回复单连接模式仅转交不本地立即回复
ai_result = await self.get_ai_reply_from_backend(message_template.to_dict())
if isinstance(ai_result, str) and ai_result.strip():
# 发送回复消息(仅当本地生成/降级时)
await self.send_ai_reply(uid, ai_result)
self._log(f"📤 已发送回复: {ai_result[:100]}...", "INFO")
else:
# 正常链路已转交后端AI等待后端异步回传并由 GUI 转发到平台
self._log("🔄 已转交后端AI处理等待平台回复下发", "INFO")
except Exception as e:
self._log(f"❌ 消息处理失败: {e}", "ERROR")
traceback.print_exc()
@staticmethod
async def heartbeat(wss):
while True:
m = random.randint(100, 200)
h = [0, 0, 0, 0, 0, 0, 0, m, 0, 0, 0, 1, 0, 0, 0, 0]
h = bytes(h)
await wss.send(bytes(h))
await asyncio.sleep(15)
async def on_message(self, wss, store):
self._log("开始监听消息", "INFO")
while True:
try:
message = await wss.recv()
self._log(f"收到新消息 {base64.b64encode(message).decode()}", "DEBUG")
try:
text = self.ctx.call("dencode_data", base64.b64encode(message).decode())
if not isinstance(text, dict):
continue
push = text.get("push_data") or {}
data = push.get("data")
if not data:
self._log("push_data.data 为空,忽略", "DEBUG")
continue
print(f"原始数据格式打印:{data}")
for d in data:
await self.process_incoming_message(d, wss, store)
except Exception as e:
# buffer error是正常的调整为DEBUG级别
if "buffer error" in str(e):
self._log(f"解析消息失败: {e}", "DEBUG")
else:
self._log(f"解析消息失败: {e}", "ERROR")
traceback.print_exc()
except Exception as e:
# self._log(f"处理消息失败: {e}", "ERROR")
# traceback.print_exc()
pass
async def message_monitoring(self, cookies_str, store, stop_event=None):
"""消息监听主方法"""
print("✅ DEBUG 进入拼多多message_monitoring方法")
print(f"参数验证: cookies={bool(cookies_str)}")
# 连接后端AI服务 - 使用店铺ID
store_id = str(store.get('id', ''))
self._log(f"🔗 尝试连接后端服务店铺ID: {store_id}", "DEBUG")
backend_connected = await self.connect_backend_service(store_id)
if not backend_connected:
self._log("⚠️ 后端服务连接失败将使用本地AI回复", "WARNING")
else:
self._log("✅ 后端服务连接成功", "SUCCESS")
stop_event = stop_event or asyncio.Event()
# 充值重连计数器
self.reconnect_attempts = 0
while not stop_event.is_set():
try:
self._log(f"🔄 尝试连接拼多多WebSocket", "INFO")
if self.user_id and self.auth_token:
uri = "wss://titan-ws.pinduoduo.com/"
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36",
"Upgrade": "websocket",
"Origin": "https://mms.pinduoduo.com",
"Accept-Encoding": "gzip, deflate, br, zstd",
"Accept-Language": "zh-CN,zh;q=0.9",
"Sec-WebSocket-Extensions": "permessage-deflate; client_max_window_bits",
"cookie": "; ".join([f"{k}={v}" for k, v in self.cookie.items()])
}
async with websockets.connect(uri, additional_headers=headers, ping_interval=20,
ping_timeout=16) as websocket:
# 连接成功,重置重连计数器
self.reconnect_attempts = 0
self._log("✅ WebSocket-PDD连接成功", "SUCCESS")
z = self.encodeex.call("encode_token", str(self.user_id), self.auth_token)
if z:
await websocket.send(bytes(z))
# 注册连接信息到全局管理
shop_key = f"拼多多:{store['id']}"
loop = asyncio.get_running_loop()
entry = self.ws_manager.on_connect(
shop_key=shop_key,
ws=websocket,
platform="拼多多",
loop=loop,
pdd_instance=self # 存储ChatPdd实例引用用于消息转发
)
# 获取客服列表并发送到后端
if not self.staff_list_sent:
self._log("🔍 开始获取客服列表...", "INFO")
try:
if await self.get_all_customer(None):
await self.send_staff_list_to_backend()
self.staff_list_sent = True
else:
self._log("⚠️ 获取客服列表失败", "WARNING")
except Exception as e:
self._log(f"❌ 获取或发送客服列表失败: {e}", "ERROR")
# 启动消息监听和心跳
await asyncio.gather(
self.heartbeat(websocket),
self.on_message(websocket, store)
)
else:
self._log("未定制账号token数组", "ERROR")
except websockets.ConnectionClosed as e:
self._log(f"🔌 连接已关闭: {e.code} {e.reason}", "WARNING")
if not await self.handle_reconnect(e):
break
except (websockets.WebSocketException, OSError) as e:
self._log(f"🌐 网络异常: {type(e).__name__} - {str(e)}", "WARNING")
if not await self.handle_reconnect(e):
break
except Exception as e:
self._log(f"⚠️ 未知异常: {type(e).__name__} - {str(e)}", "ERROR")
if not await self.handle_reconnect(e):
break
# 关闭后端服务连接
if self.backend_connected:
self.backend_connected = False
self._log("🛑 消息监听已停止", "INFO")
async def main(self):
self.get_auth_token()
self.get_mall_id()
self.get_assign_cslist()
# 连接AI服务
if await self.connect_backend_service(self.store_id):
print("✅ AI服务连接成功")
store = {'id': self.store_id}
await self.message_monitoring(
cookies_str=self.cookie,
store=store
)
else:
print("❌ AI服务连接失败")
return
class PddListenerForGUI:
"""用于GUI集成的拼多多监听包装器"""
def __init__(self, log_callback: Optional[Callable] = None):
self.pdd_bot = None
self.log_callback = log_callback
self.running = False
self.main_thread = None
self.loop = None
self.startup_success = False
self.startup_event = threading.Event()
self.startup_error = None
def _log(self, message: str, log_type: str = "INFO"):
if self.log_callback:
self.log_callback(message, log_type)
else:
print(f"[{log_type}] {message}")
async def start_listening(self, cookie_dict: Dict[str, str], chat_list_stat: bool = False,
csname: Optional[str] = None, text: Optional[str] = None) -> bool:
try:
self._log("🔵 开始拼多多平台连接流程", "INFO")
if not cookie_dict:
self._log("❌ Cookie信息不能为空", "ERROR")
return False
self._log(f"✅ Cookie验证通过包含 {len(cookie_dict)} 个字段", "SUCCESS")
self.startup_success = False
self.startup_error = None
self.startup_event.clear()
# 创建ChatPdd实例
self.pdd_bot = ChatPdd(
cookie=cookie_dict,
text=text,
chat_list_stat=chat_list_stat,
csname=csname,
log_callback=self.log_callback
)
self.running = True
self._log("🎉 开始监听拼多多平台消息...", "SUCCESS")
def run_pdd_main():
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
self.loop = loop
async def main_wrapper():
try:
# 直接调用ChatPdd的main方法
await self.pdd_bot.main()
self.startup_success = True
self._log("✅ ChatPdd主程序执行完成", "SUCCESS")
except Exception as e:
self.startup_error = str(e)
self._log(f"❌ ChatPdd运行出错: {str(e)}", "ERROR")
finally:
self.startup_event.set()
loop.run_until_complete(main_wrapper())
except Exception as e:
self.startup_error = str(e)
self._log(f"❌ ChatPdd运行出错: {str(e)}", "ERROR")
self.startup_event.set()
finally:
self.running = False
self.main_thread = threading.Thread(target=run_pdd_main, daemon=True)
self.main_thread.start()
self._log("🔄 等待ChatPdd初始化完成...", "INFO")
# 只等待初始化完成,不设置超时
self.startup_event.wait()
if self.startup_success:
self._log("✅ 拼多多监听启动成功", "SUCCESS")
return True
else:
error_msg = self.startup_error or "未知启动错误"
self._log(f"❌ 拼多多监听启动失败: {error_msg}", "ERROR")
return False
except Exception as e:
self._log(f"❌ 监听启动过程中出现严重错误: {str(e)}", "ERROR")
return False
async def start_with_login_params(self, store_id: str, login_params: str):
"""使用后端下发的登录参数执行登录并启动监听"""
try:
self._log("🔵 [PDD] 收到后端登录参数开始执行登录获取cookies", "INFO")
# 1. 解析登录参数
params_dict = self._parse_login_params(login_params)
if not params_dict:
self._log("❌ [PDD] 登录参数解析失败", "ERROR")
return False
# 2. 执行登录获取Cookie
cookies = await self._execute_pdd_login(params_dict)
if not cookies:
self._log("❌ [PDD] 登录失败无法获取cookies", "ERROR")
return False
# 3. 使用获取的Cookie继续原有流程
self._log("✅ [PDD] 登录成功使用获取的cookies连接平台", "SUCCESS")
return await self.start_with_cookies(store_id, cookies)
except Exception as e:
self._log(f"❌ [PDD] 使用登录参数启动失败: {str(e)}", "ERROR")
import traceback
self._log(f"错误详情: {traceback.format_exc()}", "DEBUG")
return False
async def start_with_cookies(self, store_id: str, cookies: str):
"""使用下发的cookies与store_id直接建立PDD平台WS并开始监听"""
try:
self._log("🔵 [PDD] 收到后端登录指令开始使用cookies连接平台", "INFO")
# 验证cookie
if not cookies:
self._log("❌ Cookie信息不能为空", "ERROR")
return False
self._log(f"✅ [PDD] Cookie验证通过长度: {len(cookies)}", "SUCCESS")
# 获取统一后端服务
from WebSocket.backend_singleton import get_backend_client
backend_service = get_backend_client()
if not backend_service:
self._log("❌ [PDD] 无法获取后端服务", "ERROR")
return False
# 解析cookies字符串为字典
cookie_dict = {}
if cookies:
try:
# 如果cookies是字符串形式的字典先尝试解析
if cookies.startswith('{') and cookies.endswith('}'):
import ast
import json
try:
# 首先尝试JSON解析双引号格式
cookie_dict = json.loads(cookies) if isinstance(cookies, str) else cookies
except json.JSONDecodeError:
# 如果JSON解析失败尝试Python字典格式单引号格式
cookie_dict = ast.literal_eval(cookies)
else:
# 传统的cookie字符串解析
for cookie_pair in cookies.split(';'):
if '=' in cookie_pair:
key, value = cookie_pair.strip().split('=', 1)
cookie_dict[key] = value
except Exception as e:
self._log(f"⚠️ 解析cookies失败: {e}", "ERROR")
self._log("❌ [PDD] 无法解析cookies连接失败", "ERROR")
return False
# 验证cookies解析结果
if not isinstance(cookie_dict, dict):
self._log("❌ [PDD] cookies解析失败不是字典格式", "ERROR")
return False
if not cookie_dict:
self._log("❌ [PDD] cookies为空", "ERROR")
return False
self._log(f"✅ [PDD] cookies解析成功包含 {len(cookie_dict)} 个字段", "SUCCESS")
# 创建ChatPdd实例
self.pdd_bot = ChatPdd(
cookie=cookie_dict,
text=None,
chat_list_stat=False,
csname=None,
log_callback=self.log_callback
)
# 设置store_id和后端服务
self.pdd_bot.store_id = store_id
# 使用统一的后端服务而不是独立的PddBackendService
backend_service_wrapper = PddBackendService()
await backend_service_wrapper.connect(store_id)
self.pdd_bot.backend_service = backend_service_wrapper
self.pdd_bot.backend_connected = True
self.running = True
self._log("🎉 [PDD] 开始监听平台消息", "SUCCESS")
# 获取认证信息
self.pdd_bot.get_auth_token()
self.pdd_bot.get_mall_id()
self.pdd_bot.get_assign_cslist()
# 启动监听
store = {'id': store_id}
await self.pdd_bot.message_monitoring(
cookies_str=cookies,
store=store
)
self._log("✅ [PDD] 拼多多平台连接成功,开始监听消息", "SUCCESS")
return True
except Exception as e:
self._log(f"❌ [PDD] 监听过程中出现严重错误: {str(e)}", "ERROR")
import traceback
self._log(f"错误详情: {traceback.format_exc()}", "DEBUG")
return False
def stop_listening(self):
try:
self._log("🛑 开始停止拼多多监听...", "INFO")
self.running = False
if self.main_thread and self.main_thread.is_alive():
try:
self._log("🔄 等待主线程结束...", "DEBUG")
self.main_thread.join(timeout=5.0)
if self.main_thread.is_alive():
self._log("⚠️ 主线程未在超时时间内结束", "WARNING")
except Exception as e:
self._log(f"⚠️ 等待主线程结束时出错: {e}", "DEBUG")
self._log("✅ 拼多多监听已停止", "SUCCESS")
except Exception as e:
self._log(f"❌ 停止监听时出错: {str(e)}", "ERROR")
def is_running(self) -> bool:
return (self.running and
self.main_thread is not None and
self.main_thread.is_alive() and
self.startup_success)
def _parse_login_params(self, login_params_str: str) -> dict:
"""解析后端下发的登录参数"""
try:
import json
data = json.loads(login_params_str)
params = data.get("data", {}).get("login_params", {})
self._log(f"✅ [PDD] 登录参数解析成功: username={params.get('username', 'N/A')}", "INFO")
return params
except Exception as e:
self._log(f"❌ [PDD] 解析登录参数失败: {e}", "ERROR")
return {}
async def _execute_pdd_login(self, login_params: dict) -> str:
"""使用后端参数执行拼多多登录"""
try:
# 1. 构造登录请求
login_request = self._build_login_request(login_params)
if not login_request:
return ""
# 2. 发送登录请求
response_data = await self._send_login_request(login_request)
if not response_data:
return ""
# 3. 处理登录响应
if "需要验证图形验证码" in str(response_data):
self._log("⚠️ [PDD] 登录需要验证码,暂不支持自动处理", "WARNING")
return ""
elif response_data.get("success", False):
cookies = response_data.get("cookies", {})
if cookies:
# 将cookies字典转换为字符串格式与原有逻辑兼容
import json
cookies_str = json.dumps(cookies)
self._log(f"✅ [PDD] 登录成功获取到cookies: {len(cookies)} 个字段", "SUCCESS")
return cookies_str
else:
self._log("❌ [PDD] 登录成功但未获取到cookies", "ERROR")
return ""
else:
error_msg = response_data.get("errorMsg", "登录失败")
self._log(f"❌ [PDD] 登录失败: {error_msg}", "ERROR")
return ""
except Exception as e:
self._log(f"❌ [PDD] 执行登录失败: {e}", "ERROR")
import traceback
self._log(f"错误详情: {traceback.format_exc()}", "DEBUG")
return ""
def _build_login_request(self, login_params: dict) -> dict:
"""构造登录请求体复用pdd_login/login.py的结构"""
try:
username = login_params.get("username", "")
password = login_params.get("password", "") # 后端已加密
anti_content = login_params.get("anti_content", "")
risk_sign = login_params.get("risk_sign", "")
timestamp = login_params.get("timestamp", int(time.time() * 1000))
if not all([username, password, anti_content, risk_sign]):
self._log("❌ [PDD] 登录参数不完整", "ERROR")
return {}
import random
# 构造请求头
headers = {
"authority": "mms.pinduoduo.com",
"accept": "*/*",
"accept-language": "zh-CN,zh;q=0.9",
"cache-control": "no-cache",
"content-type": "application/json;charset=UTF-8",
"origin": "https://mms.pinduoduo.com",
"pragma": "no-cache",
"referer": "https://mms.pinduoduo.com/",
"sec-ch-ua": '"Not_A Brand";v="8", "Chromium";v="120", "Google Chrome";v="120"',
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": '"Windows"',
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "same-origin",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"anti-content": anti_content # 后端提供的关键参数
}
# 构造请求体完全复用pdd_login/login.py的结构
payload = {
"username": username,
"password": password, # 后端已加密
"passwordEncrypt": True,
"verificationCode": "",
"mobileVerifyCode": "",
"sign": "",
"touchevent": {
"mobileInputEditStartTime": timestamp - random.randint(10000, 20000),
"mobileInputEditFinishTime": timestamp - random.randint(8000, 15000),
"mobileInputKeyboardEvent": "0|1|1|-6366-6942-142",
"passwordInputEditStartTime": timestamp - random.randint(5000, 10000),
"passwordInputEditFinishTime": timestamp - random.randint(3000, 8000),
"passwordInputKeyboardEvent": "0|1|1|195-154-755-477",
"captureInputEditStartTime": "",
"captureInputEditFinishTime": "",
"captureInputKeyboardEvent": "",
"loginButtonTouchPoint": "1263,586",
"loginButtonClickTime": timestamp - random.randint(100, 1000)
},
"fingerprint": {
"innerHeight": 906,
"innerWidth": 1707,
"devicePixelRatio": 1.5,
"availHeight": 1027,
"availWidth": 1707,
"height": 1067,
"width": 1707,
"colorDepth": 24,
"locationHref": "https://mms.pinduoduo.com/login/sso?platform=wholesale&redirectUrl=htt",
"clientWidth": 1707,
"clientHeight": 906,
"offsetWidth": 1707,
"offsetHeight": 906,
"scrollWidth": 2882,
"scrollHeight": 906,
"navigator": {
"appCodeName": "Mozilla",
"appName": "Netscape",
"hardwareConcurrency": 16,
"language": "zh-CN",
"cookieEnabled": True,
"platform": "Win32",
"doNotTrack": True,
"ua": headers["user-agent"],
"vendor": "Google Inc.",
"product": "Gecko",
"productSub": "20030107",
"mimeTypes": "f5a1111231f589322da33fb59b56946b4043e092",
"plugins": "387b918f593d4d8d6bfa647c07e108afbd7a6223"
},
"referer": "",
"timezoneOffset": -480
},
"riskSign": risk_sign, # 后端已加密
"timestamp": timestamp,
"crawlerInfo": anti_content # 后端提供
}
self._log("✅ [PDD] 登录请求构造成功", "INFO")
return {
"url": "https://mms.pinduoduo.com/janus/api/auth",
"headers": headers,
"payload": payload
}
except Exception as e:
self._log(f"❌ [PDD] 构造登录请求失败: {e}", "ERROR")
return {}
async def _send_login_request(self, login_request: dict) -> dict:
"""发送登录请求"""
try:
import requests
import asyncio
url = login_request["url"]
headers = login_request["headers"]
payload = login_request["payload"]
# 使用线程池执行同步请求
def _send_request():
response = requests.post(url, headers=headers, json=payload, timeout=30)
return response
# 在事件循环中执行
loop = asyncio.get_event_loop()
response = await loop.run_in_executor(None, _send_request)
self._log(f"✅ [PDD] 登录请求发送完成,状态码: {response.status_code}", "INFO")
if response.status_code == 200:
result = response.json()
cookies = response.cookies.get_dict()
return {
"success": result.get("success", False),
"errorMsg": result.get("errorMsg", ""),
"cookies": cookies,
"response_text": response.text
}
else:
self._log(f"❌ [PDD] 登录请求失败HTTP状态码: {response.status_code}", "ERROR")
return {}
except Exception as e:
self._log(f"❌ [PDD] 发送登录请求异常: {e}", "ERROR")
return {}
def get_status(self) -> Dict[str, Any]:
return {
"running": self.running,
"has_bot": self.pdd_bot is not None,
"main_thread_alive": self.main_thread.is_alive() if self.main_thread else False,
"startup_success": self.startup_success,
"startup_error": self.startup_error
}
# 测试函数保留
async def test_gui_integration():
def custom_log_callback(message: str, log_type: str):
print(f"[GUI测试] [{log_type}] {message}")
test_cookies = {
"api_uid": "CiKdOWi361Z8egCe1uihAg==",
"_nano_fp": "Xpmyn5CJXqmbnqPyl9_MofCSZEeib112RR6N1Ah9",
"rckk": "v8QcMiTwfCX72hCnHFVtXSg4oHNvs6Qs",
"_bee": "v8QcMiTwfCX72hCnHFVtXSg4oHNvs6Qs",
"ru1k": "cb1cec9f-8895-4301-8e7e-a56c2c830cc5",
"_f77": "cb1cec9f-8895-4301-8e7e-a56c2c830cc5",
"ru2k": "f24003c6-339b-4498-b995-c4200179d4c6",
"_a42": "f24003c6-339b-4498-b995-c4200179d4c6",
"mms_b84d1838": "3616,3523,3660,3614,3599,3621,3588,3254,3532,3642,3474,3475,3477,3479,3482,1202,1203,1204,1205,3417",
"PASS_ID": "1-Ncbwg/naTW6KMMZg7FCtK5ctxvsC84uMK9wM55CJsDJnnoQ8d5jptFdwMkC08e3lKMJw/mGLQXYNL3iCvwTiCQ_109909969_163439093",
"x-visit-time": "1757554980094",
"JSESSIONID": "329E5C258BD7920211D15E263490C895"
}
print("🧪 开始测试GUI集成类...")
listener = PddListenerForGUI(log_callback=custom_log_callback)
try:
print("\n🔵 启动GUI监听...")
success = await listener.start_listening(
cookie_dict=test_cookies,
chat_list_stat=False,
csname=None,
text="测试"
)
if success:
print("✅ GUI监听启动成功")
print(f"📊 当前状态: {listener.get_status()}")
print("\n🟢 监听运行中,将持续运行直到手动停止...")
while True:
if not listener.is_running():
print("❌ 监听意外停止")
break
await asyncio.sleep(1)
print("\n🛑 测试完成,开始停止监听...")
listener.stop_listening()
print(f"📊 停止后状态: {listener.get_status()}")
print("\n✅ GUI集成类测试完成")
print("📋 测试结果GUI集成类与原main方法逻辑完全一致")
else:
print("❌ GUI监听启动失败")
if listener.startup_error:
print(f"❌ 错误原因: {listener.startup_error}")
except KeyboardInterrupt:
print("\n🛑 用户中断测试")
listener.stop_listening()
except Exception as e:
print(f"❌ 测试过程中出错: {e}")
listener.stop_listening()
# 同时保留原有的main函数供直接调用
async def main():
cookies = {
"api_uid": "CiKdOWi361Z8egCe1uihAg==",
"_nano_fp": "Xpmyn5CJXqmbnqPyl9_MofCSZEeib112RR6N1Ah9",
"rckk": "v8QcMiTwfCX72hCnHFVtXSg4oHNvs6Qs",
"_bee": "v8QcMiTwfCX72hCnHFVtXSg4oHNvs6Qs",
"ru1k": "cb1cec9f-8895-4301-8e7e-a56c2c830cc5",
"_f77": "cb1cec9f-8895-4301-8e7e-a56c2c830cc5",
"ru2k": "f24003c6-339b-4498-b995-c4200179d4c6",
"_a42": "f24003c6-339b-4498-b995-c4200179d4c6",
"mms_b84d1838": "3616,3523,3660,3614,3599,3621,3588,3254,3532,3642,3474,3475,3477,3479,3482,1202,1203,1204,1205,3417",
"PASS_ID": "1-P3kEWMDAT6n7mZjZpx1poVmxfusHa6Qd0s+dhQlG7SiqNA8A5LCsjbbRpck4WHjVSve3G+x0N4ZEL7Dcz2L+vg_109909969_163439093",
"x-visit-time": "1757399096750",
"JSESSIONID": "04601A22B2F955001EFEA28A2F64ED79"
}
await ChatPdd(cookie=cookies, chat_list_stat=False).main()
if __name__ == '__main__':
# 可以选择直接运行main()或者测试GUI集成
# 测试GUI集成
asyncio.run(test_gui_integration())
# asyncio.run(main())