Files
shuidrop_gui/Utils/Pdd/PddUtils.py

3255 lines
118 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
"""
@Project magua_project
@File PddUtils.py
@IDE PyCharm
@Author lz
@Date 2025/7/17 16:44
"""
import os
import sys
import threading
from concurrent.futures import ThreadPoolExecutor
import base64
import json
import random
import time
import traceback
import execjs
import websockets
import requests
import asyncio
# execjs 已移除 - 不再需要本地生成anti_content
from datetime import datetime
from typing import Dict, Any, Optional, Callable
# 新增导入,用于登录功能
import PIL, numpy as np, cv2
from Crypto.PublicKey import RSA
from Crypto.Cipher import PKCS1_v1_5, AES
from Crypto.Util.Padding import pad
from pathlib import Path
from loguru import logger
from Utils.message_models import PlatformMessage
# ===== 登录相关类集成开始 =====
class Track:
"""滑块轨迹生成类"""
@staticmethod
def get_track(distance):
distance = int(distance) + random.randint(7, 11) + 0.8
track_list = [
[
0.8,
178.4,
1733388719067
],
[
4.8,
179.4,
1733388719169
],
[
6.8,
179.4,
1733388719184
],
[
8.8,
179.4,
1733388719250
],
[
8.8,
179.4,
1733388719266
],
[
9.8,
180.4,
1733388719762
],
[
10.8,
180.4,
1733388719914
],
[
11.8,
180.4,
1733388719930
],
[
11.8,
180.4,
1733388719954
],
[
12.8,
180.4,
1733388719982
],
[
12.8,
180.4,
1733388720030
],
[
13.8,
180.4,
1733388720058
],
[
14.8,
180.4,
1733388720090
],
[
15.8,
180.4,
1733388720110
],
[
16.8,
180.4,
1733388720170
],
[
17.8,
180.4,
1733388720202
],
[
19.8,
180.4,
1733388720218
],
[
22.8,
181.4,
1733388720258
],
[
23.8,
181.4,
1733388720319
],
[
24.8,
181.4,
1733388720428
],
[
24.8,
182.4,
1733388720458
],
[
26.8,
182.4,
1733388720478
],
[
28.8,
182.4,
1733388720495
],
[
30.8,
182.4,
1733388720511
],
[
32.8,
183.4,
1733388720527
],
[
33.8,
183.4,
1733388720558
],
[
35.8,
183.4,
1733388720574
],
[
36.8,
184.4,
1733388720591
],
[
38.8,
184.4,
1733388720619
],
[
39.8,
184.4,
1733388720662
],
[
40.8,
184.4,
1733388720686
],
[
41.8,
184.4,
1733388720710
],
[
42.8,
184.4,
1733388720742
],
[
43.8,
184.4,
1733388720758
],
[
45.8,
184.4,
1733388720783
],
[
46.8,
184.4,
1733388720798
],
[
48.8,
184.4,
1733388720831
],
[
49.8,
184.4,
1733388720850
],
[
50.8,
184.4,
1733388720874
],
[
52.8,
184.4,
1733388720922
],
[
53.8,
184.4,
1733388720958
],
[
55.8,
184.4,
1733388720974
],
[
55.8,
184.4,
1733388720990
],
[
56.8,
184.4,
1733388721010
],
[
57.8,
184.4,
1733388721051
],
[
58.8,
184.4,
1733388721074
],
[
58.8,
185.4,
1733388721090
],
[
59.8,
185.4,
1733388721106
],
[
60.8,
185.4,
1733388721214
],
[
60.8,
185.4,
1733388721270
],
[
61.8,
185.4,
1733388721361
],
[
63.8,
186.4,
1733388721374
],
[
68.8,
186.4,
1733388721390
],
[
71.8,
187.4,
1733388721406
],
[
72.8,
187.4,
1733388721422
],
[
73.8,
187.4,
1733388721442
],
[
74.8,
187.4,
1733388721602
],
[
75.8,
187.4,
1733388721650
],
[
79.8,
187.4,
1733388721674
],
[
81.8,
187.4,
1733388721690
],
[
83.8,
188.4,
1733388721726
],
[
83.8,
188.4,
1733388722055
],
[
91.8,
188.4,
1733388722241
],
[
92.8,
188.4,
1733388722346
],
[
92.8,
188.4,
1733388722494
],
[
94.8,
188.4,
1733388722511
],
[
97.8,
188.4,
1733388722527
],
[
101.8,
189.4,
1733388722543
],
[
102.8,
189.4,
1733388722566
],
[
103.8,
189.4,
1733388722666
],
[
104.8,
189.4,
1733388722722
],
[
106.8,
190.4,
1733388722742
],
[
108.8,
190.4,
1733388722759
],
[
112.8,
190.4,
1733388722775
],
[
115.8,
191.4,
1733388722791
],
[
116.8,
191.4,
1733388722807
],
[
118.8,
192.4,
1733388722911
],
[
119.8,
192.4,
1733388722931
],
[
120.8,
192.4,
1733388722962
],
[
123.8,
192.4,
1733388722987
],
[
124.8,
192.4,
1733388723006
],
[
125.8,
192.4,
1733388723023
],
[
127.8,
192.4,
1733388723098
],
[
128.8,
192.4,
1733388723186
],
[
129.8,
193.4,
1733388723258
],
[
132.8,
193.4,
1733388723278
],
[
135.8,
193.4,
1733388723294
],
[
137.8,
194.4,
1733388723318
],
[
138.8,
194.4,
1733388723562
],
[
140.8,
194.4,
1733388723582
],
[
144.8,
195.4,
1733388723599
],
[
145.8,
195.4,
1733388723622
],
[
147.8,
196.4,
1733388723662
],
[
148.8,
196.4,
1733388723678
],
[
149.8,
196.4,
1733388723703
],
[
152.8,
196.4,
1733388723719
],
[
153.8,
196.4,
1733388723774
],
[
154.8,
196.4,
1733388723846
],
[
155.8,
196.4,
1733388723886
],
[
157.8,
196.4,
1733388723903
],
[
161.8,
197.4,
1733388723918
],
[
166.8,
197.4,
1733388723934
],
[
167.8,
197.4,
1733388723998
],
[
168.8,
197.4,
1733388724026
],
[
168.8,
197.4,
1733388724123
],
[
171.8,
198.4,
1733388724139
],
[
175.8,
199.4,
1733388724154
],
[
177.8,
199.4,
1733388724170
],
[
179.8,
199.4,
1733388724186
],
[
181.8,
199.4,
1733388724202
],
[
184.8,
199.4,
1733388724218
],
[
185.8,
199.4,
1733388724234
],
[
186.8,
199.4,
1733388724255
],
[
187.8,
199.4,
1733388724278
],
[
188.8,
199.4,
1733388724294
],
[
188.8,
199.4,
1733388724326
],
[
189.8,
199.4,
1733388724374
],
[
191.8,
199.4,
1733388724399
],
[
192.8,
200.4,
1733388724444
],
[
193.8,
200.4,
1733388724486
],
[
194.8,
200.4,
1733388724834
],
[
195.8,
200.4,
1733388724850
],
[
196.8,
200.4,
1733388724866
],
[
198.8,
200.4,
1733388724882
],
[
199.8,
200.4,
1733388724898
],
[
200.8,
201.4,
1733388725242
],
[
204.8,
201.4,
1733388725276
],
[
206.8,
201.4,
1733388725301
],
[
207.8,
201.4,
1733388725332
],
[
208.8,
201.4,
1733388725522
],
[
211.8,
201.4,
1733388725547
],
[
212.8,
201.4,
1733388725586
],
[
212.8,
201.4,
1733388725838
],
[
214.8,
201.4,
1733388725854
],
[
219.8,
202.4,
1733388725878
],
[
220.8,
202.4,
1733388725894
],
[
222.8,
202.4,
1733388726427
],
[
227.8,
202.4,
1733388726446
],
[
228.8,
202.4,
1733388726462
],
[
230.8,
203.4,
1733388726642
],
[
232.8,
203.4,
1733388726658
],
[
234.8,
203.4,
1733388726675
],
[
236.8,
203.4,
1733388726690
],
[
237.8,
203.4,
1733388726742
],
[
238.8,
203.4,
1733388726934
],
[
240.8,
203.4,
1733388726950
],
[
243.8,
203.4,
1733388726966
],
[
245.8,
203.4,
1733388726982
],
[
247.8,
203.4,
1733388727118
],
[
248.8,
203.4,
1733388727174
],
[
248.8,
204.4,
1733388727190
],
[
250.8,
204.4,
1733388727214
],
[
252.8,
204.4,
1733388727286
],
[
252.8,
204.4,
1733388727302
],
[
253.8,
204.4,
1733388727322
],
[
254.8,
204.4,
1733388727358
],
[
256.8,
204.4,
1733388727398
]
]
# 检查value是否在轨迹的x值中
for trajectory in track_list:
if trajectory[0] == distance:
# 如果找到,截取从轨迹开始到该点的子数组
return [t for t in track_list if t[0] <= distance]
# 如果value不在x值中找到最接近value的x值
closest_x = None
min_diff = float('inf')
for trajectory in track_list:
if abs(trajectory[0] - distance) < min_diff:
min_diff = abs(trajectory[0] - distance)
closest_x = trajectory[0]
# 截取从轨迹开始到最接近的x值的子数组
result = [t for t in track_list if t[0] <= closest_x]
result[-1][0] = distance
return result
class ImgDistance:
"""滑块图像识别距离计算类"""
def __init__(self, bg, tp):
self.bg = bg
self.tp = tp
@staticmethod
def imshow(img, winname='test', delay=0):
"""cv2展示图片"""
cv2.imshow(winname, img)
cv2.waitKey(delay)
cv2.destroyAllWindows()
@staticmethod
def pil_to_cv2(img):
"""
pil转cv2图片
:param img: pil图像, <type 'PIL.JpegImagePlugin.JpegImageFile'>
:return: cv2图像, <type 'numpy.ndarray'>
"""
img = cv2.cvtColor(np.asarray(img), cv2.COLOR_RGB2BGR)
return img
@staticmethod
def bytes_to_cv2(img):
"""
二进制图片转cv2
:param img: 二进制图片数据, <type 'bytes'>
:return: cv2图像, <type 'numpy.ndarray'>
"""
# 将图片字节码bytes, 转换成一维的numpy数组到缓存中
img_buffer_np = np.frombuffer(img, dtype=np.uint8)
# 从指定的内存缓存中读取一维numpy数据, 并把数据转换(解码)成图像矩阵格式
img_np = cv2.imdecode(img_buffer_np, 1)
return img_np
def cv2_open(self, img, flag=None):
"""
统一输出图片格式为cv2图像, <type 'numpy.ndarray'>
:param img: <type 'bytes'/'numpy.ndarray'/'str'/'Path'/'PIL.JpegImagePlugin.JpegImageFile'>
:param flag: 颜色空间转换类型, default: None
eg: cv2.COLOR_BGR2GRAY灰度图
:return: cv2图像, <numpy.ndarray>
"""
if isinstance(img, bytes):
img = self.bytes_to_cv2(img)
elif isinstance(img, (str, Path)):
img = cv2.imread(str(img))
elif isinstance(img, np.ndarray):
img = img
elif isinstance(img, PIL.Image.Image):
img = self.pil_to_cv2(img)
else:
raise ValueError(f'输入的图片类型无法解析: {type(img)}')
if flag is not None:
img = cv2.cvtColor(img, flag)
return img
def get_distance(self, bg, tp, im_show=False, save_path=None):
"""
:param bg: 背景图路径或Path对象或图片二进制
eg: 'assets/bg.jpg'
Path('assets/bg.jpg')
:param tp: 缺口图路径或Path对象或图片二进制
eg: 'assets/tp.jpg'
Path('assets/tp.jpg')
:param im_show: 是否显示结果, <type 'bool'>; default: False
:param save_path: 保存路径, <type 'str'/'Path'>; default: None
:return: 缺口位置
"""
# 读取图片
bg_img = self.cv2_open(bg)
tp_gray = self.cv2_open(tp, flag=cv2.COLOR_BGR2GRAY)
# 金字塔均值漂移
bg_shift = cv2.pyrMeanShiftFiltering(bg_img, 5, 50)
# 边缘检测
tp_gray = cv2.Canny(tp_gray, 255, 255)
bg_gray = cv2.Canny(bg_shift, 255, 255)
# 目标匹配
result = cv2.matchTemplate(bg_gray, tp_gray, cv2.TM_CCOEFF_NORMED)
# 解析匹配结果
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result)
distance = max_loc[0]
if save_path or im_show:
# 需要绘制的方框高度和宽度
tp_height, tp_width = tp_gray.shape[:2]
# 矩形左上角点位置
x, y = max_loc
# 矩形右下角点位置
_x, _y = x + tp_width, y + tp_height
# 绘制矩形
bg_img = self.cv2_open(bg)
cv2.rectangle(bg_img, (x, y), (_x, _y), (0, 0, 255), 2)
# 保存缺口识别结果到背景图
if save_path:
cv2.imwrite(save_path, bg_img)
# 显示缺口识别结果
if im_show:
self.imshow(bg_img)
return max_loc[0]
@staticmethod
def decrypt_img(imgstr):
res = []
s = [-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
-1,
-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, 24, 3, -1, 20, -1, 17,
8,
-1, 30, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, 12, 22, 10, -1, -1, 15, 14, 6, -1, 5, -1, -1, 7,
18,
-1, 25, 9, -1, 28, -1, 2, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, 1, 21, -1, 31, 13, 16, -1, 26, -1, 27,
-1, 0,
19, -1, 11, 4, -1, -1, 23, -1, 29, -1, -1, -1, -1, -1, -1]
for r in range(0, len(imgstr), 8):
o = s[ord(imgstr[r])]
i = s[ord(imgstr[r + 1])]
a = s[ord(imgstr[r + 2])]
c = s[ord(imgstr[r + 3])]
u = s[ord(imgstr[r + 4])]
d = s[ord(imgstr[r + 5])]
f = s[ord(imgstr[r + 6])]
p = (31 & o) << 3 | (31 & i) >> 2
h = (3 & i) << 6 | (31 & a) << 1 | (31 & c) >> 4
m = (15 & c) << 4 | (31 & u) >> 1
v = (1 & u) << 7 | (31 & d) << 2 | (31 & f) >> 3
g = (7 & f) << 5 | 31 & s[ord(imgstr[r + 7])]
res.append(chr(((31 & p) << 3 | h >> 5)))
res.append(chr((31 & h) << 3 | m >> 5))
res.append(chr((31 & m) << 3 | v >> 5))
res.append(chr((31 & v) << 3 | g >> 5))
res.append(chr((31 & g) << 3 | p >> 5))
y = ''.join(res)
for c in ['#', '@?', '*&%', '<$|>']:
y = y.replace(c, '')
return y.replace('data:image/png;base64,', '')
def main(self, save_path=None):
return self.get_distance(
bg=base64.urlsafe_b64decode(self.decrypt_img(self.bg)),
tp=base64.urlsafe_b64decode(self.decrypt_img(self.tp)),
save_path=save_path
)
# AutiContent类已移除 - 后端会提供所有必要的anti_content
def gzip_compress(self, data):
"""压缩数据"""
try:
if self.ctx:
result = self.ctx.call("gzipCompress", data)
logger.info("成功压缩数据")
return result
except Exception as e:
logger.error(f"调用gzipCompress函数失败: {e}")
# fallback处理简单返回原数据
logger.warning("使用fallback处理返回原数据")
return data
class EncryptTool:
"""加密工具类"""
@staticmethod
def aes_encrypt(text, key, iv):
cipher = AES.new(key.encode('utf-8'), AES.MODE_CBC, iv.encode('utf-8'))
encrypted_text = cipher.encrypt(pad(text.encode('utf-8'), AES.block_size))
return base64.b64encode(encrypted_text).decode('utf-8')
@staticmethod
def rsa_encrypt(data):
public_key = "MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQC6zHXNom934tsG9SC73vAUv99bIuSRVaLsuTMY+OL6aS6eB7AuNoU+m9gPCrI7aFrT7CSiKTJ47DNwCNZO52AlLzvB6TjdUwuIXWpinE8VCsYAZgOCrx+mK9Sy0OuwnqNj5D2wUdGoN0nxbl1q2akeAa18A/iBpiXx0SZQexbEowIDAQAB"
rsa_key = RSA.import_key(base64.b64decode(public_key)) # 导入读取到的公钥
cipher = PKCS1_v1_5.new(rsa_key) # 生成对象
cipher_text = base64.b64encode(cipher.encrypt(data.encode(encoding="utf-8")))
return cipher_text.decode("utf-8")
@staticmethod
def get_key_iv(salt):
t = {'aes_key': r"bN3%cH2$H1@*jCo$", 'aes_iv': r"gl3-w^dN)3#h6E1%"}
if not salt or 9 != len(salt):
return t
else:
n = salt[:1]
r = salt[1:]
o = r[:4]
i = r[4:]
a = list(i)
s = t['aes_key'][:8] if n in ["a", "b"] else t['aes_iv'][:8]
c = 'aes_key' if n in ["a", "b"] else 'aes_iv'
l = ''
if n == 'c':
t[c] = s + r
elif n == 'd':
for u in range(4):
l += a[int(o[u])]
t[c] = s + l + i
return t
class PddLogin:
"""拼多多登录核心类"""
def __init__(self, log_callback=None):
super().__init__()
self.login_api = "https://mms.pinduoduo.com/janus/api/auth"
self.headers = {
"authority": "apiv2.pinduoduo.net",
"accept": "*/*",
"accept-language": "zh-CN,zh;q=0.9",
"cache-control": "no-cache",
"content-type": "application/json;charset=UTF-8",
"origin": "https://mms.pinduoduo.com",
"pragma": "no-cache",
"referer": "https://mms.pinduoduo.com/",
"sec-ch-ua": "\"Not_A Brand\";v=\"8\", \"Chromium\";v=\"120\", \"Google Chrome\";v=\"120\"",
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": "\"Windows\"",
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "cross-site",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
}
self.verify_auth_token = None
self.cookies = dict()
self.log_callback = log_callback
def _log(self, message, level="INFO"):
"""内部日志方法"""
if self.log_callback:
self.log_callback(message, level)
else:
print(f"[{level}] {message}")
# 滑块参数生成
def captcha_collect(self, salt, track_list):
ts = int(round(time.time() * 1000))
captcha_collect_data = json.dumps({
"v": "a",
"ts": ts + random.randint(5700, 5950),
"t1": ts,
"t2": ts + random.randint(15700, 19950),
"tp": 3,
"ua": self.headers["user-agent"],
"rf": "",
"platform": 1,
"hl": "000000000001010",
"sc": {
"w": 1536,
"h": 824
},
"ihs": 1,
"imageSize": {
"width": 272,
"height": 198
},
"del": [
[
0.8,
177.4,
1733388719033
]
],
"mel": track_list,
"uel": [track_list[-1]],
"mell": [
[
[
250.8,
187.4,
1733388711350
],
[
248.8,
187.4,
1733388711366
],
[
240.8,
187.4,
1733388711382
],
[
233.8,
186.4,
1733388711398
],
[
225.8,
185.4,
1733388711414
],
[
222.8,
184.4,
1733388711430
],
[
219.8,
184.4,
1733388711446
],
[
216.8,
184.4,
1733388711462
],
[
208.8,
183.4,
1733388711478
],
[
202.8,
182.4,
1733388711494
],
[
192.8,
182.4,
1733388711511
],
[
168.8,
180.4,
1733388711534
],
[
160.8,
180.4,
1733388711550
],
[
159.8,
180.4,
1733388711630
],
[
156.8,
180.4,
1733388711670
],
[
152.8,
180.4,
1733388711686
],
[
146.8,
180.4,
1733388711702
],
[
136.8,
180.4,
1733388711718
],
[
128.8,
180.4,
1733388711734
],
[
124.8,
180.4,
1733388711750
],
[
118.8,
180.4,
1733388711766
],
[
110.8,
180.4,
1733388711782
],
[
100.8,
180.4,
1733388711798
],
[
91.8,
180.4,
1733388711814
],
[
83.8,
180.4,
1733388711830
],
[
75.8,
180.4,
1733388711846
],
[
70.8,
180.4,
1733388711862
],
[
68.8,
180.4,
1733388711878
],
[
66.8,
180.4,
1733388711910
],
[
64.8,
180.4,
1733388711926
],
[
58.8,
180.4,
1733388711942
],
[
51.8,
180.4,
1733388711959
],
[
42.8,
180.4,
1733388711975
],
[
35.8,
180.4,
1733388711994
],
[
33.8,
180.4,
1733388712054
],
[
32.8,
180.4,
1733388712278
],
[
31.8,
182.4,
1733388712295
],
[
30.8,
182.4,
1733388712310
],
[
28.8,
182.4,
1733388712479
],
[
27.8,
182.4,
1733388712502
],
[
26.8,
182.4,
1733388712518
],
[
24.8,
182.4,
1733388712550
],
[
26.8,
182.4,
1733388712886
],
[
31.8,
181.4,
1733388712902
],
[
35.8,
180.4,
1733388712918
],
[
37.8,
179.4,
1733388712934
],
[
38.8,
179.4,
1733388712950
],
[
44.8,
179.4,
1733388712966
],
[
55.8,
179.4,
1733388712986
],
[
88.8,
178.4,
1733388712999
],
[
164.8,
172.4,
1733388713014
],
[
255.8,
165.4,
1733388713030
],
[
346.8,
157.4,
1733388713046
],
[
360.8,
161.4,
1733388715672
],
[
340.8,
161.4,
1733388715687
],
[
320.8,
164.4,
1733388715702
],
[
287.8,
168.4,
1733388715718
],
[
240.8,
176.4,
1733388715735
],
[
184.8,
182.4,
1733388715750
],
[
144.8,
184.4,
1733388715766
],
[
112.8,
186.4,
1733388715782
],
[
98.8,
186.4,
1733388715798
],
[
95.8,
186.4,
1733388715926
],
[
93.8,
186.4,
1733388715942
],
[
89.8,
187.4,
1733388715958
],
[
79.8,
188.4,
1733388715983
],
[
57.8,
192.4,
1733388715999
],
[
44.8,
192.4,
1733388716015
],
[
36.8,
192.4,
1733388716030
],
[
11.8,
188.4,
1733388716073
],
[
-0.2,
184.4,
1733388716134
],
[
-1.2,
184.4,
1733388716191
],
[
-2.2,
184.4,
1733388716206
],
[
-2.2,
182.4,
1733388716318
],
[
-0.2,
181.4,
1733388716342
],
[
0.8,
179.4,
1733388716878
],
[
2.8,
177.4,
1733388716894
],
[
3.8,
177.4,
1733388716910
],
[
4.8,
176.4,
1733388716926
],
[
3.8,
176.4,
1733388717299
],
[
2.8,
176.4,
1733388717646
],
[
1.8,
176.4,
1733388717662
],
[
0.8,
176.4,
1733388717699
],
[
-0.2,
176.4,
1733388717838
],
[
-0.2,
177.4,
1733388718115
],
[
0.8,
177.4,
1733388718490
]
],
track_list
]
}, separators=(',', ':'))
captcha_collect_gzip = self.gzip_compress(captcha_collect_data)
aes_key_iv = EncryptTool.get_key_iv(salt=salt)
captcha_collect = EncryptTool.aes_encrypt(captcha_collect_gzip, aes_key_iv['aes_key'], aes_key_iv['aes_iv'])
return captcha_collect
# 获取滑块加密参数key和iv
def vc_pre_ck_b(self, backend_anti_content):
self.headers.pop("anti-content", None)
url = "https://apiv2.pinduoduo.net/api/phantom/vc_pre_ck_b"
payload = {
"anti_content": backend_anti_content,
"verify_auth_token": self.verify_auth_token,
"sdk_type": 3,
"client_time": int(round(time.time() * 1000))
}
response = requests.post(url, headers=self.headers, json=payload, cookies=self.cookies)
self.cookies.update(response.cookies.get_dict())
return response.json().get("salt")
# 获取滑块图片
def obtain_captcha(self, backend_anti_content):
url = "https://apiv2.pinduoduo.net/api/phantom/obtain_captcha"
payload = {
"anti_content": backend_anti_content,
"verify_auth_token": self.verify_auth_token,
"captcha_collect": ""
}
response = requests.post(url, headers=self.headers, json=payload, cookies=self.cookies)
self.cookies.update(response.cookies.get_dict())
return response.json().get("pictures")
# 滑块验证
def verify(self, captcha_collect, verify_code, backend_anti_content):
url = "https://apiv2.pinduoduo.net/api/phantom/user_verify"
payload = {
"verify_code": verify_code,
"captcha_collect": captcha_collect,
"verify_auth_token": self.verify_auth_token,
"anti_content": backend_anti_content
}
response = requests.post(url, headers=self.headers, json=payload, cookies=self.cookies)
self.cookies.update(response.cookies.get_dict())
result = response.json().get("result")
self._log(f"滑块验证结果: {response.text}")
return result
# 发送验证码通知给后端,并获取验证码
def request_verification_code(self, username, store_id, backend_anti_content, phone_number=None):
"""向后端请求获取手机验证码"""
self._log(f"开始请求验证码,用户名: {username}, 店铺ID: {store_id}, 手机号: {phone_number}", "INFO")
# 使用后端下发的anti_content
anti_content = backend_anti_content
self._log(f"使用后端下发的anti_content", "DEBUG")
self.headers["anti-content"] = anti_content
url = "https://mms.pinduoduo.com/janus/api/user/getLoginVerificationCode"
payload = {
"username": username,
"crawlerInfo": anti_content
}
response = requests.post(url, headers=self.headers, json=payload, cookies=self.cookies)
self._log(f"发送验证码请求结果: {response.text}")
# 发送消息给后端,告知需要验证码(包含手机号)
self._log("准备向后端发送验证码需求通知", "INFO")
self._send_verification_needed_message(store_id, phone_number)
# 这里需要等待后端重新下发包含验证码的登录参数
# 实际实现中这个方法会在接收到新的登录参数后被重新调用
return None
def _send_verification_needed_message(self, store_id, phone_number=None):
"""向后端发送需要验证码的通知"""
try:
self._log(f"开始发送验证码需求通知店铺ID: {store_id}, 手机号: {phone_number}", "INFO")
from WebSocket.backend_singleton import get_backend_client
backend = get_backend_client()
self._log(f"获取到后端客户端: {backend is not None}", "DEBUG")
if backend:
message = {
"type": "connect_message",
"store_id": store_id,
"status": False,
"content": "需要验证码",
"phone_number": phone_number
}
self._log(f"准备发送验证码通知消息: {message}", "DEBUG")
backend.send_message(message)
self._log("✅ 成功向后端发送验证码需求通知(含手机号)", "SUCCESS")
else:
self._log("❌ 后端客户端为空,无法发送验证码需求通知", "ERROR")
except Exception as e:
self._log(f"❌ 发送验证码需求通知失败: {e}", "ERROR")
import traceback
self._log(f"错误详情: {traceback.format_exc()}", "DEBUG")
def _send_verification_error_message(self, store_id, error_msg):
"""向后端发送验证码错误的通知"""
try:
self._log(f"开始发送验证码错误通知店铺ID: {store_id}, 错误: {error_msg}", "INFO")
from WebSocket.backend_singleton import get_backend_client
backend = get_backend_client()
if backend:
message = {
"type": "connect_message",
"store_id": store_id,
"status": False,
"content": error_msg
}
self._log(f"准备发送验证码错误消息: {message}", "DEBUG")
backend.send_message(message)
self._log("✅ 成功向后端发送验证码错误通知", "SUCCESS")
else:
self._log("❌ 后端客户端为空,无法发送验证码错误通知", "ERROR")
except Exception as e:
self._log(f"❌ 发送验证码错误通知失败: {e}", "ERROR")
import traceback
self._log(f"错误详情: {traceback.format_exc()}", "DEBUG")
def _send_login_success_message(self, store_id):
"""向后端发送登录成功的通知"""
try:
self._log(f"开始发送登录成功通知店铺ID: {store_id}", "INFO")
from WebSocket.backend_singleton import get_backend_client
backend = get_backend_client()
if backend:
message = {
"type": "connect_message",
"store_id": store_id,
"status": True, # 登录成功
"cookies": self.cookies # 🔥 新增添加登录生成的cookie信息
}
self._log(f"准备发送登录成功消息: {message}", "DEBUG")
backend.send_message(message)
self._log("✅ 成功向后端发送登录成功通知", "SUCCESS")
else:
self._log("❌ 获取后端客户端失败", "ERROR")
except Exception as e:
self._log(f"❌ 发送登录成功通知失败: {e}", "ERROR")
import traceback
self._log(f"错误详情: {traceback.format_exc()}", "DEBUG")
def _send_login_failure_message(self, store_id, error_msg):
"""向后端发送登录失败的通知"""
try:
self._log(f"开始发送登录失败通知店铺ID: {store_id}, 错误: {error_msg}", "INFO")
from WebSocket.backend_singleton import get_backend_client
backend = get_backend_client()
if backend:
message = {
"type": "connect_message",
"store_id": store_id,
"status": False, # 登录失败
"content": error_msg # 官方返回的失败原因
}
self._log(f"准备发送登录失败消息: {message}", "DEBUG")
backend.send_message(message)
self._log("✅ 成功向后端发送登录失败通知", "SUCCESS")
else:
self._log("❌ 获取后端客户端失败", "ERROR")
except Exception as e:
self._log(f"❌ 发送登录失败通知失败: {e}", "ERROR")
import traceback
self._log(f"错误详情: {traceback.format_exc()}", "DEBUG")
def login_with_params(self, login_params, store_id=None, error_count=0, success_count=0):
"""使用后端下发的登录参数进行登录"""
self._log("🚀 [PddLogin] 开始使用参数登录", "INFO")
# 检查验证码字段(兼容 code 和 verification_code
verification_code = login_params.get("verification_code") or login_params.get("code", "")
self._log(
f"📋 [PddLogin] 登录参数: username={login_params.get('username', 'N/A')}, 包含验证码={bool(verification_code)}",
"DEBUG")
self.headers["anti-content"] = login_params.get("anti_content", "")
# 直接使用后端提供的参数构建登录请求
ts = login_params.get("timestamp", int(round(time.time() * 1000)))
payload = {
"username": login_params.get("username", ""),
"password": login_params.get("password", ""), # 后端已加密
"passwordEncrypt": True,
"verificationCode": verification_code,
"mobileVerifyCode": "", # 使用兼容的验证码字段
"sign": "",
"touchevent": {
"mobileInputEditStartTime": ts - random.randint(10000, 20000),
"mobileInputEditFinishTime": ts - random.randint(8000, 15000),
"mobileInputKeyboardEvent": "0|1|1|-6366-6942-142",
"passwordInputEditStartTime": ts - random.randint(5000, 10000),
"passwordInputEditFinishTime": ts - random.randint(3000, 8000),
"passwordInputKeyboardEvent": "0|1|1|195-154-755-477",
"captureInputEditStartTime": "",
"captureInputEditFinishTime": "",
"captureInputKeyboardEvent": "",
"loginButtonTouchPoint": "1263,586",
"loginButtonClickTime": ts - random.randint(100, 1000)
},
"fingerprint": {
"innerHeight": 906,
"innerWidth": 1707,
"devicePixelRatio": 1.5,
"availHeight": 1027,
"availWidth": 1707,
"height": 1067,
"width": 1707,
"colorDepth": 24,
"locationHref": "https://mms.pinduoduo.com/login/sso?platform=wholesale&redirectUrl=htt",
"clientWidth": 1707,
"clientHeight": 906,
"offsetWidth": 1707,
"offsetHeight": 906,
"scrollWidth": 2882,
"scrollHeight": 906,
"navigator": {
"appCodeName": "Mozilla",
"appName": "Netscape",
"hardwareConcurrency": 16,
"language": "zh-CN",
"cookieEnabled": True,
"platform": "Win32",
"doNotTrack": True,
"ua": self.headers.get("user-agent"),
"vendor": "Google Inc.",
"product": "Gecko",
"productSub": "20030107",
"mimeTypes": "f5a1111231f589322da33fb59b56946b4043e092",
"plugins": "387b918f593d4d8d6bfa647c07e108afbd7a6223"
},
"referer": "",
"timezoneOffset": -480
},
"riskSign": login_params.get("risk_sign", ""), # 后端已加密
"timestamp": ts,
"crawlerInfo": login_params.get("anti_content", "")
}
# 添加详细的请求日志
self._log(f"🔍 [Debug] 登录请求URL: {self.login_api}", "DEBUG")
self._log(f"🔍 [Debug] 登录请求payload: {payload}", "DEBUG")
self._log(f"🔍 [Debug] 登录请求headers: {dict(self.headers)}", "DEBUG")
response = requests.post(self.login_api, headers=self.headers, json=payload, cookies=self.cookies)
self.cookies.update(response.cookies.get_dict())
self._log(f"登录响应状态码: {response.status_code}")
self._log(f"登录响应内容: {response.text}")
# 检查响应内容
if "需要手机验证" in response.text:
self._log("✅ 检测到需要手机验证的响应", "INFO")
elif "需要验证图形验证码" in response.text:
self._log("✅ 检测到需要图形验证码的响应", "INFO")
else:
self._log("⚠️ 未检测到验证码相关响应", "WARNING")
if "需要验证图形验证码" in response.text:
self.verify_auth_token = response.json().get("result").get("verifyAuthToken")
salt = self.vc_pre_ck_b() # 获取生成aes key和iv 的密文值
pictures = self.obtain_captcha() # 获取验证码图片
distance = round((ImgDistance(bg=pictures[0], tp=pictures[1]).main() * (272 / 320)) + (48.75 / 2),
2) # 计算距离
track_list = Track.get_track(distance=distance) # 生成轨迹
captcha_collect = self.captcha_collect(salt=salt, track_list=track_list) # 生成captcha_collect参数
result = self.verify(captcha_collect=captcha_collect, verify_code=distance) # 检验
if result:
self.cookies.update({
"msfe-pc-cookie-captcha-token": self.verify_auth_token
})
self.headers["verifyauthtoken"] = self.verify_auth_token
success_count += 1
# 如果滑块成功 success_count 计数一次 成功8次还是显示验证码则失败 返回False
if success_count < 8:
return self.login_with_params(login_params=login_params, store_id=store_id,
success_count=success_count)
else:
return False
else:
# 如果滑块失败 error_count 计数一次 失败8次则返回False
error_count += 1
if error_count < 8:
return self.login_with_params(login_params=login_params, store_id=store_id, error_count=error_count)
else:
return False
elif "需要手机验证" in response.text:
# 检查是否已经包含验证码(兼容 code 和 verification_code
if verification_code:
# 已有验证码但仍需要验证,验证码可能错误、过期或者其他问题
self._log(f"带验证码登录失败,验证码: {verification_code}", "WARNING")
# 检查响应中的具体错误信息
response_data = response.json()
error_msg = response_data.get('errorMsg', '验证码验证失败')
self._log(f"服务器返回错误: {error_msg}", "WARNING")
# 不要重新发送验证码请求,直接报告验证失败
self._send_verification_error_message(store_id, error_msg) # 直接使用官方错误信息
return "verification_code_error" # 返回特殊状态,避免重复发送消息
else:
# 第一次需要验证码,调用发送验证码方法
self._log("检测到需要手机验证码,正在调用发送验证码方法", "INFO")
username = login_params.get("username")
backend_anti_content = login_params.get("anti_content")
self._log(f"为用户 {username} 发送验证码使用后端anti_content", "INFO")
# 🔥 从响应中提取手机号
phone_number = None
try:
response_json = response.json()
phone_number = response_json.get("result") # 手机号在result字段中
if phone_number and isinstance(phone_number, str):
self._log(f"🔍 从登录响应中提取到手机号: {phone_number}", "SUCCESS")
else:
self._log("⚠️ 响应中的result字段不包含有效手机号", "WARNING")
except Exception as e:
self._log(f"❌ 提取手机号时出错: {e}", "DEBUG")
# 传递后端下发的anti_content和手机号
self.request_verification_code(username, store_id, backend_anti_content, phone_number)
return "need_verification_code"
else:
# 登录成功或其他情况
response_data = response.json()
if response_data.get("success"):
self._log("🎉 登录成功!", "SUCCESS")
# 发送登录成功通知给后端
self._send_login_success_message(store_id)
return self.cookies
else:
# 登录失败,发送失败通知给后端
error_msg = response_data.get('errorMsg', '未知错误')
self._log(f"登录失败: {error_msg}", "ERROR")
self._send_login_failure_message(store_id, error_msg)
return "login_failure" # 返回特殊状态,避免重复发送消息
# ===== 登录相关类集成结束 =====
# 定义持久化数据类 - 参考京东的WebsocketManager
class WebsocketManager:
_instance = None
_lock = threading.Lock()
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._store = {}
cls._instance._lock = threading.RLock()
return cls._instance
def on_connect(self, shop_key, ws, **kwargs):
"""完全保持原有数据结构"""
with self._lock:
entry = self._store.setdefault(shop_key, {
'platform': None,
'customers': [],
'user_assignments': {}
})
entry['platform'] = {
'ws': ws, # 注意:这里存储的是强引用
'last_heartbeat': datetime.now(),
**kwargs
}
return entry
def get_connection(self, shop_key):
with self._lock:
return self._store.get(shop_key)
def remove_connection(self, shop_key):
with self._lock:
if shop_key in self._store:
del self._store[shop_key]
class PddBackendService:
"""拼多多后端服务调用类(新版:使用统一后端连接 BackendClient"""
def __init__(self):
self.current_store_id = None
async def connect(self, store_id):
"""连接后端服务(使用统一连接,无需独立连接)"""
self.current_store_id = store_id
return True
async def send_message_to_backend(self, platform_message):
"""改为通过单后端连接发送需携带store_id"""
try:
from WebSocket.backend_singleton import get_backend_client
backend = get_backend_client()
if not backend:
return None
# 确保消息包含store_id
if isinstance(platform_message, dict):
if 'store_id' not in platform_message and self.current_store_id:
platform_message['store_id'] = self.current_store_id
# 通过统一后端连接发送
backend.send_message(platform_message)
return True
except Exception:
return None
class ChatPdd:
@staticmethod
def check_js_files():
"""检查JS文件是否存在"""
# 使用资源路径解析函数
current_dir = ChatPdd._get_resource_path("static/js")
required_files = ["dencode_message.js", "encode_message.js"]
for file in required_files:
file_path = os.path.join(current_dir, file)
if not os.path.exists(file_path):
raise FileNotFoundError(f"找不到必需的JS文件: {file_path}")
return True
@staticmethod
def _get_resource_path(relative_path):
"""获取资源文件的绝对路径兼容PyInstaller打包环境"""
try:
print(f"[DEBUG] 正在解析资源路径: {relative_path}")
# PyInstaller环境下的基础路径
if hasattr(sys, '_MEIPASS'):
# PyInstaller 临时目录
base_path = sys._MEIPASS
print(f"[DEBUG] 检测到PyInstaller环境_MEIPASS: {base_path}")
elif hasattr(sys, 'frozen') and sys.frozen:
# 其他打包环境
base_path = os.path.dirname(sys.executable)
print(f"[DEBUG] 检测到其他打包环境executable目录: {base_path}")
else:
# 开发环境
base_path = os.path.dirname(os.path.abspath(__file__))
# 向上两级目录到项目根目录
base_path = os.path.dirname(os.path.dirname(base_path))
print(f"[DEBUG] 开发环境,计算的项目根目录: {base_path}")
resource_path = os.path.join(base_path, relative_path)
print(f"[DEBUG] 拼接后的完整资源路径: {resource_path}")
# 检查路径是否存在
if os.path.exists(resource_path):
print(f"[DEBUG] ✅ 资源路径存在: {resource_path}")
else:
print(f"[DEBUG] ❌ 资源路径不存在: {resource_path}")
# 尝试列出基础路径的内容
print(f"[DEBUG] 基础路径 {base_path} 的内容:")
try:
for item in os.listdir(base_path):
item_path = os.path.join(base_path, item)
if os.path.isdir(item_path):
print(f"[DEBUG] 📁 {item}/")
else:
print(f"[DEBUG] 📄 {item}")
except Exception as e:
print(f"[DEBUG] 无法列出目录内容: {e}")
return resource_path
except Exception as e:
print(f"[ERROR] 获取资源路径失败: {e}")
import traceback
print(f"[ERROR] 堆栈跟踪: {traceback.format_exc()}")
# 降级处理:返回相对路径
return relative_path
def __init__(self, cookie, chat_list_stat, csname=None, text=None, log_callback=None):
# 检查JS文件
self.check_js_files()
# 获取JS文件路径 - 使用资源路径解析
js_dir = self._get_resource_path("static/js")
dencode_js_path = os.path.join(js_dir, "dencode_message.js")
encode_js_path = os.path.join(js_dir, "encode_message.js")
# 读取JS文件
try:
with open(dencode_js_path, 'r', encoding='utf-8') as f:
jscode = f.read()
self.ctx = execjs.compile(jscode)
with open(encode_js_path, 'r', encoding='utf-8') as f:
ejscode = f.read()
self.encodeex = execjs.compile(ejscode)
except Exception as e:
raise RuntimeError(f"读取JS文件失败: {str(e)}")
# 首先设置log_callback然后才能使用_log方法
self.log_callback = log_callback
self._log("初始化ChatPdd实例", "INFO")
self.chat_list_stat = chat_list_stat
self.text = text
self.cookie = cookie
self.auth_token = None
self.mall_id = None
self.user_id = None
self.csname = csname
self.csid = None
self.staff_list_sent = False
self.uids = set()
self.pool = ThreadPoolExecutor(max_workers=4)
# WebSocket管理器
self.ws_manager = WebsocketManager()
# 后端服务实例
self.backend_service = PddBackendService()
self.backend_connected = False
# 重连参数
self.reconnect_attempts = 0
self.max_reconnect_attempts = 10
self.base_reconnect_delay = 1.0
self.max_reconnect_delay = 60.0
self.reconnect_backoff = 1.5
self.headers = {
"authority": "mms.pinduoduo.com",
"accept": "*/*",
"accept-language": "zh-CN,zh;q=0.9",
"anti-content": "0asWfxUeM_VefxObk-_v-fwmt7oS3cmHsWwMkApMVigWOS3hCHfBeF-hHM1_v9LHywCtfzDKkA_F1cUE8_HKBA5D7fVkM2ZDB-KmMxhDM51HXlVrfK_LG4Dc3S0wimw8XYPyXdgJnUv8ndgjs0EynYv8n0XjXU9YXY4Pt0ZVgTgrsIUEeMzMk7s02E3oo-zMdF35CIMWVetBhs95lGG5xENwZwX0CwXH2799MeMjVKzk5DD4Kbs2dM1bD-fJ1F-RImB3SHBkVKDJZbZoUbL4UMk8DFtrISf8CMD4Obk_CM3ICIB_F90ws9ZQ0gcOpVdIIz2PLPVcYt5X0yqYiwjtuNVfdNSf0rmfmNIndXj8G8Nyn4ianRaanoucDp0Dfvd36B6eamPGwOFS0TAyo7nXH_nwxnGS28AgVnqPyPuy8jczb1Oe3xZuPHoJ97BBbaTMfV9OcQ23T-mAUel",
"cache-control": "no-cache",
"content-type": "application/json",
"origin": "https://mms.pinduoduo.com",
"pragma": "no-cache",
"referer": "https://mms.pinduoduo.com/chat-merchant/index.html?r=0.4940608191737519",
"sec-ch-ua": "\"Not_A Brand\";v=\"8\", \"Chromium\";v=\"120\", \"Google Chrome\";v=\"120\"",
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": "\"Windows\"",
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "same-origin",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
}
self.store_id = "538727ec-c84d-4458-8ade-3960c9ab802c" # 可以根据需要修改
self.user_tokens = {} # 存储用户token信息
def _log(self, message, level="INFO"):
"""内部日志方法"""
if self.log_callback:
self.log_callback(message, level)
else:
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
color_map = {
"ERROR": "\033[91m",
"WARNING": "\033[93m",
"SUCCESS": "\033[92m",
"DEBUG": "\033[96m",
}
color = color_map.get(level, "")
reset = "\033[0m"
print(f"{color}[{timestamp}] [{level}] {message}{reset}")
# 重连机制方法
async def calculate_reconnect_delay(self):
"""计算指数退避的重连延迟时间"""
delay = self.base_reconnect_delay * (self.reconnect_backoff ** self.reconnect_attempts)
return min(delay, self.max_reconnect_delay)
async def should_reconnect(self):
"""判断是否应该继续重连"""
if self.reconnect_attempts >= self.max_reconnect_attempts:
self._log(f"已达到最大重连次数({self.max_reconnect_attempts}),停止重连", "ERROR")
return False
return True
async def handle_reconnect(self, exception=None):
"""处理重连逻辑"""
if exception:
error_type = type(exception).__name__
error_msg = str(exception)
self._log(f"连接异常[{error_type}]: {error_msg}", "WARNING")
if not await self.should_reconnect():
return False
delay = await self.calculate_reconnect_delay()
self._log(f"{self.reconnect_attempts + 1}次重连尝试,等待{delay:.1f}秒...", "WARNING")
await asyncio.sleep(delay)
self.reconnect_attempts += 1
return True
# 后端服务调用方法
async def connect_backend_service(self, store_id):
"""连接后端AI服务"""
try:
success = await self.backend_service.connect(store_id)
if success:
self.backend_connected = True
self._log("✅ 后端AI服务连接成功", "SUCCESS")
return success
except Exception as e:
self._log(f"❌ 连接后端AI服务失败: {e}", "ERROR")
return False
async def get_ai_reply_from_backend(self, platform_message):
"""发送消息到后端(使用统一连接,不等待回复)"""
# 首先检查后端服务连接状态
if not self.backend_connected:
# 尝试重新连接
if self.store_id:
self.backend_connected = await self.connect_backend_service(self.store_id)
if not self.backend_connected:
self._log("❌ 后端服务未连接,尝试重新连接失败", "WARNING")
return None
try:
# 发送消息到后端(使用统一连接,不等待回复)
await self.backend_service.send_message_to_backend(platform_message)
self._log("✅ 消息已发送到后端等待后端AI处理并通过GUI转发回复", "INFO")
return None
except Exception as e:
self._log(f"❌ 发送消息到后端失败: {e}", "ERROR")
return None
def get_auth_token(self):
url = "https://mms.pinduoduo.com/janus/api/subSystem/getAuthToken"
data = {"subSystemId": 17,
"anti_content": "0asAfx5E-wCElqJNXaKt_UKccG7YNycjZoPYgO1YTmZoApN7QJU5XT_JuYdPZ4uvLPQFgIcyXOKqm8xGrPjy0lxn0gaXr9ac0TynYEJnDwdj-WEJnwK3G4kc3M0TiPgtB11eBeZkB1hkBxUHFOtjfqz-eAkgc2aa4sZuIJwHOptYX14VK1NJSqIYfqmw6jpQTXs574CfWMFxT1RPTIB2MKBjC199pXpkbdtXxnn2mA4C1YdNnTUrNa_Wvn5mYj5XadnDbNT7xNuVeYXrsTsiipUr6xn2AAXKoYmv6j0PL92PtCTMfZzzfTfjutCgvBTzxFw-2LeeRIkFBATU1Npg1ScUFRv-1Ukrs3AklVAbBiEbBJhzcf2cXKfNH50X9QUFCd_Y1FhLW1BBuI-KEUFYK3lZTB3g_BlLDk8ti-JXmbalXzWAb6V2hX0fZE9n2L0GITFInNS"}
response = requests.post(url, cookies=self.cookie, headers=self.headers, json=data).json()
self.auth_token = response["result"]["authToken"]
def get_mall_id(self):
url = "https://mms.pinduoduo.com/chats/userinfo/realtime?get_response=true"
data = {"get_response": "true"}
response = requests.get(url, cookies=self.cookie, headers=self.headers, params=data).json()
self.mall_id = response["mall_id"]
self.user_id = response["id"]
def get_assign_cslist(self):
url = "https://mms.pinduoduo.com/latitude/assign/getAssignCsList"
data = {
"wechatCheck": True
}
data = json.dumps(data, separators=(',', ':'))
response = requests.post(url, headers=self.headers, cookies=self.cookie, data=data).json()
cslist = response["result"]["csList"]
keys = cslist.keys()
for key in keys:
username = cslist[key].get("username")
if username == self.csname:
self.csid = key
break
def tab_mall(self, uid):
"""执行转接操作"""
self._log(f"🔄 开始执行转接操作 - 用户ID: {uid}, 目标客服ID: {self.csid}", "INFO")
try:
url = "https://mms.pinduoduo.com/plateau/chat/move_conversation"
request_id = int(time.time() * 1000)
data = {
"data": {
"cmd": "move_conversation",
"request_id": request_id,
"conversation": {
"csid": self.csid,
"uid": uid,
"need_wx": False,
"remark": "无原因直接转移"
},
"anti_content": "0asAfxvYXyIgYgE2h9U0sXh6Ia3tZOAzI_rl3hixs1Z5DB-B1cswVK2UB8LfUO8H3OUKdF214-SgPVnY8SYvSdmwnKDoPXAs4OdtzfKwkT6gAO2d4R8ZVTw3a7j9BdB3Hmq7j0bh2LQrRzlt88pkQr-s3jMzWH5QPiz13wQNsJY6h9dnigBE26Ww1dKZcWbsKS4GsgY9_joGBS2FZUz94cbCvE2Ij9nO93b5kR50m9kI1lfBHvDZVIC9rLB-S4Qs4J9ebSsosuNskY9nhgXzCuNoilSQgJaMkP2Rgga2i54nWTTNfnZ4W9GvigkSnwzweN9sx-89BztNCasVkP2C2Kkipif2Dv-t7e2O9MnmtWaKm4wFlHLR7YmeyF3y5gxPRuHlO-gD7BfFle8z71lIV-7SyOLCd83FMUCsRwedwv3-TNTF_VRw_G1DHrptFYTAck8p8qIkxA_PZ14PXct1ZS2zAoZZcTMZZSpJ1jP9RlQARtnNNP0IzCdHiL4JzRQBkkKmwGGXay0i4urnXNm5qBgejFcVCkx8XYdeGBp4p-l1TjKoy603__johKFKZQksRHjOKma5DplwQwg8c3Uo1-mDAUmgAC0XfV2rkxzXj3ABARPlyEaMU8AxUZioaFRa0e5eit6r06uY9m5SyU_mdbTfLIYcKWjRA_UNQisJOUenCbMl_Bywq1KcA6TTzJ9ZHu4xz0o7ctROYLlvhEhAi3bJjFNho6l-TPZK4zSJoj"
},
"client": "WEB",
"anti_content": "0asAfxvYXNIgYgd2i9U0sXh6IabtZOAdI_LlbhNUvZZ5DB-B1cswVK2UBjxfUOjHbOUKdF214-SgPVnYjSYvSdmwnKDoPXAs4OdtzfKwkT6gAO2d4RjZVTwbJFiC0P0rR6LbH89rLVuGHiG6vm3fO6rq_rAyLQlw8CAIuESMR4L308Ctz3V0m9ZhS8tdapQr_YY2vZZk9sAJsMd2ZapG92zm2lO9FLVBjCur2KWwac9nua9g8ztOawyo9ur0Dd96xMkCXrd4J4hR4Kk9zqZBDTlRPkMdzzkJ5Osgi40EC3oVAhSPA9MawGCs33Vnd4bSJER9_b_D9g1sfdeZZKsgo25k0WRfs7dM17E2ug-pPEL5_ib53TuaxLHJq5pGG4jMUjAHRV-gbDffut1Yjy1A7K-6E3zO4tDuUq5T0LPHjf9bB7Apofgr_U_4Aq-7l8x2ks8DhhgOhblTvwrO4owZvuUXA0hfSIUPYK9fIoSUH6aaL8abMMVCvI__qF8Yw1oaMp0d0-fbwMhXQ8fBZYoajXXJv-RV3B2GQEowvMHEUtjDz50jUVduq5fKL9R-WsBRjX0aiExVKLw-PZzxsVup0ZuOYxnPofN92EqKfbvriIWFMsh80eGrvq1vsD1BrMx_mcPMmUmHHEY7wct5lePa0b2Lt5_byqRbbcpohrQrHLlH6JJaTMKQuffyT-60110e5oz_Cz0NbG88TyxOGKijW7BimL5RTjSOTPsK4zS8oi"
}
self._log(f"📤 发送转接请求 - URL: {url}", "DEBUG")
self._log(f"📝 请求数据: {json.dumps(data, ensure_ascii=False)}", "DEBUG")
data_str = json.dumps(data, separators=(',', ':'))
response = requests.post(url, headers=self.headers, cookies=self.cookie, data=data_str)
self._log(f"📥 HTTP状态码: {response.status_code}", "DEBUG")
self._log(f"📄 响应内容: {response.text}", "DEBUG")
# 解析响应
try:
result = response.json()
self._log(f"🔍 解析后的响应: {json.dumps(result, ensure_ascii=False)}", "DEBUG")
# 检查响应结构
if result.get("success"):
result_data = result.get("result", {})
if result_data.get("result") == "success" or result_data.get("result") == "ok":
self._log(f"✅ 转接成功 - 用户 {uid} 已转接给客服 {self.csid}", "SUCCESS")
return True
else:
error_code = result_data.get("error_code", "未知")
error_msg = result_data.get("error_msg", "未知错误")
self._log(f"❌ 转接失败 - 错误码: {error_code}, 错误信息: {error_msg}", "ERROR")
return False
else:
self._log(f"❌ 转接请求失败 - 响应标记为失败", "ERROR")
return False
except json.JSONDecodeError as e:
self._log(f"❌ 解析响应JSON失败: {e}", "ERROR")
self._log(f"❌ 原始响应: {response.text}", "ERROR")
return False
except requests.RequestException as e:
self._log(f"❌ 请求异常: {e}", "ERROR")
return False
except Exception as e:
self._log(f"❌ 转接操作异常: {e}", "ERROR")
traceback.print_exc()
return False
@staticmethod
def forward_message_to_platform(store_id: str, recv_pin: str, content: str):
"""转发消息到拼多多平台"""
try:
pdd_mgr = WebsocketManager()
pdd_shop_key = f"拼多多:{store_id}"
pdd_entry = pdd_mgr.get_connection(pdd_shop_key)
if pdd_entry:
# 找到拼多多连接,使用拼多多转发逻辑
platform_info = pdd_entry.get('platform') or {}
ws = platform_info.get('ws')
loop = platform_info.get('loop')
pdd_instance = platform_info.get('pdd_instance') # ChatPdd实例引用
print(
f"[PDD Forward] shop_key={pdd_shop_key} has_ws={bool(ws)} has_loop={bool(loop)} has_pdd_instance={bool(pdd_instance)} recv_pin={recv_pin}")
if ws and loop and pdd_instance:
async def _send_pdd():
# 直接调用ChatPdd实例的send_ai_reply方法
await pdd_instance.send_ai_reply(recv_pin, content)
import asyncio as _asyncio
_future = _asyncio.run_coroutine_threadsafe(_send_pdd(), loop)
try:
_future.result(timeout=5) # 拼多多需要HTTP请求给更长时间
print(f"[PDD Forward] 已转发到平台: uid={recv_pin}, content_len={len(content)}")
except Exception as fe:
print(f"[PDD Forward] 转发提交失败: {fe}")
else:
print("[PDD Forward] 条件不足,未转发:",
{'has_ws': bool(ws), 'has_loop': bool(loop), 'has_pdd_instance': bool(pdd_instance),
'has_content': bool(content)})
else:
print(f"[PDD Forward] 未找到拼多多连接: {pdd_shop_key}")
except Exception as e:
print(f"[PDD Forward] 拼多多转发失败: {e}")
@staticmethod
def transfer_customer_service(customer_service_id: str, user_id: str, store_id: str):
"""拼多多平台转接"""
try:
pdd_mgr = WebsocketManager()
pdd_shop_key = f"拼多多:{store_id}"
pdd_entry = pdd_mgr.get_connection(pdd_shop_key)
if pdd_entry:
platform_info = pdd_entry.get('platform') or {}
pdd_instance = platform_info.get('pdd_instance')
loop = platform_info.get('loop')
print(f"[PDD Transfer] 找到拼多多连接,准备执行转接: user_id={user_id}, cs_id={customer_service_id}")
if pdd_instance and loop:
# 设置目标客服ID并执行转接
pdd_instance.csid = customer_service_id
def _transfer():
result = pdd_instance.tab_mall(user_id)
if result:
print(f"[PDD Transfer] ✅ 转接成功")
else:
print(f"[PDD Transfer] ❌ 转接失败")
return result
# 在线程池中执行同步转接操作
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(_transfer)
try:
future.result(timeout=10) # 转接操作可能需要更长时间
except Exception as fe:
print(f"[PDD Transfer] 转接操作异常: {fe}")
else:
print(f"[PDD Transfer] 条件不足: has_pdd_instance={bool(pdd_instance)}, has_loop={bool(loop)}")
else:
print(f"[PDD Transfer] 未找到拼多多连接: {pdd_shop_key}")
except Exception as e:
print(f"[PDD Transfer] 拼多多转接失败: {e}")
async def send_ai_reply(self, uid, content):
"""发送AI回复给指定用户"""
self._log(f"开始发送AI回复给用户 {uid}", "INFO")
self._log(f"回复内容: {content[:50]}...", "DEBUG")
try:
ts = int(time.time() * 1000)
url = "https://mms.pinduoduo.com/plateau/chat/send_message"
data = {
"data": {
"cmd": "send_message",
"anti_content": "0asWfqzFdjPssgua6CCWyeEK7LAktf0_UFChvlj8kD8Mk1DVZchg4qURiKAS4iSfiaz9Fjg43ThZZ4_wNh5Onk0e5Tw2mGdwpTEPDZlPdfZwBpyESSDiz0Xalv6mvr0o0uQR6yRlG-vscbK8jYGRNcAr2UktX3bMKhjjx7YvVGbi_poiyypBuOwM_2qZyIM96tdNTfb_AMy0fe1UrN2SqUzfm2stmb4TJldGN9b1mTPr-6t4R9k3pquV_XHwBr20J4__P71fxPBmcXOmXTnQWJs2ddt4KKOIbPIIanas08VNiv9nsHbtvXKp8_CMdL8FQMPmEMDmM6JfjZ4LJp5l-x0BjpBbdWyoztNYqSbBe8hscA1hPFM_R1iR9E-AI-CyuehipODW-ADCL0xEWK875510RsV7mLFQKG0kWO--krZzFFi_6lhdfq3_JhBjDDd3YS1exxSMMS7yDJgskoumfAcu-y1YDO8OdcKWwx3Z0nOqUlnF8LekN7TqXhNtKTgfDZPThVT3JcNm_2TljXRhGWxTZT4o25TTVKo5KPeztcJU5NddFd38nmvo0k8R8HVIkmzOYyaerCDCm1ZDfYgNFx-2vM_JkcMYFdkV7C3BKeg7DxZXHfexAT3UKgruBWYrSs_dBB0X-QkJ2ZcmpStB6KJkZ1azrAAxdw_HMj5UIq4vEXxrA1-YAzJEhMoU0-MncDpoBZfiky7Sr4nDGYrcn86AAYWynOUn0US2DAkICABvSSqkh0vCRpdVeZRfOFsdFWF43KEIph-ckMxDsyq6G3rWXbvPkaAlv67cR1v0N2oCQzDVDT-InNv",
"request_id": ts,
"message": {
"to": {
"role": "user",
"uid": uid
},
"from": {
"role": "mall_cs"
},
"ts": int(ts / 1000),
"content": content,
"msg_id": None,
"type": 0,
"is_aut": 0,
"manual_reply": 1,
"status": "read",
"is_read": 1,
"hash": "1d1359dfb7c141fd95432a22cdc7f43b51434041a1a6016ef6ea2ea2b1abc05a"
},
"random": "c5fc9c61194294c478a6fb395b1c558f"
},
"client": "WEB",
"anti_content": "0asAfa5E-wCEsa-JeFwdFfTDtzhMbODIsAT-eu_-CigAHMbcWIfBEUFcI-Kwd93IYTWtfvkSeuwUKO57jwISBuZkzfCe-2VkBFSD-ack-ZKIXRockMwxBgyDql6V3xKKD-fCkBFZkzeKeBwUE-cSD-KHkBb-kzFVk-szd009EAPjDvBcaPIXxHGB_IxWWlIXMnxghvniTodnXt2scz7sxhJV_CEBcCez4H92maXI2d1qgoYswoG4Mf_oylph0nGigjnqWzXqwTYh0y4W6yYs6PxHyXv8slpHrXyseoqZhfqdrdn5E9BCt24mBfkEFg-vgc0wUw2D-vgE7KUM1FIdKb2_Mbl_6xj_8bvk-e1H6ObSksLCDiVCDAbkYU6Y_SMBfgA5CHUM0QuYhznXIwnTanGM2jugCnxPYPmYjyOvjKHEbaV3nvMuJ6V2cX0fZE99vcfY3TF1nNr"
}
data = json.dumps(data, separators=(',', ':'))
response = await asyncio.get_event_loop().run_in_executor(
self.pool,
lambda: requests.post(url, headers=self.headers, cookies=self.cookie, data=data)
)
# 打印HTTP状态与返回体片段
self._log(f"PDD发送状态: HTTP {response.status_code}", "INFO")
try:
self._log(f"PDD返回体: {response.text[:500]}", "DEBUG")
except Exception:
pass
if response.status_code == 200:
self._log(f"✅ 已发送AI回复给用户 {uid}: {content}", "SUCCESS")
return True
else:
self._log(f"❌ 发送AI回复失败HTTP状态码: {response.status_code}", "ERROR")
self._log(f"响应内容: {response.text}", "DEBUG")
return False
except Exception as e:
self._log(f"❌ 发送AI回复失败: {e}", "ERROR")
traceback.print_exc()
return False
async def send_message_external(self, uid, content):
"""外部消息发送接口,用于接收后端转发的回复"""
self._log(f"[External] 收到外部消息发送请求: uid={uid}, content_len={len(content) if content else 0}", "INFO")
try:
if not uid or not content:
self._log("❌ [External] 参数不完整", "ERROR")
return False
result = await self.send_ai_reply(uid, content)
if result:
self._log(f"✅ [External] 外部消息发送成功: uid={uid}", "SUCCESS")
else:
self._log(f"❌ [External] 外部消息发送失败: uid={uid}", "ERROR")
return result
except Exception as e:
self._log(f"❌ [External] 外部消息发送异常: {e}", "ERROR")
return False
async def get_all_customer(self, ws):
"""异步获取客服列表"""
try:
# PDD获取客服列表的API
url = "https://mms.pinduoduo.com/latitude/assign/getAssignCsList"
data = {
"wechatCheck": True
}
data = json.dumps(data, separators=(',', ':'))
# 使用线程池执行同步请求
response = await asyncio.get_event_loop().run_in_executor(
self.pool,
lambda: requests.post(url, headers=self.headers, cookies=self.cookie, data=data)
)
if response.status_code == 200:
result = response.json()
cslist = result.get("result", {}).get("csList", {})
print("我们收集到的客服列表原信息为:", str(cslist))
if cslist:
# 转换客服列表格式
self.customer_list = []
for key, customer in cslist.items():
customer_info = {
"staff_id": str(key), # 客服ID
"name": customer.get("username", ""), # 客服名称
"status": customer.get("status", 0), # 状态
"department": customer.get("department", ""), # 部门
"online": True # 在线状态
}
self.customer_list.append(customer_info)
self._log(f"✅ 成功获取客服列表,共 {len(self.customer_list)} 个客服", "SUCCESS")
return True
return False
except Exception as e:
self._log(f"❌ 获取客服列表失败: {e}", "ERROR")
return False
async def send_staff_list_to_backend(self):
"""发送客服列表到后端"""
try:
if not hasattr(self, 'customer_list') or not self.customer_list:
self._log("⚠️ 客服列表为空", "WARNING")
return False
# 创建消息模板
message_template = PlatformMessage(
type="staff_list",
content="客服列表更新",
data={
"staff_list": self.customer_list,
"total_count": len(self.customer_list)
},
store_id=self.store_id
)
# 通过后端服务发送
await self.backend_service.send_message_to_backend(message_template.to_dict())
self._log(f"发送客服列表消息的结构体为: {message_template.to_json()}")
self._log(f"✅ [PDD] 成功发送客服列表到后端,共 {len(self.customer_list)} 个客服", "SUCCESS")
print(f"🔥 [PDD] 客服列表已上传到后端: {len(self.customer_list)} 个客服")
return True
except Exception as e:
self._log(f"❌ 发送客服列表到后端异常: {e}", "ERROR")
return False
async def handle_transfer_message(self, message_data):
"""处理转接消息"""
self._log("收到转接指令", "INFO")
try:
content = message_data.get("content", "") # 转接目标客服ID
receiver_info = message_data.get("receiver", {})
uid = receiver_info.get("id") # 用户ID
if not content or not uid:
self._log("❌ 转接消息信息不完整", "WARNING")
self._log(f"消息内容: {message_data}", "DEBUG")
return False
self._log(f"准备将用户 {uid} 转接给客服 {content}", "INFO")
# 设置转接目标客服ID
self.csid = content
# 在线程池中执行同步的转接操作
try:
# 使用线程池执行同步方法
success = await asyncio.get_event_loop().run_in_executor(
self.pool,
self.tab_mall,
uid
)
if success:
self._log(f"✅ 转接成功: 用户 {uid} 已转接给客服 {content}", "SUCCESS")
return True
else:
self._log(f"❌ 转接失败: 用户 {uid}", "ERROR")
return False
except Exception as e:
self._log(f"❌ 执行转接操作失败: {e}", "ERROR")
return False
except Exception as e:
self._log(f"❌ 处理转接消息失败: {e}", "ERROR")
traceback.print_exc()
return False
def should_filter_robot_message(self, message_data):
"""专门判断是否为机器人消息需要过滤"""
try:
message_info = message_data.get("message", {})
# 1. 基于消息类型过滤机器人特殊消息
msg_type = message_info.get("type")
if msg_type == 31: # 机器人干预消息(如:机器人已暂停接待)
return True
# 2. 基于模板名称识别机器人消息
template_name = message_info.get("template_name", "")
robot_templates = [
"mall_robot_man_intervention_and_restart", # 机器人暂停接待消息
"mall_robot_text_msg", # 机器人自动回复消息
# 可根据实际情况添加更多机器人模板
]
if template_name in robot_templates:
return True
# 3. 基于机器人特殊标志过滤
if message_info.get("conv_silent") is True: # 静默会话标志
return True
if message_info.get("no_unreply_hint") == 1: # 无需回复提示标志
return True
# 4. 基于消息内容识别机器人提示消息
content = message_info.get("content", "")
robot_content_patterns = [
"机器人未找到对应的回复",
"机器人已暂停接待",
">>点此【立即恢复接待】<<",
"点击添加",
"[当前用户来自",
]
if any(pattern in content for pattern in robot_content_patterns):
return True
# 5. 基于biz_context中的机器人标识
biz_context = message_info.get("biz_context", {})
if biz_context.get("robot_msg_id"): # 有机器人消息ID
return True
# 不是机器人消息,不过滤
return False
except Exception as e:
self._log(f"判断机器人消息时出错: {e}", "DEBUG")
return False # 出错时不过滤,保持原有行为
async def handle_customer_message(self, message_data):
"""处理来自后端的客服消息"""
self._log("收到来自后端的客服消息", "INFO")
try:
content = message_data.get("content", "")
receiver_info = message_data.get("receiver", {})
uid = receiver_info.get("id")
if uid and content:
self._log(f"准备发送客服消息给用户 {uid}: {content}", "INFO")
await self.send_ai_reply(uid, content)
self._log(f"客服消息发送完成 - 目标用户: {uid}", "SUCCESS")
else:
self._log("客服消息数据不完整", "WARNING")
self._log(f"消息内容: {message_data}", "DEBUG")
except Exception as e:
self._log(f"处理客服消息失败: {e}", "ERROR")
traceback.print_exc()
async def process_incoming_message(self, message_data, wss, store):
"""处理接收到的消息"""
try:
# 🔥 过滤机器人消息
if self.should_filter_robot_message(message_data):
self._log("🤖 检测到机器人消息,已过滤不发送给后端", "DEBUG")
return
message_info = message_data.get("message", {})
if not message_info:
return
nickname = message_info.get("nickname")
content = message_info.get("content")
goods_info = message_info.get("info", {})
from_info = message_info.get("from", {})
uid = from_info.get("uid")
if nickname and content and uid:
self._log(f"用户消息 - {nickname}: {content}", "INFO")
self._log(f"用户UID: {uid}", "DEBUG")
self.uids.add(uid)
# 消息类型检测
lc = str(content).lower()
if any(ext in lc for ext in [".jpg", ".jpeg", ".png", ".gif", ".webp"]):
msg_type = "image"
elif any(ext in lc for ext in [".mp4", ".avi", ".mov", ".wmv", ".flv"]):
msg_type = "video"
else:
msg_type = "text"
# 订单卡片组装(使用全角冒号,符合文档)
if "订单编号" in str(content) and goods_info:
content = f"商品id{goods_info.get('goodsID')} 订单号:{goods_info.get('orderSequenceNo')}"
msg_type = "order_card"
# 商品卡片检测基于内容关键词和goods_info
elif any(keyword in lc for keyword in ['goods.html', 'item.html', 'item.jd.com', '商品卡片id']) or \
(goods_info and goods_info.get('goodsID') and not goods_info.get('orderSequenceNo')):
msg_type = "product_card"
# 创建消息模板(符合文档)
message_template = PlatformMessage.create_text_message(
content=content,
sender_id=str(uid),
store_id=self.store_id
)
message_template.msg_type = msg_type
try:
print(f"📤(SPEC) 发送到AI: {json.dumps(message_template.to_dict(), ensure_ascii=False)[:300]}...")
except Exception:
pass
# 从后端服务获取AI回复单连接模式仅转交不本地立即回复
ai_result = await self.get_ai_reply_from_backend(message_template.to_dict())
if isinstance(ai_result, str) and ai_result.strip():
# 发送回复消息(仅当本地生成/降级时)
await self.send_ai_reply(uid, ai_result)
self._log(f"📤 已发送回复: {ai_result[:100]}...", "INFO")
else:
# 正常链路已转交后端AI等待后端异步回传并由 GUI 转发到平台
self._log("🔄 已转交后端AI处理等待平台回复下发", "INFO")
except Exception as e:
self._log(f"❌ 消息处理失败: {e}", "ERROR")
traceback.print_exc()
@staticmethod
async def heartbeat(wss):
while True:
m = random.randint(100, 200)
h = [0, 0, 0, 0, 0, 0, 0, m, 0, 0, 0, 1, 0, 0, 0, 0]
h = bytes(h)
await wss.send(bytes(h))
await asyncio.sleep(15)
async def on_message(self, wss, store):
self._log("开始监听消息", "INFO")
while True:
try:
message = await wss.recv()
self._log(f"收到新消息 {base64.b64encode(message).decode()}", "DEBUG")
try:
text = self.ctx.call("dencode_data", base64.b64encode(message).decode())
if not isinstance(text, dict):
continue
push = text.get("push_data") or {}
data = push.get("data")
if not data:
self._log("push_data.data 为空,忽略", "DEBUG")
continue
print(f"原始数据格式打印:{data}")
for d in data:
await self.process_incoming_message(d, wss, store)
except Exception as e:
# buffer error是正常的调整为DEBUG级别
if "buffer error" in str(e):
self._log(f"解析消息失败: {e}", "DEBUG")
else:
self._log(f"解析消息失败: {e}", "ERROR")
traceback.print_exc()
except Exception as e:
# self._log(f"处理消息失败: {e}", "ERROR")
# traceback.print_exc()
pass
async def message_monitoring(self, cookies_str, store, stop_event=None):
"""消息监听主方法"""
print("✅ DEBUG 进入拼多多message_monitoring方法")
print(f"参数验证: cookies={bool(cookies_str)}")
# 连接后端AI服务 - 使用店铺ID
store_id = str(store.get('id', ''))
self._log(f"🔗 尝试连接后端服务店铺ID: {store_id}", "DEBUG")
backend_connected = await self.connect_backend_service(store_id)
if not backend_connected:
self._log("⚠️ 后端服务连接失败将使用本地AI回复", "WARNING")
else:
self._log("✅ 后端服务连接成功", "SUCCESS")
stop_event = stop_event or asyncio.Event()
# 充值重连计数器
self.reconnect_attempts = 0
while not stop_event.is_set():
try:
self._log(f"🔄 尝试连接拼多多WebSocket", "INFO")
if self.user_id and self.auth_token:
uri = "wss://titan-ws.pinduoduo.com/"
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36",
"Upgrade": "websocket",
"Origin": "https://mms.pinduoduo.com",
"Accept-Encoding": "gzip, deflate, br, zstd",
"Accept-Language": "zh-CN,zh;q=0.9",
"Sec-WebSocket-Extensions": "permessage-deflate; client_max_window_bits",
"cookie": "; ".join([f"{k}={v}" for k, v in self.cookie.items()])
}
async with websockets.connect(uri, additional_headers=headers, ping_interval=20,
ping_timeout=16) as websocket:
# 连接成功,重置重连计数器
self.reconnect_attempts = 0
self._log("✅ WebSocket-PDD连接成功", "SUCCESS")
z = self.encodeex.call("encode_token", str(self.user_id), self.auth_token)
if z:
await websocket.send(bytes(z))
# 注册连接信息到全局管理
shop_key = f"拼多多:{store['id']}"
loop = asyncio.get_running_loop()
entry = self.ws_manager.on_connect(
shop_key=shop_key,
ws=websocket,
platform="拼多多",
loop=loop,
pdd_instance=self # 存储ChatPdd实例引用用于消息转发
)
# 获取客服列表并发送到后端
if not self.staff_list_sent:
self._log("🔍 开始获取客服列表...", "INFO")
try:
if await self.get_all_customer(None):
await self.send_staff_list_to_backend()
self.staff_list_sent = True
else:
self._log("⚠️ 获取客服列表失败", "WARNING")
except Exception as e:
self._log(f"❌ 获取或发送客服列表失败: {e}", "ERROR")
# 🔥 新增Cookie登录成功后发送登录成功报告与登录参数模式保持一致
try:
if self.backend_service and hasattr(self, 'store_id') and self.store_id:
# 构建cookie字典从cookies_str解析
cookie_dict = {}
if hasattr(self, 'cookie') and self.cookie:
cookie_dict = self.cookie
message = {
"type": "connect_message",
"store_id": self.store_id,
"status": True, # 登录成功
"cookies": cookie_dict # 添加cookie信息
}
# 🔥 修复:使用正确的方法名 send_message_to_backend
await self.backend_service.send_message_to_backend(message)
self._log("✅ [PDD] 已向后端发送Cookie登录成功报告", "SUCCESS")
else:
self._log("⚠️ [PDD] 无法发送登录成功报告backend_service或store_id缺失", "WARNING")
except Exception as e:
self._log(f"⚠️ [PDD] 发送登录成功报告失败: {e}", "WARNING")
# 启动消息监听和心跳
await asyncio.gather(
self.heartbeat(websocket),
self.on_message(websocket, store)
)
else:
self._log("未定制账号token数组", "ERROR")
except websockets.ConnectionClosed as e:
self._log(f"🔌 连接已关闭: {e.code} {e.reason}", "WARNING")
if not await self.handle_reconnect(e):
break
except (websockets.WebSocketException, OSError) as e:
self._log(f"🌐 网络异常: {type(e).__name__} - {str(e)}", "WARNING")
if not await self.handle_reconnect(e):
break
except Exception as e:
self._log(f"⚠️ 未知异常: {type(e).__name__} - {str(e)}", "ERROR")
if not await self.handle_reconnect(e):
break
# 关闭后端服务连接
if self.backend_connected:
self.backend_connected = False
self._log("🛑 消息监听已停止", "INFO")
async def main(self):
self.get_auth_token()
self.get_mall_id()
self.get_assign_cslist()
# 连接AI服务
if await self.connect_backend_service(self.store_id):
print("✅ AI服务连接成功")
store = {'id': self.store_id}
await self.message_monitoring(
cookies_str=self.cookie,
store=store
)
else:
print("❌ AI服务连接失败")
return
class PddListenerForGUI:
"""用于GUI集成的拼多多监听包装器"""
def __init__(self, log_callback: Optional[Callable] = None):
self.pdd_bot = None
self.log_callback = log_callback
self.running = False
self.main_thread = None
self.loop = None
self.startup_success = False
self.startup_event = threading.Event()
self.startup_error = None
def _log(self, message: str, log_type: str = "INFO"):
if self.log_callback:
self.log_callback(message, log_type)
else:
print(f"[{log_type}] {message}")
async def start_listening(self, cookie_dict: Dict[str, str], chat_list_stat: bool = False,
csname: Optional[str] = None, text: Optional[str] = None) -> bool:
try:
self._log("🔵 开始拼多多平台连接流程", "INFO")
if not cookie_dict:
self._log("❌ Cookie信息不能为空", "ERROR")
return False
self._log(f"✅ Cookie验证通过包含 {len(cookie_dict)} 个字段", "SUCCESS")
self.startup_success = False
self.startup_error = None
self.startup_event.clear()
# 创建ChatPdd实例
self.pdd_bot = ChatPdd(
cookie=cookie_dict,
text=text,
chat_list_stat=chat_list_stat,
csname=csname,
log_callback=self.log_callback
)
self.running = True
self._log("🎉 开始监听拼多多平台消息...", "SUCCESS")
def run_pdd_main():
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
self.loop = loop
async def main_wrapper():
try:
# 直接调用ChatPdd的main方法
await self.pdd_bot.main()
self.startup_success = True
self._log("✅ ChatPdd主程序执行完成", "SUCCESS")
except Exception as e:
self.startup_error = str(e)
self._log(f"❌ ChatPdd运行出错: {str(e)}", "ERROR")
finally:
self.startup_event.set()
loop.run_until_complete(main_wrapper())
except Exception as e:
self.startup_error = str(e)
self._log(f"❌ ChatPdd运行出错: {str(e)}", "ERROR")
self.startup_event.set()
finally:
self.running = False
self.main_thread = threading.Thread(target=run_pdd_main, daemon=True)
self.main_thread.start()
self._log("🔄 等待ChatPdd初始化完成...", "INFO")
# 只等待初始化完成,不设置超时
self.startup_event.wait()
if self.startup_success:
self._log("✅ 拼多多监听启动成功", "SUCCESS")
return True
else:
error_msg = self.startup_error or "未知启动错误"
self._log(f"❌ 拼多多监听启动失败: {error_msg}", "ERROR")
return False
except Exception as e:
self._log(f"❌ 监听启动过程中出现严重错误: {str(e)}", "ERROR")
return False
async def start_with_login_params(self, store_id: str, login_params: str):
"""使用后端下发的登录参数执行登录并启动监听"""
try:
self._log("🔵 [PDD] 收到后端登录参数开始执行登录获取cookies", "INFO")
self._log(f"🔍 [PDD] 登录参数内容: {login_params[:100]}...", "DEBUG")
# 1. 解析登录参数
self._log("🔄 [PDD] 开始解析登录参数", "DEBUG")
params_dict = self._parse_login_params(login_params)
if not params_dict:
self._log("❌ [PDD] 登录参数解析失败", "ERROR")
return False
self._log(f"✅ [PDD] 登录参数解析成功,用户名: {params_dict.get('username', 'N/A')}", "DEBUG")
# 2. 使用新的PddLogin类执行登录
self._log("🔄 [PDD] 开始创建PddLogin实例", "DEBUG")
pdd_login = PddLogin(log_callback=self.log_callback)
self._log("✅ [PDD] PddLogin实例创建成功", "DEBUG")
self._log("🔄 [PDD] 开始执行登录", "DEBUG")
login_result = pdd_login.login_with_params(params_dict, store_id)
self._log(f"📊 [PDD] 登录结果: {login_result}", "DEBUG")
if login_result == "need_verification_code":
self._log("⚠️ [PDD] 需要手机验证码,已通知后端,等待重新下发包含验证码的登录参数", "WARNING")
return "need_verification_code" # 返回特殊标识,避免被覆盖
elif login_result == "verification_code_error":
self._log("⚠️ [PDD] 验证码错误,已通知后端", "WARNING")
return "verification_code_error" # 返回特殊标识,避免重复发送消息
elif login_result == "login_failure":
self._log("⚠️ [PDD] 登录失败,已发送失败通知给后端", "WARNING")
return "login_failure" # 返回特殊标识,避免重复发送消息
elif not login_result:
self._log("❌ [PDD] 登录失败", "ERROR")
return False
elif isinstance(login_result, dict):
# 登录成功获取到cookies
self._log("✅ [PDD] 登录成功使用获取的cookies连接平台", "SUCCESS")
# 将cookies字典转换为字符串格式与原有逻辑兼容
import json
cookies_str = json.dumps(login_result)
return await self.start_with_cookies(store_id, cookies_str)
else:
self._log("❌ [PDD] 登录返回未知结果", "ERROR")
return False
except Exception as e:
self._log(f"❌ [PDD] 使用登录参数启动失败: {str(e)}", "ERROR")
import traceback
self._log(f"🔍 [PDD] 异常详细信息: {traceback.format_exc()}", "ERROR")
return False
async def start_with_cookies(self, store_id: str, cookies: str):
"""使用下发的cookies与store_id直接建立PDD平台WS并开始监听"""
try:
self._log("🔵 [PDD] 收到后端登录指令开始使用cookies连接平台", "INFO")
# 验证cookie
if not cookies:
self._log("❌ Cookie信息不能为空", "ERROR")
return False
self._log(f"✅ [PDD] Cookie验证通过长度: {len(cookies)}", "SUCCESS")
# 获取统一后端服务
from WebSocket.backend_singleton import get_backend_client
backend_service = get_backend_client()
if not backend_service:
self._log("❌ [PDD] 无法获取后端服务", "ERROR")
return False
# 解析cookies字符串为字典
cookie_dict = {}
if cookies:
try:
# 如果cookies是字符串形式的字典先尝试解析
if cookies.startswith('{') and cookies.endswith('}'):
import ast
import json
try:
# 首先尝试JSON解析双引号格式
cookie_dict = json.loads(cookies) if isinstance(cookies, str) else cookies
except json.JSONDecodeError:
# 如果JSON解析失败尝试Python字典格式单引号格式
cookie_dict = ast.literal_eval(cookies)
else:
# 传统的cookie字符串解析
for cookie_pair in cookies.split(';'):
if '=' in cookie_pair:
key, value = cookie_pair.strip().split('=', 1)
cookie_dict[key] = value
except Exception as e:
self._log(f"⚠️ 解析cookies失败: {e}", "ERROR")
self._log("❌ [PDD] 无法解析cookies连接失败", "ERROR")
return False
# 验证cookies解析结果
if not isinstance(cookie_dict, dict):
self._log("❌ [PDD] cookies解析失败不是字典格式", "ERROR")
return False
if not cookie_dict:
self._log("❌ [PDD] cookies为空", "ERROR")
return False
self._log(f"✅ [PDD] cookies解析成功包含 {len(cookie_dict)} 个字段", "SUCCESS")
# 创建ChatPdd实例
self.pdd_bot = ChatPdd(
cookie=cookie_dict,
text=None,
chat_list_stat=False,
csname=None,
log_callback=self.log_callback
)
# 设置store_id和后端服务
self.pdd_bot.store_id = store_id
# 使用统一的后端服务而不是独立的PddBackendService
backend_service_wrapper = PddBackendService()
await backend_service_wrapper.connect(store_id)
self.pdd_bot.backend_service = backend_service_wrapper
self.pdd_bot.backend_connected = True
self.running = True
self._log("🎉 [PDD] 开始监听平台消息", "SUCCESS")
# 获取认证信息
self.pdd_bot.get_auth_token()
self.pdd_bot.get_mall_id()
self.pdd_bot.get_assign_cslist()
# 启动监听
store = {'id': store_id}
await self.pdd_bot.message_monitoring(
cookies_str=cookies,
store=store
)
self._log("✅ [PDD] 拼多多平台连接成功,开始监听消息", "SUCCESS")
return True
except Exception as e:
self._log(f"❌ [PDD] 监听过程中出现严重错误: {str(e)}", "ERROR")
import traceback
self._log(f"错误详情: {traceback.format_exc()}", "DEBUG")
return False
def stop_listening(self):
try:
self._log("🛑 开始停止拼多多监听...", "INFO")
self.running = False
if self.main_thread and self.main_thread.is_alive():
try:
self._log("🔄 等待主线程结束...", "DEBUG")
self.main_thread.join(timeout=5.0)
if self.main_thread.is_alive():
self._log("⚠️ 主线程未在超时时间内结束", "WARNING")
except Exception as e:
self._log(f"⚠️ 等待主线程结束时出错: {e}", "DEBUG")
self._log("✅ 拼多多监听已停止", "SUCCESS")
except Exception as e:
self._log(f"❌ 停止监听时出错: {str(e)}", "ERROR")
def is_running(self) -> bool:
return (self.running and
self.main_thread is not None and
self.main_thread.is_alive() and
self.startup_success)
def _parse_login_params(self, login_params_str: str) -> dict:
"""解析后端下发的登录参数"""
try:
import json
data = json.loads(login_params_str)
params = data.get("data", {}).get("login_params", {})
self._log(f"✅ [PDD] 登录参数解析成功: username={params.get('username', 'N/A')}", "INFO")
return params
except Exception as e:
self._log(f"❌ [PDD] 解析登录参数失败: {e}", "ERROR")
return {}
def get_status(self) -> Dict[str, Any]:
return {
"running": self.running,
"has_bot": self.pdd_bot is not None,
"main_thread_alive": self.main_thread.is_alive() if self.main_thread else False,
"startup_success": self.startup_success,
"startup_error": self.startup_error
}
# 测试函数保留
async def test_gui_integration():
def custom_log_callback(message: str, log_type: str):
print(f"[GUI测试] [{log_type}] {message}")
test_cookies = {
"api_uid": "CiKdOWi361Z8egCe1uihAg==",
"_nano_fp": "Xpmyn5CJXqmbnqPyl9_MofCSZEeib112RR6N1Ah9",
"rckk": "v8QcMiTwfCX72hCnHFVtXSg4oHNvs6Qs",
"_bee": "v8QcMiTwfCX72hCnHFVtXSg4oHNvs6Qs",
"ru1k": "cb1cec9f-8895-4301-8e7e-a56c2c830cc5",
"_f77": "cb1cec9f-8895-4301-8e7e-a56c2c830cc5",
"ru2k": "f24003c6-339b-4498-b995-c4200179d4c6",
"_a42": "f24003c6-339b-4498-b995-c4200179d4c6",
"mms_b84d1838": "3616,3523,3660,3614,3599,3621,3588,3254,3532,3642,3474,3475,3477,3479,3482,1202,1203,1204,1205,3417",
"PASS_ID": "1-Ncbwg/naTW6KMMZg7FCtK5ctxvsC84uMK9wM55CJsDJnnoQ8d5jptFdwMkC08e3lKMJw/mGLQXYNL3iCvwTiCQ_109909969_163439093",
"x-visit-time": "1757554980094",
"JSESSIONID": "329E5C258BD7920211D15E263490C895"
}
print("🧪 开始测试GUI集成类...")
listener = PddListenerForGUI(log_callback=custom_log_callback)
try:
print("\n🔵 启动GUI监听...")
success = await listener.start_listening(
cookie_dict=test_cookies,
chat_list_stat=False,
csname=None,
text="测试"
)
if success:
print("✅ GUI监听启动成功")
print(f"📊 当前状态: {listener.get_status()}")
print("\n🟢 监听运行中,将持续运行直到手动停止...")
while True:
if not listener.is_running():
print("❌ 监听意外停止")
break
await asyncio.sleep(1)
print("\n🛑 测试完成,开始停止监听...")
listener.stop_listening()
print(f"📊 停止后状态: {listener.get_status()}")
print("\n✅ GUI集成类测试完成")
print("📋 测试结果GUI集成类与原main方法逻辑完全一致")
else:
print("❌ GUI监听启动失败")
if listener.startup_error:
print(f"❌ 错误原因: {listener.startup_error}")
except KeyboardInterrupt:
print("\n🛑 用户中断测试")
listener.stop_listening()
except Exception as e:
print(f"❌ 测试过程中出错: {e}")
listener.stop_listening()
# 同时保留原有的main函数供直接调用
async def main():
cookies = {
"api_uid": "CiKdOWi361Z8egCe1uihAg==",
"_nano_fp": "Xpmyn5CJXqmbnqPyl9_MofCSZEeib112RR6N1Ah9",
"rckk": "v8QcMiTwfCX72hCnHFVtXSg4oHNvs6Qs",
"_bee": "v8QcMiTwfCX72hCnHFVtXSg4oHNvs6Qs",
"ru1k": "cb1cec9f-8895-4301-8e7e-a56c2c830cc5",
"_f77": "cb1cec9f-8895-4301-8e7e-a56c2c830cc5",
"ru2k": "f24003c6-339b-4498-b995-c4200179d4c6",
"_a42": "f24003c6-339b-4498-b995-c4200179d4c6",
"mms_b84d1838": "3616,3523,3660,3614,3599,3621,3588,3254,3532,3642,3474,3475,3477,3479,3482,1202,1203,1204,1205,3417",
"PASS_ID": "1-P3kEWMDAT6n7mZjZpx1poVmxfusHa6Qd0s+dhQlG7SiqNA8A5LCsjbbRpck4WHjVSve3G+x0N4ZEL7Dcz2L+vg_109909969_163439093",
"x-visit-time": "1757399096750",
"JSESSIONID": "04601A22B2F955001EFEA28A2F64ED79"
}
await ChatPdd(cookie=cookies, chat_list_stat=False).main()
if __name__ == '__main__':
# 可以选择直接运行main()或者测试GUI集成
# 测试GUI集成
asyncio.run(test_gui_integration())
# asyncio.run(main())