[patch] 新增JD图片视频类型数据发送方法编写(强化send_message方法) 新增下载视频临时处理存储逻辑
This commit is contained in:
@@ -13,6 +13,8 @@ import requests
|
||||
import json
|
||||
import time
|
||||
import threading
|
||||
import uuid
|
||||
import os
|
||||
|
||||
|
||||
# 定义持久化数据类
|
||||
@@ -154,6 +156,14 @@ class FixJdCookie:
|
||||
self.base_reconnect_delay = 1.0 # 基础重连延迟
|
||||
self.max_reconnect_delay = 60.0 # 最大重连延迟
|
||||
self.reconnect_backoff = 1.5 # 退避系数
|
||||
|
||||
# 🔥 存储认证信息,用于文件上传
|
||||
self.cookies_str = None
|
||||
self.current_aid = None
|
||||
self.current_pin_zj = None
|
||||
|
||||
# 🔥 启动时清理过期临时文件
|
||||
self._cleanup_old_temp_files(max_age_hours=24)
|
||||
|
||||
def _log(self, message, log_type="INFO"):
|
||||
"""内部日志方法"""
|
||||
@@ -313,10 +323,476 @@ class FixJdCookie:
|
||||
traceback.print_exc()
|
||||
return False
|
||||
|
||||
async def send_message(self, ws, pin, aid, pin_zj, vender_id, content):
|
||||
"""异步发送单条消息"""
|
||||
async def send_message(self, ws, pin, aid, pin_zj, vender_id, content, msg_type="text"):
|
||||
"""异步发送消息 - 支持文本/图片/视频
|
||||
|
||||
Args:
|
||||
ws: WebSocket连接
|
||||
pin: 客户pin
|
||||
aid: 账号aid
|
||||
pin_zj: 客服pin
|
||||
vender_id: 商家ID
|
||||
content: 消息内容(文本内容或URL)
|
||||
msg_type: 消息类型 (text/image/video)
|
||||
"""
|
||||
try:
|
||||
print('本地发送消息')
|
||||
# 根据消息类型调用不同的发送方法
|
||||
if msg_type == "image":
|
||||
return await self.send_image_message(ws, pin, aid, pin_zj, vender_id, content)
|
||||
elif msg_type == "video":
|
||||
return await self.send_video_message(ws, pin, aid, pin_zj, vender_id, content)
|
||||
else:
|
||||
# 文本消息(默认)
|
||||
print('本地发送文本消息')
|
||||
message = {
|
||||
"ver": "4.3",
|
||||
"type": "chat_message",
|
||||
"from": {"pin": pin_zj, "app": "im.waiter", "clientType": "comet"},
|
||||
"to": {"app": "im.customer", "pin": pin},
|
||||
"id": hashlib.md5(str(int(time.time() * 1000)).encode()).hexdigest(),
|
||||
"lang": "zh_CN",
|
||||
"aid": aid,
|
||||
"timestamp": int(time.time() * 1000),
|
||||
"readFlag": 0,
|
||||
"body": {
|
||||
"content": content,
|
||||
"translated": False,
|
||||
"param": {"cusVenderId": vender_id},
|
||||
"type": "text"
|
||||
}
|
||||
}
|
||||
await ws.send(json.dumps(message))
|
||||
logger.info(f"消息已经发送到客户端[info] {pin}: {content[:20]} ...")
|
||||
except websockets.ConnectionClosed:
|
||||
logger.error('本地发送消息失败 连接关闭')
|
||||
raise
|
||||
except Exception as e:
|
||||
# 同时这里也要及时进行raise抛出 这样比较好让系统可以看出 异常了可以抛出信息不至于后续被认为
|
||||
logger.error(f"消息发送过程中出现特殊异常异常信息为: {e}")
|
||||
raise
|
||||
|
||||
def _get_temp_directory(self) -> str:
|
||||
"""智能选择临时文件目录(优先级降级策略)
|
||||
|
||||
优先级:
|
||||
1. 环境变量 SHUIDROP_TEMP_DIR(用户自定义)
|
||||
2. 应用程序所在目录/temp(如果有写权限且不在Program Files)
|
||||
3. 系统临时目录(兜底方案)
|
||||
|
||||
Returns:
|
||||
str: 临时目录路径
|
||||
"""
|
||||
import tempfile
|
||||
import sys
|
||||
|
||||
try:
|
||||
# 优先级1:用户自定义环境变量
|
||||
custom_temp = os.getenv('SHUIDROP_TEMP_DIR')
|
||||
if custom_temp:
|
||||
custom_temp = os.path.join(custom_temp, "shuidrop_jd_temp_uploads")
|
||||
try:
|
||||
os.makedirs(custom_temp, exist_ok=True)
|
||||
# 测试写权限
|
||||
test_file = os.path.join(custom_temp, ".write_test")
|
||||
with open(test_file, 'w') as f:
|
||||
f.write("test")
|
||||
os.remove(test_file)
|
||||
self._log(f"✅ [JD路径] 使用自定义临时目录: {custom_temp}", "INFO")
|
||||
return custom_temp
|
||||
except Exception as e:
|
||||
self._log(f"⚠️ [JD路径] 自定义目录不可用: {e}", "WARNING")
|
||||
|
||||
# 优先级2:应用程序所在目录(避免占用C盘)
|
||||
try:
|
||||
# 获取可执行文件路径
|
||||
if getattr(sys, 'frozen', False):
|
||||
# 打包后
|
||||
app_dir = os.path.dirname(sys.executable)
|
||||
else:
|
||||
# 开发环境
|
||||
app_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
|
||||
# 检查是否在 Program Files(需要管理员权限)
|
||||
if 'Program Files' not in app_dir and 'Program Files (x86)' not in app_dir:
|
||||
app_temp = os.path.join(app_dir, "temp_uploads", "jd")
|
||||
os.makedirs(app_temp, exist_ok=True)
|
||||
|
||||
# 测试写权限
|
||||
test_file = os.path.join(app_temp, ".write_test")
|
||||
with open(test_file, 'w') as f:
|
||||
f.write("test")
|
||||
os.remove(test_file)
|
||||
|
||||
self._log(f"✅ [JD路径] 使用应用程序目录: {app_temp}", "INFO")
|
||||
return app_temp
|
||||
else:
|
||||
self._log(f"ℹ️ [JD路径] 应用在Program Files,跳过应用目录", "DEBUG")
|
||||
except Exception as e:
|
||||
self._log(f"⚠️ [JD路径] 应用目录不可用: {e}", "DEBUG")
|
||||
|
||||
# 优先级3:系统临时目录(兜底方案)
|
||||
system_temp = os.path.join(tempfile.gettempdir(), "shuidrop_jd_temp_uploads")
|
||||
os.makedirs(system_temp, exist_ok=True)
|
||||
self._log(f"✅ [JD路径] 使用系统临时目录: {system_temp}", "INFO")
|
||||
return system_temp
|
||||
|
||||
except Exception as e:
|
||||
# 最终兜底
|
||||
self._log(f"❌ [JD路径] 所有路径策略失败,使用默认: {e}", "ERROR")
|
||||
import tempfile
|
||||
return os.path.join(tempfile.gettempdir(), "shuidrop_jd_temp_uploads")
|
||||
|
||||
def _get_file_extension(self, url: str, default_ext: str) -> str:
|
||||
"""智能提取文件扩展名
|
||||
|
||||
Args:
|
||||
url: 文件URL
|
||||
default_ext: 默认扩展名(jpg/mp4)
|
||||
|
||||
Returns:
|
||||
str: 文件扩展名
|
||||
"""
|
||||
try:
|
||||
# 移除查询参数
|
||||
url_without_params = url.split('?')[0]
|
||||
|
||||
# 检查是否有有效的文件扩展名
|
||||
if '.' in url_without_params:
|
||||
parts = url_without_params.split('.')
|
||||
ext = parts[-1].lower()
|
||||
|
||||
# 验证扩展名是否合法(只包含字母数字)
|
||||
if ext and len(ext) <= 5 and ext.isalnum():
|
||||
# 常见图片/视频扩展名
|
||||
valid_exts = ['jpg', 'jpeg', 'png', 'gif', 'webp', 'bmp', 'mp4', 'avi', 'mov', 'wmv', 'flv', 'mkv']
|
||||
if ext in valid_exts:
|
||||
return ext
|
||||
|
||||
# 如果无法提取有效扩展名,使用默认值
|
||||
return default_ext
|
||||
|
||||
except Exception:
|
||||
return default_ext
|
||||
|
||||
def _cleanup_old_temp_files(self, max_age_hours=24):
|
||||
"""清理过期的临时文件(24小时以上)
|
||||
|
||||
Args:
|
||||
max_age_hours: 文件最大保留时间(小时)
|
||||
"""
|
||||
try:
|
||||
# 使用智能路径选择
|
||||
temp_dir = self._get_temp_directory()
|
||||
if not os.path.exists(temp_dir):
|
||||
return
|
||||
|
||||
now = time.time()
|
||||
max_age_seconds = max_age_hours * 3600
|
||||
cleaned_count = 0
|
||||
|
||||
# 遍历临时目录中的文件
|
||||
for filename in os.listdir(temp_dir):
|
||||
file_path = os.path.join(temp_dir, filename)
|
||||
|
||||
# 只处理文件,跳过目录
|
||||
if not os.path.isfile(file_path):
|
||||
continue
|
||||
|
||||
try:
|
||||
# 检查文件年龄
|
||||
file_mtime = os.path.getmtime(file_path)
|
||||
file_age = now - file_mtime
|
||||
|
||||
if file_age > max_age_seconds:
|
||||
file_size = os.path.getsize(file_path)
|
||||
os.remove(file_path)
|
||||
cleaned_count += 1
|
||||
self._log(f"🗑️ [JD清理] 删除过期文件: {filename} ({file_size} bytes, {file_age/3600:.1f}小时前)", "DEBUG")
|
||||
except Exception as e:
|
||||
self._log(f"⚠️ [JD清理] 删除文件失败 {filename}: {e}", "DEBUG")
|
||||
|
||||
if cleaned_count > 0:
|
||||
self._log(f"✅ [JD清理] 已清理 {cleaned_count} 个过期临时文件", "INFO")
|
||||
except Exception as e:
|
||||
self._log(f"⚠️ [JD清理] 临时文件清理失败: {e}", "DEBUG")
|
||||
|
||||
async def download_file(self, url: str, save_dir: str = None, max_retries: int = 3) -> str:
|
||||
"""下载外部文件到本地(带重试机制 + 智能路径选择)
|
||||
|
||||
Args:
|
||||
url: 文件URL
|
||||
save_dir: 保存目录(None时自动选择最佳路径)
|
||||
max_retries: 最大重试次数
|
||||
|
||||
Returns:
|
||||
str: 本地文件路径
|
||||
"""
|
||||
# 使用智能路径选择策略
|
||||
if save_dir is None:
|
||||
save_dir = self._get_temp_directory()
|
||||
|
||||
# 确保目录存在
|
||||
os.makedirs(save_dir, exist_ok=True)
|
||||
|
||||
# 智能提取文件扩展名
|
||||
default_ext = 'jpg'
|
||||
ext = self._get_file_extension(url, default_ext)
|
||||
|
||||
# 生成唯一文件名
|
||||
file_name = f"jd_download_{uuid.uuid4().hex[:12]}.{ext}"
|
||||
save_path = os.path.join(save_dir, file_name)
|
||||
|
||||
# 重试机制
|
||||
for attempt in range(max_retries):
|
||||
try:
|
||||
if attempt > 0:
|
||||
self._log(f"🔄 [JD下载] 第{attempt + 1}次重试下载...", "INFO")
|
||||
else:
|
||||
self._log(f"📥 [JD下载] 开始下载: {url[:100]}...", "INFO")
|
||||
|
||||
# 使用线程池下载
|
||||
loop = asyncio.get_event_loop()
|
||||
response = await loop.run_in_executor(
|
||||
None,
|
||||
lambda: requests.get(url, timeout=60, headers={
|
||||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
|
||||
})
|
||||
)
|
||||
|
||||
# 检查HTTP状态
|
||||
if response.status_code != 200:
|
||||
self._log(f"❌ [JD下载] HTTP状态码: {response.status_code}", "ERROR")
|
||||
if attempt < max_retries - 1:
|
||||
await asyncio.sleep(2)
|
||||
continue
|
||||
raise Exception(f"下载失败,HTTP状态码: {response.status_code}")
|
||||
|
||||
# 获取文件数据
|
||||
file_data = response.content
|
||||
file_size_kb = len(file_data) // 1024
|
||||
|
||||
# 检查文件大小(限制50MB)
|
||||
if file_size_kb > 51200:
|
||||
self._log(f"❌ [JD下载] 文件过大: {file_size_kb}KB,超过50MB限制", "ERROR")
|
||||
raise Exception(f"文件过大: {file_size_kb}KB")
|
||||
|
||||
# 检查文件是否为空
|
||||
if file_size_kb == 0:
|
||||
self._log(f"❌ [JD下载] 下载的文件为空", "ERROR")
|
||||
if attempt < max_retries - 1:
|
||||
await asyncio.sleep(1)
|
||||
continue
|
||||
raise Exception("下载的文件为空")
|
||||
|
||||
# 写入临时文件
|
||||
with open(save_path, 'wb') as f:
|
||||
f.write(file_data)
|
||||
|
||||
self._log(f"✅ [JD下载] 下载成功,大小: {file_size_kb}KB,文件: {file_name}", "SUCCESS")
|
||||
return save_path
|
||||
|
||||
except requests.exceptions.RequestException as e:
|
||||
self._log(f"❌ [JD下载] 网络请求失败: {e}", "ERROR")
|
||||
if attempt < max_retries - 1:
|
||||
await asyncio.sleep(2)
|
||||
continue
|
||||
raise
|
||||
except Exception as e:
|
||||
self._log(f"❌ [JD下载] 下载失败: {e}", "ERROR")
|
||||
if attempt < max_retries - 1:
|
||||
await asyncio.sleep(1)
|
||||
continue
|
||||
raise
|
||||
|
||||
# 所有重试都失败
|
||||
raise Exception(f"下载失败:已重试{max_retries}次")
|
||||
|
||||
async def upload_file_to_jd(self, file_path: str, file_type: str) -> dict:
|
||||
"""上传文件到京东服务器
|
||||
|
||||
Args:
|
||||
file_path: 本地文件路径
|
||||
file_type: 文件类型 (image/video)
|
||||
|
||||
Returns:
|
||||
dict: {path: 京东URL, width: 宽度, height: 高度}
|
||||
"""
|
||||
try:
|
||||
if not self.cookies_str or not self.current_aid or not self.current_pin_zj:
|
||||
raise Exception("缺少必要的认证信息(cookies/aid/pin)")
|
||||
|
||||
self._log(f"📤 开始上传文件到京东: {file_path}", "INFO")
|
||||
|
||||
# 读取文件内容
|
||||
with open(file_path, 'rb') as f:
|
||||
file_content = f.read()
|
||||
|
||||
# 根据文件类型选择API和MIME类型
|
||||
if file_type == "image" or any(ext in file_path.lower() for ext in ['.png', '.jpg', '.jpeg', '.gif']):
|
||||
url = "https://imio.jd.com/uploadfile/file/uploadImg.action"
|
||||
mime_type = 'image/jpeg'
|
||||
elif file_type == "video" or '.mp4' in file_path.lower():
|
||||
url = "https://imio.jd.com/uploadfile/file/uploadFile.action"
|
||||
mime_type = 'video/mp4'
|
||||
else:
|
||||
raise Exception(f"不支持的文件类型: {file_path}")
|
||||
|
||||
# 准备请求
|
||||
headers = {
|
||||
"authority": "imio.jd.com",
|
||||
"accept": "*/*",
|
||||
"accept-language": "zh-CN,zh;q=0.9",
|
||||
"cache-control": "no-cache",
|
||||
"origin": "https://dongdong.jd.com",
|
||||
"pragma": "no-cache",
|
||||
"referer": "https://dongdong.jd.com/",
|
||||
"sec-ch-ua": '"Chromium";v="134", "Not:A-Brand";v="24", "Google Chrome";v="134"',
|
||||
"sec-ch-ua-mobile": "?0",
|
||||
"sec-ch-ua-platform": '"Windows"',
|
||||
"sec-fetch-dest": "empty",
|
||||
"sec-fetch-mode": "cors",
|
||||
"sec-fetch-site": "same-site",
|
||||
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36",
|
||||
"cookie": self.cookies_str
|
||||
}
|
||||
|
||||
files = {
|
||||
'upload': (os.path.basename(file_path), file_content, mime_type)
|
||||
}
|
||||
|
||||
data = {
|
||||
'httpsEnable': 'true',
|
||||
'clientType': 'comet',
|
||||
'appId': 'im.waiter',
|
||||
'pin': self.current_pin_zj,
|
||||
'aid': self.current_aid
|
||||
}
|
||||
|
||||
# 同步请求(在异步上下文中使用 run_in_executor)
|
||||
loop = asyncio.get_event_loop()
|
||||
response = await loop.run_in_executor(
|
||||
None,
|
||||
lambda: requests.post(url, headers=headers, files=files, data=data, timeout=30)
|
||||
)
|
||||
|
||||
result = response.json()
|
||||
|
||||
if result.get('path'):
|
||||
self._log(f"✅ 文件上传成功: {result.get('path')}", "SUCCESS")
|
||||
return result
|
||||
else:
|
||||
raise Exception(f"上传失败: {result}")
|
||||
|
||||
except Exception as e:
|
||||
self._log(f"❌ 文件上传失败: {e}", "ERROR")
|
||||
raise
|
||||
|
||||
async def get_video_thumbnail(self, video_path: str) -> dict:
|
||||
"""提取视频封面并上传
|
||||
|
||||
Args:
|
||||
video_path: 视频文件路径
|
||||
|
||||
Returns:
|
||||
dict: {path: 封面URL, width: 宽度, height: 高度}
|
||||
"""
|
||||
import cv2
|
||||
|
||||
try:
|
||||
self._log(f"🎬 开始提取视频封面: {video_path}", "INFO")
|
||||
|
||||
# 提取视频第一帧
|
||||
cap = cv2.VideoCapture(video_path)
|
||||
fps = cap.get(cv2.CAP_PROP_FPS)
|
||||
frame_position = int(0 * fps) # 第0秒
|
||||
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_position)
|
||||
|
||||
ret, frame = cap.read()
|
||||
cap.release()
|
||||
|
||||
if not ret:
|
||||
raise Exception("无法读取视频帧")
|
||||
|
||||
# 编码为JPG
|
||||
success, encoded_image = cv2.imencode('.jpg', frame)
|
||||
if not success:
|
||||
raise Exception("无法编码图像")
|
||||
|
||||
thumbnail_content = encoded_image.tobytes()
|
||||
|
||||
# 上传封面(使用图片上传接口)
|
||||
headers = {
|
||||
"authority": "imio.jd.com",
|
||||
"accept": "*/*",
|
||||
"accept-language": "zh-CN,zh;q=0.9",
|
||||
"cache-control": "no-cache",
|
||||
"origin": "https://dongdong.jd.com",
|
||||
"pragma": "no-cache",
|
||||
"referer": "https://dongdong.jd.com/",
|
||||
"sec-ch-ua": '"Chromium";v="134", "Not:A-Brand";v="24", "Google Chrome";v="134"',
|
||||
"sec-ch-ua-mobile": "?0",
|
||||
"sec-ch-ua-platform": '"Windows"',
|
||||
"sec-fetch-dest": "empty",
|
||||
"sec-fetch-mode": "cors",
|
||||
"sec-fetch-site": "same-site",
|
||||
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36",
|
||||
"cookie": self.cookies_str
|
||||
}
|
||||
|
||||
url = "https://imio.jd.com/uploadfile/file/uploadImg.action"
|
||||
files = {
|
||||
'upload': ("thumbnail.jpg", thumbnail_content, 'image/jpeg')
|
||||
}
|
||||
|
||||
data = {
|
||||
'httpsEnable': 'true',
|
||||
'clientType': 'comet',
|
||||
'appId': 'im.waiter',
|
||||
'pin': self.current_pin_zj,
|
||||
'aid': self.current_aid
|
||||
}
|
||||
|
||||
# 同步请求
|
||||
loop = asyncio.get_event_loop()
|
||||
response = await loop.run_in_executor(
|
||||
None,
|
||||
lambda: requests.post(url, headers=headers, files=files, data=data, timeout=30)
|
||||
)
|
||||
|
||||
result = response.json()
|
||||
|
||||
if result.get('path'):
|
||||
self._log(f"✅ 视频封面上传成功: {result.get('path')}", "SUCCESS")
|
||||
return result
|
||||
else:
|
||||
raise Exception(f"封面上传失败: {result}")
|
||||
|
||||
except Exception as e:
|
||||
self._log(f"❌ 视频封面提取失败: {e}", "ERROR")
|
||||
raise
|
||||
|
||||
async def send_image_message(self, ws, pin: str, aid: str, pin_zj: str, vender_id: str, image_url: str):
|
||||
"""发送图片消息
|
||||
|
||||
Args:
|
||||
ws: WebSocket连接
|
||||
pin: 客户pin
|
||||
aid: 账号aid
|
||||
pin_zj: 客服pin
|
||||
vender_id: 商家ID
|
||||
image_url: 图片URL(外部URL,需要下载后上传)
|
||||
"""
|
||||
temp_file = None
|
||||
try:
|
||||
self._log(f"📷 开始发送图片消息: {image_url}", "INFO")
|
||||
|
||||
# 1. 下载图片
|
||||
temp_file = await self.download_file(image_url)
|
||||
|
||||
# 2. 上传到京东服务器
|
||||
upload_result = await self.upload_file_to_jd(temp_file, "image")
|
||||
|
||||
# 3. 发送图片消息
|
||||
message = {
|
||||
"ver": "4.3",
|
||||
"type": "chat_message",
|
||||
@@ -328,21 +804,94 @@ class FixJdCookie:
|
||||
"timestamp": int(time.time() * 1000),
|
||||
"readFlag": 0,
|
||||
"body": {
|
||||
"content": content,
|
||||
"translated": False,
|
||||
"height": upload_result.get("height", 0),
|
||||
"width": upload_result.get("width", 0),
|
||||
"url": upload_result.get("path"),
|
||||
"translated": "",
|
||||
"param": {"cusVenderId": vender_id},
|
||||
"type": "text"
|
||||
"type": "image"
|
||||
}
|
||||
}
|
||||
|
||||
await ws.send(json.dumps(message))
|
||||
logger.info(f"消息已经发送到客户端[info] {pin}: {content[:20]} ...")
|
||||
except websockets.ConnectionClosed:
|
||||
logger.error('本地发送消息失败 连接关闭')
|
||||
raise
|
||||
self._log(f"✅ 图片消息发送成功: {pin}", "SUCCESS")
|
||||
|
||||
except Exception as e:
|
||||
# 同时这里也要及时进行raise抛出 这样比较好让系统可以看出 异常了可以抛出信息不至于后续被认为
|
||||
logger.error(f"消息发送过程中出现特殊异常异常信息为: {e}")
|
||||
self._log(f"❌ 图片消息发送失败: {e}", "ERROR")
|
||||
raise
|
||||
finally:
|
||||
# 清理临时文件
|
||||
if temp_file and os.path.exists(temp_file):
|
||||
try:
|
||||
os.remove(temp_file)
|
||||
self._log(f"🗑️ 已清理临时文件: {temp_file}", "DEBUG")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
async def send_video_message(self, ws, pin: str, aid: str, pin_zj: str, vender_id: str, video_url: str):
|
||||
"""发送视频消息
|
||||
|
||||
Args:
|
||||
ws: WebSocket连接
|
||||
pin: 客户pin
|
||||
aid: 账号aid
|
||||
pin_zj: 客服pin
|
||||
vender_id: 商家ID
|
||||
video_url: 视频URL(外部URL,需要下载后上传)
|
||||
"""
|
||||
temp_video = None
|
||||
try:
|
||||
self._log(f"🎥 开始发送视频消息: {video_url}", "INFO")
|
||||
|
||||
# 1. 下载视频
|
||||
temp_video = await self.download_file(video_url)
|
||||
|
||||
# 2. 提取并上传封面
|
||||
thumbnail_result = await self.get_video_thumbnail(temp_video)
|
||||
|
||||
# 3. 上传视频
|
||||
await asyncio.sleep(2) # 等待2秒,避免请求过快
|
||||
video_result = await self.upload_file_to_jd(temp_video, "video")
|
||||
|
||||
# 4. 发送视频消息
|
||||
message = {
|
||||
"ver": "4.3",
|
||||
"type": "chat_message",
|
||||
"from": {"pin": pin_zj, "app": "im.waiter", "clientType": "comet"},
|
||||
"to": {"app": "im.customer", "pin": pin},
|
||||
"id": hashlib.md5(str(int(time.time() * 1000)).encode()).hexdigest(),
|
||||
"lang": "zh_CN",
|
||||
"aid": aid,
|
||||
"timestamp": int(time.time() * 1000),
|
||||
"readFlag": 0,
|
||||
"body": {
|
||||
"desc": "",
|
||||
"duration": 6,
|
||||
"param": {},
|
||||
"reupload": "false",
|
||||
"size": os.path.getsize(temp_video),
|
||||
"thumbHeight": thumbnail_result.get("height", 0),
|
||||
"thumbWidth": thumbnail_result.get("width", 0),
|
||||
"thumbnail": thumbnail_result.get("path"),
|
||||
"url": video_result.get("path"),
|
||||
"type": "video"
|
||||
}
|
||||
}
|
||||
|
||||
await ws.send(json.dumps(message))
|
||||
self._log(f"✅ 视频消息发送成功: {pin}", "SUCCESS")
|
||||
|
||||
except Exception as e:
|
||||
self._log(f"❌ 视频消息发送失败: {e}", "ERROR")
|
||||
raise
|
||||
finally:
|
||||
# 清理临时文件
|
||||
if temp_video and os.path.exists(temp_video):
|
||||
try:
|
||||
os.remove(temp_video)
|
||||
self._log(f"🗑️ 已清理临时视频: {temp_video}", "DEBUG")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def get_userinfo(self, response_text):
|
||||
"""获取用户 pin 并存入 pins 列表"""
|
||||
@@ -555,6 +1104,11 @@ class FixJdCookie:
|
||||
print("✅ DEBUG 进入message_monitoring方法")
|
||||
print(f"参数验证: cookies={bool(cookies_str)}, aid={aid}, pin_zj={pin_zj}")
|
||||
|
||||
# 🔥 保存认证信息,用于文件上传
|
||||
self.cookies_str = cookies_str
|
||||
self.current_aid = aid
|
||||
self.current_pin_zj = pin_zj
|
||||
|
||||
# 连接后端AI服务 - 使用店铺ID或venderId
|
||||
store_id = str(store.get('id', '')) or str(vender_id)
|
||||
self._log(f"🔗 尝试连接后端服务,店铺ID: {store_id}", "DEBUG")
|
||||
@@ -603,7 +1157,8 @@ class FixJdCookie:
|
||||
aid=aid,
|
||||
pin_zj=pin_zj,
|
||||
platform="京东",
|
||||
loop=loop
|
||||
loop=loop,
|
||||
cookies_str=cookies_str # 🔥 传递cookies用于文件上传
|
||||
)
|
||||
|
||||
await self.waiter_status_switch(ws=ws, aid=aid, pin_zj=pin_zj)
|
||||
@@ -614,9 +1169,9 @@ class FixJdCookie:
|
||||
try:
|
||||
while not stop_event.is_set(): # 检查是否收到中止信号
|
||||
try:
|
||||
print(f"[JD]等待监听消息-{datetime.now()}")
|
||||
print(f"等待监听消息-{datetime.now()}")
|
||||
response = await asyncio.wait_for(ws.recv(), timeout=1)
|
||||
print(f"[JD]原始消息类型:{type(response)}, 消息体为: {response}")
|
||||
print(f"原始消息类型:{type(response)}, 消息体为: {response}")
|
||||
|
||||
# 🔧 修复:检测被踢下线消息
|
||||
json_resp = json.loads(response) if isinstance(response, (str, bytes)) else response
|
||||
@@ -654,10 +1209,10 @@ class FixJdCookie:
|
||||
|
||||
await self.process_incoming_message(response, ws, aid, pin_zj, vender_id, store)
|
||||
|
||||
# print(json_resp) 暂去解析后消息结构
|
||||
# print(json_resp)
|
||||
|
||||
ver = json_resp.get("ver")
|
||||
print(f"[jd]信息编码版本号:{ver}")
|
||||
print(f"版本{ver}")
|
||||
except asyncio.TimeoutError:
|
||||
continue
|
||||
except websockets.ConnectionClosed:
|
||||
|
||||
@@ -729,7 +729,8 @@ class BackendClient:
|
||||
platform_type = self._get_platform_by_store_id(store_id)
|
||||
|
||||
if platform_type == "京东":
|
||||
self._forward_to_jd(store_id, recv_pin, content)
|
||||
# 🔥 传递msg_type参数,支持图片/视频等类型
|
||||
self._forward_to_jd(store_id, recv_pin, content, msg_type)
|
||||
elif platform_type == "抖音":
|
||||
# 传递msg_type参数,支持图片/视频等类型
|
||||
self._forward_to_douyin(store_id, recv_pin, content, msg_type)
|
||||
@@ -761,8 +762,15 @@ class BackendClient:
|
||||
print(f"获取平台类型失败: {e}")
|
||||
return ""
|
||||
|
||||
def _forward_to_jd(self, store_id: str, recv_pin: str, content: str):
|
||||
"""转发消息到京东平台"""
|
||||
def _forward_to_jd(self, store_id: str, recv_pin: str, content: str, msg_type: str = "text"):
|
||||
"""转发消息到京东平台(支持文本/图片/视频)
|
||||
|
||||
Args:
|
||||
store_id: 店铺ID
|
||||
recv_pin: 接收者pin
|
||||
content: 消息内容(文本内容或URL)
|
||||
msg_type: 消息类型(text/image/video)
|
||||
"""
|
||||
try:
|
||||
from Utils.JD.JdUtils import WebsocketManager as JDWSManager
|
||||
jd_mgr = JDWSManager()
|
||||
@@ -778,49 +786,52 @@ class BackendClient:
|
||||
pin_zj = platform_info.get('pin_zj')
|
||||
vender_id = platform_info.get('vender_id')
|
||||
loop = platform_info.get('loop')
|
||||
cookies_str = platform_info.get('cookies_str') # 🔥 获取cookies用于文件上传
|
||||
|
||||
print(
|
||||
f"[JD Forward] shop_key={shop_key} has_ws={bool(ws)} aid={aid} pin_zj={pin_zj} vender_id={vender_id} has_loop={bool(loop)} recv_pin={recv_pin}")
|
||||
f"[JD Forward] shop_key={shop_key} has_ws={bool(ws)} aid={aid} pin_zj={pin_zj} vender_id={vender_id} has_loop={bool(loop)} has_cookies={bool(cookies_str)} recv_pin={recv_pin} msg_type={msg_type}")
|
||||
|
||||
if ws and aid and pin_zj and vender_id and loop and content:
|
||||
# 🔥 获取 FixJdCookie 实例,使用其 send_message 方法(支持多媒体)
|
||||
async def _send():
|
||||
import hashlib as _hashlib
|
||||
import time as _time
|
||||
import json as _json
|
||||
msg = {
|
||||
"ver": "4.3",
|
||||
"type": "chat_message",
|
||||
"from": {"pin": pin_zj, "app": "im.waiter", "clientType": "comet"},
|
||||
"to": {"app": "im.customer", "pin": recv_pin},
|
||||
"id": _hashlib.md5(str(int(_time.time() * 1000)).encode()).hexdigest(),
|
||||
"lang": "zh_CN",
|
||||
"aid": aid,
|
||||
"timestamp": int(_time.time() * 1000),
|
||||
"readFlag": 0,
|
||||
"body": {
|
||||
"content": content,
|
||||
"translated": False,
|
||||
"param": {"cusVenderId": vender_id},
|
||||
"type": "text"
|
||||
}
|
||||
}
|
||||
await ws.send(_json.dumps(msg))
|
||||
from Utils.JD.JdUtils import FixJdCookie
|
||||
# 创建临时实例用于发送
|
||||
jd_instance = FixJdCookie()
|
||||
# 🔥 设置认证信息(用于图片/视频上传)
|
||||
jd_instance.cookies_str = cookies_str
|
||||
jd_instance.current_aid = aid
|
||||
jd_instance.current_pin_zj = pin_zj
|
||||
# 调用支持多媒体的 send_message 方法
|
||||
await jd_instance.send_message(
|
||||
ws=ws,
|
||||
pin=recv_pin,
|
||||
aid=aid,
|
||||
pin_zj=pin_zj,
|
||||
vender_id=vender_id,
|
||||
content=content,
|
||||
msg_type=msg_type
|
||||
)
|
||||
|
||||
import asyncio as _asyncio
|
||||
_future = _asyncio.run_coroutine_threadsafe(_send(), loop)
|
||||
try:
|
||||
_future.result(timeout=2)
|
||||
print(f"[JD Forward] 已转发到平台: pin={recv_pin}, content_len={len(content)}")
|
||||
_future.result(timeout=60) # 图片/视频需要更长时间
|
||||
print(f"[JD Forward] 已转发到平台: pin={recv_pin}, type={msg_type}, content_len={len(content)}")
|
||||
except Exception as fe:
|
||||
print(f"[JD Forward] 转发提交失败: {fe}")
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
else:
|
||||
print("[JD Forward] 条件不足,未转发:",
|
||||
{
|
||||
'has_ws': bool(ws), 'has_aid': bool(aid), 'has_pin_zj': bool(pin_zj),
|
||||
'has_vender_id': bool(vender_id), 'has_loop': bool(loop), 'has_content': bool(content)
|
||||
'has_vender_id': bool(vender_id), 'has_loop': bool(loop), 'has_cookies': bool(cookies_str),
|
||||
'has_content': bool(content)
|
||||
})
|
||||
except Exception as e:
|
||||
print(f"[JD Forward] 转发失败: {e}")
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
|
||||
def _forward_to_douyin(self, store_id: str, recv_pin: str, content: str, msg_type: str = "text"):
|
||||
"""转发消息到抖音平台
|
||||
|
||||
Reference in New Issue
Block a user