Files
shuidrop_gui/Utils/Pdd/PddUtils.py

3066 lines
107 KiB
Python
Raw Normal View History

2025-09-12 20:42:00 +08:00
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
"""
@Project magua_project
@File PddUtils.py
@IDE PyCharm
@Author lz
@Date 2025/7/17 16:44
"""
import os
import threading
from concurrent.futures import ThreadPoolExecutor
import base64
import json
import random
import time
import traceback
import websockets
import requests
import asyncio
import execjs
from datetime import datetime
from typing import Dict, Any, Optional, Callable
# 新增导入,用于登录功能
import PIL, numpy as np, cv2
from Crypto.PublicKey import RSA
from Crypto.Cipher import PKCS1_v1_5, AES
from Crypto.Util.Padding import pad
from pathlib import Path
from loguru import logger
2025-09-12 20:42:00 +08:00
from Utils.message_models import PlatformMessage
# ===== 登录相关类集成开始 =====
class Track:
"""滑块轨迹生成类"""
@staticmethod
def get_track(distance):
distance = int(distance) + random.randint(7, 11) + 0.8
track_list = [
[
0.8,
178.4,
1733388719067
],
[
4.8,
179.4,
1733388719169
],
[
6.8,
179.4,
1733388719184
],
[
8.8,
179.4,
1733388719250
],
[
8.8,
179.4,
1733388719266
],
[
9.8,
180.4,
1733388719762
],
[
10.8,
180.4,
1733388719914
],
[
11.8,
180.4,
1733388719930
],
[
11.8,
180.4,
1733388719954
],
[
12.8,
180.4,
1733388719982
],
[
12.8,
180.4,
1733388720030
],
[
13.8,
180.4,
1733388720058
],
[
14.8,
180.4,
1733388720090
],
[
15.8,
180.4,
1733388720110
],
[
16.8,
180.4,
1733388720170
],
[
17.8,
180.4,
1733388720202
],
[
19.8,
180.4,
1733388720218
],
[
22.8,
181.4,
1733388720258
],
[
23.8,
181.4,
1733388720319
],
[
24.8,
181.4,
1733388720428
],
[
24.8,
182.4,
1733388720458
],
[
26.8,
182.4,
1733388720478
],
[
28.8,
182.4,
1733388720495
],
[
30.8,
182.4,
1733388720511
],
[
32.8,
183.4,
1733388720527
],
[
33.8,
183.4,
1733388720558
],
[
35.8,
183.4,
1733388720574
],
[
36.8,
184.4,
1733388720591
],
[
38.8,
184.4,
1733388720619
],
[
39.8,
184.4,
1733388720662
],
[
40.8,
184.4,
1733388720686
],
[
41.8,
184.4,
1733388720710
],
[
42.8,
184.4,
1733388720742
],
[
43.8,
184.4,
1733388720758
],
[
45.8,
184.4,
1733388720783
],
[
46.8,
184.4,
1733388720798
],
[
48.8,
184.4,
1733388720831
],
[
49.8,
184.4,
1733388720850
],
[
50.8,
184.4,
1733388720874
],
[
52.8,
184.4,
1733388720922
],
[
53.8,
184.4,
1733388720958
],
[
55.8,
184.4,
1733388720974
],
[
55.8,
184.4,
1733388720990
],
[
56.8,
184.4,
1733388721010
],
[
57.8,
184.4,
1733388721051
],
[
58.8,
184.4,
1733388721074
],
[
58.8,
185.4,
1733388721090
],
[
59.8,
185.4,
1733388721106
],
[
60.8,
185.4,
1733388721214
],
[
60.8,
185.4,
1733388721270
],
[
61.8,
185.4,
1733388721361
],
[
63.8,
186.4,
1733388721374
],
[
68.8,
186.4,
1733388721390
],
[
71.8,
187.4,
1733388721406
],
[
72.8,
187.4,
1733388721422
],
[
73.8,
187.4,
1733388721442
],
[
74.8,
187.4,
1733388721602
],
[
75.8,
187.4,
1733388721650
],
[
79.8,
187.4,
1733388721674
],
[
81.8,
187.4,
1733388721690
],
[
83.8,
188.4,
1733388721726
],
[
83.8,
188.4,
1733388722055
],
[
91.8,
188.4,
1733388722241
],
[
92.8,
188.4,
1733388722346
],
[
92.8,
188.4,
1733388722494
],
[
94.8,
188.4,
1733388722511
],
[
97.8,
188.4,
1733388722527
],
[
101.8,
189.4,
1733388722543
],
[
102.8,
189.4,
1733388722566
],
[
103.8,
189.4,
1733388722666
],
[
104.8,
189.4,
1733388722722
],
[
106.8,
190.4,
1733388722742
],
[
108.8,
190.4,
1733388722759
],
[
112.8,
190.4,
1733388722775
],
[
115.8,
191.4,
1733388722791
],
[
116.8,
191.4,
1733388722807
],
[
118.8,
192.4,
1733388722911
],
[
119.8,
192.4,
1733388722931
],
[
120.8,
192.4,
1733388722962
],
[
123.8,
192.4,
1733388722987
],
[
124.8,
192.4,
1733388723006
],
[
125.8,
192.4,
1733388723023
],
[
127.8,
192.4,
1733388723098
],
[
128.8,
192.4,
1733388723186
],
[
129.8,
193.4,
1733388723258
],
[
132.8,
193.4,
1733388723278
],
[
135.8,
193.4,
1733388723294
],
[
137.8,
194.4,
1733388723318
],
[
138.8,
194.4,
1733388723562
],
[
140.8,
194.4,
1733388723582
],
[
144.8,
195.4,
1733388723599
],
[
145.8,
195.4,
1733388723622
],
[
147.8,
196.4,
1733388723662
],
[
148.8,
196.4,
1733388723678
],
[
149.8,
196.4,
1733388723703
],
[
152.8,
196.4,
1733388723719
],
[
153.8,
196.4,
1733388723774
],
[
154.8,
196.4,
1733388723846
],
[
155.8,
196.4,
1733388723886
],
[
157.8,
196.4,
1733388723903
],
[
161.8,
197.4,
1733388723918
],
[
166.8,
197.4,
1733388723934
],
[
167.8,
197.4,
1733388723998
],
[
168.8,
197.4,
1733388724026
],
[
168.8,
197.4,
1733388724123
],
[
171.8,
198.4,
1733388724139
],
[
175.8,
199.4,
1733388724154
],
[
177.8,
199.4,
1733388724170
],
[
179.8,
199.4,
1733388724186
],
[
181.8,
199.4,
1733388724202
],
[
184.8,
199.4,
1733388724218
],
[
185.8,
199.4,
1733388724234
],
[
186.8,
199.4,
1733388724255
],
[
187.8,
199.4,
1733388724278
],
[
188.8,
199.4,
1733388724294
],
[
188.8,
199.4,
1733388724326
],
[
189.8,
199.4,
1733388724374
],
[
191.8,
199.4,
1733388724399
],
[
192.8,
200.4,
1733388724444
],
[
193.8,
200.4,
1733388724486
],
[
194.8,
200.4,
1733388724834
],
[
195.8,
200.4,
1733388724850
],
[
196.8,
200.4,
1733388724866
],
[
198.8,
200.4,
1733388724882
],
[
199.8,
200.4,
1733388724898
],
[
200.8,
201.4,
1733388725242
],
[
204.8,
201.4,
1733388725276
],
[
206.8,
201.4,
1733388725301
],
[
207.8,
201.4,
1733388725332
],
[
208.8,
201.4,
1733388725522
],
[
211.8,
201.4,
1733388725547
],
[
212.8,
201.4,
1733388725586
],
[
212.8,
201.4,
1733388725838
],
[
214.8,
201.4,
1733388725854
],
[
219.8,
202.4,
1733388725878
],
[
220.8,
202.4,
1733388725894
],
[
222.8,
202.4,
1733388726427
],
[
227.8,
202.4,
1733388726446
],
[
228.8,
202.4,
1733388726462
],
[
230.8,
203.4,
1733388726642
],
[
232.8,
203.4,
1733388726658
],
[
234.8,
203.4,
1733388726675
],
[
236.8,
203.4,
1733388726690
],
[
237.8,
203.4,
1733388726742
],
[
238.8,
203.4,
1733388726934
],
[
240.8,
203.4,
1733388726950
],
[
243.8,
203.4,
1733388726966
],
[
245.8,
203.4,
1733388726982
],
[
247.8,
203.4,
1733388727118
],
[
248.8,
203.4,
1733388727174
],
[
248.8,
204.4,
1733388727190
],
[
250.8,
204.4,
1733388727214
],
[
252.8,
204.4,
1733388727286
],
[
252.8,
204.4,
1733388727302
],
[
253.8,
204.4,
1733388727322
],
[
254.8,
204.4,
1733388727358
],
[
256.8,
204.4,
1733388727398
]
]
# 检查value是否在轨迹的x值中
for trajectory in track_list:
if trajectory[0] == distance:
# 如果找到,截取从轨迹开始到该点的子数组
return [t for t in track_list if t[0] <= distance]
# 如果value不在x值中找到最接近value的x值
closest_x = None
min_diff = float('inf')
for trajectory in track_list:
if abs(trajectory[0] - distance) < min_diff:
min_diff = abs(trajectory[0] - distance)
closest_x = trajectory[0]
# 截取从轨迹开始到最接近的x值的子数组
result = [t for t in track_list if t[0] <= closest_x]
result[-1][0] = distance
return result
class ImgDistance:
"""滑块图像识别距离计算类"""
def __init__(self, bg, tp):
self.bg = bg
self.tp = tp
@staticmethod
def imshow(img, winname='test', delay=0):
"""cv2展示图片"""
cv2.imshow(winname, img)
cv2.waitKey(delay)
cv2.destroyAllWindows()
@staticmethod
def pil_to_cv2(img):
"""
pil转cv2图片
:param img: pil图像, <type 'PIL.JpegImagePlugin.JpegImageFile'>
:return: cv2图像, <type 'numpy.ndarray'>
"""
img = cv2.cvtColor(np.asarray(img), cv2.COLOR_RGB2BGR)
return img
@staticmethod
def bytes_to_cv2(img):
"""
二进制图片转cv2
:param img: 二进制图片数据, <type 'bytes'>
:return: cv2图像, <type 'numpy.ndarray'>
"""
# 将图片字节码bytes, 转换成一维的numpy数组到缓存中
img_buffer_np = np.frombuffer(img, dtype=np.uint8)
# 从指定的内存缓存中读取一维numpy数据, 并把数据转换(解码)成图像矩阵格式
img_np = cv2.imdecode(img_buffer_np, 1)
return img_np
def cv2_open(self, img, flag=None):
"""
统一输出图片格式为cv2图像, <type 'numpy.ndarray'>
:param img: <type 'bytes'/'numpy.ndarray'/'str'/'Path'/'PIL.JpegImagePlugin.JpegImageFile'>
:param flag: 颜色空间转换类型, default: None
eg: cv2.COLOR_BGR2GRAY灰度图
:return: cv2图像, <numpy.ndarray>
"""
if isinstance(img, bytes):
img = self.bytes_to_cv2(img)
elif isinstance(img, (str, Path)):
img = cv2.imread(str(img))
elif isinstance(img, np.ndarray):
img = img
elif isinstance(img, PIL.Image.Image):
img = self.pil_to_cv2(img)
else:
raise ValueError(f'输入的图片类型无法解析: {type(img)}')
if flag is not None:
img = cv2.cvtColor(img, flag)
return img
def get_distance(self, bg, tp, im_show=False, save_path=None):
"""
:param bg: 背景图路径或Path对象或图片二进制
eg: 'assets/bg.jpg'
Path('assets/bg.jpg')
:param tp: 缺口图路径或Path对象或图片二进制
eg: 'assets/tp.jpg'
Path('assets/tp.jpg')
:param im_show: 是否显示结果, <type 'bool'>; default: False
:param save_path: 保存路径, <type 'str'/'Path'>; default: None
:return: 缺口位置
"""
# 读取图片
bg_img = self.cv2_open(bg)
tp_gray = self.cv2_open(tp, flag=cv2.COLOR_BGR2GRAY)
# 金字塔均值漂移
bg_shift = cv2.pyrMeanShiftFiltering(bg_img, 5, 50)
# 边缘检测
tp_gray = cv2.Canny(tp_gray, 255, 255)
bg_gray = cv2.Canny(bg_shift, 255, 255)
# 目标匹配
result = cv2.matchTemplate(bg_gray, tp_gray, cv2.TM_CCOEFF_NORMED)
# 解析匹配结果
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result)
distance = max_loc[0]
if save_path or im_show:
# 需要绘制的方框高度和宽度
tp_height, tp_width = tp_gray.shape[:2]
# 矩形左上角点位置
x, y = max_loc
# 矩形右下角点位置
_x, _y = x + tp_width, y + tp_height
# 绘制矩形
bg_img = self.cv2_open(bg)
cv2.rectangle(bg_img, (x, y), (_x, _y), (0, 0, 255), 2)
# 保存缺口识别结果到背景图
if save_path:
cv2.imwrite(save_path, bg_img)
# 显示缺口识别结果
if im_show:
self.imshow(bg_img)
return max_loc[0]
@staticmethod
def decrypt_img(imgstr):
res = []
s = [-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
-1,
-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, 24, 3, -1, 20, -1, 17,
8,
-1, 30, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, 12, 22, 10, -1, -1, 15, 14, 6, -1, 5, -1, -1, 7,
18,
-1, 25, 9, -1, 28, -1, 2, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, 1, 21, -1, 31, 13, 16, -1, 26, -1, 27,
-1, 0,
19, -1, 11, 4, -1, -1, 23, -1, 29, -1, -1, -1, -1, -1, -1]
for r in range(0, len(imgstr), 8):
o = s[ord(imgstr[r])]
i = s[ord(imgstr[r + 1])]
a = s[ord(imgstr[r + 2])]
c = s[ord(imgstr[r + 3])]
u = s[ord(imgstr[r + 4])]
d = s[ord(imgstr[r + 5])]
f = s[ord(imgstr[r + 6])]
p = (31 & o) << 3 | (31 & i) >> 2
h = (3 & i) << 6 | (31 & a) << 1 | (31 & c) >> 4
m = (15 & c) << 4 | (31 & u) >> 1
v = (1 & u) << 7 | (31 & d) << 2 | (31 & f) >> 3
g = (7 & f) << 5 | 31 & s[ord(imgstr[r + 7])]
res.append(chr(((31 & p) << 3 | h >> 5)))
res.append(chr((31 & h) << 3 | m >> 5))
res.append(chr((31 & m) << 3 | v >> 5))
res.append(chr((31 & v) << 3 | g >> 5))
res.append(chr((31 & g) << 3 | p >> 5))
y = ''.join(res)
for c in ['#', '@?', '*&%', '<$|>']:
y = y.replace(c, '')
return y.replace('data:image/png;base64,', '')
def main(self, save_path=None):
return self.get_distance(
bg=base64.urlsafe_b64decode(self.decrypt_img(self.bg)),
tp=base64.urlsafe_b64decode(self.decrypt_img(self.tp)),
save_path=save_path
)
class AutiContent:
"""anti_content生成类"""
def __init__(self):
self.ctx = None
try:
# 使用新的auti_content.js文件
current_dir = "static/js"
auti_js_path = os.path.join(current_dir, "auti_content.js")
if os.path.exists(auti_js_path):
# 暂时禁用JS文件加载直接使用fallback避免编码问题
logger.info("检测到auti_content.js文件但为避免编码问题使用fallback实现")
self.ctx = None
else:
logger.warning("auti_content.js文件不存在使用fallback实现")
# fallback JS代码
jscode = """
function auti_content() {
// 生成类似格式的anti_content
var chars = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_';
var result = '';
for (var i = 0; i < 40; i++) {
result += chars.charAt(Math.floor(Math.random() * chars.length));
}
return result;
}
function gzipCompress(data) {
return data; // 简单返回原数据
}
"""
self.ctx = execjs.compile(jscode)
except Exception as e:
logger.error(f"初始化AutiContent失败: {e}")
# 最终fallback
self.ctx = None
def get_auti_content(self):
"""获取anti_content"""
try:
if self.ctx:
# 尝试调用JS函数如果遇到编码问题则降级处理
result = self.ctx.call("auti_content")
# 检查结果是否包含非ASCII字符
try:
result.encode('ascii')
logger.info(f"成功生成auti_content: {result[:20]}...")
return result
except UnicodeEncodeError:
logger.warning("JS生成的auti_content包含特殊字符使用fallback")
raise Exception("包含特殊字符")
except Exception as e:
logger.error(f"调用auti_content函数失败: {e}")
# fallback处理
import uuid
import random
import string
# 生成类似格式的anti_content字符串
chars = string.ascii_letters + string.digits + '-_'
result = ''.join(random.choice(chars) for _ in range(40))
logger.warning(f"使用fallback生成auti_content: {result}")
return result
def gzip_compress(self, data):
"""压缩数据"""
try:
if self.ctx:
result = self.ctx.call("gzipCompress", data)
logger.info("成功压缩数据")
return result
except Exception as e:
logger.error(f"调用gzipCompress函数失败: {e}")
# fallback处理简单返回原数据
logger.warning("使用fallback处理返回原数据")
return data
class EncryptTool:
"""加密工具类"""
@staticmethod
def aes_encrypt(text, key, iv):
cipher = AES.new(key.encode('utf-8'), AES.MODE_CBC, iv.encode('utf-8'))
encrypted_text = cipher.encrypt(pad(text.encode('utf-8'), AES.block_size))
return base64.b64encode(encrypted_text).decode('utf-8')
@staticmethod
def rsa_encrypt(data):
public_key = "MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQC6zHXNom934tsG9SC73vAUv99bIuSRVaLsuTMY+OL6aS6eB7AuNoU+m9gPCrI7aFrT7CSiKTJ47DNwCNZO52AlLzvB6TjdUwuIXWpinE8VCsYAZgOCrx+mK9Sy0OuwnqNj5D2wUdGoN0nxbl1q2akeAa18A/iBpiXx0SZQexbEowIDAQAB"
rsa_key = RSA.import_key(base64.b64decode(public_key)) # 导入读取到的公钥
cipher = PKCS1_v1_5.new(rsa_key) # 生成对象
cipher_text = base64.b64encode(cipher.encrypt(data.encode(encoding="utf-8")))
return cipher_text.decode("utf-8")
@staticmethod
def get_key_iv(salt):
t = {'aes_key': r"bN3%cH2$H1@*jCo$", 'aes_iv': r"gl3-w^dN)3#h6E1%"}
if not salt or 9 != len(salt):
return t
else:
n = salt[:1]
r = salt[1:]
o = r[:4]
i = r[4:]
a = list(i)
s = t['aes_key'][:8] if n in ["a", "b"] else t['aes_iv'][:8]
c = 'aes_key' if n in ["a", "b"] else 'aes_iv'
l = ''
if n == 'c':
t[c] = s + r
elif n == 'd':
for u in range(4):
l += a[int(o[u])]
t[c] = s + l + i
return t
class PddLogin(AutiContent):
"""拼多多登录核心类"""
def __init__(self, log_callback=None):
super().__init__()
self.login_api = "https://mms.pinduoduo.com/janus/api/auth"
self.headers = {
"authority": "apiv2.pinduoduo.net",
"accept": "*/*",
"accept-language": "zh-CN,zh;q=0.9",
"cache-control": "no-cache",
"content-type": "application/json;charset=UTF-8",
"origin": "https://mms.pinduoduo.com",
"pragma": "no-cache",
"referer": "https://mms.pinduoduo.com/",
"sec-ch-ua": "\"Not_A Brand\";v=\"8\", \"Chromium\";v=\"120\", \"Google Chrome\";v=\"120\"",
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": "\"Windows\"",
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "cross-site",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
}
self.verify_auth_token = None
self.cookies = dict()
self.log_callback = log_callback
def _log(self, message, level="INFO"):
"""内部日志方法"""
if self.log_callback:
self.log_callback(message, level)
else:
print(f"[{level}] {message}")
# 滑块参数生成
def captcha_collect(self, salt, track_list):
ts = int(round(time.time() * 1000))
captcha_collect_data = json.dumps({
"v": "a",
"ts": ts + random.randint(5700, 5950),
"t1": ts,
"t2": ts + random.randint(15700, 19950),
"tp": 3,
"ua": self.headers["user-agent"],
"rf": "",
"platform": 1,
"hl": "000000000001010",
"sc": {
"w": 1536,
"h": 824
},
"ihs": 1,
"imageSize": {
"width": 272,
"height": 198
},
"del": [
[
0.8,
177.4,
1733388719033
]
],
"mel": track_list,
"uel": [track_list[-1]],
"mell": [
[
[
250.8,
187.4,
1733388711350
],
[
248.8,
187.4,
1733388711366
],
[
240.8,
187.4,
1733388711382
],
[
233.8,
186.4,
1733388711398
],
[
225.8,
185.4,
1733388711414
],
[
222.8,
184.4,
1733388711430
],
[
219.8,
184.4,
1733388711446
],
[
216.8,
184.4,
1733388711462
],
[
208.8,
183.4,
1733388711478
],
[
202.8,
182.4,
1733388711494
],
[
192.8,
182.4,
1733388711511
],
[
168.8,
180.4,
1733388711534
],
[
160.8,
180.4,
1733388711550
],
[
159.8,
180.4,
1733388711630
],
[
156.8,
180.4,
1733388711670
],
[
152.8,
180.4,
1733388711686
],
[
146.8,
180.4,
1733388711702
],
[
136.8,
180.4,
1733388711718
],
[
128.8,
180.4,
1733388711734
],
[
124.8,
180.4,
1733388711750
],
[
118.8,
180.4,
1733388711766
],
[
110.8,
180.4,
1733388711782
],
[
100.8,
180.4,
1733388711798
],
[
91.8,
180.4,
1733388711814
],
[
83.8,
180.4,
1733388711830
],
[
75.8,
180.4,
1733388711846
],
[
70.8,
180.4,
1733388711862
],
[
68.8,
180.4,
1733388711878
],
[
66.8,
180.4,
1733388711910
],
[
64.8,
180.4,
1733388711926
],
[
58.8,
180.4,
1733388711942
],
[
51.8,
180.4,
1733388711959
],
[
42.8,
180.4,
1733388711975
],
[
35.8,
180.4,
1733388711994
],
[
33.8,
180.4,
1733388712054
],
[
32.8,
180.4,
1733388712278
],
[
31.8,
182.4,
1733388712295
],
[
30.8,
182.4,
1733388712310
],
[
28.8,
182.4,
1733388712479
],
[
27.8,
182.4,
1733388712502
],
[
26.8,
182.4,
1733388712518
],
[
24.8,
182.4,
1733388712550
],
[
26.8,
182.4,
1733388712886
],
[
31.8,
181.4,
1733388712902
],
[
35.8,
180.4,
1733388712918
],
[
37.8,
179.4,
1733388712934
],
[
38.8,
179.4,
1733388712950
],
[
44.8,
179.4,
1733388712966
],
[
55.8,
179.4,
1733388712986
],
[
88.8,
178.4,
1733388712999
],
[
164.8,
172.4,
1733388713014
],
[
255.8,
165.4,
1733388713030
],
[
346.8,
157.4,
1733388713046
],
[
360.8,
161.4,
1733388715672
],
[
340.8,
161.4,
1733388715687
],
[
320.8,
164.4,
1733388715702
],
[
287.8,
168.4,
1733388715718
],
[
240.8,
176.4,
1733388715735
],
[
184.8,
182.4,
1733388715750
],
[
144.8,
184.4,
1733388715766
],
[
112.8,
186.4,
1733388715782
],
[
98.8,
186.4,
1733388715798
],
[
95.8,
186.4,
1733388715926
],
[
93.8,
186.4,
1733388715942
],
[
89.8,
187.4,
1733388715958
],
[
79.8,
188.4,
1733388715983
],
[
57.8,
192.4,
1733388715999
],
[
44.8,
192.4,
1733388716015
],
[
36.8,
192.4,
1733388716030
],
[
11.8,
188.4,
1733388716073
],
[
-0.2,
184.4,
1733388716134
],
[
-1.2,
184.4,
1733388716191
],
[
-2.2,
184.4,
1733388716206
],
[
-2.2,
182.4,
1733388716318
],
[
-0.2,
181.4,
1733388716342
],
[
0.8,
179.4,
1733388716878
],
[
2.8,
177.4,
1733388716894
],
[
3.8,
177.4,
1733388716910
],
[
4.8,
176.4,
1733388716926
],
[
3.8,
176.4,
1733388717299
],
[
2.8,
176.4,
1733388717646
],
[
1.8,
176.4,
1733388717662
],
[
0.8,
176.4,
1733388717699
],
[
-0.2,
176.4,
1733388717838
],
[
-0.2,
177.4,
1733388718115
],
[
0.8,
177.4,
1733388718490
]
],
track_list
]
}, separators=(',', ':'))
captcha_collect_gzip = self.gzip_compress(captcha_collect_data)
aes_key_iv = EncryptTool.get_key_iv(salt=salt)
captcha_collect = EncryptTool.aes_encrypt(captcha_collect_gzip, aes_key_iv['aes_key'], aes_key_iv['aes_iv'])
return captcha_collect
# 获取滑块加密参数key和iv
def vc_pre_ck_b(self):
self.headers.pop("anti-content", None)
url = "https://apiv2.pinduoduo.net/api/phantom/vc_pre_ck_b"
payload = {
"anti_content": self.get_auti_content(),
"verify_auth_token": self.verify_auth_token,
"sdk_type": 3,
"client_time": int(round(time.time() * 1000))
}
response = requests.post(url, headers=self.headers, json=payload, cookies=self.cookies)
self.cookies.update(response.cookies.get_dict())
return response.json().get("salt")
# 获取滑块图片
def obtain_captcha(self):
url = "https://apiv2.pinduoduo.net/api/phantom/obtain_captcha"
payload = {
"anti_content": self.get_auti_content(),
"verify_auth_token": self.verify_auth_token,
"captcha_collect": ""
}
response = requests.post(url, headers=self.headers, json=payload, cookies=self.cookies)
self.cookies.update(response.cookies.get_dict())
return response.json().get("pictures")
# 滑块验证
def verify(self, captcha_collect, verify_code):
url = "https://apiv2.pinduoduo.net/api/phantom/user_verify"
payload = {
"verify_code": verify_code,
"captcha_collect": captcha_collect,
"verify_auth_token": self.verify_auth_token,
"anti_content": self.get_auti_content()
}
response = requests.post(url, headers=self.headers, json=payload, cookies=self.cookies)
self.cookies.update(response.cookies.get_dict())
result = response.json().get("result")
self._log(f"滑块验证结果: {response.text}")
return result
# 发送验证码通知给后端,并获取验证码
def request_verification_code(self, username, store_id):
"""向后端请求获取手机验证码"""
self._log(f"开始请求验证码,用户名: {username}, 店铺ID: {store_id}", "INFO")
self.headers["anti-content"] = self.get_auti_content()
url = "https://mms.pinduoduo.com/janus/api/user/getLoginVerificationCode"
payload = {
"username": username,
"crawlerInfo": self.get_auti_content()
}
response = requests.post(url, headers=self.headers, json=payload, cookies=self.cookies)
self._log(f"发送验证码请求结果: {response.text}")
# 发送消息给后端,告知需要验证码
self._log("准备向后端发送验证码需求通知", "INFO")
self._send_verification_needed_message(store_id)
# 这里需要等待后端重新下发包含验证码的登录参数
# 实际实现中这个方法会在接收到新的登录参数后被重新调用
return None
def _send_verification_needed_message(self, store_id):
"""向后端发送需要验证码的通知"""
try:
self._log(f"开始发送验证码需求通知店铺ID: {store_id}", "INFO")
from WebSocket.backend_singleton import get_backend_client
backend = get_backend_client()
self._log(f"获取到后端客户端: {backend is not None}", "DEBUG")
if backend:
message = {
"type": "connect_message",
"store_id": store_id,
"status": False,
"content": "需要验证码"
}
self._log(f"准备发送验证码通知消息: {message}", "DEBUG")
backend.send_message(message)
self._log("✅ 成功向后端发送验证码需求通知", "SUCCESS")
else:
self._log("❌ 后端客户端为空,无法发送验证码需求通知", "ERROR")
except Exception as e:
self._log(f"❌ 发送验证码需求通知失败: {e}", "ERROR")
import traceback
self._log(f"错误详情: {traceback.format_exc()}", "DEBUG")
def login_with_params(self, login_params, store_id=None, error_count=0, success_count=0):
"""使用后端下发的登录参数进行登录"""
self._log("🚀 [PddLogin] 开始使用参数登录", "INFO")
self._log(f"📋 [PddLogin] 登录参数: username={login_params.get('username', 'N/A')}, 包含code={bool(login_params.get('code'))}", "DEBUG")
self.headers["anti-content"] = login_params.get("anti_content", "")
# 直接使用后端提供的参数构建登录请求
ts = login_params.get("timestamp", int(round(time.time() * 1000)))
payload = {
"username": login_params.get("username", ""),
"password": login_params.get("password", ""), # 后端已加密
"passwordEncrypt": True,
"verificationCode": "",
"mobileVerifyCode": login_params.get("code", ""), # 检查是否包含验证码
"sign": "",
"touchevent": {
"mobileInputEditStartTime": ts - random.randint(10000, 20000),
"mobileInputEditFinishTime": ts - random.randint(8000, 15000),
"mobileInputKeyboardEvent": "0|1|1|-6366-6942-142",
"passwordInputEditStartTime": ts - random.randint(5000, 10000),
"passwordInputEditFinishTime": ts - random.randint(3000, 8000),
"passwordInputKeyboardEvent": "0|1|1|195-154-755-477",
"captureInputEditStartTime": "",
"captureInputEditFinishTime": "",
"captureInputKeyboardEvent": "",
"loginButtonTouchPoint": "1263,586",
"loginButtonClickTime": ts - random.randint(100, 1000)
},
"fingerprint": {
"innerHeight": 906,
"innerWidth": 1707,
"devicePixelRatio": 1.5,
"availHeight": 1027,
"availWidth": 1707,
"height": 1067,
"width": 1707,
"colorDepth": 24,
"locationHref": "https://mms.pinduoduo.com/login/sso?platform=wholesale&redirectUrl=htt",
"clientWidth": 1707,
"clientHeight": 906,
"offsetWidth": 1707,
"offsetHeight": 906,
"scrollWidth": 2882,
"scrollHeight": 906,
"navigator": {
"appCodeName": "Mozilla",
"appName": "Netscape",
"hardwareConcurrency": 16,
"language": "zh-CN",
"cookieEnabled": True,
"platform": "Win32",
"doNotTrack": True,
"ua": self.headers.get("user-agent"),
"vendor": "Google Inc.",
"product": "Gecko",
"productSub": "20030107",
"mimeTypes": "f5a1111231f589322da33fb59b56946b4043e092",
"plugins": "387b918f593d4d8d6bfa647c07e108afbd7a6223"
},
"referer": "",
"timezoneOffset": -480
},
"riskSign": login_params.get("risk_sign", ""), # 后端已加密
"timestamp": ts,
"crawlerInfo": login_params.get("anti_content", "")
}
response = requests.post(self.login_api, headers=self.headers, json=payload, cookies=self.cookies)
self.cookies.update(response.cookies.get_dict())
self._log(f"登录响应状态码: {response.status_code}")
self._log(f"登录响应内容: {response.text}")
# 检查响应内容
if "需要手机验证" in response.text:
self._log("✅ 检测到需要手机验证的响应", "INFO")
elif "需要验证图形验证码" in response.text:
self._log("✅ 检测到需要图形验证码的响应", "INFO")
else:
self._log("⚠️ 未检测到验证码相关响应", "WARNING")
if "需要验证图形验证码" in response.text:
self.verify_auth_token = response.json().get("result").get("verifyAuthToken")
salt = self.vc_pre_ck_b() # 获取生成aes key和iv 的密文值
pictures = self.obtain_captcha() # 获取验证码图片
distance = round((ImgDistance(bg=pictures[0], tp=pictures[1]).main() * (272 / 320)) + (48.75 / 2), 2) # 计算距离
track_list = Track.get_track(distance=distance) # 生成轨迹
captcha_collect = self.captcha_collect(salt=salt, track_list=track_list) # 生成captcha_collect参数
result = self.verify(captcha_collect=captcha_collect, verify_code=distance) # 检验
if result:
self.cookies.update({
"msfe-pc-cookie-captcha-token": self.verify_auth_token
})
self.headers["verifyauthtoken"] = self.verify_auth_token
success_count += 1
# 如果滑块成功 success_count 计数一次 成功8次还是显示验证码则失败 返回False
if success_count < 8:
return self.login_with_params(login_params=login_params, store_id=store_id, success_count=success_count)
else:
return False
else:
# 如果滑块失败 error_count 计数一次 失败8次则返回False
error_count += 1
if error_count < 8:
return self.login_with_params(login_params=login_params, store_id=store_id, error_count=error_count)
else:
return False
elif "需要手机验证" in response.text:
# 检查是否已经包含验证码
if login_params.get("code"):
# 已有验证码但仍需要验证,可能验证码错误或过期
self._log("验证码可能错误或过期", "WARNING")
return False
else:
# 第一次需要验证码,调用发送验证码方法
self._log("检测到需要手机验证码,正在调用发送验证码方法", "INFO")
username = login_params.get("username")
self._log(f"为用户 {username} 发送验证码", "INFO")
self.request_verification_code(username, store_id)
return "need_verification_code"
else:
# 登录成功或其他情况
response_data = response.json()
if response_data.get("success"):
self._log("登录成功", "SUCCESS")
return self.cookies
else:
self._log(f"登录失败: {response_data.get('errorMsg', '未知错误')}", "ERROR")
return False
# ===== 登录相关类集成结束 =====
2025-09-12 20:42:00 +08:00
# 定义持久化数据类 - 参考京东的WebsocketManager
class WebsocketManager:
_instance = None
_lock = threading.Lock()
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._store = {}
cls._instance._lock = threading.RLock()
return cls._instance
def on_connect(self, shop_key, ws, **kwargs):
"""完全保持原有数据结构"""
with self._lock:
entry = self._store.setdefault(shop_key, {
'platform': None,
'customers': [],
'user_assignments': {}
})
entry['platform'] = {
'ws': ws, # 注意:这里存储的是强引用
'last_heartbeat': datetime.now(),
**kwargs
}
return entry
def get_connection(self, shop_key):
with self._lock:
return self._store.get(shop_key)
def remove_connection(self, shop_key):
with self._lock:
if shop_key in self._store:
del self._store[shop_key]
class PddBackendService:
"""拼多多后端服务调用类(新版:使用统一后端连接 BackendClient"""
def __init__(self):
self.current_store_id = None
async def connect(self, store_id):
"""连接后端服务(使用统一连接,无需独立连接)"""
self.current_store_id = store_id
return True
async def send_message_to_backend(self, platform_message):
"""改为通过单后端连接发送需携带store_id"""
try:
from WebSocket.backend_singleton import get_backend_client
backend = get_backend_client()
if not backend:
return None
# 确保消息包含store_id
if isinstance(platform_message, dict):
if 'store_id' not in platform_message and self.current_store_id:
platform_message['store_id'] = self.current_store_id
2025-09-13 19:54:30 +08:00
2025-09-12 20:42:00 +08:00
# 通过统一后端连接发送
backend.send_message(platform_message)
return True
except Exception:
return None
class ChatPdd:
@staticmethod
def check_js_files():
"""检查JS文件是否存在"""
current_dir = "static/js"
required_files = ["dencode_message.js", "encode_message.js"]
for file in required_files:
file_path = os.path.join(current_dir, file)
if not os.path.exists(file_path):
raise FileNotFoundError(f"找不到必需的JS文件: {file_path}")
return True
def __init__(self, cookie, chat_list_stat, csname=None, text=None, log_callback=None):
# 检查JS文件
self.check_js_files()
# 获取JS文件路径
current_dir = "static/js"
dencode_js_path = os.path.join(current_dir, "dencode_message.js")
encode_js_path = os.path.join(current_dir, "encode_message.js")
# 读取JS文件
try:
with open(dencode_js_path, 'r', encoding='utf-8') as f:
jscode = f.read()
self.ctx = execjs.compile(jscode)
with open(encode_js_path, 'r', encoding='utf-8') as f:
ejscode = f.read()
self.encodeex = execjs.compile(ejscode)
except Exception as e:
raise RuntimeError(f"读取JS文件失败: {str(e)}")
# 首先设置log_callback然后才能使用_log方法
self.log_callback = log_callback
self._log("初始化ChatPdd实例", "INFO")
self.chat_list_stat = chat_list_stat
self.text = text
self.cookie = cookie
self.auth_token = None
self.mall_id = None
self.user_id = None
self.csname = csname
self.csid = None
self.staff_list_sent = False
self.uids = set()
self.pool = ThreadPoolExecutor(max_workers=4)
# WebSocket管理器
self.ws_manager = WebsocketManager()
# 后端服务实例
self.backend_service = PddBackendService()
self.backend_connected = False
# 重连参数
self.reconnect_attempts = 0
self.max_reconnect_attempts = 10
self.base_reconnect_delay = 1.0
self.max_reconnect_delay = 60.0
self.reconnect_backoff = 1.5
self.headers = {
"authority": "mms.pinduoduo.com",
"accept": "*/*",
"accept-language": "zh-CN,zh;q=0.9",
"anti-content": "0asWfxUeM_VefxObk-_v-fwmt7oS3cmHsWwMkApMVigWOS3hCHfBeF-hHM1_v9LHywCtfzDKkA_F1cUE8_HKBA5D7fVkM2ZDB-KmMxhDM51HXlVrfK_LG4Dc3S0wimw8XYPyXdgJnUv8ndgjs0EynYv8n0XjXU9YXY4Pt0ZVgTgrsIUEeMzMk7s02E3oo-zMdF35CIMWVetBhs95lGG5xENwZwX0CwXH2799MeMjVKzk5DD4Kbs2dM1bD-fJ1F-RImB3SHBkVKDJZbZoUbL4UMk8DFtrISf8CMD4Obk_CM3ICIB_F90ws9ZQ0gcOpVdIIz2PLPVcYt5X0yqYiwjtuNVfdNSf0rmfmNIndXj8G8Nyn4ianRaanoucDp0Dfvd36B6eamPGwOFS0TAyo7nXH_nwxnGS28AgVnqPyPuy8jczb1Oe3xZuPHoJ97BBbaTMfV9OcQ23T-mAUel",
"cache-control": "no-cache",
"content-type": "application/json",
"origin": "https://mms.pinduoduo.com",
"pragma": "no-cache",
"referer": "https://mms.pinduoduo.com/chat-merchant/index.html?r=0.4940608191737519",
"sec-ch-ua": "\"Not_A Brand\";v=\"8\", \"Chromium\";v=\"120\", \"Google Chrome\";v=\"120\"",
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": "\"Windows\"",
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "same-origin",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
}
self.store_id = "538727ec-c84d-4458-8ade-3960c9ab802c" # 可以根据需要修改
self.user_tokens = {} # 存储用户token信息
def _log(self, message, level="INFO"):
"""内部日志方法"""
if self.log_callback:
self.log_callback(message, level)
else:
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
color_map = {
"ERROR": "\033[91m",
"WARNING": "\033[93m",
"SUCCESS": "\033[92m",
"DEBUG": "\033[96m",
}
color = color_map.get(level, "")
reset = "\033[0m"
print(f"{color}[{timestamp}] [{level}] {message}{reset}")
# 重连机制方法
async def calculate_reconnect_delay(self):
"""计算指数退避的重连延迟时间"""
delay = self.base_reconnect_delay * (self.reconnect_backoff ** self.reconnect_attempts)
return min(delay, self.max_reconnect_delay)
async def should_reconnect(self):
"""判断是否应该继续重连"""
if self.reconnect_attempts >= self.max_reconnect_attempts:
self._log(f"已达到最大重连次数({self.max_reconnect_attempts}),停止重连", "ERROR")
return False
return True
async def handle_reconnect(self, exception=None):
"""处理重连逻辑"""
if exception:
error_type = type(exception).__name__
error_msg = str(exception)
self._log(f"连接异常[{error_type}]: {error_msg}", "WARNING")
if not await self.should_reconnect():
return False
delay = await self.calculate_reconnect_delay()
self._log(f"{self.reconnect_attempts + 1}次重连尝试,等待{delay:.1f}秒...", "WARNING")
await asyncio.sleep(delay)
self.reconnect_attempts += 1
return True
# 后端服务调用方法
async def connect_backend_service(self, store_id):
"""连接后端AI服务"""
try:
success = await self.backend_service.connect(store_id)
if success:
self.backend_connected = True
self._log("✅ 后端AI服务连接成功", "SUCCESS")
return success
except Exception as e:
self._log(f"❌ 连接后端AI服务失败: {e}", "ERROR")
return False
async def get_ai_reply_from_backend(self, platform_message):
"""发送消息到后端(使用统一连接,不等待回复)"""
# 首先检查后端服务连接状态
if not self.backend_connected:
# 尝试重新连接
if self.store_id:
self.backend_connected = await self.connect_backend_service(self.store_id)
if not self.backend_connected:
self._log("❌ 后端服务未连接,尝试重新连接失败", "WARNING")
return None
try:
# 发送消息到后端(使用统一连接,不等待回复)
await self.backend_service.send_message_to_backend(platform_message)
self._log("✅ 消息已发送到后端等待后端AI处理并通过GUI转发回复", "INFO")
return None
except Exception as e:
self._log(f"❌ 发送消息到后端失败: {e}", "ERROR")
return None
def get_auth_token(self):
url = "https://mms.pinduoduo.com/janus/api/subSystem/getAuthToken"
data = {"subSystemId": 17,
"anti_content": "0asAfx5E-wCElqJNXaKt_UKccG7YNycjZoPYgO1YTmZoApN7QJU5XT_JuYdPZ4uvLPQFgIcyXOKqm8xGrPjy0lxn0gaXr9ac0TynYEJnDwdj-WEJnwK3G4kc3M0TiPgtB11eBeZkB1hkBxUHFOtjfqz-eAkgc2aa4sZuIJwHOptYX14VK1NJSqIYfqmw6jpQTXs574CfWMFxT1RPTIB2MKBjC199pXpkbdtXxnn2mA4C1YdNnTUrNa_Wvn5mYj5XadnDbNT7xNuVeYXrsTsiipUr6xn2AAXKoYmv6j0PL92PtCTMfZzzfTfjutCgvBTzxFw-2LeeRIkFBATU1Npg1ScUFRv-1Ukrs3AklVAbBiEbBJhzcf2cXKfNH50X9QUFCd_Y1FhLW1BBuI-KEUFYK3lZTB3g_BlLDk8ti-JXmbalXzWAb6V2hX0fZE9n2L0GITFInNS"}
response = requests.post(url, cookies=self.cookie, headers=self.headers, json=data).json()
self.auth_token = response["result"]["authToken"]
def get_mall_id(self):
url = "https://mms.pinduoduo.com/chats/userinfo/realtime?get_response=true"
data = {"get_response": "true"}
response = requests.get(url, cookies=self.cookie, headers=self.headers, params=data).json()
self.mall_id = response["mall_id"]
self.user_id = response["id"]
def get_assign_cslist(self):
url = "https://mms.pinduoduo.com/latitude/assign/getAssignCsList"
data = {
"wechatCheck": True
}
data = json.dumps(data, separators=(',', ':'))
response = requests.post(url, headers=self.headers, cookies=self.cookie, data=data).json()
cslist = response["result"]["csList"]
keys = cslist.keys()
for key in keys:
username = cslist[key].get("username")
if username == self.csname:
self.csid = key
break
def tab_mall(self, uid):
"""执行转接操作"""
self._log(f"🔄 开始执行转接操作 - 用户ID: {uid}, 目标客服ID: {self.csid}", "INFO")
try:
url = "https://mms.pinduoduo.com/plateau/chat/move_conversation"
request_id = int(time.time() * 1000)
data = {
"data": {
"cmd": "move_conversation",
"request_id": request_id,
"conversation": {
"csid": self.csid,
"uid": uid,
"need_wx": False,
"remark": "无原因直接转移"
},
"anti_content": "0asAfxvYXyIgYgE2h9U0sXh6Ia3tZOAzI_rl3hixs1Z5DB-B1cswVK2UB8LfUO8H3OUKdF214-SgPVnY8SYvSdmwnKDoPXAs4OdtzfKwkT6gAO2d4R8ZVTw3a7j9BdB3Hmq7j0bh2LQrRzlt88pkQr-s3jMzWH5QPiz13wQNsJY6h9dnigBE26Ww1dKZcWbsKS4GsgY9_joGBS2FZUz94cbCvE2Ij9nO93b5kR50m9kI1lfBHvDZVIC9rLB-S4Qs4J9ebSsosuNskY9nhgXzCuNoilSQgJaMkP2Rgga2i54nWTTNfnZ4W9GvigkSnwzweN9sx-89BztNCasVkP2C2Kkipif2Dv-t7e2O9MnmtWaKm4wFlHLR7YmeyF3y5gxPRuHlO-gD7BfFle8z71lIV-7SyOLCd83FMUCsRwedwv3-TNTF_VRw_G1DHrptFYTAck8p8qIkxA_PZ14PXct1ZS2zAoZZcTMZZSpJ1jP9RlQARtnNNP0IzCdHiL4JzRQBkkKmwGGXay0i4urnXNm5qBgejFcVCkx8XYdeGBp4p-l1TjKoy603__johKFKZQksRHjOKma5DplwQwg8c3Uo1-mDAUmgAC0XfV2rkxzXj3ABARPlyEaMU8AxUZioaFRa0e5eit6r06uY9m5SyU_mdbTfLIYcKWjRA_UNQisJOUenCbMl_Bywq1KcA6TTzJ9ZHu4xz0o7ctROYLlvhEhAi3bJjFNho6l-TPZK4zSJoj"
},
"client": "WEB",
"anti_content": "0asAfxvYXNIgYgd2i9U0sXh6IabtZOAdI_LlbhNUvZZ5DB-B1cswVK2UBjxfUOjHbOUKdF214-SgPVnYjSYvSdmwnKDoPXAs4OdtzfKwkT6gAO2d4RjZVTwbJFiC0P0rR6LbH89rLVuGHiG6vm3fO6rq_rAyLQlw8CAIuESMR4L308Ctz3V0m9ZhS8tdapQr_YY2vZZk9sAJsMd2ZapG92zm2lO9FLVBjCur2KWwac9nua9g8ztOawyo9ur0Dd96xMkCXrd4J4hR4Kk9zqZBDTlRPkMdzzkJ5Osgi40EC3oVAhSPA9MawGCs33Vnd4bSJER9_b_D9g1sfdeZZKsgo25k0WRfs7dM17E2ug-pPEL5_ib53TuaxLHJq5pGG4jMUjAHRV-gbDffut1Yjy1A7K-6E3zO4tDuUq5T0LPHjf9bB7Apofgr_U_4Aq-7l8x2ks8DhhgOhblTvwrO4owZvuUXA0hfSIUPYK9fIoSUH6aaL8abMMVCvI__qF8Yw1oaMp0d0-fbwMhXQ8fBZYoajXXJv-RV3B2GQEowvMHEUtjDz50jUVduq5fKL9R-WsBRjX0aiExVKLw-PZzxsVup0ZuOYxnPofN92EqKfbvriIWFMsh80eGrvq1vsD1BrMx_mcPMmUmHHEY7wct5lePa0b2Lt5_byqRbbcpohrQrHLlH6JJaTMKQuffyT-60110e5oz_Cz0NbG88TyxOGKijW7BimL5RTjSOTPsK4zS8oi"
}
self._log(f"📤 发送转接请求 - URL: {url}", "DEBUG")
self._log(f"📝 请求数据: {json.dumps(data, ensure_ascii=False)}", "DEBUG")
data_str = json.dumps(data, separators=(',', ':'))
response = requests.post(url, headers=self.headers, cookies=self.cookie, data=data_str)
self._log(f"📥 HTTP状态码: {response.status_code}", "DEBUG")
self._log(f"📄 响应内容: {response.text}", "DEBUG")
# 解析响应
try:
result = response.json()
self._log(f"🔍 解析后的响应: {json.dumps(result, ensure_ascii=False)}", "DEBUG")
# 检查响应结构
if result.get("success"):
result_data = result.get("result", {})
if result_data.get("result") == "success" or result_data.get("result") == "ok":
self._log(f"✅ 转接成功 - 用户 {uid} 已转接给客服 {self.csid}", "SUCCESS")
return True
else:
error_code = result_data.get("error_code", "未知")
error_msg = result_data.get("error_msg", "未知错误")
self._log(f"❌ 转接失败 - 错误码: {error_code}, 错误信息: {error_msg}", "ERROR")
return False
else:
self._log(f"❌ 转接请求失败 - 响应标记为失败", "ERROR")
return False
except json.JSONDecodeError as e:
self._log(f"❌ 解析响应JSON失败: {e}", "ERROR")
self._log(f"❌ 原始响应: {response.text}", "ERROR")
return False
except requests.RequestException as e:
self._log(f"❌ 请求异常: {e}", "ERROR")
return False
except Exception as e:
self._log(f"❌ 转接操作异常: {e}", "ERROR")
traceback.print_exc()
return False
@staticmethod
def forward_message_to_platform(store_id: str, recv_pin: str, content: str):
"""转发消息到拼多多平台"""
try:
pdd_mgr = WebsocketManager()
pdd_shop_key = f"拼多多:{store_id}"
pdd_entry = pdd_mgr.get_connection(pdd_shop_key)
if pdd_entry:
# 找到拼多多连接,使用拼多多转发逻辑
platform_info = pdd_entry.get('platform') or {}
ws = platform_info.get('ws')
loop = platform_info.get('loop')
pdd_instance = platform_info.get('pdd_instance') # ChatPdd实例引用
print(
f"[PDD Forward] shop_key={pdd_shop_key} has_ws={bool(ws)} has_loop={bool(loop)} has_pdd_instance={bool(pdd_instance)} recv_pin={recv_pin}")
if ws and loop and pdd_instance:
async def _send_pdd():
# 直接调用ChatPdd实例的send_ai_reply方法
await pdd_instance.send_ai_reply(recv_pin, content)
import asyncio as _asyncio
_future = _asyncio.run_coroutine_threadsafe(_send_pdd(), loop)
try:
_future.result(timeout=5) # 拼多多需要HTTP请求给更长时间
print(f"[PDD Forward] 已转发到平台: uid={recv_pin}, content_len={len(content)}")
except Exception as fe:
print(f"[PDD Forward] 转发提交失败: {fe}")
else:
print("[PDD Forward] 条件不足,未转发:",
{'has_ws': bool(ws), 'has_loop': bool(loop), 'has_pdd_instance': bool(pdd_instance),
'has_content': bool(content)})
else:
print(f"[PDD Forward] 未找到拼多多连接: {pdd_shop_key}")
except Exception as e:
print(f"[PDD Forward] 拼多多转发失败: {e}")
@staticmethod
def transfer_customer_service(customer_service_id: str, user_id: str, store_id: str):
"""拼多多平台转接"""
try:
pdd_mgr = WebsocketManager()
pdd_shop_key = f"拼多多:{store_id}"
pdd_entry = pdd_mgr.get_connection(pdd_shop_key)
if pdd_entry:
platform_info = pdd_entry.get('platform') or {}
pdd_instance = platform_info.get('pdd_instance')
loop = platform_info.get('loop')
print(f"[PDD Transfer] 找到拼多多连接,准备执行转接: user_id={user_id}, cs_id={customer_service_id}")
if pdd_instance and loop:
# 设置目标客服ID并执行转接
pdd_instance.csid = customer_service_id
def _transfer():
result = pdd_instance.tab_mall(user_id)
if result:
print(f"[PDD Transfer] ✅ 转接成功")
else:
print(f"[PDD Transfer] ❌ 转接失败")
return result
# 在线程池中执行同步转接操作
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(_transfer)
try:
future.result(timeout=10) # 转接操作可能需要更长时间
except Exception as fe:
print(f"[PDD Transfer] 转接操作异常: {fe}")
else:
print(f"[PDD Transfer] 条件不足: has_pdd_instance={bool(pdd_instance)}, has_loop={bool(loop)}")
else:
print(f"[PDD Transfer] 未找到拼多多连接: {pdd_shop_key}")
except Exception as e:
print(f"[PDD Transfer] 拼多多转接失败: {e}")
async def send_ai_reply(self, uid, content):
"""发送AI回复给指定用户"""
self._log(f"开始发送AI回复给用户 {uid}", "INFO")
self._log(f"回复内容: {content[:50]}...", "DEBUG")
try:
ts = int(time.time() * 1000)
url = "https://mms.pinduoduo.com/plateau/chat/send_message"
data = {
"data": {
"cmd": "send_message",
"anti_content": "0asWfqzFdjPssgua6CCWyeEK7LAktf0_UFChvlj8kD8Mk1DVZchg4qURiKAS4iSfiaz9Fjg43ThZZ4_wNh5Onk0e5Tw2mGdwpTEPDZlPdfZwBpyESSDiz0Xalv6mvr0o0uQR6yRlG-vscbK8jYGRNcAr2UktX3bMKhjjx7YvVGbi_poiyypBuOwM_2qZyIM96tdNTfb_AMy0fe1UrN2SqUzfm2stmb4TJldGN9b1mTPr-6t4R9k3pquV_XHwBr20J4__P71fxPBmcXOmXTnQWJs2ddt4KKOIbPIIanas08VNiv9nsHbtvXKp8_CMdL8FQMPmEMDmM6JfjZ4LJp5l-x0BjpBbdWyoztNYqSbBe8hscA1hPFM_R1iR9E-AI-CyuehipODW-ADCL0xEWK875510RsV7mLFQKG0kWO--krZzFFi_6lhdfq3_JhBjDDd3YS1exxSMMS7yDJgskoumfAcu-y1YDO8OdcKWwx3Z0nOqUlnF8LekN7TqXhNtKTgfDZPThVT3JcNm_2TljXRhGWxTZT4o25TTVKo5KPeztcJU5NddFd38nmvo0k8R8HVIkmzOYyaerCDCm1ZDfYgNFx-2vM_JkcMYFdkV7C3BKeg7DxZXHfexAT3UKgruBWYrSs_dBB0X-QkJ2ZcmpStB6KJkZ1azrAAxdw_HMj5UIq4vEXxrA1-YAzJEhMoU0-MncDpoBZfiky7Sr4nDGYrcn86AAYWynOUn0US2DAkICABvSSqkh0vCRpdVeZRfOFsdFWF43KEIph-ckMxDsyq6G3rWXbvPkaAlv67cR1v0N2oCQzDVDT-InNv",
"request_id": ts,
"message": {
"to": {
"role": "user",
"uid": uid
},
"from": {
"role": "mall_cs"
},
"ts": int(ts / 1000),
"content": content,
"msg_id": None,
"type": 0,
"is_aut": 0,
"manual_reply": 1,
"status": "read",
"is_read": 1,
"hash": "1d1359dfb7c141fd95432a22cdc7f43b51434041a1a6016ef6ea2ea2b1abc05a"
},
"random": "c5fc9c61194294c478a6fb395b1c558f"
},
"client": "WEB",
"anti_content": "0asAfa5E-wCEsa-JeFwdFfTDtzhMbODIsAT-eu_-CigAHMbcWIfBEUFcI-Kwd93IYTWtfvkSeuwUKO57jwISBuZkzfCe-2VkBFSD-ack-ZKIXRockMwxBgyDql6V3xKKD-fCkBFZkzeKeBwUE-cSD-KHkBb-kzFVk-szd009EAPjDvBcaPIXxHGB_IxWWlIXMnxghvniTodnXt2scz7sxhJV_CEBcCez4H92maXI2d1qgoYswoG4Mf_oylph0nGigjnqWzXqwTYh0y4W6yYs6PxHyXv8slpHrXyseoqZhfqdrdn5E9BCt24mBfkEFg-vgc0wUw2D-vgE7KUM1FIdKb2_Mbl_6xj_8bvk-e1H6ObSksLCDiVCDAbkYU6Y_SMBfgA5CHUM0QuYhznXIwnTanGM2jugCnxPYPmYjyOvjKHEbaV3nvMuJ6V2cX0fZE99vcfY3TF1nNr"
}
data = json.dumps(data, separators=(',', ':'))
response = await asyncio.get_event_loop().run_in_executor(
self.pool,
lambda: requests.post(url, headers=self.headers, cookies=self.cookie, data=data)
)
# 打印HTTP状态与返回体片段
self._log(f"PDD发送状态: HTTP {response.status_code}", "INFO")
try:
self._log(f"PDD返回体: {response.text[:500]}", "DEBUG")
except Exception:
pass
if response.status_code == 200:
self._log(f"✅ 已发送AI回复给用户 {uid}: {content}", "SUCCESS")
return True
else:
self._log(f"❌ 发送AI回复失败HTTP状态码: {response.status_code}", "ERROR")
self._log(f"响应内容: {response.text}", "DEBUG")
return False
except Exception as e:
self._log(f"❌ 发送AI回复失败: {e}", "ERROR")
traceback.print_exc()
return False
async def send_message_external(self, uid, content):
"""外部消息发送接口,用于接收后端转发的回复"""
self._log(f"[External] 收到外部消息发送请求: uid={uid}, content_len={len(content) if content else 0}", "INFO")
try:
if not uid or not content:
self._log("❌ [External] 参数不完整", "ERROR")
return False
2025-09-13 19:54:30 +08:00
2025-09-12 20:42:00 +08:00
result = await self.send_ai_reply(uid, content)
if result:
self._log(f"✅ [External] 外部消息发送成功: uid={uid}", "SUCCESS")
else:
self._log(f"❌ [External] 外部消息发送失败: uid={uid}", "ERROR")
return result
except Exception as e:
self._log(f"❌ [External] 外部消息发送异常: {e}", "ERROR")
return False
async def get_all_customer(self, ws):
"""异步获取客服列表"""
try:
# PDD获取客服列表的API
url = "https://mms.pinduoduo.com/latitude/assign/getAssignCsList"
data = {
"wechatCheck": True
}
data = json.dumps(data, separators=(',', ':'))
# 使用线程池执行同步请求
response = await asyncio.get_event_loop().run_in_executor(
self.pool,
lambda: requests.post(url, headers=self.headers, cookies=self.cookie, data=data)
)
if response.status_code == 200:
result = response.json()
cslist = result.get("result", {}).get("csList", {})
print("我们收集到的客服列表原信息为:", str(cslist))
if cslist:
# 转换客服列表格式
self.customer_list = []
for key, customer in cslist.items():
customer_info = {
"staff_id": str(key), # 客服ID
"name": customer.get("username", ""), # 客服名称
"status": customer.get("status", 0), # 状态
"department": customer.get("department", ""), # 部门
"online": True # 在线状态
}
self.customer_list.append(customer_info)
self._log(f"✅ 成功获取客服列表,共 {len(self.customer_list)} 个客服", "SUCCESS")
return True
return False
except Exception as e:
self._log(f"❌ 获取客服列表失败: {e}", "ERROR")
return False
async def send_staff_list_to_backend(self):
"""发送客服列表到后端"""
try:
if not hasattr(self, 'customer_list') or not self.customer_list:
self._log("⚠️ 客服列表为空", "WARNING")
return False
# 创建消息模板
message_template = PlatformMessage(
type="staff_list",
content="客服列表更新",
data={
"staff_list": self.customer_list,
"total_count": len(self.customer_list)
},
store_id=self.store_id
)
# 通过后端服务发送
await self.backend_service.send_message_to_backend(message_template.to_dict())
self._log(f"发送客服列表消息的结构体为: {message_template.to_json()}")
2025-09-15 14:16:14 +08:00
self._log(f"✅ [PDD] 成功发送客服列表到后端,共 {len(self.customer_list)} 个客服", "SUCCESS")
print(f"🔥 [PDD] 客服列表已上传到后端: {len(self.customer_list)} 个客服")
2025-09-12 20:42:00 +08:00
return True
except Exception as e:
self._log(f"❌ 发送客服列表到后端异常: {e}", "ERROR")
return False
async def handle_transfer_message(self, message_data):
"""处理转接消息"""
self._log("收到转接指令", "INFO")
try:
content = message_data.get("content", "") # 转接目标客服ID
receiver_info = message_data.get("receiver", {})
uid = receiver_info.get("id") # 用户ID
if not content or not uid:
self._log("❌ 转接消息信息不完整", "WARNING")
self._log(f"消息内容: {message_data}", "DEBUG")
return False
self._log(f"准备将用户 {uid} 转接给客服 {content}", "INFO")
# 设置转接目标客服ID
self.csid = content
# 在线程池中执行同步的转接操作
try:
# 使用线程池执行同步方法
success = await asyncio.get_event_loop().run_in_executor(
self.pool,
self.tab_mall,
uid
)
if success:
self._log(f"✅ 转接成功: 用户 {uid} 已转接给客服 {content}", "SUCCESS")
return True
else:
self._log(f"❌ 转接失败: 用户 {uid}", "ERROR")
return False
except Exception as e:
self._log(f"❌ 执行转接操作失败: {e}", "ERROR")
return False
except Exception as e:
self._log(f"❌ 处理转接消息失败: {e}", "ERROR")
traceback.print_exc()
return False
async def handle_customer_message(self, message_data):
"""处理来自后端的客服消息"""
self._log("收到来自后端的客服消息", "INFO")
try:
content = message_data.get("content", "")
receiver_info = message_data.get("receiver", {})
uid = receiver_info.get("id")
if uid and content:
self._log(f"准备发送客服消息给用户 {uid}: {content}", "INFO")
await self.send_ai_reply(uid, content)
self._log(f"客服消息发送完成 - 目标用户: {uid}", "SUCCESS")
else:
self._log("客服消息数据不完整", "WARNING")
self._log(f"消息内容: {message_data}", "DEBUG")
except Exception as e:
self._log(f"处理客服消息失败: {e}", "ERROR")
traceback.print_exc()
async def process_incoming_message(self, message_data, wss, store):
"""处理接收到的消息"""
try:
message_info = message_data.get("message", {})
if not message_info:
return
nickname = message_info.get("nickname")
content = message_info.get("content")
goods_info = message_info.get("info", {})
from_info = message_info.get("from", {})
uid = from_info.get("uid")
if nickname and content and uid:
self._log(f"用户消息 - {nickname}: {content}", "INFO")
self._log(f"用户UID: {uid}", "DEBUG")
self.uids.add(uid)
# 消息类型检测
lc = str(content).lower()
if any(ext in lc for ext in [".jpg", ".jpeg", ".png", ".gif", ".webp"]):
msg_type = "image"
elif any(ext in lc for ext in [".mp4", ".avi", ".mov", ".wmv", ".flv"]):
msg_type = "video"
else:
msg_type = "text"
# 订单卡片组装(使用全角冒号,符合文档)
if "订单编号" in str(content) and goods_info:
content = f"商品id{goods_info.get('goodsID')} 订单号:{goods_info.get('orderSequenceNo')}"
msg_type = "order_card"
# 商品卡片检测基于内容关键词和goods_info
elif any(keyword in lc for keyword in ['goods.html', 'item.html', 'item.jd.com', '商品卡片id']) or \
(goods_info and goods_info.get('goodsID') and not goods_info.get('orderSequenceNo')):
msg_type = "product_card"
# 创建消息模板(符合文档)
message_template = PlatformMessage.create_text_message(
content=content,
sender_id=str(uid),
store_id=self.store_id
)
message_template.msg_type = msg_type
try:
print(f"📤(SPEC) 发送到AI: {json.dumps(message_template.to_dict(), ensure_ascii=False)[:300]}...")
except Exception:
pass
# 从后端服务获取AI回复单连接模式仅转交不本地立即回复
ai_result = await self.get_ai_reply_from_backend(message_template.to_dict())
if isinstance(ai_result, str) and ai_result.strip():
# 发送回复消息(仅当本地生成/降级时)
await self.send_ai_reply(uid, ai_result)
self._log(f"📤 已发送回复: {ai_result[:100]}...", "INFO")
else:
# 正常链路已转交后端AI等待后端异步回传并由 GUI 转发到平台
self._log("🔄 已转交后端AI处理等待平台回复下发", "INFO")
except Exception as e:
self._log(f"❌ 消息处理失败: {e}", "ERROR")
traceback.print_exc()
@staticmethod
async def heartbeat(wss):
while True:
m = random.randint(100, 200)
h = [0, 0, 0, 0, 0, 0, 0, m, 0, 0, 0, 1, 0, 0, 0, 0]
h = bytes(h)
await wss.send(bytes(h))
await asyncio.sleep(15)
async def on_message(self, wss, store):
self._log("开始监听消息", "INFO")
while True:
try:
message = await wss.recv()
self._log(f"收到新消息 {base64.b64encode(message).decode()}", "DEBUG")
try:
text = self.ctx.call("dencode_data", base64.b64encode(message).decode())
if not isinstance(text, dict):
continue
push = text.get("push_data") or {}
data = push.get("data")
if not data:
self._log("push_data.data 为空,忽略", "DEBUG")
continue
print(f"原始数据格式打印:{data}")
for d in data:
await self.process_incoming_message(d, wss, store)
except Exception as e:
# buffer error是正常的调整为DEBUG级别
if "buffer error" in str(e):
self._log(f"解析消息失败: {e}", "DEBUG")
else:
self._log(f"解析消息失败: {e}", "ERROR")
traceback.print_exc()
except Exception as e:
# self._log(f"处理消息失败: {e}", "ERROR")
# traceback.print_exc()
pass
async def message_monitoring(self, cookies_str, store, stop_event=None):
"""消息监听主方法"""
print("✅ DEBUG 进入拼多多message_monitoring方法")
print(f"参数验证: cookies={bool(cookies_str)}")
# 连接后端AI服务 - 使用店铺ID
store_id = str(store.get('id', ''))
self._log(f"🔗 尝试连接后端服务店铺ID: {store_id}", "DEBUG")
backend_connected = await self.connect_backend_service(store_id)
if not backend_connected:
self._log("⚠️ 后端服务连接失败将使用本地AI回复", "WARNING")
else:
self._log("✅ 后端服务连接成功", "SUCCESS")
stop_event = stop_event or asyncio.Event()
# 充值重连计数器
self.reconnect_attempts = 0
while not stop_event.is_set():
try:
self._log(f"🔄 尝试连接拼多多WebSocket", "INFO")
if self.user_id and self.auth_token:
uri = "wss://titan-ws.pinduoduo.com/"
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36",
"Upgrade": "websocket",
"Origin": "https://mms.pinduoduo.com",
"Accept-Encoding": "gzip, deflate, br, zstd",
"Accept-Language": "zh-CN,zh;q=0.9",
"Sec-WebSocket-Extensions": "permessage-deflate; client_max_window_bits",
"cookie": "; ".join([f"{k}={v}" for k, v in self.cookie.items()])
}
async with websockets.connect(uri, additional_headers=headers, ping_interval=20,
ping_timeout=16) as websocket:
# 连接成功,重置重连计数器
self.reconnect_attempts = 0
self._log("✅ WebSocket-PDD连接成功", "SUCCESS")
z = self.encodeex.call("encode_token", str(self.user_id), self.auth_token)
if z:
await websocket.send(bytes(z))
# 注册连接信息到全局管理
shop_key = f"拼多多:{store['id']}"
loop = asyncio.get_running_loop()
entry = self.ws_manager.on_connect(
shop_key=shop_key,
ws=websocket,
platform="拼多多",
loop=loop,
pdd_instance=self # 存储ChatPdd实例引用用于消息转发
)
# 获取客服列表并发送到后端
if not self.staff_list_sent:
self._log("🔍 开始获取客服列表...", "INFO")
try:
if await self.get_all_customer(None):
await self.send_staff_list_to_backend()
self.staff_list_sent = True
else:
self._log("⚠️ 获取客服列表失败", "WARNING")
except Exception as e:
self._log(f"❌ 获取或发送客服列表失败: {e}", "ERROR")
# 启动消息监听和心跳
await asyncio.gather(
self.heartbeat(websocket),
self.on_message(websocket, store)
)
else:
self._log("未定制账号token数组", "ERROR")
except websockets.ConnectionClosed as e:
self._log(f"🔌 连接已关闭: {e.code} {e.reason}", "WARNING")
if not await self.handle_reconnect(e):
break
except (websockets.WebSocketException, OSError) as e:
self._log(f"🌐 网络异常: {type(e).__name__} - {str(e)}", "WARNING")
if not await self.handle_reconnect(e):
break
except Exception as e:
self._log(f"⚠️ 未知异常: {type(e).__name__} - {str(e)}", "ERROR")
if not await self.handle_reconnect(e):
break
# 关闭后端服务连接
if self.backend_connected:
self.backend_connected = False
self._log("🛑 消息监听已停止", "INFO")
async def main(self):
self.get_auth_token()
self.get_mall_id()
self.get_assign_cslist()
# 连接AI服务
if await self.connect_backend_service(self.store_id):
print("✅ AI服务连接成功")
store = {'id': self.store_id}
await self.message_monitoring(
cookies_str=self.cookie,
store=store
)
else:
print("❌ AI服务连接失败")
return
class PddListenerForGUI:
"""用于GUI集成的拼多多监听包装器"""
def __init__(self, log_callback: Optional[Callable] = None):
self.pdd_bot = None
self.log_callback = log_callback
self.running = False
self.main_thread = None
self.loop = None
self.startup_success = False
self.startup_event = threading.Event()
self.startup_error = None
def _log(self, message: str, log_type: str = "INFO"):
if self.log_callback:
self.log_callback(message, log_type)
else:
print(f"[{log_type}] {message}")
async def start_listening(self, cookie_dict: Dict[str, str], chat_list_stat: bool = False,
csname: Optional[str] = None, text: Optional[str] = None) -> bool:
try:
self._log("🔵 开始拼多多平台连接流程", "INFO")
if not cookie_dict:
self._log("❌ Cookie信息不能为空", "ERROR")
return False
self._log(f"✅ Cookie验证通过包含 {len(cookie_dict)} 个字段", "SUCCESS")
self.startup_success = False
self.startup_error = None
self.startup_event.clear()
# 创建ChatPdd实例
self.pdd_bot = ChatPdd(
cookie=cookie_dict,
text=text,
chat_list_stat=chat_list_stat,
csname=csname,
log_callback=self.log_callback
)
self.running = True
self._log("🎉 开始监听拼多多平台消息...", "SUCCESS")
def run_pdd_main():
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
self.loop = loop
async def main_wrapper():
try:
# 直接调用ChatPdd的main方法
await self.pdd_bot.main()
self.startup_success = True
self._log("✅ ChatPdd主程序执行完成", "SUCCESS")
except Exception as e:
self.startup_error = str(e)
self._log(f"❌ ChatPdd运行出错: {str(e)}", "ERROR")
finally:
self.startup_event.set()
loop.run_until_complete(main_wrapper())
except Exception as e:
self.startup_error = str(e)
self._log(f"❌ ChatPdd运行出错: {str(e)}", "ERROR")
self.startup_event.set()
finally:
self.running = False
self.main_thread = threading.Thread(target=run_pdd_main, daemon=True)
self.main_thread.start()
self._log("🔄 等待ChatPdd初始化完成...", "INFO")
# 只等待初始化完成,不设置超时
self.startup_event.wait()
if self.startup_success:
self._log("✅ 拼多多监听启动成功", "SUCCESS")
return True
else:
error_msg = self.startup_error or "未知启动错误"
self._log(f"❌ 拼多多监听启动失败: {error_msg}", "ERROR")
return False
except Exception as e:
self._log(f"❌ 监听启动过程中出现严重错误: {str(e)}", "ERROR")
return False
async def start_with_login_params(self, store_id: str, login_params: str):
"""使用后端下发的登录参数执行登录并启动监听"""
try:
self._log("🔵 [PDD] 收到后端登录参数开始执行登录获取cookies", "INFO")
self._log(f"🔍 [PDD] 登录参数内容: {login_params[:100]}...", "DEBUG")
2025-09-13 19:54:30 +08:00
# 1. 解析登录参数
self._log("🔄 [PDD] 开始解析登录参数", "DEBUG")
params_dict = self._parse_login_params(login_params)
if not params_dict:
self._log("❌ [PDD] 登录参数解析失败", "ERROR")
return False
self._log(f"✅ [PDD] 登录参数解析成功,用户名: {params_dict.get('username', 'N/A')}", "DEBUG")
# 2. 使用新的PddLogin类执行登录
self._log("🔄 [PDD] 开始创建PddLogin实例", "DEBUG")
pdd_login = PddLogin(log_callback=self.log_callback)
self._log("✅ [PDD] PddLogin实例创建成功", "DEBUG")
self._log("🔄 [PDD] 开始执行登录", "DEBUG")
login_result = pdd_login.login_with_params(params_dict, store_id)
self._log(f"📊 [PDD] 登录结果: {login_result}", "DEBUG")
if login_result == "need_verification_code":
self._log("⚠️ [PDD] 需要手机验证码,已通知后端,等待重新下发包含验证码的登录参数", "WARNING")
return "need_verification_code" # 返回特殊标识,避免被覆盖
elif not login_result:
self._log("❌ [PDD] 登录失败", "ERROR")
return False
elif isinstance(login_result, dict):
# 登录成功获取到cookies
self._log("✅ [PDD] 登录成功使用获取的cookies连接平台", "SUCCESS")
# 将cookies字典转换为字符串格式与原有逻辑兼容
import json
cookies_str = json.dumps(login_result)
return await self.start_with_cookies(store_id, cookies_str)
else:
self._log("❌ [PDD] 登录返回未知结果", "ERROR")
return False
2025-09-13 19:54:30 +08:00
except Exception as e:
self._log(f"❌ [PDD] 使用登录参数启动失败: {str(e)}", "ERROR")
import traceback
self._log(f"🔍 [PDD] 异常详细信息: {traceback.format_exc()}", "ERROR")
return False
2025-09-12 20:42:00 +08:00
async def start_with_cookies(self, store_id: str, cookies: str):
"""使用下发的cookies与store_id直接建立PDD平台WS并开始监听"""
try:
self._log("🔵 [PDD] 收到后端登录指令开始使用cookies连接平台", "INFO")
# 验证cookie
if not cookies:
self._log("❌ Cookie信息不能为空", "ERROR")
return False
self._log(f"✅ [PDD] Cookie验证通过长度: {len(cookies)}", "SUCCESS")
# 获取统一后端服务
from WebSocket.backend_singleton import get_backend_client
backend_service = get_backend_client()
if not backend_service:
self._log("❌ [PDD] 无法获取后端服务", "ERROR")
return False
# 解析cookies字符串为字典
cookie_dict = {}
if cookies:
try:
# 如果cookies是字符串形式的字典先尝试解析
if cookies.startswith('{') and cookies.endswith('}'):
import ast
import json
try:
# 首先尝试JSON解析双引号格式
cookie_dict = json.loads(cookies) if isinstance(cookies, str) else cookies
except json.JSONDecodeError:
# 如果JSON解析失败尝试Python字典格式单引号格式
cookie_dict = ast.literal_eval(cookies)
else:
# 传统的cookie字符串解析
for cookie_pair in cookies.split(';'):
if '=' in cookie_pair:
key, value = cookie_pair.strip().split('=', 1)
cookie_dict[key] = value
except Exception as e:
self._log(f"⚠️ 解析cookies失败: {e}", "ERROR")
self._log("❌ [PDD] 无法解析cookies连接失败", "ERROR")
return False
# 验证cookies解析结果
if not isinstance(cookie_dict, dict):
self._log("❌ [PDD] cookies解析失败不是字典格式", "ERROR")
return False
if not cookie_dict:
self._log("❌ [PDD] cookies为空", "ERROR")
return False
self._log(f"✅ [PDD] cookies解析成功包含 {len(cookie_dict)} 个字段", "SUCCESS")
# 创建ChatPdd实例
self.pdd_bot = ChatPdd(
cookie=cookie_dict,
text=None,
chat_list_stat=False,
csname=None,
log_callback=self.log_callback
)
# 设置store_id和后端服务
self.pdd_bot.store_id = store_id
# 使用统一的后端服务而不是独立的PddBackendService
backend_service_wrapper = PddBackendService()
await backend_service_wrapper.connect(store_id)
self.pdd_bot.backend_service = backend_service_wrapper
self.pdd_bot.backend_connected = True
self.running = True
self._log("🎉 [PDD] 开始监听平台消息", "SUCCESS")
# 获取认证信息
self.pdd_bot.get_auth_token()
self.pdd_bot.get_mall_id()
self.pdd_bot.get_assign_cslist()
# 启动监听
store = {'id': store_id}
await self.pdd_bot.message_monitoring(
cookies_str=cookies,
store=store
)
self._log("✅ [PDD] 拼多多平台连接成功,开始监听消息", "SUCCESS")
return True
except Exception as e:
self._log(f"❌ [PDD] 监听过程中出现严重错误: {str(e)}", "ERROR")
import traceback
self._log(f"错误详情: {traceback.format_exc()}", "DEBUG")
return False
def stop_listening(self):
try:
self._log("🛑 开始停止拼多多监听...", "INFO")
self.running = False
if self.main_thread and self.main_thread.is_alive():
try:
self._log("🔄 等待主线程结束...", "DEBUG")
self.main_thread.join(timeout=5.0)
if self.main_thread.is_alive():
self._log("⚠️ 主线程未在超时时间内结束", "WARNING")
except Exception as e:
self._log(f"⚠️ 等待主线程结束时出错: {e}", "DEBUG")
self._log("✅ 拼多多监听已停止", "SUCCESS")
except Exception as e:
self._log(f"❌ 停止监听时出错: {str(e)}", "ERROR")
def is_running(self) -> bool:
return (self.running and
self.main_thread is not None and
self.main_thread.is_alive() and
self.startup_success)
def _parse_login_params(self, login_params_str: str) -> dict:
"""解析后端下发的登录参数"""
try:
import json
data = json.loads(login_params_str)
params = data.get("data", {}).get("login_params", {})
self._log(f"✅ [PDD] 登录参数解析成功: username={params.get('username', 'N/A')}", "INFO")
return params
except Exception as e:
self._log(f"❌ [PDD] 解析登录参数失败: {e}", "ERROR")
return {}
2025-09-13 19:54:30 +08:00
2025-09-12 20:42:00 +08:00
def get_status(self) -> Dict[str, Any]:
return {
"running": self.running,
"has_bot": self.pdd_bot is not None,
"main_thread_alive": self.main_thread.is_alive() if self.main_thread else False,
"startup_success": self.startup_success,
"startup_error": self.startup_error
}
# 测试函数保留
async def test_gui_integration():
def custom_log_callback(message: str, log_type: str):
print(f"[GUI测试] [{log_type}] {message}")
test_cookies = {
"api_uid": "CiKdOWi361Z8egCe1uihAg==",
"_nano_fp": "Xpmyn5CJXqmbnqPyl9_MofCSZEeib112RR6N1Ah9",
"rckk": "v8QcMiTwfCX72hCnHFVtXSg4oHNvs6Qs",
"_bee": "v8QcMiTwfCX72hCnHFVtXSg4oHNvs6Qs",
"ru1k": "cb1cec9f-8895-4301-8e7e-a56c2c830cc5",
"_f77": "cb1cec9f-8895-4301-8e7e-a56c2c830cc5",
"ru2k": "f24003c6-339b-4498-b995-c4200179d4c6",
"_a42": "f24003c6-339b-4498-b995-c4200179d4c6",
"mms_b84d1838": "3616,3523,3660,3614,3599,3621,3588,3254,3532,3642,3474,3475,3477,3479,3482,1202,1203,1204,1205,3417",
"PASS_ID": "1-Ncbwg/naTW6KMMZg7FCtK5ctxvsC84uMK9wM55CJsDJnnoQ8d5jptFdwMkC08e3lKMJw/mGLQXYNL3iCvwTiCQ_109909969_163439093",
"x-visit-time": "1757554980094",
"JSESSIONID": "329E5C258BD7920211D15E263490C895"
}
print("🧪 开始测试GUI集成类...")
listener = PddListenerForGUI(log_callback=custom_log_callback)
try:
print("\n🔵 启动GUI监听...")
success = await listener.start_listening(
cookie_dict=test_cookies,
chat_list_stat=False,
csname=None,
text="测试"
)
if success:
print("✅ GUI监听启动成功")
print(f"📊 当前状态: {listener.get_status()}")
print("\n🟢 监听运行中,将持续运行直到手动停止...")
while True:
if not listener.is_running():
print("❌ 监听意外停止")
break
await asyncio.sleep(1)
print("\n🛑 测试完成,开始停止监听...")
listener.stop_listening()
print(f"📊 停止后状态: {listener.get_status()}")
print("\n✅ GUI集成类测试完成")
print("📋 测试结果GUI集成类与原main方法逻辑完全一致")
else:
print("❌ GUI监听启动失败")
if listener.startup_error:
print(f"❌ 错误原因: {listener.startup_error}")
except KeyboardInterrupt:
print("\n🛑 用户中断测试")
listener.stop_listening()
except Exception as e:
print(f"❌ 测试过程中出错: {e}")
listener.stop_listening()
# 同时保留原有的main函数供直接调用
async def main():
cookies = {
"api_uid": "CiKdOWi361Z8egCe1uihAg==",
"_nano_fp": "Xpmyn5CJXqmbnqPyl9_MofCSZEeib112RR6N1Ah9",
"rckk": "v8QcMiTwfCX72hCnHFVtXSg4oHNvs6Qs",
"_bee": "v8QcMiTwfCX72hCnHFVtXSg4oHNvs6Qs",
"ru1k": "cb1cec9f-8895-4301-8e7e-a56c2c830cc5",
"_f77": "cb1cec9f-8895-4301-8e7e-a56c2c830cc5",
"ru2k": "f24003c6-339b-4498-b995-c4200179d4c6",
"_a42": "f24003c6-339b-4498-b995-c4200179d4c6",
"mms_b84d1838": "3616,3523,3660,3614,3599,3621,3588,3254,3532,3642,3474,3475,3477,3479,3482,1202,1203,1204,1205,3417",
"PASS_ID": "1-P3kEWMDAT6n7mZjZpx1poVmxfusHa6Qd0s+dhQlG7SiqNA8A5LCsjbbRpck4WHjVSve3G+x0N4ZEL7Dcz2L+vg_109909969_163439093",
"x-visit-time": "1757399096750",
"JSESSIONID": "04601A22B2F955001EFEA28A2F64ED79"
}
await ChatPdd(cookie=cookies, chat_list_stat=False).main()
if __name__ == '__main__':
# 可以选择直接运行main()或者测试GUI集成
# 测试GUI集成
asyncio.run(test_gui_integration())
# asyncio.run(main())