Files
shuidrop_gui/Utils/Pdd/PddUtils.py

3258 lines
118 KiB
Python
Raw Normal View History

2025-09-12 20:42:00 +08:00
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
"""
@Project magua_project
@File PddUtils.py
@IDE PyCharm
@Author lz
@Date 2025/7/17 16:44
"""
import os
import sys
2025-09-12 20:42:00 +08:00
import threading
from concurrent.futures import ThreadPoolExecutor
import base64
import json
import random
import time
import traceback
import execjs
2025-09-12 20:42:00 +08:00
import websockets
import requests
import asyncio
# execjs 已移除 - 不再需要本地生成anti_content
2025-09-12 20:42:00 +08:00
from datetime import datetime
from typing import Dict, Any, Optional, Callable
# 新增导入,用于登录功能
import PIL, numpy as np, cv2
from Crypto.PublicKey import RSA
from Crypto.Cipher import PKCS1_v1_5, AES
from Crypto.Util.Padding import pad
from pathlib import Path
from loguru import logger
2025-09-12 20:42:00 +08:00
from Utils.message_models import PlatformMessage
# ===== 登录相关类集成开始 =====
class Track:
"""滑块轨迹生成类"""
@staticmethod
def get_track(distance):
distance = int(distance) + random.randint(7, 11) + 0.8
track_list = [
[
0.8,
178.4,
1733388719067
],
[
4.8,
179.4,
1733388719169
],
[
6.8,
179.4,
1733388719184
],
[
8.8,
179.4,
1733388719250
],
[
8.8,
179.4,
1733388719266
],
[
9.8,
180.4,
1733388719762
],
[
10.8,
180.4,
1733388719914
],
[
11.8,
180.4,
1733388719930
],
[
11.8,
180.4,
1733388719954
],
[
12.8,
180.4,
1733388719982
],
[
12.8,
180.4,
1733388720030
],
[
13.8,
180.4,
1733388720058
],
[
14.8,
180.4,
1733388720090
],
[
15.8,
180.4,
1733388720110
],
[
16.8,
180.4,
1733388720170
],
[
17.8,
180.4,
1733388720202
],
[
19.8,
180.4,
1733388720218
],
[
22.8,
181.4,
1733388720258
],
[
23.8,
181.4,
1733388720319
],
[
24.8,
181.4,
1733388720428
],
[
24.8,
182.4,
1733388720458
],
[
26.8,
182.4,
1733388720478
],
[
28.8,
182.4,
1733388720495
],
[
30.8,
182.4,
1733388720511
],
[
32.8,
183.4,
1733388720527
],
[
33.8,
183.4,
1733388720558
],
[
35.8,
183.4,
1733388720574
],
[
36.8,
184.4,
1733388720591
],
[
38.8,
184.4,
1733388720619
],
[
39.8,
184.4,
1733388720662
],
[
40.8,
184.4,
1733388720686
],
[
41.8,
184.4,
1733388720710
],
[
42.8,
184.4,
1733388720742
],
[
43.8,
184.4,
1733388720758
],
[
45.8,
184.4,
1733388720783
],
[
46.8,
184.4,
1733388720798
],
[
48.8,
184.4,
1733388720831
],
[
49.8,
184.4,
1733388720850
],
[
50.8,
184.4,
1733388720874
],
[
52.8,
184.4,
1733388720922
],
[
53.8,
184.4,
1733388720958
],
[
55.8,
184.4,
1733388720974
],
[
55.8,
184.4,
1733388720990
],
[
56.8,
184.4,
1733388721010
],
[
57.8,
184.4,
1733388721051
],
[
58.8,
184.4,
1733388721074
],
[
58.8,
185.4,
1733388721090
],
[
59.8,
185.4,
1733388721106
],
[
60.8,
185.4,
1733388721214
],
[
60.8,
185.4,
1733388721270
],
[
61.8,
185.4,
1733388721361
],
[
63.8,
186.4,
1733388721374
],
[
68.8,
186.4,
1733388721390
],
[
71.8,
187.4,
1733388721406
],
[
72.8,
187.4,
1733388721422
],
[
73.8,
187.4,
1733388721442
],
[
74.8,
187.4,
1733388721602
],
[
75.8,
187.4,
1733388721650
],
[
79.8,
187.4,
1733388721674
],
[
81.8,
187.4,
1733388721690
],
[
83.8,
188.4,
1733388721726
],
[
83.8,
188.4,
1733388722055
],
[
91.8,
188.4,
1733388722241
],
[
92.8,
188.4,
1733388722346
],
[
92.8,
188.4,
1733388722494
],
[
94.8,
188.4,
1733388722511
],
[
97.8,
188.4,
1733388722527
],
[
101.8,
189.4,
1733388722543
],
[
102.8,
189.4,
1733388722566
],
[
103.8,
189.4,
1733388722666
],
[
104.8,
189.4,
1733388722722
],
[
106.8,
190.4,
1733388722742
],
[
108.8,
190.4,
1733388722759
],
[
112.8,
190.4,
1733388722775
],
[
115.8,
191.4,
1733388722791
],
[
116.8,
191.4,
1733388722807
],
[
118.8,
192.4,
1733388722911
],
[
119.8,
192.4,
1733388722931
],
[
120.8,
192.4,
1733388722962
],
[
123.8,
192.4,
1733388722987
],
[
124.8,
192.4,
1733388723006
],
[
125.8,
192.4,
1733388723023
],
[
127.8,
192.4,
1733388723098
],
[
128.8,
192.4,
1733388723186
],
[
129.8,
193.4,
1733388723258
],
[
132.8,
193.4,
1733388723278
],
[
135.8,
193.4,
1733388723294
],
[
137.8,
194.4,
1733388723318
],
[
138.8,
194.4,
1733388723562
],
[
140.8,
194.4,
1733388723582
],
[
144.8,
195.4,
1733388723599
],
[
145.8,
195.4,
1733388723622
],
[
147.8,
196.4,
1733388723662
],
[
148.8,
196.4,
1733388723678
],
[
149.8,
196.4,
1733388723703
],
[
152.8,
196.4,
1733388723719
],
[
153.8,
196.4,
1733388723774
],
[
154.8,
196.4,
1733388723846
],
[
155.8,
196.4,
1733388723886
],
[
157.8,
196.4,
1733388723903
],
[
161.8,
197.4,
1733388723918
],
[
166.8,
197.4,
1733388723934
],
[
167.8,
197.4,
1733388723998
],
[
168.8,
197.4,
1733388724026
],
[
168.8,
197.4,
1733388724123
],
[
171.8,
198.4,
1733388724139
],
[
175.8,
199.4,
1733388724154
],
[
177.8,
199.4,
1733388724170
],
[
179.8,
199.4,
1733388724186
],
[
181.8,
199.4,
1733388724202
],
[
184.8,
199.4,
1733388724218
],
[
185.8,
199.4,
1733388724234
],
[
186.8,
199.4,
1733388724255
],
[
187.8,
199.4,
1733388724278
],
[
188.8,
199.4,
1733388724294
],
[
188.8,
199.4,
1733388724326
],
[
189.8,
199.4,
1733388724374
],
[
191.8,
199.4,
1733388724399
],
[
192.8,
200.4,
1733388724444
],
[
193.8,
200.4,
1733388724486
],
[
194.8,
200.4,
1733388724834
],
[
195.8,
200.4,
1733388724850
],
[
196.8,
200.4,
1733388724866
],
[
198.8,
200.4,
1733388724882
],
[
199.8,
200.4,
1733388724898
],
[
200.8,
201.4,
1733388725242
],
[
204.8,
201.4,
1733388725276
],
[
206.8,
201.4,
1733388725301
],
[
207.8,
201.4,
1733388725332
],
[
208.8,
201.4,
1733388725522
],
[
211.8,
201.4,
1733388725547
],
[
212.8,
201.4,
1733388725586
],
[
212.8,
201.4,
1733388725838
],
[
214.8,
201.4,
1733388725854
],
[
219.8,
202.4,
1733388725878
],
[
220.8,
202.4,
1733388725894
],
[
222.8,
202.4,
1733388726427
],
[
227.8,
202.4,
1733388726446
],
[
228.8,
202.4,
1733388726462
],
[
230.8,
203.4,
1733388726642
],
[
232.8,
203.4,
1733388726658
],
[
234.8,
203.4,
1733388726675
],
[
236.8,
203.4,
1733388726690
],
[
237.8,
203.4,
1733388726742
],
[
238.8,
203.4,
1733388726934
],
[
240.8,
203.4,
1733388726950
],
[
243.8,
203.4,
1733388726966
],
[
245.8,
203.4,
1733388726982
],
[
247.8,
203.4,
1733388727118
],
[
248.8,
203.4,
1733388727174
],
[
248.8,
204.4,
1733388727190
],
[
250.8,
204.4,
1733388727214
],
[
252.8,
204.4,
1733388727286
],
[
252.8,
204.4,
1733388727302
],
[
253.8,
204.4,
1733388727322
],
[
254.8,
204.4,
1733388727358
],
[
256.8,
204.4,
1733388727398
]
]
# 检查value是否在轨迹的x值中
for trajectory in track_list:
if trajectory[0] == distance:
# 如果找到,截取从轨迹开始到该点的子数组
return [t for t in track_list if t[0] <= distance]
# 如果value不在x值中找到最接近value的x值
closest_x = None
min_diff = float('inf')
for trajectory in track_list:
if abs(trajectory[0] - distance) < min_diff:
min_diff = abs(trajectory[0] - distance)
closest_x = trajectory[0]
# 截取从轨迹开始到最接近的x值的子数组
result = [t for t in track_list if t[0] <= closest_x]
result[-1][0] = distance
return result
class ImgDistance:
"""滑块图像识别距离计算类"""
def __init__(self, bg, tp):
self.bg = bg
self.tp = tp
@staticmethod
def imshow(img, winname='test', delay=0):
"""cv2展示图片"""
cv2.imshow(winname, img)
cv2.waitKey(delay)
cv2.destroyAllWindows()
@staticmethod
def pil_to_cv2(img):
"""
pil转cv2图片
:param img: pil图像, <type 'PIL.JpegImagePlugin.JpegImageFile'>
:return: cv2图像, <type 'numpy.ndarray'>
"""
img = cv2.cvtColor(np.asarray(img), cv2.COLOR_RGB2BGR)
return img
@staticmethod
def bytes_to_cv2(img):
"""
二进制图片转cv2
:param img: 二进制图片数据, <type 'bytes'>
:return: cv2图像, <type 'numpy.ndarray'>
"""
# 将图片字节码bytes, 转换成一维的numpy数组到缓存中
img_buffer_np = np.frombuffer(img, dtype=np.uint8)
# 从指定的内存缓存中读取一维numpy数据, 并把数据转换(解码)成图像矩阵格式
img_np = cv2.imdecode(img_buffer_np, 1)
return img_np
def cv2_open(self, img, flag=None):
"""
统一输出图片格式为cv2图像, <type 'numpy.ndarray'>
:param img: <type 'bytes'/'numpy.ndarray'/'str'/'Path'/'PIL.JpegImagePlugin.JpegImageFile'>
:param flag: 颜色空间转换类型, default: None
eg: cv2.COLOR_BGR2GRAY灰度图
:return: cv2图像, <numpy.ndarray>
"""
if isinstance(img, bytes):
img = self.bytes_to_cv2(img)
elif isinstance(img, (str, Path)):
img = cv2.imread(str(img))
elif isinstance(img, np.ndarray):
img = img
elif isinstance(img, PIL.Image.Image):
img = self.pil_to_cv2(img)
else:
raise ValueError(f'输入的图片类型无法解析: {type(img)}')
if flag is not None:
img = cv2.cvtColor(img, flag)
return img
def get_distance(self, bg, tp, im_show=False, save_path=None):
"""
:param bg: 背景图路径或Path对象或图片二进制
eg: 'assets/bg.jpg'
Path('assets/bg.jpg')
:param tp: 缺口图路径或Path对象或图片二进制
eg: 'assets/tp.jpg'
Path('assets/tp.jpg')
:param im_show: 是否显示结果, <type 'bool'>; default: False
:param save_path: 保存路径, <type 'str'/'Path'>; default: None
:return: 缺口位置
"""
# 读取图片
bg_img = self.cv2_open(bg)
tp_gray = self.cv2_open(tp, flag=cv2.COLOR_BGR2GRAY)
# 金字塔均值漂移
bg_shift = cv2.pyrMeanShiftFiltering(bg_img, 5, 50)
# 边缘检测
tp_gray = cv2.Canny(tp_gray, 255, 255)
bg_gray = cv2.Canny(bg_shift, 255, 255)
# 目标匹配
result = cv2.matchTemplate(bg_gray, tp_gray, cv2.TM_CCOEFF_NORMED)
# 解析匹配结果
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result)
distance = max_loc[0]
if save_path or im_show:
# 需要绘制的方框高度和宽度
tp_height, tp_width = tp_gray.shape[:2]
# 矩形左上角点位置
x, y = max_loc
# 矩形右下角点位置
_x, _y = x + tp_width, y + tp_height
# 绘制矩形
bg_img = self.cv2_open(bg)
cv2.rectangle(bg_img, (x, y), (_x, _y), (0, 0, 255), 2)
# 保存缺口识别结果到背景图
if save_path:
cv2.imwrite(save_path, bg_img)
# 显示缺口识别结果
if im_show:
self.imshow(bg_img)
return max_loc[0]
@staticmethod
def decrypt_img(imgstr):
res = []
s = [-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
-1,
-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, 24, 3, -1, 20, -1, 17,
8,
-1, 30, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, 12, 22, 10, -1, -1, 15, 14, 6, -1, 5, -1, -1, 7,
18,
-1, 25, 9, -1, 28, -1, 2, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, 1, 21, -1, 31, 13, 16, -1, 26, -1, 27,
-1, 0,
19, -1, 11, 4, -1, -1, 23, -1, 29, -1, -1, -1, -1, -1, -1]
for r in range(0, len(imgstr), 8):
o = s[ord(imgstr[r])]
i = s[ord(imgstr[r + 1])]
a = s[ord(imgstr[r + 2])]
c = s[ord(imgstr[r + 3])]
u = s[ord(imgstr[r + 4])]
d = s[ord(imgstr[r + 5])]
f = s[ord(imgstr[r + 6])]
p = (31 & o) << 3 | (31 & i) >> 2
h = (3 & i) << 6 | (31 & a) << 1 | (31 & c) >> 4
m = (15 & c) << 4 | (31 & u) >> 1
v = (1 & u) << 7 | (31 & d) << 2 | (31 & f) >> 3
g = (7 & f) << 5 | 31 & s[ord(imgstr[r + 7])]
res.append(chr(((31 & p) << 3 | h >> 5)))
res.append(chr((31 & h) << 3 | m >> 5))
res.append(chr((31 & m) << 3 | v >> 5))
res.append(chr((31 & v) << 3 | g >> 5))
res.append(chr((31 & g) << 3 | p >> 5))
y = ''.join(res)
for c in ['#', '@?', '*&%', '<$|>']:
y = y.replace(c, '')
return y.replace('data:image/png;base64,', '')
def main(self, save_path=None):
return self.get_distance(
bg=base64.urlsafe_b64decode(self.decrypt_img(self.bg)),
tp=base64.urlsafe_b64decode(self.decrypt_img(self.tp)),
save_path=save_path
)
# AutiContent类已移除 - 后端会提供所有必要的anti_content
def gzip_compress(self, data):
"""压缩数据"""
try:
if self.ctx:
result = self.ctx.call("gzipCompress", data)
logger.info("成功压缩数据")
return result
except Exception as e:
logger.error(f"调用gzipCompress函数失败: {e}")
# fallback处理简单返回原数据
logger.warning("使用fallback处理返回原数据")
return data
class EncryptTool:
"""加密工具类"""
@staticmethod
def aes_encrypt(text, key, iv):
cipher = AES.new(key.encode('utf-8'), AES.MODE_CBC, iv.encode('utf-8'))
encrypted_text = cipher.encrypt(pad(text.encode('utf-8'), AES.block_size))
return base64.b64encode(encrypted_text).decode('utf-8')
@staticmethod
def rsa_encrypt(data):
public_key = "MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQC6zHXNom934tsG9SC73vAUv99bIuSRVaLsuTMY+OL6aS6eB7AuNoU+m9gPCrI7aFrT7CSiKTJ47DNwCNZO52AlLzvB6TjdUwuIXWpinE8VCsYAZgOCrx+mK9Sy0OuwnqNj5D2wUdGoN0nxbl1q2akeAa18A/iBpiXx0SZQexbEowIDAQAB"
rsa_key = RSA.import_key(base64.b64decode(public_key)) # 导入读取到的公钥
cipher = PKCS1_v1_5.new(rsa_key) # 生成对象
cipher_text = base64.b64encode(cipher.encrypt(data.encode(encoding="utf-8")))
return cipher_text.decode("utf-8")
@staticmethod
def get_key_iv(salt):
t = {'aes_key': r"bN3%cH2$H1@*jCo$", 'aes_iv': r"gl3-w^dN)3#h6E1%"}
if not salt or 9 != len(salt):
return t
else:
n = salt[:1]
r = salt[1:]
o = r[:4]
i = r[4:]
a = list(i)
s = t['aes_key'][:8] if n in ["a", "b"] else t['aes_iv'][:8]
c = 'aes_key' if n in ["a", "b"] else 'aes_iv'
l = ''
if n == 'c':
t[c] = s + r
elif n == 'd':
for u in range(4):
l += a[int(o[u])]
t[c] = s + l + i
return t
class PddLogin:
"""拼多多登录核心类"""
def __init__(self, log_callback=None):
super().__init__()
self.login_api = "https://mms.pinduoduo.com/janus/api/auth"
self.headers = {
"authority": "apiv2.pinduoduo.net",
"accept": "*/*",
"accept-language": "zh-CN,zh;q=0.9",
"cache-control": "no-cache",
"content-type": "application/json;charset=UTF-8",
"origin": "https://mms.pinduoduo.com",
"pragma": "no-cache",
"referer": "https://mms.pinduoduo.com/",
"sec-ch-ua": "\"Not_A Brand\";v=\"8\", \"Chromium\";v=\"120\", \"Google Chrome\";v=\"120\"",
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": "\"Windows\"",
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "cross-site",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
}
self.verify_auth_token = None
self.cookies = dict()
self.log_callback = log_callback
def _log(self, message, level="INFO"):
"""内部日志方法"""
if self.log_callback:
self.log_callback(message, level)
else:
print(f"[{level}] {message}")
# 滑块参数生成
def captcha_collect(self, salt, track_list):
ts = int(round(time.time() * 1000))
captcha_collect_data = json.dumps({
"v": "a",
"ts": ts + random.randint(5700, 5950),
"t1": ts,
"t2": ts + random.randint(15700, 19950),
"tp": 3,
"ua": self.headers["user-agent"],
"rf": "",
"platform": 1,
"hl": "000000000001010",
"sc": {
"w": 1536,
"h": 824
},
"ihs": 1,
"imageSize": {
"width": 272,
"height": 198
},
"del": [
[
0.8,
177.4,
1733388719033
]
],
"mel": track_list,
"uel": [track_list[-1]],
"mell": [
[
[
250.8,
187.4,
1733388711350
],
[
248.8,
187.4,
1733388711366
],
[
240.8,
187.4,
1733388711382
],
[
233.8,
186.4,
1733388711398
],
[
225.8,
185.4,
1733388711414
],
[
222.8,
184.4,
1733388711430
],
[
219.8,
184.4,
1733388711446
],
[
216.8,
184.4,
1733388711462
],
[
208.8,
183.4,
1733388711478
],
[
202.8,
182.4,
1733388711494
],
[
192.8,
182.4,
1733388711511
],
[
168.8,
180.4,
1733388711534
],
[
160.8,
180.4,
1733388711550
],
[
159.8,
180.4,
1733388711630
],
[
156.8,
180.4,
1733388711670
],
[
152.8,
180.4,
1733388711686
],
[
146.8,
180.4,
1733388711702
],
[
136.8,
180.4,
1733388711718
],
[
128.8,
180.4,
1733388711734
],
[
124.8,
180.4,
1733388711750
],
[
118.8,
180.4,
1733388711766
],
[
110.8,
180.4,
1733388711782
],
[
100.8,
180.4,
1733388711798
],
[
91.8,
180.4,
1733388711814
],
[
83.8,
180.4,
1733388711830
],
[
75.8,
180.4,
1733388711846
],
[
70.8,
180.4,
1733388711862
],
[
68.8,
180.4,
1733388711878
],
[
66.8,
180.4,
1733388711910
],
[
64.8,
180.4,
1733388711926
],
[
58.8,
180.4,
1733388711942
],
[
51.8,
180.4,
1733388711959
],
[
42.8,
180.4,
1733388711975
],
[
35.8,
180.4,
1733388711994
],
[
33.8,
180.4,
1733388712054
],
[
32.8,
180.4,
1733388712278
],
[
31.8,
182.4,
1733388712295
],
[
30.8,
182.4,
1733388712310
],
[
28.8,
182.4,
1733388712479
],
[
27.8,
182.4,
1733388712502
],
[
26.8,
182.4,
1733388712518
],
[
24.8,
182.4,
1733388712550
],
[
26.8,
182.4,
1733388712886
],
[
31.8,
181.4,
1733388712902
],
[
35.8,
180.4,
1733388712918
],
[
37.8,
179.4,
1733388712934
],
[
38.8,
179.4,
1733388712950
],
[
44.8,
179.4,
1733388712966
],
[
55.8,
179.4,
1733388712986
],
[
88.8,
178.4,
1733388712999
],
[
164.8,
172.4,
1733388713014
],
[
255.8,
165.4,
1733388713030
],
[
346.8,
157.4,
1733388713046
],
[
360.8,
161.4,
1733388715672
],
[
340.8,
161.4,
1733388715687
],
[
320.8,
164.4,
1733388715702
],
[
287.8,
168.4,
1733388715718
],
[
240.8,
176.4,
1733388715735
],
[
184.8,
182.4,
1733388715750
],
[
144.8,
184.4,
1733388715766
],
[
112.8,
186.4,
1733388715782
],
[
98.8,
186.4,
1733388715798
],
[
95.8,
186.4,
1733388715926
],
[
93.8,
186.4,
1733388715942
],
[
89.8,
187.4,
1733388715958
],
[
79.8,
188.4,
1733388715983
],
[
57.8,
192.4,
1733388715999
],
[
44.8,
192.4,
1733388716015
],
[
36.8,
192.4,
1733388716030
],
[
11.8,
188.4,
1733388716073
],
[
-0.2,
184.4,
1733388716134
],
[
-1.2,
184.4,
1733388716191
],
[
-2.2,
184.4,
1733388716206
],
[
-2.2,
182.4,
1733388716318
],
[
-0.2,
181.4,
1733388716342
],
[
0.8,
179.4,
1733388716878
],
[
2.8,
177.4,
1733388716894
],
[
3.8,
177.4,
1733388716910
],
[
4.8,
176.4,
1733388716926
],
[
3.8,
176.4,
1733388717299
],
[
2.8,
176.4,
1733388717646
],
[
1.8,
176.4,
1733388717662
],
[
0.8,
176.4,
1733388717699
],
[
-0.2,
176.4,
1733388717838
],
[
-0.2,
177.4,
1733388718115
],
[
0.8,
177.4,
1733388718490
]
],
track_list
]
}, separators=(',', ':'))
captcha_collect_gzip = self.gzip_compress(captcha_collect_data)
aes_key_iv = EncryptTool.get_key_iv(salt=salt)
captcha_collect = EncryptTool.aes_encrypt(captcha_collect_gzip, aes_key_iv['aes_key'], aes_key_iv['aes_iv'])
return captcha_collect
# 获取滑块加密参数key和iv
def vc_pre_ck_b(self, backend_anti_content):
self.headers.pop("anti-content", None)
url = "https://apiv2.pinduoduo.net/api/phantom/vc_pre_ck_b"
payload = {
"anti_content": backend_anti_content,
"verify_auth_token": self.verify_auth_token,
"sdk_type": 3,
"client_time": int(round(time.time() * 1000))
}
response = requests.post(url, headers=self.headers, json=payload, cookies=self.cookies)
self.cookies.update(response.cookies.get_dict())
return response.json().get("salt")
# 获取滑块图片
def obtain_captcha(self, backend_anti_content):
url = "https://apiv2.pinduoduo.net/api/phantom/obtain_captcha"
payload = {
"anti_content": backend_anti_content,
"verify_auth_token": self.verify_auth_token,
"captcha_collect": ""
}
response = requests.post(url, headers=self.headers, json=payload, cookies=self.cookies)
self.cookies.update(response.cookies.get_dict())
return response.json().get("pictures")
# 滑块验证
def verify(self, captcha_collect, verify_code, backend_anti_content):
url = "https://apiv2.pinduoduo.net/api/phantom/user_verify"
payload = {
"verify_code": verify_code,
"captcha_collect": captcha_collect,
"verify_auth_token": self.verify_auth_token,
"anti_content": backend_anti_content
}
response = requests.post(url, headers=self.headers, json=payload, cookies=self.cookies)
self.cookies.update(response.cookies.get_dict())
result = response.json().get("result")
self._log(f"滑块验证结果: {response.text}")
return result
# 发送验证码通知给后端,并获取验证码
2025-09-25 11:13:05 +08:00
def request_verification_code(self, username, store_id, backend_anti_content, phone_number=None):
"""向后端请求获取手机验证码"""
2025-09-25 11:13:05 +08:00
self._log(f"开始请求验证码,用户名: {username}, 店铺ID: {store_id}, 手机号: {phone_number}", "INFO")
# 使用后端下发的anti_content
anti_content = backend_anti_content
self._log(f"使用后端下发的anti_content", "DEBUG")
self.headers["anti-content"] = anti_content
url = "https://mms.pinduoduo.com/janus/api/user/getLoginVerificationCode"
payload = {
"username": username,
"crawlerInfo": anti_content
}
response = requests.post(url, headers=self.headers, json=payload, cookies=self.cookies)
self._log(f"发送验证码请求结果: {response.text}")
2025-09-25 11:13:05 +08:00
# 发送消息给后端,告知需要验证码(包含手机号)
self._log("准备向后端发送验证码需求通知", "INFO")
2025-09-25 11:13:05 +08:00
self._send_verification_needed_message(store_id, phone_number) # 🔥 传递手机号
# 这里需要等待后端重新下发包含验证码的登录参数
# 实际实现中这个方法会在接收到新的登录参数后被重新调用
return None
2025-09-25 11:13:05 +08:00
def _send_verification_needed_message(self, store_id, phone_number=None):
"""向后端发送需要验证码的通知"""
try:
2025-09-25 11:13:05 +08:00
self._log(f"开始发送验证码需求通知店铺ID: {store_id}, 手机号: {phone_number}", "INFO")
from WebSocket.backend_singleton import get_backend_client
backend = get_backend_client()
self._log(f"获取到后端客户端: {backend is not None}", "DEBUG")
if backend:
message = {
"type": "connect_message",
"store_id": store_id,
"status": False,
2025-09-25 11:13:05 +08:00
"content": "需要验证码",
"phone_number": phone_number # 🔥 新增手机号字段
}
self._log(f"准备发送验证码通知消息: {message}", "DEBUG")
backend.send_message(message)
2025-09-25 11:13:05 +08:00
self._log("✅ 成功向后端发送验证码需求通知(含手机号)", "SUCCESS")
else:
self._log("❌ 后端客户端为空,无法发送验证码需求通知", "ERROR")
except Exception as e:
self._log(f"❌ 发送验证码需求通知失败: {e}", "ERROR")
import traceback
self._log(f"错误详情: {traceback.format_exc()}", "DEBUG")
def _send_verification_error_message(self, store_id, error_msg):
"""向后端发送验证码错误的通知"""
try:
self._log(f"开始发送验证码错误通知店铺ID: {store_id}, 错误: {error_msg}", "INFO")
from WebSocket.backend_singleton import get_backend_client
backend = get_backend_client()
if backend:
message = {
"type": "connect_message",
"store_id": store_id,
"status": False,
"content": error_msg
}
self._log(f"准备发送验证码错误消息: {message}", "DEBUG")
backend.send_message(message)
self._log("✅ 成功向后端发送验证码错误通知", "SUCCESS")
else:
self._log("❌ 后端客户端为空,无法发送验证码错误通知", "ERROR")
except Exception as e:
self._log(f"❌ 发送验证码错误通知失败: {e}", "ERROR")
import traceback
self._log(f"错误详情: {traceback.format_exc()}", "DEBUG")
def _send_login_success_message(self, store_id):
"""向后端发送登录成功的通知"""
try:
self._log(f"开始发送登录成功通知店铺ID: {store_id}", "INFO")
from WebSocket.backend_singleton import get_backend_client
backend = get_backend_client()
if backend:
message = {
"type": "connect_message",
"store_id": store_id,
"status": True # 登录成功
}
self._log(f"准备发送登录成功消息: {message}", "DEBUG")
backend.send_message(message)
self._log("✅ 成功向后端发送登录成功通知", "SUCCESS")
else:
self._log("❌ 获取后端客户端失败", "ERROR")
except Exception as e:
self._log(f"❌ 发送登录成功通知失败: {e}", "ERROR")
import traceback
self._log(f"错误详情: {traceback.format_exc()}", "DEBUG")
def _send_login_failure_message(self, store_id, error_msg):
"""向后端发送登录失败的通知"""
try:
self._log(f"开始发送登录失败通知店铺ID: {store_id}, 错误: {error_msg}", "INFO")
from WebSocket.backend_singleton import get_backend_client
backend = get_backend_client()
if backend:
message = {
"type": "connect_message",
"store_id": store_id,
"status": False, # 登录失败
"content": error_msg # 官方返回的失败原因
}
self._log(f"准备发送登录失败消息: {message}", "DEBUG")
backend.send_message(message)
self._log("✅ 成功向后端发送登录失败通知", "SUCCESS")
else:
self._log("❌ 获取后端客户端失败", "ERROR")
except Exception as e:
self._log(f"❌ 发送登录失败通知失败: {e}", "ERROR")
import traceback
self._log(f"错误详情: {traceback.format_exc()}", "DEBUG")
def login_with_params(self, login_params, store_id=None, error_count=0, success_count=0):
"""使用后端下发的登录参数进行登录"""
self._log("🚀 [PddLogin] 开始使用参数登录", "INFO")
# 检查验证码字段(兼容 code 和 verification_code
verification_code = login_params.get("verification_code") or login_params.get("code", "")
self._log(f"📋 [PddLogin] 登录参数: username={login_params.get('username', 'N/A')}, 包含验证码={bool(verification_code)}", "DEBUG")
self.headers["anti-content"] = login_params.get("anti_content", "")
# 直接使用后端提供的参数构建登录请求
ts = login_params.get("timestamp", int(round(time.time() * 1000)))
payload = {
"username": login_params.get("username", ""),
"password": login_params.get("password", ""), # 后端已加密
"passwordEncrypt": True,
"verificationCode": verification_code,
"mobileVerifyCode": "", # 使用兼容的验证码字段
"sign": "",
"touchevent": {
"mobileInputEditStartTime": ts - random.randint(10000, 20000),
"mobileInputEditFinishTime": ts - random.randint(8000, 15000),
"mobileInputKeyboardEvent": "0|1|1|-6366-6942-142",
"passwordInputEditStartTime": ts - random.randint(5000, 10000),
"passwordInputEditFinishTime": ts - random.randint(3000, 8000),
"passwordInputKeyboardEvent": "0|1|1|195-154-755-477",
"captureInputEditStartTime": "",
"captureInputEditFinishTime": "",
"captureInputKeyboardEvent": "",
"loginButtonTouchPoint": "1263,586",
"loginButtonClickTime": ts - random.randint(100, 1000)
},
"fingerprint": {
"innerHeight": 906,
"innerWidth": 1707,
"devicePixelRatio": 1.5,
"availHeight": 1027,
"availWidth": 1707,
"height": 1067,
"width": 1707,
"colorDepth": 24,
"locationHref": "https://mms.pinduoduo.com/login/sso?platform=wholesale&redirectUrl=htt",
"clientWidth": 1707,
"clientHeight": 906,
"offsetWidth": 1707,
"offsetHeight": 906,
"scrollWidth": 2882,
"scrollHeight": 906,
"navigator": {
"appCodeName": "Mozilla",
"appName": "Netscape",
"hardwareConcurrency": 16,
"language": "zh-CN",
"cookieEnabled": True,
"platform": "Win32",
"doNotTrack": True,
"ua": self.headers.get("user-agent"),
"vendor": "Google Inc.",
"product": "Gecko",
"productSub": "20030107",
"mimeTypes": "f5a1111231f589322da33fb59b56946b4043e092",
"plugins": "387b918f593d4d8d6bfa647c07e108afbd7a6223"
},
"referer": "",
"timezoneOffset": -480
},
"riskSign": login_params.get("risk_sign", ""), # 后端已加密
"timestamp": ts,
"crawlerInfo": login_params.get("anti_content", "")
}
# 添加详细的请求日志
self._log(f"🔍 [Debug] 登录请求URL: {self.login_api}", "DEBUG")
self._log(f"🔍 [Debug] 登录请求payload: {payload}", "DEBUG")
self._log(f"🔍 [Debug] 登录请求headers: {dict(self.headers)}", "DEBUG")
response = requests.post(self.login_api, headers=self.headers, json=payload, cookies=self.cookies)
self.cookies.update(response.cookies.get_dict())
2025-09-25 11:13:05 +08:00
# 🔥 增强日志输出 - 打印完整响应信息
self._log(f"登录响应状态码: {response.status_code}")
2025-09-25 11:13:05 +08:00
self._log(f"登录响应Headers: {dict(response.headers)}")
self._log(f"登录完整响应内容: {response.text}")
# 🔥 尝试解析JSON响应获取更多信息
response_json = None # 🔥 在外部定义变量
try:
response_json = response.json()
self._log(f"登录响应JSON结构: {json.dumps(response_json, ensure_ascii=False, indent=2)}")
# 特别关注result字段中可能包含的手机号信息
if "result" in response_json:
result_data = response_json["result"]
self._log(f"响应result字段详情: {json.dumps(result_data, ensure_ascii=False, indent=2)}")
# 查找可能的手机号字段
possible_phone_fields = ["phone", "mobile", "phoneNumber", "mobileNumber", "telephone", "tel", "cellphone", "handphone"]
for field in possible_phone_fields:
if field in result_data:
self._log(f"🔍 发现可能的手机号字段 {field}: {result_data[field]}")
except Exception as e:
self._log(f"解析响应JSON失败: {e}", "DEBUG")
# 检查响应内容
if "需要手机验证" in response.text:
self._log("✅ 检测到需要手机验证的响应", "INFO")
elif "需要验证图形验证码" in response.text:
self._log("✅ 检测到需要图形验证码的响应", "INFO")
else:
self._log("⚠️ 未检测到验证码相关响应", "WARNING")
if "需要验证图形验证码" in response.text:
self.verify_auth_token = response.json().get("result").get("verifyAuthToken")
salt = self.vc_pre_ck_b() # 获取生成aes key和iv 的密文值
pictures = self.obtain_captcha() # 获取验证码图片
distance = round((ImgDistance(bg=pictures[0], tp=pictures[1]).main() * (272 / 320)) + (48.75 / 2), 2) # 计算距离
track_list = Track.get_track(distance=distance) # 生成轨迹
captcha_collect = self.captcha_collect(salt=salt, track_list=track_list) # 生成captcha_collect参数
result = self.verify(captcha_collect=captcha_collect, verify_code=distance) # 检验
if result:
self.cookies.update({
"msfe-pc-cookie-captcha-token": self.verify_auth_token
})
self.headers["verifyauthtoken"] = self.verify_auth_token
success_count += 1
# 如果滑块成功 success_count 计数一次 成功8次还是显示验证码则失败 返回False
if success_count < 8:
return self.login_with_params(login_params=login_params, store_id=store_id, success_count=success_count)
else:
return False
else:
# 如果滑块失败 error_count 计数一次 失败8次则返回False
error_count += 1
if error_count < 8:
return self.login_with_params(login_params=login_params, store_id=store_id, error_count=error_count)
else:
return False
elif "需要手机验证" in response.text:
# 检查是否已经包含验证码(兼容 code 和 verification_code
if verification_code:
# 已有验证码但仍需要验证,验证码可能错误、过期或者其他问题
self._log(f"带验证码登录失败,验证码: {verification_code}", "WARNING")
# 检查响应中的具体错误信息
response_data = response.json()
error_msg = response_data.get('errorMsg', '验证码验证失败')
self._log(f"服务器返回错误: {error_msg}", "WARNING")
# 不要重新发送验证码请求,直接报告验证失败
self._send_verification_error_message(store_id, error_msg) # 直接使用官方错误信息
return "verification_code_error" # 返回特殊状态,避免重复发送消息
else:
# 第一次需要验证码,调用发送验证码方法
self._log("检测到需要手机验证码,正在调用发送验证码方法", "INFO")
username = login_params.get("username")
backend_anti_content = login_params.get("anti_content")
self._log(f"为用户 {username} 发送验证码使用后端anti_content", "INFO")
2025-09-25 11:13:05 +08:00
# 🔥 从响应中提取手机号
phone_number = None
try:
if response_json and isinstance(response_json, dict):
phone_number = response_json.get("result") # 手机号在result字段中
if phone_number and isinstance(phone_number, str):
self._log(f"🔍 从登录响应中提取到手机号: {phone_number}", "SUCCESS")
else:
self._log("⚠️ 响应中的result字段不包含有效手机号", "WARNING")
self._log(f"🔍 DEBUG: response_json = {response_json}", "DEBUG")
else:
self._log("⚠️ response_json 为空或格式不正确", "WARNING")
self._log(f"🔍 DEBUG: response_json = {response_json}", "DEBUG")
except Exception as e:
self._log(f"❌ 提取手机号时出错: {e}", "DEBUG")
import traceback
self._log(f"🔍 DEBUG: 错误详情: {traceback.format_exc()}", "DEBUG")
# 传递后端下发的anti_content和手机号
self.request_verification_code(username, store_id, backend_anti_content, phone_number) # 🔥 传递手机号
return "need_verification_code"
else:
# 登录成功或其他情况
response_data = response.json()
if response_data.get("success"):
self._log("🎉 登录成功!", "SUCCESS")
# 发送登录成功通知给后端
self._send_login_success_message(store_id)
return self.cookies
else:
# 登录失败,发送失败通知给后端
error_msg = response_data.get('errorMsg', '未知错误')
self._log(f"登录失败: {error_msg}", "ERROR")
self._send_login_failure_message(store_id, error_msg)
return "login_failure" # 返回特殊状态,避免重复发送消息
# ===== 登录相关类集成结束 =====
2025-09-12 20:42:00 +08:00
# 定义持久化数据类 - 参考京东的WebsocketManager
class WebsocketManager:
_instance = None
_lock = threading.Lock()
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._store = {}
cls._instance._lock = threading.RLock()
return cls._instance
def on_connect(self, shop_key, ws, **kwargs):
"""完全保持原有数据结构"""
with self._lock:
entry = self._store.setdefault(shop_key, {
'platform': None,
'customers': [],
'user_assignments': {}
})
entry['platform'] = {
'ws': ws, # 注意:这里存储的是强引用
'last_heartbeat': datetime.now(),
**kwargs
}
return entry
def get_connection(self, shop_key):
with self._lock:
return self._store.get(shop_key)
def remove_connection(self, shop_key):
with self._lock:
if shop_key in self._store:
del self._store[shop_key]
class PddBackendService:
"""拼多多后端服务调用类(新版:使用统一后端连接 BackendClient"""
def __init__(self):
self.current_store_id = None
async def connect(self, store_id):
"""连接后端服务(使用统一连接,无需独立连接)"""
self.current_store_id = store_id
return True
async def send_message_to_backend(self, platform_message):
"""改为通过单后端连接发送需携带store_id"""
try:
from WebSocket.backend_singleton import get_backend_client
backend = get_backend_client()
if not backend:
return None
# 确保消息包含store_id
if isinstance(platform_message, dict):
if 'store_id' not in platform_message and self.current_store_id:
platform_message['store_id'] = self.current_store_id
2025-09-13 19:54:30 +08:00
2025-09-12 20:42:00 +08:00
# 通过统一后端连接发送
backend.send_message(platform_message)
return True
except Exception:
return None
class ChatPdd:
@staticmethod
def check_js_files():
"""检查JS文件是否存在"""
# 使用资源路径解析函数
current_dir = ChatPdd._get_resource_path("static/js")
2025-09-12 20:42:00 +08:00
required_files = ["dencode_message.js", "encode_message.js"]
for file in required_files:
file_path = os.path.join(current_dir, file)
if not os.path.exists(file_path):
raise FileNotFoundError(f"找不到必需的JS文件: {file_path}")
return True
@staticmethod
def _get_resource_path(relative_path):
"""获取资源文件的绝对路径兼容PyInstaller打包环境"""
try:
print(f"[DEBUG] 正在解析资源路径: {relative_path}")
# PyInstaller环境下的基础路径
if hasattr(sys, '_MEIPASS'):
# PyInstaller 临时目录
base_path = sys._MEIPASS
print(f"[DEBUG] 检测到PyInstaller环境_MEIPASS: {base_path}")
elif hasattr(sys, 'frozen') and sys.frozen:
# 其他打包环境
base_path = os.path.dirname(sys.executable)
print(f"[DEBUG] 检测到其他打包环境executable目录: {base_path}")
else:
# 开发环境
base_path = os.path.dirname(os.path.abspath(__file__))
# 向上两级目录到项目根目录
base_path = os.path.dirname(os.path.dirname(base_path))
print(f"[DEBUG] 开发环境,计算的项目根目录: {base_path}")
resource_path = os.path.join(base_path, relative_path)
print(f"[DEBUG] 拼接后的完整资源路径: {resource_path}")
# 检查路径是否存在
if os.path.exists(resource_path):
print(f"[DEBUG] ✅ 资源路径存在: {resource_path}")
else:
print(f"[DEBUG] ❌ 资源路径不存在: {resource_path}")
# 尝试列出基础路径的内容
print(f"[DEBUG] 基础路径 {base_path} 的内容:")
try:
for item in os.listdir(base_path):
item_path = os.path.join(base_path, item)
if os.path.isdir(item_path):
print(f"[DEBUG] 📁 {item}/")
else:
print(f"[DEBUG] 📄 {item}")
except Exception as e:
print(f"[DEBUG] 无法列出目录内容: {e}")
return resource_path
except Exception as e:
print(f"[ERROR] 获取资源路径失败: {e}")
import traceback
print(f"[ERROR] 堆栈跟踪: {traceback.format_exc()}")
# 降级处理:返回相对路径
return relative_path
2025-09-12 20:42:00 +08:00
def __init__(self, cookie, chat_list_stat, csname=None, text=None, log_callback=None):
# 检查JS文件
self.check_js_files()
# 获取JS文件路径 - 使用资源路径解析
js_dir = self._get_resource_path("static/js")
dencode_js_path = os.path.join(js_dir, "dencode_message.js")
encode_js_path = os.path.join(js_dir, "encode_message.js")
2025-09-12 20:42:00 +08:00
# 读取JS文件
try:
with open(dencode_js_path, 'r', encoding='utf-8') as f:
jscode = f.read()
self.ctx = execjs.compile(jscode)
with open(encode_js_path, 'r', encoding='utf-8') as f:
ejscode = f.read()
self.encodeex = execjs.compile(ejscode)
except Exception as e:
raise RuntimeError(f"读取JS文件失败: {str(e)}")
# 首先设置log_callback然后才能使用_log方法
self.log_callback = log_callback
self._log("初始化ChatPdd实例", "INFO")
self.chat_list_stat = chat_list_stat
self.text = text
self.cookie = cookie
self.auth_token = None
self.mall_id = None
self.user_id = None
self.csname = csname
self.csid = None
self.staff_list_sent = False
self.uids = set()
self.pool = ThreadPoolExecutor(max_workers=4)
# WebSocket管理器
self.ws_manager = WebsocketManager()
# 后端服务实例
self.backend_service = PddBackendService()
self.backend_connected = False
# 重连参数
self.reconnect_attempts = 0
self.max_reconnect_attempts = 10
self.base_reconnect_delay = 1.0
self.max_reconnect_delay = 60.0
self.reconnect_backoff = 1.5
self.headers = {
"authority": "mms.pinduoduo.com",
"accept": "*/*",
"accept-language": "zh-CN,zh;q=0.9",
"anti-content": "0asWfxUeM_VefxObk-_v-fwmt7oS3cmHsWwMkApMVigWOS3hCHfBeF-hHM1_v9LHywCtfzDKkA_F1cUE8_HKBA5D7fVkM2ZDB-KmMxhDM51HXlVrfK_LG4Dc3S0wimw8XYPyXdgJnUv8ndgjs0EynYv8n0XjXU9YXY4Pt0ZVgTgrsIUEeMzMk7s02E3oo-zMdF35CIMWVetBhs95lGG5xENwZwX0CwXH2799MeMjVKzk5DD4Kbs2dM1bD-fJ1F-RImB3SHBkVKDJZbZoUbL4UMk8DFtrISf8CMD4Obk_CM3ICIB_F90ws9ZQ0gcOpVdIIz2PLPVcYt5X0yqYiwjtuNVfdNSf0rmfmNIndXj8G8Nyn4ianRaanoucDp0Dfvd36B6eamPGwOFS0TAyo7nXH_nwxnGS28AgVnqPyPuy8jczb1Oe3xZuPHoJ97BBbaTMfV9OcQ23T-mAUel",
"cache-control": "no-cache",
"content-type": "application/json",
"origin": "https://mms.pinduoduo.com",
"pragma": "no-cache",
"referer": "https://mms.pinduoduo.com/chat-merchant/index.html?r=0.4940608191737519",
"sec-ch-ua": "\"Not_A Brand\";v=\"8\", \"Chromium\";v=\"120\", \"Google Chrome\";v=\"120\"",
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": "\"Windows\"",
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "same-origin",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
}
self.store_id = "538727ec-c84d-4458-8ade-3960c9ab802c" # 可以根据需要修改
self.user_tokens = {} # 存储用户token信息
def _log(self, message, level="INFO"):
"""内部日志方法"""
if self.log_callback:
self.log_callback(message, level)
else:
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
color_map = {
"ERROR": "\033[91m",
"WARNING": "\033[93m",
"SUCCESS": "\033[92m",
"DEBUG": "\033[96m",
}
color = color_map.get(level, "")
reset = "\033[0m"
print(f"{color}[{timestamp}] [{level}] {message}{reset}")
# 重连机制方法
async def calculate_reconnect_delay(self):
"""计算指数退避的重连延迟时间"""
delay = self.base_reconnect_delay * (self.reconnect_backoff ** self.reconnect_attempts)
return min(delay, self.max_reconnect_delay)
async def should_reconnect(self):
"""判断是否应该继续重连"""
if self.reconnect_attempts >= self.max_reconnect_attempts:
self._log(f"已达到最大重连次数({self.max_reconnect_attempts}),停止重连", "ERROR")
return False
return True
async def handle_reconnect(self, exception=None):
"""处理重连逻辑"""
if exception:
error_type = type(exception).__name__
error_msg = str(exception)
self._log(f"连接异常[{error_type}]: {error_msg}", "WARNING")
if not await self.should_reconnect():
return False
delay = await self.calculate_reconnect_delay()
self._log(f"{self.reconnect_attempts + 1}次重连尝试,等待{delay:.1f}秒...", "WARNING")
await asyncio.sleep(delay)
self.reconnect_attempts += 1
return True
# 后端服务调用方法
async def connect_backend_service(self, store_id):
"""连接后端AI服务"""
try:
success = await self.backend_service.connect(store_id)
if success:
self.backend_connected = True
self._log("✅ 后端AI服务连接成功", "SUCCESS")
return success
except Exception as e:
self._log(f"❌ 连接后端AI服务失败: {e}", "ERROR")
return False
async def get_ai_reply_from_backend(self, platform_message):
"""发送消息到后端(使用统一连接,不等待回复)"""
# 首先检查后端服务连接状态
if not self.backend_connected:
# 尝试重新连接
if self.store_id:
self.backend_connected = await self.connect_backend_service(self.store_id)
if not self.backend_connected:
self._log("❌ 后端服务未连接,尝试重新连接失败", "WARNING")
return None
try:
# 发送消息到后端(使用统一连接,不等待回复)
await self.backend_service.send_message_to_backend(platform_message)
self._log("✅ 消息已发送到后端等待后端AI处理并通过GUI转发回复", "INFO")
return None
except Exception as e:
self._log(f"❌ 发送消息到后端失败: {e}", "ERROR")
return None
def get_auth_token(self):
url = "https://mms.pinduoduo.com/janus/api/subSystem/getAuthToken"
data = {"subSystemId": 17,
"anti_content": "0asAfx5E-wCElqJNXaKt_UKccG7YNycjZoPYgO1YTmZoApN7QJU5XT_JuYdPZ4uvLPQFgIcyXOKqm8xGrPjy0lxn0gaXr9ac0TynYEJnDwdj-WEJnwK3G4kc3M0TiPgtB11eBeZkB1hkBxUHFOtjfqz-eAkgc2aa4sZuIJwHOptYX14VK1NJSqIYfqmw6jpQTXs574CfWMFxT1RPTIB2MKBjC199pXpkbdtXxnn2mA4C1YdNnTUrNa_Wvn5mYj5XadnDbNT7xNuVeYXrsTsiipUr6xn2AAXKoYmv6j0PL92PtCTMfZzzfTfjutCgvBTzxFw-2LeeRIkFBATU1Npg1ScUFRv-1Ukrs3AklVAbBiEbBJhzcf2cXKfNH50X9QUFCd_Y1FhLW1BBuI-KEUFYK3lZTB3g_BlLDk8ti-JXmbalXzWAb6V2hX0fZE9n2L0GITFInNS"}
response = requests.post(url, cookies=self.cookie, headers=self.headers, json=data).json()
self.auth_token = response["result"]["authToken"]
def get_mall_id(self):
url = "https://mms.pinduoduo.com/chats/userinfo/realtime?get_response=true"
data = {"get_response": "true"}
response = requests.get(url, cookies=self.cookie, headers=self.headers, params=data).json()
self.mall_id = response["mall_id"]
self.user_id = response["id"]
def get_assign_cslist(self):
url = "https://mms.pinduoduo.com/latitude/assign/getAssignCsList"
data = {
"wechatCheck": True
}
data = json.dumps(data, separators=(',', ':'))
response = requests.post(url, headers=self.headers, cookies=self.cookie, data=data).json()
cslist = response["result"]["csList"]
keys = cslist.keys()
for key in keys:
username = cslist[key].get("username")
if username == self.csname:
self.csid = key
break
def tab_mall(self, uid):
"""执行转接操作"""
self._log(f"🔄 开始执行转接操作 - 用户ID: {uid}, 目标客服ID: {self.csid}", "INFO")
try:
url = "https://mms.pinduoduo.com/plateau/chat/move_conversation"
request_id = int(time.time() * 1000)
data = {
"data": {
"cmd": "move_conversation",
"request_id": request_id,
"conversation": {
"csid": self.csid,
"uid": uid,
"need_wx": False,
"remark": "无原因直接转移"
},
"anti_content": "0asAfxvYXyIgYgE2h9U0sXh6Ia3tZOAzI_rl3hixs1Z5DB-B1cswVK2UB8LfUO8H3OUKdF214-SgPVnY8SYvSdmwnKDoPXAs4OdtzfKwkT6gAO2d4R8ZVTw3a7j9BdB3Hmq7j0bh2LQrRzlt88pkQr-s3jMzWH5QPiz13wQNsJY6h9dnigBE26Ww1dKZcWbsKS4GsgY9_joGBS2FZUz94cbCvE2Ij9nO93b5kR50m9kI1lfBHvDZVIC9rLB-S4Qs4J9ebSsosuNskY9nhgXzCuNoilSQgJaMkP2Rgga2i54nWTTNfnZ4W9GvigkSnwzweN9sx-89BztNCasVkP2C2Kkipif2Dv-t7e2O9MnmtWaKm4wFlHLR7YmeyF3y5gxPRuHlO-gD7BfFle8z71lIV-7SyOLCd83FMUCsRwedwv3-TNTF_VRw_G1DHrptFYTAck8p8qIkxA_PZ14PXct1ZS2zAoZZcTMZZSpJ1jP9RlQARtnNNP0IzCdHiL4JzRQBkkKmwGGXay0i4urnXNm5qBgejFcVCkx8XYdeGBp4p-l1TjKoy603__johKFKZQksRHjOKma5DplwQwg8c3Uo1-mDAUmgAC0XfV2rkxzXj3ABARPlyEaMU8AxUZioaFRa0e5eit6r06uY9m5SyU_mdbTfLIYcKWjRA_UNQisJOUenCbMl_Bywq1KcA6TTzJ9ZHu4xz0o7ctROYLlvhEhAi3bJjFNho6l-TPZK4zSJoj"
},
"client": "WEB",
"anti_content": "0asAfxvYXNIgYgd2i9U0sXh6IabtZOAdI_LlbhNUvZZ5DB-B1cswVK2UBjxfUOjHbOUKdF214-SgPVnYjSYvSdmwnKDoPXAs4OdtzfKwkT6gAO2d4RjZVTwbJFiC0P0rR6LbH89rLVuGHiG6vm3fO6rq_rAyLQlw8CAIuESMR4L308Ctz3V0m9ZhS8tdapQr_YY2vZZk9sAJsMd2ZapG92zm2lO9FLVBjCur2KWwac9nua9g8ztOawyo9ur0Dd96xMkCXrd4J4hR4Kk9zqZBDTlRPkMdzzkJ5Osgi40EC3oVAhSPA9MawGCs33Vnd4bSJER9_b_D9g1sfdeZZKsgo25k0WRfs7dM17E2ug-pPEL5_ib53TuaxLHJq5pGG4jMUjAHRV-gbDffut1Yjy1A7K-6E3zO4tDuUq5T0LPHjf9bB7Apofgr_U_4Aq-7l8x2ks8DhhgOhblTvwrO4owZvuUXA0hfSIUPYK9fIoSUH6aaL8abMMVCvI__qF8Yw1oaMp0d0-fbwMhXQ8fBZYoajXXJv-RV3B2GQEowvMHEUtjDz50jUVduq5fKL9R-WsBRjX0aiExVKLw-PZzxsVup0ZuOYxnPofN92EqKfbvriIWFMsh80eGrvq1vsD1BrMx_mcPMmUmHHEY7wct5lePa0b2Lt5_byqRbbcpohrQrHLlH6JJaTMKQuffyT-60110e5oz_Cz0NbG88TyxOGKijW7BimL5RTjSOTPsK4zS8oi"
}
self._log(f"📤 发送转接请求 - URL: {url}", "DEBUG")
self._log(f"📝 请求数据: {json.dumps(data, ensure_ascii=False)}", "DEBUG")
data_str = json.dumps(data, separators=(',', ':'))
response = requests.post(url, headers=self.headers, cookies=self.cookie, data=data_str)
self._log(f"📥 HTTP状态码: {response.status_code}", "DEBUG")
self._log(f"📄 响应内容: {response.text}", "DEBUG")
# 解析响应
try:
result = response.json()
self._log(f"🔍 解析后的响应: {json.dumps(result, ensure_ascii=False)}", "DEBUG")
# 检查响应结构
if result.get("success"):
result_data = result.get("result", {})
if result_data.get("result") == "success" or result_data.get("result") == "ok":
self._log(f"✅ 转接成功 - 用户 {uid} 已转接给客服 {self.csid}", "SUCCESS")
return True
else:
error_code = result_data.get("error_code", "未知")
error_msg = result_data.get("error_msg", "未知错误")
self._log(f"❌ 转接失败 - 错误码: {error_code}, 错误信息: {error_msg}", "ERROR")
return False
else:
self._log(f"❌ 转接请求失败 - 响应标记为失败", "ERROR")
return False
except json.JSONDecodeError as e:
self._log(f"❌ 解析响应JSON失败: {e}", "ERROR")
self._log(f"❌ 原始响应: {response.text}", "ERROR")
return False
except requests.RequestException as e:
self._log(f"❌ 请求异常: {e}", "ERROR")
return False
except Exception as e:
self._log(f"❌ 转接操作异常: {e}", "ERROR")
traceback.print_exc()
return False
@staticmethod
def forward_message_to_platform(store_id: str, recv_pin: str, content: str):
"""转发消息到拼多多平台"""
try:
pdd_mgr = WebsocketManager()
pdd_shop_key = f"拼多多:{store_id}"
pdd_entry = pdd_mgr.get_connection(pdd_shop_key)
if pdd_entry:
# 找到拼多多连接,使用拼多多转发逻辑
platform_info = pdd_entry.get('platform') or {}
ws = platform_info.get('ws')
loop = platform_info.get('loop')
pdd_instance = platform_info.get('pdd_instance') # ChatPdd实例引用
print(
f"[PDD Forward] shop_key={pdd_shop_key} has_ws={bool(ws)} has_loop={bool(loop)} has_pdd_instance={bool(pdd_instance)} recv_pin={recv_pin}")
if ws and loop and pdd_instance:
async def _send_pdd():
# 直接调用ChatPdd实例的send_ai_reply方法
await pdd_instance.send_ai_reply(recv_pin, content)
import asyncio as _asyncio
_future = _asyncio.run_coroutine_threadsafe(_send_pdd(), loop)
try:
_future.result(timeout=5) # 拼多多需要HTTP请求给更长时间
print(f"[PDD Forward] 已转发到平台: uid={recv_pin}, content_len={len(content)}")
except Exception as fe:
print(f"[PDD Forward] 转发提交失败: {fe}")
else:
print("[PDD Forward] 条件不足,未转发:",
{'has_ws': bool(ws), 'has_loop': bool(loop), 'has_pdd_instance': bool(pdd_instance),
'has_content': bool(content)})
else:
print(f"[PDD Forward] 未找到拼多多连接: {pdd_shop_key}")
except Exception as e:
print(f"[PDD Forward] 拼多多转发失败: {e}")
@staticmethod
def transfer_customer_service(customer_service_id: str, user_id: str, store_id: str):
"""拼多多平台转接"""
try:
pdd_mgr = WebsocketManager()
pdd_shop_key = f"拼多多:{store_id}"
pdd_entry = pdd_mgr.get_connection(pdd_shop_key)
if pdd_entry:
platform_info = pdd_entry.get('platform') or {}
pdd_instance = platform_info.get('pdd_instance')
loop = platform_info.get('loop')
print(f"[PDD Transfer] 找到拼多多连接,准备执行转接: user_id={user_id}, cs_id={customer_service_id}")
if pdd_instance and loop:
# 设置目标客服ID并执行转接
pdd_instance.csid = customer_service_id
def _transfer():
result = pdd_instance.tab_mall(user_id)
if result:
print(f"[PDD Transfer] ✅ 转接成功")
else:
print(f"[PDD Transfer] ❌ 转接失败")
return result
# 在线程池中执行同步转接操作
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(_transfer)
try:
future.result(timeout=10) # 转接操作可能需要更长时间
except Exception as fe:
print(f"[PDD Transfer] 转接操作异常: {fe}")
else:
print(f"[PDD Transfer] 条件不足: has_pdd_instance={bool(pdd_instance)}, has_loop={bool(loop)}")
else:
print(f"[PDD Transfer] 未找到拼多多连接: {pdd_shop_key}")
except Exception as e:
print(f"[PDD Transfer] 拼多多转接失败: {e}")
async def send_ai_reply(self, uid, content):
"""发送AI回复给指定用户"""
self._log(f"开始发送AI回复给用户 {uid}", "INFO")
self._log(f"回复内容: {content[:50]}...", "DEBUG")
try:
ts = int(time.time() * 1000)
url = "https://mms.pinduoduo.com/plateau/chat/send_message"
data = {
"data": {
"cmd": "send_message",
"anti_content": "0asWfqzFdjPssgua6CCWyeEK7LAktf0_UFChvlj8kD8Mk1DVZchg4qURiKAS4iSfiaz9Fjg43ThZZ4_wNh5Onk0e5Tw2mGdwpTEPDZlPdfZwBpyESSDiz0Xalv6mvr0o0uQR6yRlG-vscbK8jYGRNcAr2UktX3bMKhjjx7YvVGbi_poiyypBuOwM_2qZyIM96tdNTfb_AMy0fe1UrN2SqUzfm2stmb4TJldGN9b1mTPr-6t4R9k3pquV_XHwBr20J4__P71fxPBmcXOmXTnQWJs2ddt4KKOIbPIIanas08VNiv9nsHbtvXKp8_CMdL8FQMPmEMDmM6JfjZ4LJp5l-x0BjpBbdWyoztNYqSbBe8hscA1hPFM_R1iR9E-AI-CyuehipODW-ADCL0xEWK875510RsV7mLFQKG0kWO--krZzFFi_6lhdfq3_JhBjDDd3YS1exxSMMS7yDJgskoumfAcu-y1YDO8OdcKWwx3Z0nOqUlnF8LekN7TqXhNtKTgfDZPThVT3JcNm_2TljXRhGWxTZT4o25TTVKo5KPeztcJU5NddFd38nmvo0k8R8HVIkmzOYyaerCDCm1ZDfYgNFx-2vM_JkcMYFdkV7C3BKeg7DxZXHfexAT3UKgruBWYrSs_dBB0X-QkJ2ZcmpStB6KJkZ1azrAAxdw_HMj5UIq4vEXxrA1-YAzJEhMoU0-MncDpoBZfiky7Sr4nDGYrcn86AAYWynOUn0US2DAkICABvSSqkh0vCRpdVeZRfOFsdFWF43KEIph-ckMxDsyq6G3rWXbvPkaAlv67cR1v0N2oCQzDVDT-InNv",
"request_id": ts,
"message": {
"to": {
"role": "user",
"uid": uid
},
"from": {
"role": "mall_cs"
},
"ts": int(ts / 1000),
"content": content,
"msg_id": None,
"type": 0,
"is_aut": 0,
"manual_reply": 1,
"status": "read",
"is_read": 1,
"hash": "1d1359dfb7c141fd95432a22cdc7f43b51434041a1a6016ef6ea2ea2b1abc05a"
},
"random": "c5fc9c61194294c478a6fb395b1c558f"
},
"client": "WEB",
"anti_content": "0asAfa5E-wCEsa-JeFwdFfTDtzhMbODIsAT-eu_-CigAHMbcWIfBEUFcI-Kwd93IYTWtfvkSeuwUKO57jwISBuZkzfCe-2VkBFSD-ack-ZKIXRockMwxBgyDql6V3xKKD-fCkBFZkzeKeBwUE-cSD-KHkBb-kzFVk-szd009EAPjDvBcaPIXxHGB_IxWWlIXMnxghvniTodnXt2scz7sxhJV_CEBcCez4H92maXI2d1qgoYswoG4Mf_oylph0nGigjnqWzXqwTYh0y4W6yYs6PxHyXv8slpHrXyseoqZhfqdrdn5E9BCt24mBfkEFg-vgc0wUw2D-vgE7KUM1FIdKb2_Mbl_6xj_8bvk-e1H6ObSksLCDiVCDAbkYU6Y_SMBfgA5CHUM0QuYhznXIwnTanGM2jugCnxPYPmYjyOvjKHEbaV3nvMuJ6V2cX0fZE99vcfY3TF1nNr"
}
data = json.dumps(data, separators=(',', ':'))
response = await asyncio.get_event_loop().run_in_executor(
self.pool,
lambda: requests.post(url, headers=self.headers, cookies=self.cookie, data=data)
)
# 打印HTTP状态与返回体片段
self._log(f"PDD发送状态: HTTP {response.status_code}", "INFO")
try:
self._log(f"PDD返回体: {response.text[:500]}", "DEBUG")
except Exception:
pass
if response.status_code == 200:
self._log(f"✅ 已发送AI回复给用户 {uid}: {content}", "SUCCESS")
return True
else:
self._log(f"❌ 发送AI回复失败HTTP状态码: {response.status_code}", "ERROR")
self._log(f"响应内容: {response.text}", "DEBUG")
return False
except Exception as e:
self._log(f"❌ 发送AI回复失败: {e}", "ERROR")
traceback.print_exc()
return False
async def send_message_external(self, uid, content):
"""外部消息发送接口,用于接收后端转发的回复"""
self._log(f"[External] 收到外部消息发送请求: uid={uid}, content_len={len(content) if content else 0}", "INFO")
try:
if not uid or not content:
self._log("❌ [External] 参数不完整", "ERROR")
return False
2025-09-13 19:54:30 +08:00
2025-09-12 20:42:00 +08:00
result = await self.send_ai_reply(uid, content)
if result:
self._log(f"✅ [External] 外部消息发送成功: uid={uid}", "SUCCESS")
else:
self._log(f"❌ [External] 外部消息发送失败: uid={uid}", "ERROR")
return result
except Exception as e:
self._log(f"❌ [External] 外部消息发送异常: {e}", "ERROR")
return False
async def get_all_customer(self, ws):
"""异步获取客服列表"""
try:
# PDD获取客服列表的API
url = "https://mms.pinduoduo.com/latitude/assign/getAssignCsList"
data = {
"wechatCheck": True
}
data = json.dumps(data, separators=(',', ':'))
# 使用线程池执行同步请求
response = await asyncio.get_event_loop().run_in_executor(
self.pool,
lambda: requests.post(url, headers=self.headers, cookies=self.cookie, data=data)
)
if response.status_code == 200:
result = response.json()
cslist = result.get("result", {}).get("csList", {})
print("我们收集到的客服列表原信息为:", str(cslist))
if cslist:
# 转换客服列表格式
self.customer_list = []
for key, customer in cslist.items():
customer_info = {
"staff_id": str(key), # 客服ID
"name": customer.get("username", ""), # 客服名称
"status": customer.get("status", 0), # 状态
"department": customer.get("department", ""), # 部门
"online": True # 在线状态
}
self.customer_list.append(customer_info)
self._log(f"✅ 成功获取客服列表,共 {len(self.customer_list)} 个客服", "SUCCESS")
return True
return False
except Exception as e:
self._log(f"❌ 获取客服列表失败: {e}", "ERROR")
return False
async def send_staff_list_to_backend(self):
"""发送客服列表到后端"""
try:
if not hasattr(self, 'customer_list') or not self.customer_list:
self._log("⚠️ 客服列表为空", "WARNING")
return False
# 创建消息模板
message_template = PlatformMessage(
type="staff_list",
content="客服列表更新",
data={
"staff_list": self.customer_list,
"total_count": len(self.customer_list)
},
store_id=self.store_id
)
# 通过后端服务发送
await self.backend_service.send_message_to_backend(message_template.to_dict())
self._log(f"发送客服列表消息的结构体为: {message_template.to_json()}")
2025-09-15 14:16:14 +08:00
self._log(f"✅ [PDD] 成功发送客服列表到后端,共 {len(self.customer_list)} 个客服", "SUCCESS")
print(f"🔥 [PDD] 客服列表已上传到后端: {len(self.customer_list)} 个客服")
2025-09-12 20:42:00 +08:00
return True
except Exception as e:
self._log(f"❌ 发送客服列表到后端异常: {e}", "ERROR")
return False
async def handle_transfer_message(self, message_data):
"""处理转接消息"""
self._log("收到转接指令", "INFO")
try:
content = message_data.get("content", "") # 转接目标客服ID
receiver_info = message_data.get("receiver", {})
uid = receiver_info.get("id") # 用户ID
if not content or not uid:
self._log("❌ 转接消息信息不完整", "WARNING")
self._log(f"消息内容: {message_data}", "DEBUG")
return False
self._log(f"准备将用户 {uid} 转接给客服 {content}", "INFO")
# 设置转接目标客服ID
self.csid = content
# 在线程池中执行同步的转接操作
try:
# 使用线程池执行同步方法
success = await asyncio.get_event_loop().run_in_executor(
self.pool,
self.tab_mall,
uid
)
if success:
self._log(f"✅ 转接成功: 用户 {uid} 已转接给客服 {content}", "SUCCESS")
return True
else:
self._log(f"❌ 转接失败: 用户 {uid}", "ERROR")
return False
except Exception as e:
self._log(f"❌ 执行转接操作失败: {e}", "ERROR")
return False
except Exception as e:
self._log(f"❌ 处理转接消息失败: {e}", "ERROR")
traceback.print_exc()
return False
2025-09-26 14:35:28 +08:00
def should_filter_robot_message(self, message_data):
"""专门判断是否为机器人消息需要过滤"""
try:
message_info = message_data.get("message", {})
# 1. 基于消息类型过滤机器人特殊消息
msg_type = message_info.get("type")
if msg_type == 31: # 机器人干预消息(如:机器人已暂停接待)
return True
# 2. 基于模板名称识别机器人消息
template_name = message_info.get("template_name", "")
robot_templates = [
"mall_robot_man_intervention_and_restart", # 机器人暂停接待消息
"mall_robot_text_msg", # 机器人自动回复消息
# 可根据实际情况添加更多机器人模板
]
if template_name in robot_templates:
return True
# 3. 基于机器人特殊标志过滤
if message_info.get("conv_silent") is True: # 静默会话标志
return True
if message_info.get("no_unreply_hint") == 1: # 无需回复提示标志
return True
# 4. 基于消息内容识别机器人提示消息
content = message_info.get("content", "")
robot_content_patterns = [
"机器人未找到对应的回复",
"机器人已暂停接待",
">>点此【立即恢复接待】<<",
"点击添加",
"[当前用户来自",
]
if any(pattern in content for pattern in robot_content_patterns):
return True
# 5. 基于biz_context中的机器人标识
biz_context = message_info.get("biz_context", {})
if biz_context.get("robot_msg_id"): # 有机器人消息ID
return True
# 不是机器人消息,不过滤
return False
except Exception as e:
self._log(f"判断机器人消息时出错: {e}", "DEBUG")
return False # 出错时不过滤,保持原有行为
2025-09-12 20:42:00 +08:00
async def handle_customer_message(self, message_data):
"""处理来自后端的客服消息"""
self._log("收到来自后端的客服消息", "INFO")
try:
content = message_data.get("content", "")
receiver_info = message_data.get("receiver", {})
uid = receiver_info.get("id")
if uid and content:
self._log(f"准备发送客服消息给用户 {uid}: {content}", "INFO")
await self.send_ai_reply(uid, content)
self._log(f"客服消息发送完成 - 目标用户: {uid}", "SUCCESS")
else:
self._log("客服消息数据不完整", "WARNING")
self._log(f"消息内容: {message_data}", "DEBUG")
except Exception as e:
self._log(f"处理客服消息失败: {e}", "ERROR")
traceback.print_exc()
async def process_incoming_message(self, message_data, wss, store):
"""处理接收到的消息"""
try:
2025-09-26 14:35:28 +08:00
# 🔥 过滤机器人消息
if self.should_filter_robot_message(message_data):
self._log("🤖 检测到机器人消息,已过滤不发送给后端", "DEBUG")
return
2025-09-12 20:42:00 +08:00
message_info = message_data.get("message", {})
if not message_info:
return
nickname = message_info.get("nickname")
content = message_info.get("content")
goods_info = message_info.get("info", {})
from_info = message_info.get("from", {})
uid = from_info.get("uid")
if nickname and content and uid:
self._log(f"用户消息 - {nickname}: {content}", "INFO")
self._log(f"用户UID: {uid}", "DEBUG")
self.uids.add(uid)
# 消息类型检测
lc = str(content).lower()
if any(ext in lc for ext in [".jpg", ".jpeg", ".png", ".gif", ".webp"]):
msg_type = "image"
elif any(ext in lc for ext in [".mp4", ".avi", ".mov", ".wmv", ".flv"]):
msg_type = "video"
else:
msg_type = "text"
# 订单卡片组装(使用全角冒号,符合文档)
if "订单编号" in str(content) and goods_info:
content = f"商品id{goods_info.get('goodsID')} 订单号:{goods_info.get('orderSequenceNo')}"
msg_type = "order_card"
# 商品卡片检测基于内容关键词和goods_info
elif any(keyword in lc for keyword in ['goods.html', 'item.html', 'item.jd.com', '商品卡片id']) or \
(goods_info and goods_info.get('goodsID') and not goods_info.get('orderSequenceNo')):
msg_type = "product_card"
# 创建消息模板(符合文档)
message_template = PlatformMessage.create_text_message(
content=content,
sender_id=str(uid),
store_id=self.store_id
)
message_template.msg_type = msg_type
try:
print(f"📤(SPEC) 发送到AI: {json.dumps(message_template.to_dict(), ensure_ascii=False)[:300]}...")
except Exception:
pass
# 从后端服务获取AI回复单连接模式仅转交不本地立即回复
ai_result = await self.get_ai_reply_from_backend(message_template.to_dict())
if isinstance(ai_result, str) and ai_result.strip():
# 发送回复消息(仅当本地生成/降级时)
await self.send_ai_reply(uid, ai_result)
self._log(f"📤 已发送回复: {ai_result[:100]}...", "INFO")
else:
# 正常链路已转交后端AI等待后端异步回传并由 GUI 转发到平台
self._log("🔄 已转交后端AI处理等待平台回复下发", "INFO")
except Exception as e:
self._log(f"❌ 消息处理失败: {e}", "ERROR")
traceback.print_exc()
@staticmethod
async def heartbeat(wss):
while True:
m = random.randint(100, 200)
h = [0, 0, 0, 0, 0, 0, 0, m, 0, 0, 0, 1, 0, 0, 0, 0]
h = bytes(h)
await wss.send(bytes(h))
await asyncio.sleep(15)
async def on_message(self, wss, store):
self._log("开始监听消息", "INFO")
while True:
try:
message = await wss.recv()
self._log(f"收到新消息 {base64.b64encode(message).decode()}", "DEBUG")
try:
text = self.ctx.call("dencode_data", base64.b64encode(message).decode())
if not isinstance(text, dict):
continue
push = text.get("push_data") or {}
data = push.get("data")
if not data:
self._log("push_data.data 为空,忽略", "DEBUG")
continue
print(f"原始数据格式打印:{data}")
for d in data:
await self.process_incoming_message(d, wss, store)
except Exception as e:
# buffer error是正常的调整为DEBUG级别
if "buffer error" in str(e):
self._log(f"解析消息失败: {e}", "DEBUG")
else:
self._log(f"解析消息失败: {e}", "ERROR")
traceback.print_exc()
except Exception as e:
# self._log(f"处理消息失败: {e}", "ERROR")
# traceback.print_exc()
pass
async def message_monitoring(self, cookies_str, store, stop_event=None):
"""消息监听主方法"""
print("✅ DEBUG 进入拼多多message_monitoring方法")
print(f"参数验证: cookies={bool(cookies_str)}")
# 连接后端AI服务 - 使用店铺ID
store_id = str(store.get('id', ''))
self._log(f"🔗 尝试连接后端服务店铺ID: {store_id}", "DEBUG")
backend_connected = await self.connect_backend_service(store_id)
if not backend_connected:
self._log("⚠️ 后端服务连接失败将使用本地AI回复", "WARNING")
else:
self._log("✅ 后端服务连接成功", "SUCCESS")
stop_event = stop_event or asyncio.Event()
# 充值重连计数器
self.reconnect_attempts = 0
while not stop_event.is_set():
try:
self._log(f"🔄 尝试连接拼多多WebSocket", "INFO")
if self.user_id and self.auth_token:
uri = "wss://titan-ws.pinduoduo.com/"
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36",
"Upgrade": "websocket",
"Origin": "https://mms.pinduoduo.com",
"Accept-Encoding": "gzip, deflate, br, zstd",
"Accept-Language": "zh-CN,zh;q=0.9",
"Sec-WebSocket-Extensions": "permessage-deflate; client_max_window_bits",
"cookie": "; ".join([f"{k}={v}" for k, v in self.cookie.items()])
}
async with websockets.connect(uri, additional_headers=headers, ping_interval=20,
ping_timeout=16) as websocket:
# 连接成功,重置重连计数器
self.reconnect_attempts = 0
self._log("✅ WebSocket-PDD连接成功", "SUCCESS")
z = self.encodeex.call("encode_token", str(self.user_id), self.auth_token)
if z:
await websocket.send(bytes(z))
# 注册连接信息到全局管理
shop_key = f"拼多多:{store['id']}"
loop = asyncio.get_running_loop()
entry = self.ws_manager.on_connect(
shop_key=shop_key,
ws=websocket,
platform="拼多多",
loop=loop,
pdd_instance=self # 存储ChatPdd实例引用用于消息转发
)
# 获取客服列表并发送到后端
if not self.staff_list_sent:
self._log("🔍 开始获取客服列表...", "INFO")
try:
if await self.get_all_customer(None):
await self.send_staff_list_to_backend()
self.staff_list_sent = True
else:
self._log("⚠️ 获取客服列表失败", "WARNING")
except Exception as e:
self._log(f"❌ 获取或发送客服列表失败: {e}", "ERROR")
# 启动消息监听和心跳
await asyncio.gather(
self.heartbeat(websocket),
self.on_message(websocket, store)
)
else:
self._log("未定制账号token数组", "ERROR")
except websockets.ConnectionClosed as e:
self._log(f"🔌 连接已关闭: {e.code} {e.reason}", "WARNING")
if not await self.handle_reconnect(e):
break
except (websockets.WebSocketException, OSError) as e:
self._log(f"🌐 网络异常: {type(e).__name__} - {str(e)}", "WARNING")
if not await self.handle_reconnect(e):
break
except Exception as e:
self._log(f"⚠️ 未知异常: {type(e).__name__} - {str(e)}", "ERROR")
if not await self.handle_reconnect(e):
break
# 关闭后端服务连接
if self.backend_connected:
self.backend_connected = False
self._log("🛑 消息监听已停止", "INFO")
async def main(self):
self.get_auth_token()
self.get_mall_id()
self.get_assign_cslist()
# 连接AI服务
if await self.connect_backend_service(self.store_id):
print("✅ AI服务连接成功")
store = {'id': self.store_id}
await self.message_monitoring(
cookies_str=self.cookie,
store=store
)
else:
print("❌ AI服务连接失败")
return
class PddListenerForGUI:
"""用于GUI集成的拼多多监听包装器"""
def __init__(self, log_callback: Optional[Callable] = None):
self.pdd_bot = None
self.log_callback = log_callback
self.running = False
self.main_thread = None
self.loop = None
self.startup_success = False
self.startup_event = threading.Event()
self.startup_error = None
def _log(self, message: str, log_type: str = "INFO"):
if self.log_callback:
self.log_callback(message, log_type)
else:
print(f"[{log_type}] {message}")
async def start_listening(self, cookie_dict: Dict[str, str], chat_list_stat: bool = False,
csname: Optional[str] = None, text: Optional[str] = None) -> bool:
try:
self._log("🔵 开始拼多多平台连接流程", "INFO")
if not cookie_dict:
self._log("❌ Cookie信息不能为空", "ERROR")
return False
self._log(f"✅ Cookie验证通过包含 {len(cookie_dict)} 个字段", "SUCCESS")
self.startup_success = False
self.startup_error = None
self.startup_event.clear()
# 创建ChatPdd实例
self.pdd_bot = ChatPdd(
cookie=cookie_dict,
text=text,
chat_list_stat=chat_list_stat,
csname=csname,
log_callback=self.log_callback
)
self.running = True
self._log("🎉 开始监听拼多多平台消息...", "SUCCESS")
def run_pdd_main():
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
self.loop = loop
async def main_wrapper():
try:
# 直接调用ChatPdd的main方法
await self.pdd_bot.main()
self.startup_success = True
self._log("✅ ChatPdd主程序执行完成", "SUCCESS")
except Exception as e:
self.startup_error = str(e)
self._log(f"❌ ChatPdd运行出错: {str(e)}", "ERROR")
finally:
self.startup_event.set()
loop.run_until_complete(main_wrapper())
except Exception as e:
self.startup_error = str(e)
self._log(f"❌ ChatPdd运行出错: {str(e)}", "ERROR")
self.startup_event.set()
finally:
self.running = False
self.main_thread = threading.Thread(target=run_pdd_main, daemon=True)
self.main_thread.start()
self._log("🔄 等待ChatPdd初始化完成...", "INFO")
# 只等待初始化完成,不设置超时
self.startup_event.wait()
if self.startup_success:
self._log("✅ 拼多多监听启动成功", "SUCCESS")
return True
else:
error_msg = self.startup_error or "未知启动错误"
self._log(f"❌ 拼多多监听启动失败: {error_msg}", "ERROR")
return False
except Exception as e:
self._log(f"❌ 监听启动过程中出现严重错误: {str(e)}", "ERROR")
return False
async def start_with_login_params(self, store_id: str, login_params: str):
"""使用后端下发的登录参数执行登录并启动监听"""
try:
self._log("🔵 [PDD] 收到后端登录参数开始执行登录获取cookies", "INFO")
self._log(f"🔍 [PDD] 登录参数内容: {login_params[:100]}...", "DEBUG")
2025-09-13 19:54:30 +08:00
# 1. 解析登录参数
self._log("🔄 [PDD] 开始解析登录参数", "DEBUG")
params_dict = self._parse_login_params(login_params)
if not params_dict:
self._log("❌ [PDD] 登录参数解析失败", "ERROR")
return False
self._log(f"✅ [PDD] 登录参数解析成功,用户名: {params_dict.get('username', 'N/A')}", "DEBUG")
# 2. 使用新的PddLogin类执行登录
self._log("🔄 [PDD] 开始创建PddLogin实例", "DEBUG")
pdd_login = PddLogin(log_callback=self.log_callback)
self._log("✅ [PDD] PddLogin实例创建成功", "DEBUG")
self._log("🔄 [PDD] 开始执行登录", "DEBUG")
login_result = pdd_login.login_with_params(params_dict, store_id)
self._log(f"📊 [PDD] 登录结果: {login_result}", "DEBUG")
if login_result == "need_verification_code":
self._log("⚠️ [PDD] 需要手机验证码,已通知后端,等待重新下发包含验证码的登录参数", "WARNING")
return "need_verification_code" # 返回特殊标识,避免被覆盖
elif login_result == "verification_code_error":
self._log("⚠️ [PDD] 验证码错误,已通知后端", "WARNING")
return "verification_code_error" # 返回特殊标识,避免重复发送消息
elif login_result == "login_failure":
self._log("⚠️ [PDD] 登录失败,已发送失败通知给后端", "WARNING")
return "login_failure" # 返回特殊标识,避免重复发送消息
elif not login_result:
self._log("❌ [PDD] 登录失败", "ERROR")
return False
elif isinstance(login_result, dict):
# 登录成功获取到cookies
self._log("✅ [PDD] 登录成功使用获取的cookies连接平台", "SUCCESS")
# 将cookies字典转换为字符串格式与原有逻辑兼容
import json
cookies_str = json.dumps(login_result)
return await self.start_with_cookies(store_id, cookies_str)
else:
self._log("❌ [PDD] 登录返回未知结果", "ERROR")
return False
2025-09-13 19:54:30 +08:00
except Exception as e:
self._log(f"❌ [PDD] 使用登录参数启动失败: {str(e)}", "ERROR")
import traceback
self._log(f"🔍 [PDD] 异常详细信息: {traceback.format_exc()}", "ERROR")
return False
2025-09-12 20:42:00 +08:00
async def start_with_cookies(self, store_id: str, cookies: str):
"""使用下发的cookies与store_id直接建立PDD平台WS并开始监听"""
try:
self._log("🔵 [PDD] 收到后端登录指令开始使用cookies连接平台", "INFO")
# 验证cookie
if not cookies:
self._log("❌ Cookie信息不能为空", "ERROR")
return False
self._log(f"✅ [PDD] Cookie验证通过长度: {len(cookies)}", "SUCCESS")
# 获取统一后端服务
from WebSocket.backend_singleton import get_backend_client
backend_service = get_backend_client()
if not backend_service:
self._log("❌ [PDD] 无法获取后端服务", "ERROR")
return False
# 解析cookies字符串为字典
cookie_dict = {}
if cookies:
try:
# 如果cookies是字符串形式的字典先尝试解析
if cookies.startswith('{') and cookies.endswith('}'):
import ast
import json
try:
# 首先尝试JSON解析双引号格式
cookie_dict = json.loads(cookies) if isinstance(cookies, str) else cookies
except json.JSONDecodeError:
# 如果JSON解析失败尝试Python字典格式单引号格式
cookie_dict = ast.literal_eval(cookies)
else:
# 传统的cookie字符串解析
for cookie_pair in cookies.split(';'):
if '=' in cookie_pair:
key, value = cookie_pair.strip().split('=', 1)
cookie_dict[key] = value
except Exception as e:
self._log(f"⚠️ 解析cookies失败: {e}", "ERROR")
self._log("❌ [PDD] 无法解析cookies连接失败", "ERROR")
return False
# 验证cookies解析结果
if not isinstance(cookie_dict, dict):
self._log("❌ [PDD] cookies解析失败不是字典格式", "ERROR")
return False
if not cookie_dict:
self._log("❌ [PDD] cookies为空", "ERROR")
return False
self._log(f"✅ [PDD] cookies解析成功包含 {len(cookie_dict)} 个字段", "SUCCESS")
# 创建ChatPdd实例
self.pdd_bot = ChatPdd(
cookie=cookie_dict,
text=None,
chat_list_stat=False,
csname=None,
log_callback=self.log_callback
)
# 设置store_id和后端服务
self.pdd_bot.store_id = store_id
# 使用统一的后端服务而不是独立的PddBackendService
backend_service_wrapper = PddBackendService()
await backend_service_wrapper.connect(store_id)
self.pdd_bot.backend_service = backend_service_wrapper
self.pdd_bot.backend_connected = True
self.running = True
self._log("🎉 [PDD] 开始监听平台消息", "SUCCESS")
# 获取认证信息
self.pdd_bot.get_auth_token()
self.pdd_bot.get_mall_id()
self.pdd_bot.get_assign_cslist()
# 启动监听
store = {'id': store_id}
await self.pdd_bot.message_monitoring(
cookies_str=cookies,
store=store
)
self._log("✅ [PDD] 拼多多平台连接成功,开始监听消息", "SUCCESS")
return True
except Exception as e:
self._log(f"❌ [PDD] 监听过程中出现严重错误: {str(e)}", "ERROR")
import traceback
self._log(f"错误详情: {traceback.format_exc()}", "DEBUG")
return False
def stop_listening(self):
try:
self._log("🛑 开始停止拼多多监听...", "INFO")
self.running = False
if self.main_thread and self.main_thread.is_alive():
try:
self._log("🔄 等待主线程结束...", "DEBUG")
self.main_thread.join(timeout=5.0)
if self.main_thread.is_alive():
self._log("⚠️ 主线程未在超时时间内结束", "WARNING")
except Exception as e:
self._log(f"⚠️ 等待主线程结束时出错: {e}", "DEBUG")
self._log("✅ 拼多多监听已停止", "SUCCESS")
except Exception as e:
self._log(f"❌ 停止监听时出错: {str(e)}", "ERROR")
def is_running(self) -> bool:
return (self.running and
self.main_thread is not None and
self.main_thread.is_alive() and
self.startup_success)
def _parse_login_params(self, login_params_str: str) -> dict:
"""解析后端下发的登录参数"""
try:
import json
data = json.loads(login_params_str)
params = data.get("data", {}).get("login_params", {})
self._log(f"✅ [PDD] 登录参数解析成功: username={params.get('username', 'N/A')}", "INFO")
return params
except Exception as e:
self._log(f"❌ [PDD] 解析登录参数失败: {e}", "ERROR")
return {}
2025-09-13 19:54:30 +08:00
2025-09-12 20:42:00 +08:00
def get_status(self) -> Dict[str, Any]:
return {
"running": self.running,
"has_bot": self.pdd_bot is not None,
"main_thread_alive": self.main_thread.is_alive() if self.main_thread else False,
"startup_success": self.startup_success,
"startup_error": self.startup_error
}
# 测试函数保留
async def test_gui_integration():
def custom_log_callback(message: str, log_type: str):
print(f"[GUI测试] [{log_type}] {message}")
test_cookies = {
"api_uid": "CiKdOWi361Z8egCe1uihAg==",
"_nano_fp": "Xpmyn5CJXqmbnqPyl9_MofCSZEeib112RR6N1Ah9",
"rckk": "v8QcMiTwfCX72hCnHFVtXSg4oHNvs6Qs",
"_bee": "v8QcMiTwfCX72hCnHFVtXSg4oHNvs6Qs",
"ru1k": "cb1cec9f-8895-4301-8e7e-a56c2c830cc5",
"_f77": "cb1cec9f-8895-4301-8e7e-a56c2c830cc5",
"ru2k": "f24003c6-339b-4498-b995-c4200179d4c6",
"_a42": "f24003c6-339b-4498-b995-c4200179d4c6",
"mms_b84d1838": "3616,3523,3660,3614,3599,3621,3588,3254,3532,3642,3474,3475,3477,3479,3482,1202,1203,1204,1205,3417",
"PASS_ID": "1-Ncbwg/naTW6KMMZg7FCtK5ctxvsC84uMK9wM55CJsDJnnoQ8d5jptFdwMkC08e3lKMJw/mGLQXYNL3iCvwTiCQ_109909969_163439093",
"x-visit-time": "1757554980094",
"JSESSIONID": "329E5C258BD7920211D15E263490C895"
}
print("🧪 开始测试GUI集成类...")
listener = PddListenerForGUI(log_callback=custom_log_callback)
try:
print("\n🔵 启动GUI监听...")
success = await listener.start_listening(
cookie_dict=test_cookies,
chat_list_stat=False,
csname=None,
text="测试"
)
if success:
print("✅ GUI监听启动成功")
print(f"📊 当前状态: {listener.get_status()}")
print("\n🟢 监听运行中,将持续运行直到手动停止...")
while True:
if not listener.is_running():
print("❌ 监听意外停止")
break
await asyncio.sleep(1)
print("\n🛑 测试完成,开始停止监听...")
listener.stop_listening()
print(f"📊 停止后状态: {listener.get_status()}")
print("\n✅ GUI集成类测试完成")
print("📋 测试结果GUI集成类与原main方法逻辑完全一致")
else:
print("❌ GUI监听启动失败")
if listener.startup_error:
print(f"❌ 错误原因: {listener.startup_error}")
except KeyboardInterrupt:
print("\n🛑 用户中断测试")
listener.stop_listening()
except Exception as e:
print(f"❌ 测试过程中出错: {e}")
listener.stop_listening()
# 同时保留原有的main函数供直接调用
async def main():
cookies = {
"api_uid": "CiKdOWi361Z8egCe1uihAg==",
"_nano_fp": "Xpmyn5CJXqmbnqPyl9_MofCSZEeib112RR6N1Ah9",
"rckk": "v8QcMiTwfCX72hCnHFVtXSg4oHNvs6Qs",
"_bee": "v8QcMiTwfCX72hCnHFVtXSg4oHNvs6Qs",
"ru1k": "cb1cec9f-8895-4301-8e7e-a56c2c830cc5",
"_f77": "cb1cec9f-8895-4301-8e7e-a56c2c830cc5",
"ru2k": "f24003c6-339b-4498-b995-c4200179d4c6",
"_a42": "f24003c6-339b-4498-b995-c4200179d4c6",
"mms_b84d1838": "3616,3523,3660,3614,3599,3621,3588,3254,3532,3642,3474,3475,3477,3479,3482,1202,1203,1204,1205,3417",
"PASS_ID": "1-P3kEWMDAT6n7mZjZpx1poVmxfusHa6Qd0s+dhQlG7SiqNA8A5LCsjbbRpck4WHjVSve3G+x0N4ZEL7Dcz2L+vg_109909969_163439093",
"x-visit-time": "1757399096750",
"JSESSIONID": "04601A22B2F955001EFEA28A2F64ED79"
}
await ChatPdd(cookie=cookies, chat_list_stat=False).main()
if __name__ == '__main__':
# 可以选择直接运行main()或者测试GUI集成
# 测试GUI集成
asyncio.run(test_gui_integration())
# asyncio.run(main())