[patch] 新增DY 图片 视频上传 发送等方法逻辑集成 优化抖音心跳维护与弹性发送心跳包 DY集成内置js环境 PDD取消过滤系统机器消息

This commit is contained in:
2025-10-27 16:32:07 +08:00
parent 787c52a8dc
commit 1f1deb5f7f
5 changed files with 1556 additions and 64 deletions

View File

@@ -14,12 +14,26 @@ from dataclasses import dataclass, asdict
from urllib.parse import urlencode
import re
import random
import base64
import os
import zlib
import hashlib
import string
from concurrent.futures import ThreadPoolExecutor
import config
# 导入 message_arg 中的方法
from Utils.Dy.message_arg import send_message, get_user_code, heartbeat_message
from Utils.Dy.message_arg import send_message, get_user_code, heartbeat_message, send_img, send_video
from Utils.message_models import PlatformMessage
# 🔧 尝试导入PyMiniRacer内置V8引擎无需外部JavaScript环境
try:
from py_mini_racer import MiniRacer
PYMINIRACER_AVAILABLE = True
except ImportError:
PYMINIRACER_AVAILABLE = False
MiniRacer = None
# ===== 抖音登录相关类集成开始 =====
@@ -663,6 +677,14 @@ class DouYinMessageHandler:
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"x-secsdk-csrf-token": "0001000000017e890e18651b2ef5f6d36d0485a64cae6b0bfc36d69e27fdc20fe7d423670eba1861a5bcb5baaf40,a25cfb6098b498c33ee5f0a5dcafe47b"
}
# 🔥 新增:线程池用于异步操作(下载、上传等)
self.pool = ThreadPoolExecutor(max_workers=3)
# 🔥 新增:签名引擎相关
self.sign_engine_initialized = False
self.sign_ctx = None
self.js_engine = None
# 打印实例创建信息
print(f"[DY Handler] 创建实例 {self.instance_id} for store {store_id}")
@@ -959,28 +981,52 @@ class DouYinMessageHandler:
def on_error(self, ws, error):
"""WebSocket错误处理"""
self._log(f"❌ WebSocket错误: {error}", "ERROR")
self.is_running = False
# 🔥 不立即设置 is_running = False让重连机制处理
# self.is_running = False
def on_close(self, ws, close_status_code, close_msg):
"""WebSocket关闭处理"""
self._log(f"🔌 连接关闭: {close_status_code}, {close_msg}", "WARNING")
self.is_running = False
# 🔥 添加详细的关闭原因分析
if close_status_code:
if close_status_code == 1006:
self._log("⚠️ 异常关闭(1006),可能是网络问题或心跳超时", "WARNING")
elif close_status_code == 1000:
self._log("✅ 正常关闭(1000),服务器主动断开", "INFO")
elif close_status_code == 1001:
self._log("⚠️ 端点离开(1001)", "WARNING")
else:
self._log(f"⚠️ 关闭代码: {close_status_code}", "WARNING")
# 🔥 不立即设置 is_running = False让重连机制处理
# self.is_running = False
def heartbeat_wss(self):
"""心跳线程"""
heartbeat_count = 0
while self.is_running:
try:
if self.ws and hasattr(self.ws, 'sock') and self.ws.sock and self.ws.sock.connected:
self.send_heartbeat()
time.sleep(3)
heartbeat_count += 1
# 🔍 每10次心跳输出一次统计
if heartbeat_count % 10 == 0:
self._log(f"📊 [心跳统计] 已发送 {heartbeat_count} 次心跳", "INFO")
# 🔥 优化增加心跳间隔到5秒避免被抖音风控
# 并添加随机抖动4.5-5.5秒)
import random
sleep_time = 5 + random.uniform(-0.5, 0.5)
time.sleep(sleep_time)
except Exception as e:
self._log(f"❌ 心跳发送失败: {e}", "ERROR")
import traceback
self._log(f"心跳异常详情: {traceback.format_exc()}", "DEBUG")
time.sleep(5)
def send_heartbeat(self):
"""发送心跳包 - 使用 message_arg 中的方法"""
try:
# 使用 message_arg 中的 heartbeat_message 方法
value, message_type = heartbeat_message(
pigeon_sign=self.config["data"]["pigeon_sign"],
token=self.config["data"]["token"],
@@ -1880,6 +1926,719 @@ class DouYinMessageHandler:
self._log(f"📤 已请求用户 {receiver_id} 的token", "INFO")
except Exception as e:
self._log(f"❌ 请求token失败: {e}", "ERROR")
# ==================== 🔥 新增:图片/视频处理方法 ====================
def _initialize_sign_engine(self):
"""初始化JavaScript签名引擎用于图片/视频上传签名)"""
if self.sign_engine_initialized:
return True
try:
self._log("🔧 [DY上传] 初始化JavaScript签名引擎...", "INFO")
# 获取 sign.js 文件路径
from windows_taskbar_fix import get_resource_path
sign_js_path = get_resource_path("static/js/sign.js")
if not os.path.exists(sign_js_path):
self._log(f"❌ [DY上传] 签名文件不存在: {sign_js_path}", "ERROR")
return False
# 读取 sign.js 文件
with open(sign_js_path, 'r', encoding='utf-8') as f:
jscode = f.read()
# 🔧 优先使用PyMiniRacer内置V8无需外部JavaScript环境
if PYMINIRACER_AVAILABLE:
self._log("✅ [DY上传] 使用PyMiniRacer内置JavaScript引擎", "INFO")
self.js_engine = "PyMiniRacer"
self.sign_ctx = MiniRacer()
self.sign_ctx.eval(jscode)
self._log("✅ [DY上传] PyMiniRacer引擎初始化成功", "SUCCESS")
else:
# 回退到execjs需要Node.js环境
self._log("⚠️ [DY上传] PyMiniRacer不可用回退到execjs", "WARNING")
try:
import execjs
self.js_engine = "execjs"
self.sign_ctx = execjs.compile(jscode)
self._log("✅ [DY上传] execjs引擎初始化成功", "SUCCESS")
except ImportError:
self._log("❌ [DY上传] execjs未安装无法使用签名功能", "ERROR")
return False
self.sign_engine_initialized = True
return True
except Exception as e:
self._log(f"❌ [DY上传] 初始化签名引擎失败: {e}", "ERROR")
import traceback
self._log(f"错误详情: {traceback.format_exc()}", "DEBUG")
return False
def _call_sign_function(self, function_name, *args):
"""调用JavaScript签名函数统一接口"""
try:
if not self.sign_engine_initialized:
if not self._initialize_sign_engine():
raise Exception("签名引擎未初始化")
if self.js_engine == "PyMiniRacer":
# PyMiniRacer调用方式
js_args = ", ".join([json.dumps(arg) if not isinstance(arg, (int, float, bool)) else str(arg) for arg in args])
js_call = f"{function_name}({js_args})"
return self.sign_ctx.eval(js_call)
else:
# execjs调用方式
return self.sign_ctx.call(function_name, *args)
except Exception as e:
self._log(f"❌ [DY上传] 调用签名函数失败: {e}", "ERROR")
raise
def _get_file_extension(self, url, default_ext):
"""智能提取文件扩展名
Args:
url: 文件URL
default_ext: 默认扩展名jpg/mp4
Returns:
str: 文件扩展名
"""
try:
# 移除查询参数
url_without_params = url.split('?')[0]
# 检查是否有有效的文件扩展名
if '.' in url_without_params:
parts = url_without_params.split('.')
ext = parts[-1].lower()
# 验证扩展名是否合法(只包含字母数字)
if ext and len(ext) <= 5 and ext.isalnum():
# 常见图片/视频扩展名
valid_exts = ['jpg', 'jpeg', 'png', 'gif', 'webp', 'bmp', 'mp4', 'avi', 'mov', 'wmv', 'flv', 'mkv']
if ext in valid_exts:
return ext
# 如果无法提取有效扩展名,使用默认值
return default_ext
except Exception:
return default_ext
async def _download_media(self, media_url, media_type="image", max_retries=3):
"""下载图片或视频文件(带重试机制 + 智能扩展名识别)
Args:
media_url: 媒体文件URL
media_type: 媒体类型image/video
max_retries: 最大重试次数
Returns:
tuple: (local_file_path, file_size_kb) 或 (None, None)
"""
for attempt in range(max_retries):
try:
if attempt > 0:
self._log(f"🔄 [DY{media_type}] 第{attempt + 1}次重试下载...", "INFO")
else:
self._log(f"🔽 [DY{media_type}] 开始下载: {media_url[:100]}...", "INFO")
# 使用线程池下载文件(避免阻塞)
response = await asyncio.get_event_loop().run_in_executor(
self.pool,
lambda: requests.get(media_url, timeout=30, headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
})
)
if response.status_code != 200:
self._log(f"❌ [DY{media_type}] 下载失败HTTP状态码: {response.status_code}", "ERROR")
if attempt < max_retries - 1:
await asyncio.sleep(1)
continue
return None, None
# 获取文件数据
file_data = response.content
file_size_kb = len(file_data) // 1024
# 检查文件大小限制50MB
if file_size_kb > 51200:
self._log(f"❌ [DY{media_type}] 文件过大: {file_size_kb}KB超过50MB限制", "ERROR")
return None, None
# 检查文件是否为空
if file_size_kb == 0:
self._log(f"❌ [DY{media_type}] 下载的文件为空", "ERROR")
if attempt < max_retries - 1:
await asyncio.sleep(1)
continue
return None, None
# 保存到临时文件
temp_dir = os.path.join(os.path.dirname(__file__), "temp_uploads")
os.makedirs(temp_dir, exist_ok=True)
# 🔥 修复:智能提取文件扩展名
default_ext = 'mp4' if media_type == 'video' else 'jpg'
ext = self._get_file_extension(media_url, default_ext)
temp_filename = f"{media_type}_{uuid.uuid4().hex[:12]}.{ext}"
temp_filepath = os.path.join(temp_dir, temp_filename)
# 写入临时文件
with open(temp_filepath, 'wb') as f:
f.write(file_data)
self._log(f"✅ [DY{media_type}] 下载成功,大小: {file_size_kb}KB文件: {temp_filename}", "SUCCESS")
return temp_filepath, file_size_kb
except requests.exceptions.RequestException as e:
self._log(f"❌ [DY{media_type}] 网络请求失败: {e}", "ERROR")
if attempt < max_retries - 1:
await asyncio.sleep(2)
continue
return None, None
except Exception as e:
self._log(f"❌ [DY{media_type}] 下载失败: {e}", "ERROR")
if attempt < max_retries - 1:
await asyncio.sleep(1)
continue
return None, None
self._log(f"❌ [DY{media_type}] 下载失败,已重试{max_retries}", "ERROR")
return None, None
def _cleanup_temp_file(self, file_path):
"""清理临时文件"""
try:
if file_path and os.path.exists(file_path):
os.remove(file_path)
self._log(f"🗑️ [DY上传] 临时文件已删除: {os.path.basename(file_path)}", "DEBUG")
except Exception as e:
self._log(f"⚠️ [DY上传] 删除临时文件失败: {e}", "WARNING")
async def _get_upload_token(self, upload_type="image"):
"""获取上传Token图片或视频"""
try:
self._log(f"📝 [DY上传] 请求{upload_type}上传Token...", "DEBUG")
headers = {
"authority": "pigeon.jinritemai.com",
"accept": "application/json, text/plain, */*",
"accept-language": "zh-CN,zh;q=0.9",
"cache-control": "no-cache",
"origin": "https://im.jinritemai.com",
"pragma": "no-cache",
"referer": "https://im.jinritemai.com/",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
"x-secsdk-csrf-token": "000100000001d87d9750fd37e74337d4d45e4b1d099ae24c4a56636764617b7be09bbe989ab618710dd6176c296d,796bb03b67483620a4e078de481a1100"
}
if upload_type == "video":
url = "https://pigeon.jinritemai.com/backstage/video/getUploadToken"
else:
url = "https://pigeon.jinritemai.com/backstage/getSTS2Token"
params = {
"biz_type": "4",
"PIGEON_BIZ_TYPE": "2",
"_ts": int(time.time() * 1000),
"_pms": "1",
"FUSION": "true",
"verifyFp": "verify_mh2rv3to_ZqgJkknT_7C5g_4Ov9_9Rz5_zIPjTzd4Ee6L",
"_v": "1.0.1.4380"
}
response = await asyncio.get_event_loop().run_in_executor(
self.pool,
lambda: requests.get(url, headers=headers, cookies=self.cookie, params=params)
)
if response.status_code != 200:
self._log(f"❌ [DY上传] 获取上传Token失败: HTTP {response.status_code}", "ERROR")
return None
result = response.json()
data = result.get("data")
if upload_type == "video":
token_data = data.get("token", {})
return {
"access_key_id": token_data.get("access_key_id"),
"secret_access_key": token_data.get("secret_access_key"),
"session_token": token_data.get("session_token")
}
else:
return {
"access_key_id": data.get("AccessKeyID"),
"secret_access_key": data.get("SecretAccessKey"),
"service_id": data.get("ServiceId"),
"session_token": data.get("SessionToken")
}
except Exception as e:
self._log(f"❌ [DY上传] 获取上传Token失败: {e}", "ERROR")
return None
async def _upload_image_to_douyin(self, image_path, token_info):
"""上传图片到抖音服务器(完整流程)"""
try:
self._log("📤 [DY图片] 开始上传图片到抖音服务器...", "INFO")
current_time = datetime.utcnow()
formatted_time = current_time.strftime("%Y%m%dT%H%M%SZ")
random_string = ''.join(random.choices(string.ascii_lowercase + string.digits, k=10))
# 步骤1: 申请上传
upload_node = await self._apply_image_upload(
formatted_time, random_string, token_info, action="ApplyImageUpload"
)
if not upload_node:
return None
session_key = upload_node.get("SessionKey")
store_uri = upload_node.get("StoreInfos", [{}])[0].get("StoreUri")
authorization = upload_node.get("StoreInfos", [{}])[0].get("Auth")
# 步骤2-4: 分片上传
upload_id = await self._upload_file_part(store_uri, authorization, image_path, step=1)
if not upload_id:
return None
success = await self._upload_file_part(store_uri, authorization, image_path, step=2, upload_id=upload_id)
if not success:
return None
with open(image_path, 'rb') as f:
content = f.read()
crc32_hex = format(zlib.crc32(content), '08x')
success = await self._upload_file_part(store_uri, authorization, image_path, step=3, upload_id=upload_id, crc=crc32_hex)
if not success:
return None
# 步骤5: 提交上传获取图片URI
result = await self._apply_image_upload(
formatted_time, random_string, token_info,
action="CommitImageUpload", session_key=session_key
)
if result:
image_uri = result.get("ImageUri")
self._log(f"✅ [DY图片] 上传成功URI: {image_uri}", "SUCCESS")
return image_uri
return None
except Exception as e:
self._log(f"❌ [DY图片] 上传失败: {e}", "ERROR")
return None
async def _apply_image_upload(self, formatted_time, random_string, token_info, action, session_key=None):
"""申请或提交图片上传"""
try:
url = "https://imagex.bytedanceapi.com/"
if action == "ApplyImageUpload":
params = {
"Action": "ApplyImageUpload",
"Version": "2018-08-01",
"ServiceId": token_info["service_id"],
"s": random_string
}
sign = self._call_sign_function(
"signature", "GET", "imagex",
"AWS4" + token_info["secret_access_key"],
formatted_time, token_info["session_token"], params
)
else:
params = {
"Action": "CommitImageUpload",
"Version": "2018-08-01",
"SessionKey": session_key,
"ServiceId": token_info["service_id"]
}
sign = self._call_sign_function(
"signature", "POST", "imagex",
"AWS4" + token_info["secret_access_key"],
formatted_time, token_info["session_token"], params
)
authorization = f"AWS4-HMAC-SHA256 Credential={token_info['access_key_id']}/{formatted_time.split('T')[0]}/cn-north-1/imagex/aws4_request, SignedHeaders=x-amz-date;x-amz-security-token, Signature={sign}"
headers = {
"authority": "imagex.bytedanceapi.com",
"accept": "*/*",
"accept-language": "zh-CN,zh;q=0.9",
"authorization": authorization,
"cache-control": "no-cache",
"origin": "https://im.jinritemai.com",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
"x-amz-date": formatted_time,
"x-amz-security-token": token_info["session_token"]
}
if action == "ApplyImageUpload":
response = await asyncio.get_event_loop().run_in_executor(
self.pool,
lambda: requests.get(url, headers=headers, params=params)
)
result = response.json()
return result.get("Result", {}).get("InnerUploadAddress", {}).get("UploadNodes", [{}])[0]
else:
response = await asyncio.get_event_loop().run_in_executor(
self.pool,
lambda: requests.post(url, headers=headers, params=params)
)
result = response.json()
return result.get("Result", {}).get("PluginResult", [{}])[0]
except Exception as e:
self._log(f"❌ [DY图片] {action}失败: {e}", "ERROR")
return None
async def _upload_file_part(self, uri, authorization, file_path, step, crc=None, upload_id=None):
"""上传图片文件分片(带超时和重试)"""
try:
headers = {
"Authorization": authorization,
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Host": "tos-d-x-lf.douyin.com",
"Origin": "https://im.jinritemai.com",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
"X-Storage-U": self.cookie["PIGEON_CID"],
}
url = f"https://tos-d-x-lf.douyin.com/{uri}"
if step == 1:
headers["Content-Type"] = "multipart/form-data"
params = {"uploads": ""}
response = await asyncio.get_event_loop().run_in_executor(
self.pool,
lambda: requests.post(url, headers=headers, params=params, timeout=30)
)
return response.json().get("payload", {}).get("uploadID")
elif step == 2:
with open(file_path, 'rb') as f:
content = f.read()
headers["Content-Crc32"] = format(zlib.crc32(content), '08x')
headers["Content-Length"] = str(len(content))
headers["Content-Type"] = 'application/octet-stream'
params = {"partNumber": "1", "uploadID": upload_id}
response = await asyncio.get_event_loop().run_in_executor(
self.pool,
lambda: requests.put(url, headers=headers, params=params, data=content, timeout=120)
)
return response.json().get("error", {}).get("code") == 200
elif step == 3:
headers["Content-Type"] = 'text/plain;charset=UTF-8'
params = {"uploadID": upload_id}
response = await asyncio.get_event_loop().run_in_executor(
self.pool,
lambda: requests.post(url, headers=headers, params=params, data=f"1:{crc}", timeout=30)
)
return response.json().get("error", {}).get("code") == 200
except Exception as e:
self._log(f"❌ [DY图片] 分片上传失败(步骤{step}): {e}", "ERROR")
return None if step == 1 else False
async def _send_image_message(self, receiver_id, talk_id, p_id, user_token, image_uri):
"""发送图片消息给用户"""
try:
self._log(f"📤 [DY图片] 发送图片消息给用户 {receiver_id}", "INFO")
value, message_type = send_img(
pigeon_sign=self.config["data"]["pigeon_sign"],
token=self.config["data"]["token"],
receiver_id=receiver_id,
shop_id=self.cookie["SHOP_ID"],
talk_id=talk_id,
session_did=self.cookie["PIGEON_CID"],
p_id=p_id,
user_code=user_token,
img=image_uri
)
form_data = blackboxprotobuf.encode_message(value=value, message_type=message_type)
self.ws.send_bytes(form_data)
self._log(f"✅ [DY图片] 图片消息已发送", "SUCCESS")
return True
except Exception as e:
self._log(f"❌ [DY图片] 发送失败: {e}", "ERROR")
return False
async def _upload_video_to_douyin(self, video_path, token_info):
"""上传视频到抖音服务器(完整流程)"""
try:
self._log("📤 [DY视频] 开始上传视频...", "INFO")
# 🔥 获取实际文件大小
file_size = os.path.getsize(video_path)
self._log(f"🔍 [DY视频] 文件大小: {file_size} 字节", "DEBUG")
current_time = datetime.utcnow()
formatted_time = current_time.strftime("%Y%m%dT%H%M%SZ")
random_string = ''.join(random.choices(string.ascii_lowercase + string.digits, k=10))
upload_node = await self._apply_video_upload(formatted_time, random_string, token_info, file_size, action="ApplyUploadInner")
if not upload_node:
return None
session_key = upload_node.get("SessionKey")
store_uri = upload_node.get("StoreInfos", [{}])[0].get("StoreUri")
authorization = upload_node.get("StoreInfos", [{}])[0].get("Auth")
upload_id = await self._upload_video_part(store_uri, authorization, video_path, step=1)
if not upload_id:
return None
success = await self._upload_video_part(store_uri, authorization, video_path, step=2, upload_id=upload_id)
if not success:
return None
with open(video_path, 'rb') as f:
content = f.read()
crc32_hex = format(zlib.crc32(content), '08x')
success = await self._upload_video_part(store_uri, authorization, video_path, step=3, upload_id=upload_id, crc=crc32_hex)
if not success:
return None
result = await self._apply_video_upload(formatted_time, random_string, token_info, file_size, action="CommitUploadInner", session_key=session_key)
if result:
# 🔥 添加调试日志,查看返回结构
self._log(f"🔍 [DY视频] CommitUpload返回结构: {json.dumps(result, ensure_ascii=False)[:500]}", "DEBUG")
poster_uri = result.get("PosterUri")
cover_url = await self._get_url_for_uri(poster_uri) if poster_uri else ""
# 🔥 提取视频元数据
video_meta = result.get("VideoMeta", {})
if not video_meta:
self._log(f"⚠️ [DY视频] VideoMeta为空使用默认值", "WARNING")
vid = result.get("Vid")
width = video_meta.get("Width") if video_meta else 1920
height = video_meta.get("Height") if video_meta else 1080
duration = video_meta.get("Duration") if video_meta else 0
self._log(f"🔍 [DY视频] 提取的元数据: vid={vid}, width={width}, height={height}, duration={duration}", "DEBUG")
return {
"vid": vid,
"width": str(width),
"height": str(height),
"duration": str(duration),
"cover_url": cover_url
}
return None
except Exception as e:
self._log(f"❌ [DY视频] 上传失败: {e}", "ERROR")
return None
async def _apply_video_upload(self, formatted_time, random_string, token_info, file_size, action, session_key=None):
"""申请或提交视频上传"""
try:
url = "https://open.bytedanceapi.com/"
if action == "ApplyUploadInner":
# 🔥 修复:使用实际文件大小,不是固定值
params = {"Action": "ApplyUploadInner", "Version": "2020-11-19", "SpaceName": "pigeon-video", "FileType": "video", "IsInner": "1", "FileSize": str(file_size), "s": random_string}
sign = self._call_sign_function("signature", "GET", "vod", "AWS4" + token_info["secret_access_key"], formatted_time, token_info["session_token"], params)
else:
params = {"Action": "CommitUploadInner", "Version": "2020-11-19", "SpaceName": "pigeon-video"}
payload = {"SessionKey": session_key, "Functions": [{"name": "GetMeta"}, {"name": "Snapshot", "input": {"SnapshotTime": 0}}]}
body_sha256 = hashlib.sha256(json.dumps(payload).replace(" ", "").encode()).hexdigest()
sign = self._call_sign_function("signature", "POST", "vod", "AWS4" + token_info["secret_access_key"], formatted_time, token_info["session_token"], params, payload)
authorization = f"AWS4-HMAC-SHA256 Credential={token_info['access_key_id']}/{formatted_time.split('T')[0]}/cn-north-1/vod/aws4_request, SignedHeaders="
authorization += "x-amz-content-sha256;x-amz-date;x-amz-security-token, Signature=" + sign if action == "CommitUploadInner" else "x-amz-date;x-amz-security-token, Signature=" + sign
headers = {
"authorization": authorization,
"x-amz-date": formatted_time,
"x-amz-security-token": token_info["session_token"],
"User-Agent": "Mozilla/5.0"
}
if action == "ApplyUploadInner":
response = await asyncio.get_event_loop().run_in_executor(self.pool, lambda: requests.get(url, headers=headers, params=params))
result = response.json()
self._log(f"🔍 [DY视频] ApplyUploadInner响应: {json.dumps(result, ensure_ascii=False)[:300]}", "DEBUG")
return result.get("Result", {}).get("InnerUploadAddress", {}).get("UploadNodes", [{}])[0]
else:
headers["X-Amz-Content-Sha256"] = body_sha256
response = await asyncio.get_event_loop().run_in_executor(self.pool, lambda: requests.post(url, headers=headers, params=params, data=json.dumps(payload).replace(" ", "")))
full_result = response.json()
self._log(f"🔍 [DY视频] CommitUploadInner完整响应: {json.dumps(full_result, ensure_ascii=False)[:800]}", "DEBUG")
# 🔥 修复:直接返回 Result 对象,而不是 Results 数组的第一个元素
# 因为视频元数据在 Result 中,不在 Results[0] 中
result_data = full_result.get("Result", {})
# 检查是否有 Results 数组(某些情况下返回格式不同)
if "Results" in result_data and result_data["Results"]:
self._log(f"🔍 [DY视频] 使用Results数组: {result_data['Results'][0]}", "DEBUG")
return result_data["Results"][0]
else:
# 直接返回 Result 对象(包含 Vid, VideoMeta, PosterUri 等)
self._log(f"🔍 [DY视频] 使用Result对象", "DEBUG")
return result_data
except Exception as e:
self._log(f"❌ [DY视频] {action}失败: {e}", "ERROR")
return None
async def _upload_video_part(self, uri, authorization, file_path, step, crc=None, upload_id=None):
"""上传视频文件分片"""
try:
headers = {
"Authorization": authorization,
"Host": "tos-hl-x.snssdk.com",
"Origin": "https://im.jinritemai.com",
"X-Storage-U": self.cookie["PIGEON_CID"]
}
url = f"https://tos-hl-x.snssdk.com/{uri}"
if step == 1:
response = await asyncio.get_event_loop().run_in_executor(
self.pool,
lambda: requests.post(url, headers=headers, params={"uploads": ""}, timeout=30)
)
return response.json().get("payload", {}).get("uploadID")
elif step == 2:
with open(file_path, 'rb') as f:
content = f.read()
headers["Content-Crc32"] = format(zlib.crc32(content), '08x')
headers["Content-Type"] = 'application/octet-stream'
response = await asyncio.get_event_loop().run_in_executor(
self.pool,
lambda: requests.put(url, headers=headers, params={"partNumber": "1", "uploadID": upload_id}, data=content, timeout=120)
)
return response.json().get("error", {}).get("code") == 200
elif step == 3:
headers["Content-Type"] = 'text/plain;charset=UTF-8'
response = await asyncio.get_event_loop().run_in_executor(
self.pool,
lambda: requests.post(url, headers=headers, params={"uploadID": upload_id}, data=f"1:{crc}", timeout=30)
)
return response.json().get("error", {}).get("code") == 200
except Exception as e:
self._log(f"❌ [DY视频] 分片上传失败(步骤{step}): {e}", "ERROR")
return None if step == 1 else False
async def _get_url_for_uri(self, uri):
"""根据URI获取实际URL"""
try:
params = {"biz_type": "4", "PIGEON_BIZ_TYPE": "2", "_ts": int(time.time() * 1000), "_pms": "1", "FUSION": "true", "uri": uri, "file_type": "image"}
response = await asyncio.get_event_loop().run_in_executor(
self.pool,
lambda: requests.get("https://pigeon.jinritemai.com/backstage/getURLForURI", headers=self.video_headers, params=params, cookies=self.cookie)
)
return response.json().get("data", {}).get("k3s_url")
except Exception as e:
self._log(f"❌ [DY视频] 获取封面URL失败: {e}", "ERROR")
return None
async def _put_video(self, vid, receiver_id):
"""激活视频(关键步骤:告诉抖音这个视频可以使用了)
Args:
vid: 视频ID
receiver_id: 接收者ID
Returns:
bool: 是否成功
"""
try:
self._log(f"🔥 [DY视频] 激活视频: vid={vid}, receiver_id={receiver_id}", "INFO")
headers = {
"authority": "pigeon.jinritemai.com",
"accept": "application/json, text/plain, */*",
"accept-language": "zh-CN,zh;q=0.9",
"cache-control": "no-cache",
"content-type": "application/json;charset=UTF-8",
"origin": "https://im.jinritemai.com",
"pragma": "no-cache",
"referer": "https://im.jinritemai.com/",
"sec-ch-ua": "\"Not_A Brand\";v=\"8\", \"Chromium\";v=\"120\", \"Google Chrome\";v=\"120\"",
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": "\"Windows\"",
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "same-site",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"x-secsdk-csrf-token": "0001000000012c9d3ac3a79ed92ea62c56059c02dea78e3b9f2c856b2095ac3596036ea5d508187236fe1d77f3fd,796bb03b67483620a4e078de481a1100"
}
url = "https://pigeon.jinritemai.com/backstage/video/putVideo"
params = {
"vid": vid,
"receiver_id": str(receiver_id),
"biz_type": "4",
"PIGEON_BIZ_TYPE": "2",
"_ts": int(time.time() * 1000),
"_pms": "1",
"FUSION": "true",
"verifyFp": "verify_mh8hyodv_XxOgxVPT_WN66_4mBk_9Ht4_O0qGQxZuhqqo",
"_v": "1.0.1.4423"
}
data = {}
data_str = json.dumps(data, separators=(',', ':'))
response = await asyncio.get_event_loop().run_in_executor(
self.pool,
lambda: requests.post(url, headers=headers, cookies=self.cookie, params=params, data=data_str)
)
self._log(f"✅ [DY视频] 视频激活成功: {response.text[:200]}", "SUCCESS")
return True
except Exception as e:
self._log(f"❌ [DY视频] 视频激活失败: {e}", "ERROR")
return False
async def _send_video_message(self, receiver_id, talk_id, p_id, user_token, video_info):
"""发送视频消息给用户"""
try:
self._log(f"📤 [DY视频] 发送视频消息给用户 {receiver_id}", "INFO")
value, message_type = send_video(
pigeon_sign=self.config["data"]["pigeon_sign"],
token=self.config["data"]["token"],
receiver_id=receiver_id,
shop_id=self.cookie["SHOP_ID"],
talk_id=talk_id,
session_did=self.cookie["PIGEON_CID"],
p_id=p_id,
user_code=user_token,
vid=video_info["vid"],
cover_url=video_info["cover_url"],
height=video_info["height"],
width=video_info["width"],
duration=video_info["duration"]
)
form_data = blackboxprotobuf.encode_message(value=value, message_type=message_type)
self.ws.send_bytes(form_data)
self._log(f"✅ [DY视频] 视频消息已发送", "SUCCESS")
return True
except Exception as e:
self._log(f"❌ [DY视频] 发送失败: {e}", "ERROR")
return False
def _log_user_tokens_state(self):
"""记录当前 user_tokens 状态(用于调试)"""
@@ -2102,44 +2861,23 @@ class DouYinMessageHandler:
self._log("🛑 消息处理器已停止", "INFO")
async def send_message_external(self, receiver_id: str, content: str) -> bool:
"""外部调用的发送消息方法 - 用于后端消息转发"""
async def send_message_external(self, receiver_id: str, content: str, msg_type: str = "text") -> bool:
"""外部调用的发送消息方法 - 用于后端消息转发(支持图片/视频)"""
temp_file = None
try:
self._log(f"🔄 [External-{self.instance_id}] 收到转发请求: receiver_id={receiver_id}, content={content}",
self._log(f"🔄 [External-{self.instance_id}] 收到转发请求: receiver_id={receiver_id}, msg_type={msg_type}, content={content[:100] if content else ''}",
"INFO")
# 修复数据类型不匹配问题:将字符串转换为整数
try:
receiver_id_int = int(receiver_id)
self._log(f"🔧 [External-{self.instance_id}] 转换 receiver_id: '{receiver_id}' -> {receiver_id_int}",
"DEBUG")
except ValueError:
self._log(f"❌ [External-{self.instance_id}] receiver_id 无法转换为整数: {receiver_id}", "ERROR")
return False
# 调试信息:显示当前活跃用户
active_users = list(self.user_tokens.keys())
self._log(f"🔍 [External-{self.instance_id}] 当前活跃用户列表: {active_users}", "DEBUG")
self._log(f"🔍 [External-{self.instance_id}] 活跃用户数量: {len(active_users)}", "DEBUG")
# 检查用户是否存在于user_tokens中使用整数类型
self._log(f"🔍 [External-{self.instance_id}] 调试信息:", "DEBUG")
self._log(
f"🔍 [External-{self.instance_id}] receiver_id_int: {receiver_id_int} (类型: {type(receiver_id_int)})",
"DEBUG")
self._log(f"🔍 [External-{self.instance_id}] user_tokens keys: {list(self.user_tokens.keys())}", "DEBUG")
if self.user_tokens:
first_key = list(self.user_tokens.keys())[0]
self._log(f"🔍 [External-{self.instance_id}] 第一个key: {first_key} (类型: {type(first_key)})", "DEBUG")
self._log(f"🔍 [External-{self.instance_id}] 直接比较: {receiver_id_int == first_key}", "DEBUG")
if receiver_id_int not in self.user_tokens:
self._log(f"❌ [External-{self.instance_id}] 用户 {receiver_id_int} 不在活跃会话中", "WARNING")
self._log(f"💡 [External-{self.instance_id}] 提示:用户需要先在抖音平台发送消息建立会话", "INFO")
# 显示当前活跃用户的调试信息
self.print_active_users_debug()
return False
user_info = self.user_tokens[receiver_id_int]
@@ -2147,45 +2885,91 @@ class DouYinMessageHandler:
p_id = user_info.get("p_id")
user_token = user_info.get("token")
self._log(
f"🔍 [External-{self.instance_id}] 用户会话信息: talk_id={talk_id}, p_id={p_id}, has_token={bool(user_token)}",
"DEBUG")
# 检查必要参数
if not talk_id or not p_id:
self._log(
f"❌ [External-{self.instance_id}] 用户 {receiver_id_int} 缺少必要的会话信息 (talk_id: {talk_id}, p_id: {p_id})",
"ERROR")
self._log(f"❌ [External-{self.instance_id}] 缺少会话信息", "ERROR")
return False
if not user_token:
self._log(f"⚠️ [External-{self.instance_id}] 用户 {receiver_id_int} token为空尝试请求token", "WARNING")
# 请求用户token
self._log(f"⚠️ [External-{self.instance_id}] token为空加入待发送队列", "WARNING")
self._request_user_token(receiver_id_int, p_id)
# 将消息加入待发送队列
if "pending_messages" not in user_info:
user_info["pending_messages"] = []
user_info["pending_messages"].append(content)
self._log(
f"📝 [External-{self.instance_id}] 消息已加入待发送队列,队列长度: {len(user_info['pending_messages'])}",
"INFO")
return True
# 发送消息 (注意_send_message_to_user 可能期望字符串类型的receiver_id)
success = await self._send_message_to_user(receiver_id_int, talk_id, p_id, user_token, content)
# 🔥 根据消息类型分发处理
success = False
if msg_type == "image":
# 图片处理流程
self._log(f"🖼️ [External-{self.instance_id}] 开始图片发送流程", "INFO")
# 1. 下载图片
temp_file, file_size = await self._download_media(content, "image")
if not temp_file:
self._log(f"❌ [External-{self.instance_id}] 图片下载失败,回退发送提示", "WARNING")
return await self._send_message_to_user(receiver_id_int, talk_id, p_id, user_token, "[图片发送失败:下载失败]")
# 2. 获取上传Token
token_info = await self._get_upload_token("image")
if not token_info:
self._log(f"❌ [External-{self.instance_id}] 获取Token失败", "WARNING")
return await self._send_message_to_user(receiver_id_int, talk_id, p_id, user_token, "[图片发送失败:获取凭证失败]")
# 3. 上传图片
image_uri = await self._upload_image_to_douyin(temp_file, token_info)
if not image_uri:
self._log(f"❌ [External-{self.instance_id}] 图片上传失败", "WARNING")
return await self._send_message_to_user(receiver_id_int, talk_id, p_id, user_token, "[图片发送失败:上传失败]")
# 4. 发送图片消息
success = await self._send_image_message(receiver_id_int, talk_id, p_id, user_token, image_uri)
elif msg_type == "video":
# 视频处理流程
self._log(f"🎥 [External-{self.instance_id}] 开始视频发送流程", "INFO")
temp_file, file_size = await self._download_media(content, "video")
if not temp_file:
return await self._send_message_to_user(receiver_id_int, talk_id, p_id, user_token, "[视频发送失败:下载失败]")
token_info = await self._get_upload_token("video")
if not token_info:
return await self._send_message_to_user(receiver_id_int, talk_id, p_id, user_token, "[视频发送失败:获取凭证失败]")
video_info = await self._upload_video_to_douyin(temp_file, token_info)
if not video_info:
return await self._send_message_to_user(receiver_id_int, talk_id, p_id, user_token, "[视频发送失败:上传失败]")
# 🔥 关键新增:激活视频(告诉抖音这个视频可以使用)
vid = video_info.get("vid")
if vid:
put_success = await self._put_video(vid, receiver_id_int)
if not put_success:
self._log(f"⚠️ [External-{self.instance_id}] 视频激活失败,但继续发送", "WARNING")
else:
self._log(f"⚠️ [External-{self.instance_id}] 未获取到VID跳过激活步骤", "WARNING")
success = await self._send_video_message(receiver_id_int, talk_id, p_id, user_token, video_info)
else:
# 文本消息(默认)
success = await self._send_message_to_user(receiver_id_int, talk_id, p_id, user_token, content)
if success:
# 更新最后发送时间
user_info["last_sent"] = int(time.time() * 1000)
self._log(f"✅ [External-{self.instance_id}] 消息转发成功", "SUCCESS")
else:
self._log(f"❌ [External-{self.instance_id}] 消息转发失败", "ERROR")
return success
except Exception as e:
self._log(f"❌ [External-{self.instance_id}] 外部消息发送异常: {e}", "ERROR")
self._log(f"❌ [External-{self.instance_id}] 错误详情: {traceback.format_exc()}", "DEBUG")
self._log(f"❌ [External-{self.instance_id}] 异常: {e}", "ERROR")
self._log(f"错误详情: {traceback.format_exc()}", "DEBUG")
return False
finally:
# 清理临时文件
if temp_file:
self._cleanup_temp_file(temp_file)
def get_active_users_info(self) -> dict:
"""获取当前活跃用户的详细信息"""

View File

@@ -8,7 +8,7 @@ import uuid
import json
# 发送消息
# 发送文本消息
def send_message(pigeon_sign: str, token: str, receiver_id: str, shop_id: str, talk_id: int, session_did: str, p_id: int, user_code: str, text: str):
"""
构造发送消息消息体
@@ -78,7 +78,7 @@ def send_message(pigeon_sign: str, token: str, receiver_id: str, shop_id: str, t
'15': [
{'1': b'pigeon_source', '2': b'web'},
{'1': b'PIGEON_BIZ_TYPE', '2': b'2'},
{'1': b'pigeon_sign', '2': b'MIG6BAz2BNUON43WdlOBuGYEgZcsIho9ZjVP4yyExLShzXgAZtsvUMj2e3jZWeMZv+6+TNVQQMq3xSLrqiwcs2cCaOVBDuS6zGsWm5gBlGtlvOOLM5td2/9OS8P37t1sdkjN4BSH2mB7FlGItioZIsTh1sodn6pYCGj+45mtId3Itenufgai3Mnkpt573uoWJmagF8J3jVPHMFtdwd25Qf5vsWC2kB30glpQBBCbk2VO2ubMqctqQSzhI6uD'},
{'1': b'pigeon_sign', '2': pigeon_sign.encode()}, # 🔥 修复:使用动态参数,不是硬编码
{'1': b'session_aid', '2': b'1383'},
{'1': b'session_did', '2': session_did.encode()},
{'1': b'app_name', '2': b'im'},
@@ -373,3 +373,229 @@ def heartbeat_message(pigeon_sign: str, token: str, session_did: str):
message_type = {'1': {'type': 'int', 'name': ''}, '2': {'type': 'int', 'name': ''}, '3': {'type': 'int', 'name': ''}, '4': {'type': 'int', 'name': ''}, '5': {'type': 'message', 'message_typedef': {'1': {'type': 'bytes', 'name': ''}, '2': {'type': 'bytes', 'name': ''}}, 'name': ''}, '7': {'type': 'message', 'message_typedef': {'14': {'type': 'int', 'name': ''}}, 'name': ''}, '8': {'type': 'message', 'message_typedef': {'1': {'type': 'int', 'name': ''}, '2': {'type': 'int', 'name': ''}, '3': {'type': 'bytes', 'name': ''}, '4': {'type': 'bytes', 'name': ''}, '5': {'type': 'int', 'name': ''}, '6': {'type': 'int', 'name': ''}, '7': {'type': 'bytes', 'name': ''}, '8': {'type': 'message', 'message_typedef': {'200': {'type': 'message', 'message_typedef': {'1': {'type': 'int', 'name': ''}, '2': {'type': 'int', 'name': ''}}, 'name': ''}}, 'name': ''}, '9': {'type': 'bytes', 'name': ''}, '11': {'type': 'bytes', 'name': ''}, '15': {'type': 'message', 'message_typedef': {'1': {'type': 'bytes', 'name': ''}, '2': {'type': 'bytes', 'name': ''}}, 'name': ''}, '18': {'type': 'int', 'name': ''}}, 'name': ''}}
return value, message_type
# 🔥 新增:发送图片消息
def send_img(pigeon_sign: str, token: str, receiver_id: str, shop_id: str, talk_id: int, session_did: str, p_id: int, user_code: str, img: str, image_width: str = "2000", image_height: str = "1125", image_format: str = "png", image_size: str = "3157512"):
"""
构造发送图片消息体
:param image_width: 图片宽度
:param image_size: 图片大小
:param image_height: 图片高度
:param image_format: 图片格式
:param pigeon_sign: 接口返回
:param token: 接口返回
:param receiver_id: wss消息返回 对方用户id
:param shop_id: cookie自带
:param talk_id: wss消息返回 激活窗口id
:param session_did: cookie自带
:param p_id: wss消息返回
:param user_code: 用户token
:param img: 图片URI或URL
:return: (value, message_type)
"""
value = {
'1': 11778,
'2': int(time.time() * 1000),
'3': 10001,
'4': 1,
'5': [
{'1': b'pigeon_source', '2': b'web'},
{'1': b'PIGEON_BIZ_TYPE', '2': b'2'},
{'1': b'pigeon_sign', '2': pigeon_sign.encode()},
],
'7': {'14': 98},
'8': {
'1': 100,
'2': 11778,
'3': b'1.0.4-beta.2',
'4': token.encode(),
'5': 3,
'6': 3,
'7': b'2d97ea6:feat/add_init_callback',
'8': {
'100': {
'1': f"{receiver_id}:{shop_id}::2:1:pigeon".encode(),
'2': 11,
'3': p_id,
'4': "[图片]".encode(),
'5': [
{'1': b'type', '2': b'file_image'},
{'1': b'shop_id', '2': shop_id.encode()},
{'1': b'sender_role', '2': b'2'},
{'1': b'PIGEON_BIZ_TYPE', '2': b'2'},
{'1': b'src', '2': b'pc'},
{'1': b'srcType', '2': b'1'},
{'1': b'source', '2': b'pc-web'},
{'1': b'receiver_id', '2': str(receiver_id).encode()},
{'1': b'hierarchical_dimension', '2': b'{"dynamic_dimension":"4541_1131_9042_6599_9420_6832_4050_3823_3994_8564_1528_0388_8667_2179_7948_1870_1949_0989_8012_6240_7898_7548_8852_6245_9393_3650_8570_4026_4034_4057_6537_8632_2068_8958_0363_2387_9033_3425_2238_0982_1935_8188_3817_8557_7931_3278_4065_1893_6049_6961_3814_4883_4401_6637_7282_3652_9354_0437_4769_4815_9572_7230_5054_3951_4852_2188_3505_6813_2570_5394_0729","goofy_id":"1.0.1.1508","desk_version":"0.0.0","open_stores":"0","memL":"","cpuL":"","session_throughput":0,"message_throughput_send":0,"message_throughput_revice":0}'},
{'1': b'tag_valid', '2': b'1'},
{'1': b'imageUrl', '2': img.encode()},
{'1': b'imageWidth', '2': image_width.encode()},
{'1': b'imageHeight', '2': image_height.encode()},
{'1': b'imageFormat', '2': image_format.encode()},
{'1': b'imageSize', '2': image_size.encode()},
{'1': b'uuid', '2': str(uuid.uuid4()).encode()},
{'1': b'track_info','2': json.dumps({"send_time": int(time.time() * 1000), "_send_delta": "77","_send_delta_2": "216"}).encode()},
{'1': b'user_agent', '2': b'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36'},
{'1': b'sender_id', '2': b''},
{'1': b'biz_ext', '2': b'{}'},
{'1': b'p:from_source', '2': b'web'},
{'1': b's:mentioned_users', '2': b''},
{'1': b's:client_message_id', '2': str(uuid.uuid4()).encode()}
],
'6': 1000,
'7': user_code.encode(),
'8': str(uuid.uuid4()).encode(),
'14': talk_id
}
},
'9': session_did.encode(),
'11': b'web',
'15': [
{'1': b'pigeon_source', '2': b'web'},
{'1': b'PIGEON_BIZ_TYPE', '2': b'2'},
{'1': b'pigeon_sign', '2': b'MIG6BAz2BNUON43WdlOBuGYEgZcsIho9ZjVP4yyExLShzXgAZtsvUMj2e3jZWeMZv+6+TNVQQMq3xSLrqiwcs2cCaOVBDuS6zGsWm5gBlGtlvOOLM5td2/9OS8P37t1sdkjN4BSH2mB7FlGItioZIsTh1sodn6pYCGj+45mtId3Itenufgai3Mnkpt573uoWJmagF8J3jVPHMFtdwd25Qf5vsWC2kB30glpQBBCbk2VO2ubMqctqQSzhI6uD'},
{'1': b'session_aid', '2': b'1383'},
{'1': b'session_did', '2': session_did.encode()},
{'1': b'app_name', '2': b'im'},
{'1': b'priority_region', '2': b'cn'},
{'1': b'user_agent','2': b'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36'},
{'1': b'cookie_enabled', '2': b'true'},
{'1': b'browser_language', '2': b'zh-CN'},
{'1': b'browser_platform', '2': b'Win32'},
{'1': b'browser_name', '2': b'Mozilla'},
{'1': b'browser_version', '2': b'5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36'},
{'1': b'browser_online', '2': b'true'},
{'1': b'screen_width', '2': b'1707'},
{'1': b'screen_height', '2': b'1067'},
{'1': b'referer', '2': b''},
{'1': b'timezone_name', '2': b'Asia/Shanghai'}
],
'18': 2
}
}
message_type = {'1': {'type': 'int', 'name': ''}, '2': {'type': 'int', 'name': ''}, '3': {'type': 'int', 'name': ''}, '4': {'type': 'int', 'name': ''}, '5': {'type': 'message', 'message_typedef': {'1': {'type': 'bytes', 'name': ''}, '2': {'type': 'bytes', 'name': ''}}, 'name': ''}, '7': {'type': 'message', 'message_typedef': {'14': {'type': 'int', 'name': ''}}, 'name': ''}, '8': {'type': 'message', 'message_typedef': {'1': {'type': 'int', 'name': ''}, '2': {'type': 'int', 'name': ''}, '3': {'type': 'bytes', 'name': ''}, '4': {'type': 'bytes', 'name': ''}, '5': {'type': 'int', 'name': ''}, '6': {'type': 'int', 'name': ''}, '7': {'type': 'bytes', 'name': ''}, '8': {'type': 'message', 'message_typedef': {'100': {'type': 'message', 'message_typedef': {'1': {'type': 'bytes', 'name': ''}, '2': {'type': 'int', 'name': ''}, '3': {'type': 'int', 'name': ''}, '4': {'type': 'bytes', 'name': ''}, '5': {'type': 'message', 'message_typedef': {'1': {'type': 'bytes', 'name': ''}, '2': {'type': 'bytes', 'name': ''}}, 'name': ''}, '6': {'type': 'int', 'name': ''}, '7': {'type': 'bytes', 'name': ''}, '8': {'type': 'bytes', 'name': ''}, '14': {'type': 'int', 'name': ''}}, 'name': ''}}, 'name': ''}, '9': {'type': 'bytes', 'name': ''}, '11': {'type': 'bytes', 'name': ''}, '15': {'type': 'message', 'message_typedef': {'1': {'type': 'bytes', 'name': ''}, '2': {'type': 'bytes', 'name': ''}}, 'name': ''}, '18': {'type': 'int', 'name': ''}}, 'name': ''}}
return value, message_type
# 🔥 新增:发送视频消息
def send_video(pigeon_sign: str, token: str, receiver_id: str, shop_id: str, talk_id: int, session_did: str, p_id: int, user_code: str, vid: str, cover_url: str, height: str, width: str, duration: str):
"""
构造发送视频消息体
:param duration: 视频时长
:param width: 视频宽度
:param height: 视频高度
:param cover_url: 封面URL
:param pigeon_sign: 接口返回
:param token: 接口返回
:param receiver_id: wss消息返回 对方用户id
:param shop_id: cookie自带
:param talk_id: wss消息返回 激活窗口id
:param session_did: cookie自带
:param p_id: wss消息返回
:param user_code: 用户token
:param vid: 视频id
:return: (value, message_type)
"""
# 🔥 修复确保数值类型正确height/width为intduration为float
try:
height_int = int(height) if isinstance(height, str) else height
width_int = int(width) if isinstance(width, str) else width
duration_float = float(duration) if isinstance(duration, str) else duration
except (ValueError, TypeError):
# 如果转换失败,使用默认值
height_int = 1080
width_int = 1920
duration_float = 0.0
msg_render_model = json.dumps({
"msg_render_type": "video",
"render_body": {
"vid": vid,
"coverURL": cover_url,
"height": height_int,
"width": width_int,
"duration": duration_float
}
}).encode()
value = {
'1': 10015,
'2': int(time.time() * 1000),
'3': 10001,
'4': 1,
'5': [
{'1': b'pigeon_source', '2': b'web'},
{'1': b'PIGEON_BIZ_TYPE', '2': b'2'},
{'1': b'pigeon_sign', '2': pigeon_sign.encode()},
],
'7': {'14': 98},
'8': {
'1': 100,
'2': 10015,
'3': b'1.0.4-beta.2',
'4': token.encode(),
'5': 3,
'6': 3,
'7': b'2d97ea6:feat/add_init_callback',
'8': {
'100': {
'1': f"{receiver_id}:{shop_id}::2:1:pigeon".encode(),
'2': 11,
'3': p_id,
'4': "[视频]".encode(),
'5': [
{'1': b'type', '2': b'video'},
{'1': b'shop_id', '2': shop_id.encode()},
{'1': b'sender_role', '2': b'2'},
{'1': b'PIGEON_BIZ_TYPE', '2': b'2'},
{'1': b'src', '2': b'pc'},
{'1': b'srcType', '2': b'1'},
{'1': b'source', '2': b'pc-web'},
{'1': b'receiver_id', '2': str(receiver_id).encode()},
{'1': b'hierarchical_dimension', '2': b'{"dynamic_dimension":"4541_1131_9042_6599_9420_6832_4050_3823_3994_8564_1528_0388_8667_2179_7948_1870_1949_0989_8012_6240_7898_7548_8852_6245_9393_3650_8570_4026_4034_4057_6537_8632_2068_8958_0363_2387_9033_3425_2238_0982_1935_8188_3817_8557_7931_3278_4065_1893_6049_6961_3814_4883_4401_6637_7282_3652_9354_0437_4769_4815_9572_7230_5054_3951_4852_2188_3505_6813_2570_5394_0729","goofy_id":"1.0.1.1508","desk_version":"0.0.0","open_stores":"0","memL":"","cpuL":"","session_throughput":0,"message_throughput_send":0,"message_throughput_revice":0}'},
{'1': b'msg_render_model', '2': msg_render_model},
{'1': b'uuid', '2': str(uuid.uuid4()).encode()},
{'1': b'start_scene', '2': b'1'},
{'1': b'track_info','2': json.dumps({"send_time": int(time.time() * 1000), "_send_delta": "77","_send_delta_2": "216"}).encode()},
{'1': b'user_agent', '2': b'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36'},
{'1': b'sender_id', '2': b''},
{'1': b'biz_ext', '2': b'{}'},
{'1': b'p:from_source', '2': b'web'},
{'1': b's:mentioned_users', '2': b''},
{'1': b's:client_message_id', '2': str(uuid.uuid4()).encode()}
],
'6': 1000,
'7': user_code.encode(),
'8': str(uuid.uuid4()).encode(),
'14': talk_id
}
},
'9': session_did.encode(),
'11': b'web',
'15': [
{'1': b'pigeon_source', '2': b'web'},
{'1': b'PIGEON_BIZ_TYPE', '2': b'2'},
{'1': b'pigeon_sign', '2': b'MIG6BAz2BNUON43WdlOBuGYEgZcsIho9ZjVP4yyExLShzXgAZtsvUMj2e3jZWeMZv+6+TNVQQMq3xSLrqiwcs2cCaOVBDuS6zGsWm5gBlGtlvOOLM5td2/9OS8P37t1sdkjN4BSH2mB7FlGItioZIsTh1sodn6pYCGj+45mtId3Itenufgai3Mnkpt573uoWJmagF8J3jVPHMFtdwd25Qf5vsWC2kB30glpQBBCbk2VO2ubMqctqQSzhI6uD'},
{'1': b'session_aid', '2': b'1383'},
{'1': b'session_did', '2': session_did.encode()},
{'1': b'app_name', '2': b'im'},
{'1': b'priority_region', '2': b'cn'},
{'1': b'user_agent','2': b'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36'},
{'1': b'cookie_enabled', '2': b'true'},
{'1': b'browser_language', '2': b'zh-CN'},
{'1': b'browser_platform', '2': b'Win32'},
{'1': b'browser_name', '2': b'Mozilla'},
{'1': b'browser_version', '2': b'5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36'},
{'1': b'browser_online', '2': b'true'},
{'1': b'screen_width', '2': b'1707'},
{'1': b'screen_height', '2': b'1067'},
{'1': b'referer', '2': b''},
{'1': b'timezone_name', '2': b'Asia/Shanghai'}
],
'18': 2
}
}
message_type = {'1': {'type': 'int', 'name': ''}, '2': {'type': 'int', 'name': ''}, '3': {'type': 'int', 'name': ''}, '4': {'type': 'int', 'name': ''}, '5': {'type': 'message', 'message_typedef': {'1': {'type': 'bytes', 'name': ''}, '2': {'type': 'bytes', 'name': ''}}, 'name': ''}, '7': {'type': 'message', 'message_typedef': {'14': {'type': 'int', 'name': ''}}, 'name': ''}, '8': {'type': 'message', 'message_typedef': {'1': {'type': 'int', 'name': ''}, '2': {'type': 'int', 'name': ''}, '3': {'type': 'bytes', 'name': ''}, '4': {'type': 'bytes', 'name': ''}, '5': {'type': 'int', 'name': ''}, '6': {'type': 'int', 'name': ''}, '7': {'type': 'bytes', 'name': ''}, '8': {'type': 'message', 'message_typedef': {'100': {'type': 'message', 'message_typedef': {'1': {'type': 'bytes', 'name': ''}, '2': {'type': 'int', 'name': ''}, '3': {'type': 'int', 'name': ''}, '4': {'type': 'bytes', 'name': ''}, '5': {'type': 'message', 'message_typedef': {'1': {'type': 'bytes', 'name': ''}, '2': {'type': 'bytes', 'name': ''}}, 'name': ''}, '6': {'type': 'int', 'name': ''}, '7': {'type': 'bytes', 'name': ''}, '8': {'type': 'bytes', 'name': ''}, '14': {'type': 'int', 'name': ''}}, 'name': ''}}, 'name': ''}, '9': {'type': 'bytes', 'name': ''}, '11': {'type': 'bytes', 'name': ''}, '15': {'type': 'message', 'message_typedef': {'1': {'type': 'bytes', 'name': ''}, '2': {'type': 'bytes', 'name': ''}}, 'name': ''}, '18': {'type': 'int', 'name': ''}}, 'name': ''}}
return value, message_type

View File

@@ -3450,9 +3450,9 @@ class ChatPdd:
"""处理接收到的消息"""
try:
# 🔥 过滤机器人消息
if self.should_filter_robot_message(message_data):
self._log("🤖 检测到机器人消息,已过滤不发送给后端", "DEBUG")
return
# if self.should_filter_robot_message(message_data):
# self._log("🤖 检测到机器人消息,已过滤不发送给后端", "DEBUG")
# return
message_info = message_data.get("message", {})
if not message_info: