1953 lines
82 KiB
Python
1953 lines
82 KiB
Python
# -*- coding: utf-8 -*-
|
||
import json
|
||
import traceback
|
||
import uuid
|
||
import requests
|
||
import time
|
||
import asyncio
|
||
import websockets
|
||
import blackboxprotobuf
|
||
from websocket import WebSocketApp
|
||
import threading
|
||
from datetime import datetime
|
||
from dataclasses import dataclass, asdict
|
||
from urllib.parse import urlencode
|
||
|
||
import config
|
||
# 导入 message_arg 中的方法
|
||
from Utils.Dy.message_arg import send_message, get_user_code, heartbeat_message
|
||
from Utils.message_models import PlatformMessage
|
||
|
||
|
||
# 抖音WebSocket管理器类
|
||
class DouYinWebsocketManager:
|
||
_instance = None
|
||
_lock = threading.Lock()
|
||
|
||
def __new__(cls):
|
||
if cls._instance is None:
|
||
with cls._lock:
|
||
if cls._instance is None:
|
||
cls._instance = super().__new__(cls)
|
||
cls._instance._store = {}
|
||
cls._instance._lock = threading.RLock()
|
||
return cls._instance
|
||
|
||
def on_connect(self, shop_key, douyin_bot, **kwargs):
|
||
"""存储抖音机器人连接信息"""
|
||
with self._lock:
|
||
entry = self._store.setdefault(shop_key, {
|
||
'platform': None,
|
||
'customers': [],
|
||
'user_assignments': {}
|
||
})
|
||
entry['platform'] = {
|
||
'douyin_bot': douyin_bot,
|
||
'message_handler': douyin_bot.message_handler if douyin_bot else None,
|
||
'last_heartbeat': datetime.now(),
|
||
**kwargs
|
||
}
|
||
return entry
|
||
|
||
def get_connection(self, shop_key):
|
||
"""获取抖音连接信息"""
|
||
with self._lock:
|
||
return self._store.get(shop_key)
|
||
|
||
def remove_connection(self, shop_key):
|
||
"""移除抖音连接"""
|
||
with self._lock:
|
||
if shop_key in self._store:
|
||
del self._store[shop_key]
|
||
|
||
def get_all_connections(self):
|
||
"""获取所有抖音连接"""
|
||
with self._lock:
|
||
return dict(self._store)
|
||
|
||
|
||
@dataclass
|
||
class StaffInfo:
|
||
"""客服信息结构体"""
|
||
staff_id: str
|
||
name: str
|
||
status: str
|
||
department: str = None
|
||
online: bool = True
|
||
|
||
def to_dict(self):
|
||
return {
|
||
"staff_id": self.staff_id,
|
||
"name": self.name,
|
||
"status": self.status,
|
||
"department": self.department or "",
|
||
"online": self.online
|
||
}
|
||
|
||
|
||
class DouYinBackendService:
|
||
"""抖音后端服务调用类(新版:使用统一后端连接 BackendClient)"""
|
||
|
||
def __init__(self, *args, **kwargs):
|
||
self.current_store_id = ""
|
||
|
||
async def connect(self, store_id, *args, **kwargs):
|
||
"""连接后端服务(使用统一连接,无需独立连接)"""
|
||
try:
|
||
self.current_store_id = str(store_id or "")
|
||
except Exception:
|
||
self.current_store_id = ""
|
||
return True
|
||
|
||
async def send_message_to_backend(self, platform_message):
|
||
"""改为通过单后端连接发送,需携带store_id"""
|
||
try:
|
||
from WebSocket.backend_singleton import get_backend_client
|
||
backend = get_backend_client()
|
||
if not backend:
|
||
return None
|
||
|
||
# 从platform_message中构造统一上行结构,并附加store_id
|
||
body = platform_message.get('body', {}) if isinstance(platform_message, dict) else {}
|
||
sender_id = platform_message.get('sender', {}).get('id', '') if isinstance(platform_message, dict) else ''
|
||
|
||
# 优先取消息内的store_id,其次取body内,再次退回当前会话store_id
|
||
store_id = (platform_message.get('store_id')
|
||
or body.get('store_id')
|
||
or self.current_store_id
|
||
or '')
|
||
|
||
msg_type = platform_message.get('msg_type', 'text')
|
||
content_for_backend = platform_message.get('content', '')
|
||
|
||
pin_image = platform_message.get('pin_image')
|
||
if not pin_image:
|
||
pin_image = ""
|
||
else:
|
||
pass
|
||
# 构造标准消息格式
|
||
msg = {
|
||
'type': 'message',
|
||
'content': content_for_backend,
|
||
'pin_image': pin_image,
|
||
'msg_type': msg_type,
|
||
'sender': {'id': sender_id},
|
||
'store_id': store_id
|
||
}
|
||
|
||
backend.send_message(msg)
|
||
return None
|
||
except Exception:
|
||
return None
|
||
|
||
async def close(self):
|
||
"""关闭连接(统一连接模式下无需特殊处理)"""
|
||
pass
|
||
|
||
|
||
class DouYinMessageHandler:
|
||
"""抖音消息处理器"""
|
||
|
||
def __init__(self, cookie: dict, ai_service: DouYinBackendService,
|
||
store_id: str):
|
||
import uuid
|
||
self.instance_id = str(uuid.uuid4())[:8] # 添加实例ID用于调试
|
||
self.cookie = cookie
|
||
self.ai_service = ai_service
|
||
self.store_id = store_id
|
||
self.user_tokens = {}
|
||
self.config = None
|
||
self.wss_url = None
|
||
self.ws = None
|
||
self.is_running = True
|
||
|
||
self._loop = None # 添加事件循环
|
||
# asyncio.set_event_loop(self._loop) # 设置事件循环
|
||
self._thread = None # 添加事件循环线程
|
||
|
||
# 设置 AI 服务器的消息处理器引用(统一连接模式下无需设置)
|
||
# self.ai_service.set_message_handler(self)
|
||
|
||
# 添加视频解析相关的headers
|
||
self.video_headers = {
|
||
"authority": "pigeon.jinritemai.com",
|
||
"accept": "application/json, text/plain, */*",
|
||
"accept-language": "zh-CN,zh;q=0.9",
|
||
"cache-control": "no-cache",
|
||
"origin": "https://im.jinritemai.com",
|
||
"pragma": "no-cache",
|
||
"referer": "https://im.jinritemai.com/",
|
||
"sec-ch-ua": "\"Not_A Brand\";v=\"8\", \"Chromium\";v=\"120\", \"Google Chrome\";v=\"120\"",
|
||
"sec-ch-ua-mobile": "?0",
|
||
"sec-ch-ua-platform": "\"Windows\"",
|
||
"sec-fetch-dest": "empty",
|
||
"sec-fetch-mode": "cors",
|
||
"sec-fetch-site": "same-site",
|
||
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
|
||
"x-secsdk-csrf-token": "0001000000017e890e18651b2ef5f6d36d0485a64cae6b0bfc36d69e27fdc20fe7d423670eba1861a5bcb5baaf40,a25cfb6098b498c33ee5f0a5dcafe47b"
|
||
}
|
||
|
||
# 打印实例创建信息
|
||
print(f"[DY Handler] 创建实例 {self.instance_id} for store {store_id}")
|
||
|
||
def get_casl(self):
|
||
"""获取可分配客服列表"""
|
||
headers = {
|
||
"authority": "pigeon.jinritemai.com",
|
||
"accept": "application/json, text/plain, */*",
|
||
"accept-language": "zh-CN,zh;q=0.9",
|
||
"cache-control": "no-cache",
|
||
"origin": "https://im.jinritemai.com",
|
||
"pragma": "no-cache",
|
||
"referer": "https://im.jinritemai.com/",
|
||
"sec-ch-ua": "\"Not_A Brand\";v=\"8\", \"Chromium\";v=\"120\", \"Google Chrome\";v=\"120\"",
|
||
"sec-ch-ua-mobile": "?0",
|
||
"sec-ch-ua-platform": "\"Windows\"",
|
||
"sec-fetch-dest": "empty",
|
||
"sec-fetch-mode": "cors",
|
||
"sec-fetch-site": "same-site",
|
||
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
|
||
"x-secsdk-csrf-token": "000100000001fbdad828f64ea30cad11ae4188140964c4e321a7f711791309da962ec586d0471861a72a472781fb,a25cfb6098b498c33ee5f0a5dcafe47b"
|
||
}
|
||
url = "https://pigeon.jinritemai.com/backstage/getCanAssignStaffList"
|
||
params = {
|
||
"biz_type": "4",
|
||
"PIGEON_BIZ_TYPE": "2",
|
||
"_ts": int(time.time() * 1000),
|
||
"_pms": "1",
|
||
"FUSION": "true",
|
||
"verifyFp": "",
|
||
"_v": "1.0.1.3585"
|
||
}
|
||
try:
|
||
response = requests.get(url, headers=headers, cookies=self.cookie, params=params).json()
|
||
if response.get('code') == 0:
|
||
return response.get('data', [])
|
||
return None
|
||
except Exception as e:
|
||
self._log(f"❌ 获取客服列表失败: {e}", "ERROR")
|
||
return None
|
||
|
||
def transfer_conversation(self, receiver_id, shop_id, staff_id):
|
||
"""转接对话"""
|
||
headers = {
|
||
"authority": "pigeon.jinritemai.com",
|
||
"accept": "application/json, text/plain, */*",
|
||
"accept-language": "zh-CN,zh;q=0.9",
|
||
"cache-control": "no-cache",
|
||
"origin": "https://im.jinritemai.com",
|
||
"pragma": "no-cache",
|
||
"referer": "https://im.jinritemai.com/",
|
||
"sec-ch-ua": "\"Not_A Brand\";v=\"8\", \"Chromium\";v=\"120\", \"Google Chrome\";v=\"120\"",
|
||
"sec-ch-ua-mobile": "?0",
|
||
"sec-ch-ua-platform": "\"Windows\"",
|
||
"sec-fetch-dest": "empty",
|
||
"sec-fetch-mode": "cors",
|
||
"sec-fetch-site": "same-site",
|
||
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
|
||
"x-secsdk-csrf-token": "000100000001fbdad828f64ea30cad11ae4188140964c4e321a7f711791309da962ec586d0471861a72a472781fb,a25cfb6098b498c33ee5f0a5dcafe47b"
|
||
}
|
||
url = "https://pigeon.jinritemai.com/chat/api/backstage/conversation/transfer_conversation?PIGEON_BIZ_TYPE=2"
|
||
params = {
|
||
"bizConversationId": f"{receiver_id}:{shop_id}::2:1:pigeon",
|
||
"toCid": f"{staff_id}",
|
||
"extParams": "{}"
|
||
}
|
||
try:
|
||
response = requests.post(url, headers=headers, cookies=self.cookie, json=params)
|
||
if response.status_code == 200:
|
||
result = response.json()
|
||
if result.get('code') == 0:
|
||
self._log(f"✅ 成功转接对话到客服 {staff_id}", "SUCCESS")
|
||
return True
|
||
self._log(f"❌ 转接对话失败: {response.text}", "ERROR")
|
||
return False
|
||
except Exception as e:
|
||
self._log(f"❌ 转接对话异常: {e}", "ERROR")
|
||
return False
|
||
|
||
# 新增: 发送客服列表消息到后端
|
||
async def send_staff_list_to_backend(self):
|
||
"""发送客服列表到后端"""
|
||
try:
|
||
# 获取客服列表
|
||
staff_list = self.get_casl()
|
||
if not staff_list:
|
||
self._log("⚠️ 获取客服列表失败", "WARNING")
|
||
return False
|
||
|
||
# 转换客服数据格式
|
||
staff_infos = []
|
||
for staff in staff_list:
|
||
staff_info = StaffInfo(
|
||
staff_id=str(staff.get('staffId', '')),
|
||
name=staff.get('staffName', ''),
|
||
status=staff.get('status', 0),
|
||
department=staff.get('department', ''),
|
||
online=staff.get('online', True)
|
||
)
|
||
staff_infos.append(staff_info.to_dict())
|
||
|
||
# 创建消息模板
|
||
message_template = PlatformMessage(
|
||
type="staff_list",
|
||
content="客服列表更新",
|
||
data={
|
||
"staff_list": staff_infos,
|
||
"total_count": len(staff_infos)
|
||
},
|
||
store_id=self.store_id
|
||
)
|
||
|
||
# 发送到后端
|
||
await self.ai_service.send_message_to_backend(message_template.to_dict())
|
||
self._log(f"发送客服列表消息的结构体为: {message_template.to_json()}")
|
||
self._log(f"✅ 成功发送客服列表到后端,共 {len(staff_infos)} 个客服", "SUCCESS")
|
||
return True
|
||
|
||
except Exception as e:
|
||
self._log(f"❌ 发送客服列表到后端异常: {e}", "ERROR")
|
||
return False
|
||
|
||
def start_event_loop(self):
|
||
"""启动事件循环线程"""
|
||
if self._loop is not None and not self._loop.is_closed():
|
||
self._loop.close()
|
||
|
||
self._loop = asyncio.new_event_loop()
|
||
|
||
def run_loop():
|
||
asyncio.set_event_loop(self._loop)
|
||
try:
|
||
self._loop.run_forever()
|
||
except Exception as e:
|
||
self._log(f"事件循环异常: {e}", "ERROR")
|
||
finally:
|
||
# 清理资源
|
||
tasks = asyncio.all_tasks(self._loop)
|
||
for task in tasks:
|
||
task.cancel()
|
||
self._loop.run_until_complete(asyncio.gather(*tasks, return_exceptions=True))
|
||
self._loop.close()
|
||
|
||
self._thread = threading.Thread(target=run_loop, daemon=True)
|
||
self._thread.start()
|
||
|
||
def _log(self, message, level="INFO"):
|
||
"""日志记录"""
|
||
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
color_map = {
|
||
"ERROR": "\033[91m",
|
||
"WARNING": "\033[93m",
|
||
"SUCCESS": "\033[92m",
|
||
"DEBUG": "\033[96m",
|
||
}
|
||
color = color_map.get(level, "")
|
||
reset = "\033[0m"
|
||
print(f"{color}[{timestamp}] [{level}] {message}{reset}")
|
||
|
||
def get_config(self):
|
||
"""获取配置信息"""
|
||
headers = {
|
||
'accept': 'application/json, text/plain, */*',
|
||
'accept-language': 'zh-CN,zh;q=0.9',
|
||
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36',
|
||
}
|
||
params = [
|
||
('PIGEON_BIZ_TYPE', '2'),
|
||
('biz_type', '4'),
|
||
('_ts', int(time.time() * 1000)),
|
||
('_pms', '1'),
|
||
('FUSION', 'true'),
|
||
]
|
||
|
||
try:
|
||
self._log("正在请求配置信息...", "DEBUG")
|
||
response = requests.get(
|
||
'https://pigeon.jinritemai.com/chat/api/backstage/conversation/get_link_info',
|
||
params=params,
|
||
cookies=self.cookie,
|
||
headers=headers,
|
||
timeout=30
|
||
)
|
||
|
||
self._log(f"响应状态码: {response.status_code}", "DEBUG")
|
||
self._log(f"响应内容: {response.text}", "DEBUG")
|
||
|
||
if response.status_code != 200:
|
||
raise Exception(f"HTTP请求失败: {response.status_code}")
|
||
|
||
data = response.json()
|
||
self._log(f"解析后的JSON数据: {data}", "DEBUG")
|
||
|
||
if data.get('code') != 0:
|
||
error_msg = data.get('message', '未知错误')
|
||
raise Exception(f"获取配置失败: {error_msg}")
|
||
|
||
if 'data' not in data:
|
||
raise Exception("响应数据中缺少data字段")
|
||
|
||
config_data = data['data']
|
||
required_fields = ['token', 'appId', 'frontierConfig', 'websocketUrl', 'pigeon_sign']
|
||
for field in required_fields:
|
||
if field not in config_data:
|
||
raise Exception(f"配置数据中缺少必要字段: {field}")
|
||
|
||
params = {
|
||
'token': config_data['token'],
|
||
'aid': config_data['appId'],
|
||
'fpid': str(config_data['frontierConfig']['fpId']),
|
||
'device_id': '1216524360102748',
|
||
'access_key': 'd4ececcc8a27b1fb63389380b8007fb0',
|
||
'device_platform': 'web',
|
||
'version_code': '10000',
|
||
'pigeon_source': 'web',
|
||
'PIGEON_BIZ_TYPE': '2',
|
||
'pigeon_sign': config_data['pigeon_sign']
|
||
}
|
||
|
||
websocket_url = f"{config_data['websocketUrl']}?{urlencode(params)}"
|
||
self._log(f"生成的WebSocket URL: {websocket_url}", "DEBUG")
|
||
|
||
return data, websocket_url
|
||
|
||
except requests.exceptions.RequestException as e:
|
||
self._log(f"请求异常: {e}", "ERROR")
|
||
raise Exception(f"网络请求失败: {e}")
|
||
except json.JSONDecodeError as e:
|
||
self._log(f"JSON解析失败: {e}", "ERROR")
|
||
raise Exception(f"响应解析失败: {e}")
|
||
except Exception as e:
|
||
self._log(f"配置获取失败: {e}", "ERROR")
|
||
raise
|
||
|
||
def on_error(self, ws, error):
|
||
"""WebSocket错误处理"""
|
||
self._log(f"❌ WebSocket错误: {error}", "ERROR")
|
||
self.is_running = False
|
||
|
||
def on_close(self, ws, close_status_code, close_msg):
|
||
"""WebSocket关闭处理"""
|
||
self._log(f"🔌 连接关闭: {close_status_code}, {close_msg}", "WARNING")
|
||
self.is_running = False
|
||
|
||
def heartbeat_wss(self):
|
||
"""心跳线程"""
|
||
while self.is_running:
|
||
try:
|
||
if self.ws and hasattr(self.ws, 'sock') and self.ws.sock and self.ws.sock.connected:
|
||
self.send_heartbeat()
|
||
time.sleep(3)
|
||
except Exception as e:
|
||
self._log(f"❌ 心跳发送失败: {e}", "ERROR")
|
||
time.sleep(5)
|
||
|
||
def send_heartbeat(self):
|
||
"""发送心跳包 - 使用 message_arg 中的方法"""
|
||
try:
|
||
# 使用 message_arg 中的 heartbeat_message 方法
|
||
value, message_type = heartbeat_message(
|
||
pigeon_sign=self.config["data"]["pigeon_sign"],
|
||
token=self.config["data"]["token"],
|
||
session_did=self.cookie["PIGEON_CID"]
|
||
)
|
||
form_data = blackboxprotobuf.encode_message(value=value, message_type=message_type)
|
||
self.ws.send_bytes(form_data)
|
||
self._log("💓 发送心跳包", "DEBUG")
|
||
except Exception as e:
|
||
self._log(f"❌ 发送心跳包失败: {e}", "ERROR")
|
||
|
||
def decode_binary_data(self, obj):
|
||
"""
|
||
递归解码字典中的二进制数据
|
||
"""
|
||
if isinstance(obj, dict):
|
||
# 如果是字典,递归处理每个值
|
||
return {k: self.decode_binary_data(v) for k, v in obj.items()}
|
||
elif isinstance(obj, list):
|
||
# 如果是列表,递归处理每个元素
|
||
return [self.decode_binary_data(item) for item in obj]
|
||
elif isinstance(obj, tuple):
|
||
# 如果是元组,递归处理每个元素
|
||
return tuple(self.decode_binary_data(item) for item in obj)
|
||
elif isinstance(obj, bytes):
|
||
# 如果是二进制数据,尝试解码
|
||
try:
|
||
return obj.decode('utf-8')
|
||
except UnicodeDecodeError:
|
||
# 如果UTF-8解码失败,可以尝试其他编码或返回原始数据
|
||
try:
|
||
return obj.decode('latin-1')
|
||
except UnicodeDecodeError:
|
||
return obj # 或者返回原始二进制数据
|
||
else:
|
||
# 其他类型直接返回
|
||
return obj
|
||
|
||
def on_open(self, ws):
|
||
"""WebSocket连接打开"""
|
||
self._log(f"✅ WebSocket连接已打开", "SUCCESS")
|
||
threading.Thread(target=self.heartbeat_wss, daemon=True).start()
|
||
|
||
def code_parse(self, message: dict):
|
||
try:
|
||
user = (message.get("8").get("6").get("610").get("1").get("1")).decode()
|
||
token = (message.get("8").get("6").get("610").get("1").get("4")).decode()
|
||
except:
|
||
user = None
|
||
token = None
|
||
return user, token
|
||
|
||
def _init_user_info(self, receiver_id, token=None):
|
||
"""初始化用户信息"""
|
||
if receiver_id not in self.user_tokens:
|
||
self.user_tokens[receiver_id] = {
|
||
"token": token,
|
||
"last_sent": 0,
|
||
"session_initialized": False,
|
||
"talk_id": 7543146114111898922, # 添加默认值
|
||
"p_id": 7542727339747116329, # 添加默认值
|
||
"pending_messages": []
|
||
}
|
||
elif token:
|
||
self.user_tokens[receiver_id]["token"] = token
|
||
self._log(f"✅ 更新用户 {receiver_id} 的Token", "DEBUG")
|
||
|
||
def on_message(self, ws, content):
|
||
"""处理收到的消息"""
|
||
try:
|
||
message = blackboxprotobuf.decode_message(content)[0]
|
||
|
||
# 解析二进制数据
|
||
decoded_message_readable = self.decode_binary_data(message)
|
||
print(f"\n📨 收到消息: {decoded_message_readable}")
|
||
|
||
user, token = self.code_parse(message=message)
|
||
if user and token:
|
||
receiver_id = int(user.split(":")[0])
|
||
self._init_user_info(receiver_id, token)
|
||
|
||
print(f"✅ 获取到用户Token: 用户ID={receiver_id}")
|
||
|
||
# 获取消息类型
|
||
msg_type = message.get('1')
|
||
print(f"📋 消息类型: {msg_type}")
|
||
|
||
# 获取内部消息结构
|
||
inner_msg = message.get('8', {})
|
||
|
||
# 处理inner_msg为bytes的情况
|
||
if isinstance(inner_msg, bytes):
|
||
try:
|
||
inner_msg = blackboxprotobuf.decode_message(inner_msg)[0]
|
||
print(f"🔄 解析内部消息字节成功")
|
||
except Exception as e:
|
||
print(f"⚠️ 无法解析内部消息字节: {e}")
|
||
return
|
||
|
||
if not inner_msg or not isinstance(inner_msg, dict):
|
||
print("⚠️ 消息结构不完整")
|
||
return
|
||
|
||
# 获取内部消息类型
|
||
inner_type = inner_msg.get('1')
|
||
inner_data = inner_msg.get('6', {})
|
||
|
||
# 首先检查是否是心跳响应
|
||
if "5" in message:
|
||
kv_pairs = message["5"]
|
||
if isinstance(kv_pairs, list):
|
||
for pair in kv_pairs:
|
||
if isinstance(pair, dict) and pair.get("1") == b"code" and pair.get("2") == b"0":
|
||
self._log("💓 心跳响应正常", "DEBUG")
|
||
return
|
||
print(f"📋 内部消息类型: {inner_type}")
|
||
print(f"📋 内部消息数据键: {list(inner_data.keys()) if isinstance(inner_data, dict) else type(inner_data)}")
|
||
|
||
# 处理不同类型的消息
|
||
if inner_type == 610: # Token消息
|
||
print("🔑 处理Token消息")
|
||
self._handle_token_message(inner_data)
|
||
|
||
elif inner_type == 500: # 聊天消息
|
||
print("💬 处理聊天消息")
|
||
self._handle_chat_message(inner_data, message)
|
||
|
||
elif inner_type == 200: # 心跳消息
|
||
print("💓 收到心跳消息")
|
||
|
||
else:
|
||
print(f"❓ 未知内部消息类型: {inner_type}")
|
||
|
||
except Exception as e:
|
||
print(f"❌ 解析消息时出错: {e}")
|
||
traceback.print_exc()
|
||
|
||
async def _process_pending_message(self, sender_id, message_content):
|
||
"""处理待发送的消息"""
|
||
try:
|
||
# 获取用户信息
|
||
user_info = self.user_tokens[sender_id]
|
||
talk_id = user_info.get("talk_id", 0)
|
||
p_id = user_info.get("p_id", 0)
|
||
|
||
# 准备发送者和接收者信息
|
||
sender_info = {
|
||
"id": str(sender_id),
|
||
"name": f"用户_{sender_id}",
|
||
"is_customer": True
|
||
}
|
||
receiver_info = {
|
||
"id": self.store_id,
|
||
"name": "店铺客服",
|
||
"is_merchant": True
|
||
}
|
||
|
||
# 创建消息模板
|
||
message_template = PlatformMessage(
|
||
type="message",
|
||
content=message_content,
|
||
msg_type="text",
|
||
sender={"id": sender_info.get("id", "")},
|
||
store_id=self.store_id
|
||
)
|
||
|
||
# 发送消息到后端(使用统一连接)
|
||
await self.ai_service.send_message_to_backend(message_template.to_dict())
|
||
self._log(f"✅ 待发送消息已发送到后端", "INFO")
|
||
|
||
except Exception as e:
|
||
self._log(f"❌ 处理待发送消息失败: {e}", "ERROR")
|
||
|
||
def _handle_token_message(self, message):
|
||
"""处理Token消息"""
|
||
try:
|
||
# 直接从消息中获取token数据
|
||
token_data = message.get('610', {}).get('1', {})
|
||
if not token_data:
|
||
self._log("❌ 未找到token数据", "ERROR")
|
||
return
|
||
|
||
user_bytes = token_data.get('1')
|
||
token_bytes = token_data.get('4')
|
||
|
||
if not (isinstance(user_bytes, bytes) and isinstance(token_bytes, bytes)):
|
||
self._log("❌ token数据格式错误", "ERROR")
|
||
return
|
||
|
||
# 解码用户信息和token
|
||
user = user_bytes.decode('utf-8')
|
||
token = token_bytes.decode('utf-8')
|
||
|
||
# 解析用户ID
|
||
try:
|
||
receiver_id = int(user.split(":")[0])
|
||
except (ValueError, IndexError):
|
||
self._log(f"❌ 无法解析用户ID: {user}", "ERROR")
|
||
return
|
||
|
||
self._log(f"✅ 成功解析Token - 用户: {receiver_id}", "SUCCESS")
|
||
|
||
# 确保用户信息存在并保存token
|
||
if receiver_id not in self.user_tokens:
|
||
self._log(f"🆕 创建用户信息: {receiver_id}", "DEBUG")
|
||
self.user_tokens[receiver_id] = {
|
||
"token": None,
|
||
"last_sent": 0,
|
||
"session_initialized": False,
|
||
"talk_id": 0,
|
||
"p_id": 0,
|
||
"pending_messages": []
|
||
}
|
||
|
||
# 保存token
|
||
self.user_tokens[receiver_id]["token"] = token
|
||
self._log(f"✅ 已保存用户 {receiver_id} 的Token", "DEBUG")
|
||
|
||
# 处理待发送的消息
|
||
if "pending_messages" in self.user_tokens[receiver_id]:
|
||
pending_msgs = self.user_tokens[receiver_id]["pending_messages"]
|
||
if pending_msgs:
|
||
self._log(f"📤 处理 {len(pending_msgs)} 条待发送消息", "INFO")
|
||
for msg in pending_msgs:
|
||
asyncio.run_coroutine_threadsafe(
|
||
self._process_pending_message(receiver_id, msg),
|
||
self._loop
|
||
)
|
||
# 清空待发送消息列表
|
||
self.user_tokens[receiver_id]["pending_messages"] = []
|
||
|
||
except Exception as e:
|
||
self._log(f"❌ 处理Token消息失败: {e}", "ERROR")
|
||
self._log(f"❌ 错误详情: {traceback.format_exc()}", "DEBUG")
|
||
|
||
def parse_video(self, vid):
|
||
"""解析视频获取播放地址"""
|
||
try:
|
||
self._log(f"🎥 开始解析视频,VID: {vid}", "INFO")
|
||
|
||
url = "https://pigeon.jinritemai.com/backstage/video/getPlayToken"
|
||
params = {
|
||
"vid": vid,
|
||
"_pms": "1"
|
||
}
|
||
|
||
response = requests.get(
|
||
url,
|
||
headers=self.video_headers,
|
||
cookies=self.cookie,
|
||
params=params,
|
||
timeout=36
|
||
).json()
|
||
|
||
self._log(f"视频解析响应: {response}", "DEBUG")
|
||
|
||
if response.get("code") != 0:
|
||
self._log(f"❌ 获取视频token失败: {response.get('message')}", "ERROR")
|
||
return None
|
||
|
||
token = response.get("data", {}).get("token")
|
||
if not token:
|
||
self._log("❌ 未获取到视频token", "ERROR")
|
||
return None
|
||
|
||
# 解析token获取播放地址
|
||
token_url = "https://open.bytedanceapi.com/?" + token
|
||
video_response = requests.get(
|
||
token_url,
|
||
headers=self.video_headers,
|
||
cookies=self.cookie,
|
||
timeout=38
|
||
).json()
|
||
|
||
self._log(f"视频播放信息: {video_response}", "DEBUG")
|
||
|
||
result = video_response.get("Result", {}).get("Data", {}).get("PlayInfoList")
|
||
if result and len(result) > 0:
|
||
play_url = result[0].get("MainPlayUrl")
|
||
if play_url:
|
||
self._log(f"✅ 成功获取视频播放地址: {play_url[:50]}...", "SUCCESS")
|
||
return play_url
|
||
else:
|
||
self._log("❌ 未找到MainPlayUrl", "ERROR")
|
||
else:
|
||
self._log("❌ 获取播放地址失败", "ERROR")
|
||
|
||
return None
|
||
|
||
except requests.exceptions.RequestException as e:
|
||
self._log(f"❌ 网络请求失败: {e}", "ERROR")
|
||
return None
|
||
except json.JSONDecodeError as e:
|
||
self._log(f"❌ JSON解析失败: {e}", "ERROR")
|
||
return None
|
||
except Exception as e:
|
||
self._log(f"❌ 视频解析失败: {e}", "ERROR")
|
||
return None
|
||
|
||
# 获取商品id
|
||
def get_goods_id(self, order_id):
|
||
headers = {
|
||
"authority": "fxg.jinritemai.com",
|
||
"accept": "application/json, text/plain, */*",
|
||
"accept-language": "zh-CN,zh;q=0.9",
|
||
"cache-control": "no-cache",
|
||
"pragma": "no-cache",
|
||
"referer": "https://fxg.jinritemai.com/ffa/morder/order/detail?id=6921052297377971987",
|
||
"sec-ch-ua": "\"Not_A Brand\";v=\"8\", \"Chromium\";v=\"120\", \"Google Chrome\";v=\"120\"",
|
||
"sec-ch-ua-mobile": "?0",
|
||
"sec-ch-ua-platform": "\"Windows\"",
|
||
"sec-fetch-dest": "empty",
|
||
"sec-fetch-mode": "cors",
|
||
"sec-fetch-site": "same-origin",
|
||
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
|
||
}
|
||
url = "https://fxg.jinritemai.com/api/order/orderDetail"
|
||
params = {
|
||
"order_id": order_id,
|
||
"appid": "1",
|
||
"__token": "d83b1e7b9e05390304167f02c95f1a74",
|
||
"_bid": "ffa_order",
|
||
"aid": "4272",
|
||
"_lid": "720383572871",
|
||
"verifyFp": "verify_mf3dk1nv_kbxjZXA8_9XBV_4Rvb_B3iI_XDueP5U4SoQx",
|
||
"fp": "verify_mf3dk1nv_kbxjZXA8_9XBV_4Rvb_B3iI_XDueP5U4SoQx",
|
||
"msToken": "OJcYEsXGO2a1822s3kkoFuk3zwOyj5nhjzxhynXWCz7b4Q4oDuxmbFf85-2C05VezCpYsbhaZ8ZtXs6hl_rrU2Akb5K3kOQQimXOVshe_7yCNW52NA8sAR3nOp_y6N99nQN8VxscYQJ6xLcvQN6CzpsjC7y0gxo6VpfHAVs=",
|
||
}
|
||
response = requests.get(url, headers=headers, cookies=self.cookie, params=params).json()
|
||
# print(response)
|
||
if response.get('code') == 0 and response.get('data').get('order_id') == order_id:
|
||
# 表示成功展示了对应这个订单内部的信息
|
||
product_data = response.get('data').get('product')
|
||
product_id = product_data.get('sku')[0].get('product_id')
|
||
return product_id
|
||
else:
|
||
# 没有找到默认返回None
|
||
print("没有根据订单信息找到内部的商品ID 请核对方法get_product_id")
|
||
return None
|
||
|
||
def _extract_message_text(self, msg_content):
|
||
"""从消息内容中提取文本和图片URL"""
|
||
try:
|
||
print("🔍 开始提取消息内容...")
|
||
|
||
# 初始化返回结果
|
||
result = {
|
||
'text': None,
|
||
'avatar': None,
|
||
'image_url': None,
|
||
'thumbnail_url': None,
|
||
'goods_id': None,
|
||
'video_url': None,
|
||
'order_id': None
|
||
}
|
||
|
||
# 方法1: 处理文本消息(从'8'字段获取)
|
||
text_content = msg_content.get('8')
|
||
if text_content:
|
||
print(f"📄 检测到文本内容")
|
||
if isinstance(text_content, bytes):
|
||
try:
|
||
decoded = text_content.decode('utf-8')
|
||
result['text'] = decoded
|
||
self._log(f"✅ UTF-8解码成功: {decoded}", "SUCCESS")
|
||
except UnicodeDecodeError:
|
||
try:
|
||
decoded = text_content.decode('gbk')
|
||
result['text'] = decoded
|
||
self._log(f"✅ GBK解码成功: {decoded}", "SUCCESS")
|
||
except:
|
||
self._log("⚠️ 无法解码文本内容", "DEBUG")
|
||
elif isinstance(text_content, str):
|
||
result['text'] = text_content
|
||
self._log(f"✅ 直接使用文本内容: {text_content}")
|
||
|
||
# 方法2: 统一处理'9'字段中的数据(包括文本和图片、消息、视频
|
||
meta_data = msg_content.get('9', [])
|
||
if meta_data:
|
||
print(f"🔍 检测到消息元数据")
|
||
for item in meta_data:
|
||
if not isinstance(item, dict):
|
||
continue
|
||
|
||
key = item.get('1', b'').decode('utf-8') if isinstance(item.get('1'), bytes) else str(
|
||
item.get('1', ''))
|
||
value = item.get('2', b'').decode('utf-8') if isinstance(item.get('2'), bytes) else str(
|
||
item.get('2', ''))
|
||
|
||
# 处理所有类型的URL
|
||
if key == 'avatar_uri':
|
||
result['avatar'] = value
|
||
self._log(f"✅ 找到头像URL: {value}", "SUCCESS")
|
||
elif key == 'imageUrl':
|
||
result['image_url'] = value
|
||
self._log(f"✅ 找到图片URL: {value}", "SUCCESS")
|
||
elif key == 'thumbnailUrl':
|
||
result['thumbnail_url'] = value
|
||
self._log(f"✅ 找到缩略图URL: {value}", "SUCCESS")
|
||
elif key == 'goods_id':
|
||
goods_id = value
|
||
result['goods_id'] = goods_id
|
||
self._log(f"✅ 找到商品ID: {goods_id}", "SUCCESS")
|
||
elif key == 'msg_render_model':
|
||
try:
|
||
video_info = json.loads(value)
|
||
render_body = video_info.get('render_body', {})
|
||
|
||
# 提取视频URL
|
||
viedo_url = render_body.get('coverURL')
|
||
viedo_vid = render_body.get('vid')
|
||
if viedo_vid and viedo_url:
|
||
self._log(f"✅ 找到视频原始vid: {viedo_vid}", "SUCCESS")
|
||
|
||
# 解析获取视频播放地址
|
||
play_url = self.parse_video(viedo_vid)
|
||
if play_url:
|
||
result['video_url'] = play_url
|
||
else:
|
||
# 如果解析失败 在打印对应的日志后 使用备用封面图片作为地址
|
||
if '\\u0026' in viedo_url:
|
||
viedo_url = viedo_url.replace('\\u0026', '&')
|
||
result['video_url'] = viedo_url
|
||
self._log("⚠️ 使用视频封面作为备选", "WARNING")
|
||
else:
|
||
self._log("⚠️ 未找到视频核心id 可能出现新的数据存放地址", "DEBUG")
|
||
except json.JSONDecodeError as e:
|
||
self._log(f"解析视频信息失败: {e}", "ERROR")
|
||
elif key == 'sku_order_id':
|
||
# 取到了对应的订单号 id
|
||
order_id = value
|
||
result['order_id'] = order_id
|
||
|
||
# 如果已经找到所有需要的URL,提前退出循环
|
||
if all([result['avatar'], result['image_url'], result['thumbnail_url']]):
|
||
break
|
||
|
||
# 方法3: 兼容处理旧的'9-1'字段(如果存在)
|
||
image_data = msg_content.get('9-1', []) # 如果没有拿到这个arrayList那么就不需要在对里面的数据进行解析了
|
||
if image_data != []:
|
||
print(f"📸 检测到旧的图片数据格式", "DEBUG")
|
||
for item in image_data:
|
||
if not isinstance(item, dict):
|
||
continue
|
||
|
||
key = item.get('1', b'').decode('utf-8') if isinstance(item.get('1'), bytes) else str(
|
||
item.get('1', ''))
|
||
value = item.get('2', b'').decode('utf-8') if isinstance(item.get('2'), bytes) else str(
|
||
item.get('2', ''))
|
||
|
||
if key == 'avatar_uri' and not result['avatar']:
|
||
result['avatar'] = value
|
||
self._log(f"✅ 找到头像URL: {value}", "SUCCESS")
|
||
elif key == 'imageUrl' and not result['image_url']:
|
||
result['image_url'] = value
|
||
self._log(f"✅ 找到图片URL: {value}", "SUCCESS")
|
||
elif key == 'thumbnailUrl' and not result['thumbnail_url']:
|
||
result['thumbnail_url'] = value
|
||
self._log(f"✅ 找到缩略图URL: {value}", "SUCCESS")
|
||
elif key == 'goods_id':
|
||
goods_id = value
|
||
result['goods_id'] = goods_id
|
||
self._log(f"✅ 找到商品ID: {goods_id}", "SUCCESS")
|
||
elif key == 'msg_render_model':
|
||
try:
|
||
video_info = json.loads(value)
|
||
render_body = video_info.get('render_body', {})
|
||
|
||
# 提取视频URL
|
||
viedo_url = render_body.get('coverURL')
|
||
viedo_vid = render_body.get('vid')
|
||
if viedo_vid and viedo_url:
|
||
self._log(f"✅ 找到视频原始vid: {viedo_vid}", "SUCCESS")
|
||
|
||
# 解析获取视频播放地址
|
||
play_url = self.parse_video(viedo_vid)
|
||
if play_url:
|
||
result['video_url'] = play_url
|
||
else:
|
||
# 如果解析失败 在打印对应的日志后 使用备用封面图片作为地址
|
||
if '\\u0026' in viedo_url:
|
||
viedo_url = viedo_url.replace('\\u0026', '&')
|
||
result['video_url'] = viedo_url
|
||
self._log("⚠️ 使用视频封面作为备选", "WARNING")
|
||
else:
|
||
self._log("⚠️ 未找到视频核心id 可能出现新的数据存放地址", "DEBUG")
|
||
except json.JSONDecodeError as e:
|
||
self._log(f"解析视频信息失败: {e}", "ERROR")
|
||
elif key == 'sku_order_id':
|
||
# 取到了对应的订单号 id
|
||
order_id = value
|
||
result['order_id'] = order_id
|
||
|
||
# 如果都没找到内容,打印调试信息
|
||
if not any(result.values()):
|
||
print("🔍 未找到有效内容,打印所有字段:")
|
||
for key, value in msg_content.items():
|
||
if isinstance(value, bytes):
|
||
try:
|
||
decoded = value.decode('utf-8', errors='ignore')
|
||
print(f" {key}: {decoded[:100]}...")
|
||
except:
|
||
print(f" {key}: [无法解码的字节数据,长度: {len(value)}]")
|
||
elif isinstance(value, list):
|
||
print(f" {key}: [列表数据,长度: {len(value)}]")
|
||
else:
|
||
print(f" {key}: {value}")
|
||
|
||
return result
|
||
|
||
except Exception as e:
|
||
self._log(f"❌ 提取消息内容时出错: {e}", "ERROR")
|
||
traceback.print_exc()
|
||
return {
|
||
'text': None,
|
||
'avatar': None,
|
||
'image_url': None,
|
||
'thumbnail_url': None,
|
||
'goods_id': None,
|
||
'viedo_url': None,
|
||
'order_id': None
|
||
}
|
||
|
||
def _handle_chat_message(self, message, msg_type):
|
||
"""处理聊天消息 - 修改版本,支持多种消息类型"""
|
||
try:
|
||
self._log(f"🔍 开始解析{message} 消息结构", "DEBUG")
|
||
|
||
msg_500 = message.get('500', {})
|
||
if not msg_500:
|
||
return
|
||
|
||
msg_content = msg_500.get('5', {})
|
||
if not msg_content:
|
||
return
|
||
|
||
# 从消息内容中获取类型和发送者ID
|
||
inner_msg_type = msg_content.get("6", 0)
|
||
sender_id = msg_content.get("7", 0)
|
||
|
||
self._log(f"📊 内部消息类型: {inner_msg_type}, 发送者ID: {sender_id}", "DEBUG")
|
||
|
||
print("会不会到这里来呢")
|
||
|
||
# 处理不同类型的消息
|
||
if inner_msg_type == 1000 and sender_id and sender_id != int(self.cookie['PIGEON_CID']):
|
||
# 用户文本消息 - 处理AI回复
|
||
self._log("✅ 检测到1000状态的有效消息", "INFO")
|
||
self._handle_user_text_message(msg_content, sender_id)
|
||
|
||
elif inner_msg_type == 50002 and sender_id and sender_id != int(self.cookie['PIGEON_CID']):
|
||
# 系统消息 - 触发token请求
|
||
self._log("✅ 检测到50002类型的系统消息", "INFO")
|
||
self._handle_system_message(msg_content, sender_id)
|
||
|
||
else:
|
||
# 忽略其他消息
|
||
self._log(f"🔍 忽略消息类型: {inner_msg_type}", "DEBUG")
|
||
|
||
except Exception as e:
|
||
self._log(f"❌ 处理聊天消息失败: {e}", "ERROR")
|
||
self._log(f"❌ 错误详情: {traceback.format_exc()}", "DEBUG")
|
||
|
||
def _handle_user_text_message(self, msg_content, sender_id):
|
||
"""处理用户文本消息"""
|
||
try:
|
||
# 解析消息内容
|
||
|
||
# 使用引入的新的解析全详情代码(包括图片与头像信息)
|
||
message_dict = self._extract_message_text(msg_content)
|
||
|
||
message_content = None
|
||
if "8" in msg_content:
|
||
content_data = msg_content["8"]
|
||
self._log(f"📄 原始内容数据: {content_data}", "DEBUG")
|
||
|
||
|
||
if message_dict and message_dict.get('text') != "客服水滴智能优品接入":
|
||
self._log(f"💬 成功解析用户消息: '{message_dict}'", "SUCCESS")
|
||
|
||
# 提取会话信息
|
||
talk_id = msg_content.get("20", 0)
|
||
p_id = msg_content.get("5", 0)
|
||
|
||
self._log(f"📝 会话信息 - TalkID: {talk_id}, PID: {p_id}", "DEBUG")
|
||
|
||
# 存储会话信息
|
||
if sender_id not in self.user_tokens:
|
||
self.user_tokens[sender_id] = {
|
||
"token": None,
|
||
"last_sent": 0,
|
||
"session_initialized": False,
|
||
"talk_id": talk_id,
|
||
"p_id": p_id,
|
||
"pending_messages": []
|
||
}
|
||
else:
|
||
# 更新会话信息
|
||
self.user_tokens[sender_id]["talk_id"] = talk_id
|
||
self.user_tokens[sender_id]["p_id"] = p_id
|
||
|
||
# 使用事件循环提交异步任务
|
||
asyncio.run_coroutine_threadsafe(
|
||
self._get_and_send_ai_reply(sender_id, message_dict, talk_id, p_id),
|
||
self._loop
|
||
)
|
||
else:
|
||
self._log("⚠️ 无法解析消息内容", "WARNING")
|
||
|
||
except Exception as e:
|
||
self._log(f"❌ 处理用户文本消息失败: {e}", "ERROR")
|
||
|
||
def _handle_system_message(self, msg_content, sender_id):
|
||
"""处理系统消息 - 触发token请求"""
|
||
try:
|
||
# 提取会话信息
|
||
talk_id = msg_content.get("20", 0)
|
||
p_id = msg_content.get("5", 0)
|
||
|
||
self._log(f"📝 系统消息会话信息 - TalkID: {talk_id}, PID: {p_id}", "DEBUG")
|
||
|
||
# 检查是否已经有该用户的token
|
||
if sender_id not in self.user_tokens or not self.user_tokens[sender_id].get("token"):
|
||
self._log(f"🆕 用户 {sender_id} 无token,请求token", "INFO")
|
||
|
||
# 存储会话信息
|
||
if sender_id not in self.user_tokens:
|
||
self.user_tokens[sender_id] = {
|
||
"token": None,
|
||
"last_sent": 0,
|
||
"session_initialized": False,
|
||
"talk_id": talk_id,
|
||
"p_id": p_id,
|
||
"pending_messages": []
|
||
}
|
||
else:
|
||
# 更新会话信息
|
||
self.user_tokens[sender_id]["talk_id"] = talk_id
|
||
self.user_tokens[sender_id]["p_id"] = p_id
|
||
|
||
# 请求token
|
||
self._request_user_token(sender_id, p_id)
|
||
else:
|
||
self._log(f"✅ 用户 {sender_id} 已有token", "DEBUG")
|
||
|
||
except Exception as e:
|
||
self._log(f"❌ 处理系统消息失败: {e}", "ERROR")
|
||
|
||
def _parse_message_content(self, msg_data):
|
||
"""解析消息内容"""
|
||
try:
|
||
# 尝试从不同字段解析内容
|
||
content_fields = ['4', '8', '14']
|
||
for field in content_fields:
|
||
content = msg_data.get(field)
|
||
if content:
|
||
if isinstance(content, bytes):
|
||
try:
|
||
return content.decode('utf-8')
|
||
except UnicodeDecodeError:
|
||
try:
|
||
return content.decode('gbk')
|
||
except:
|
||
continue
|
||
elif isinstance(content, str):
|
||
return content
|
||
return None
|
||
except Exception as e:
|
||
self._log(f"❌ 解析消息内容失败: {e}", "ERROR")
|
||
return None
|
||
|
||
def _determine_message_type(self, message_dict):
|
||
"""确定消息类型"""
|
||
if message_dict.get('video_url'): # 如果确认收到了video_url类型的数据 那么不可能是其它类型
|
||
return 'video'
|
||
elif message_dict.get('image_url'): # 如果确认收到了image_url类型的数据 那么不可能是其它类型
|
||
return 'image'
|
||
elif message_dict.get('order_id'): # 如果确认收到了order_id类型的数据 那么不可能是其它类型 只能是发送询问订单
|
||
return 'order'
|
||
elif message_dict.get('goods_id'): # 如果确认收到了goods_id类型的数据 那么不可能是其它类型 只能是发送询问商品
|
||
return 'goods'
|
||
else:
|
||
return 'text' # 最普遍的类型
|
||
|
||
async def _get_and_send_ai_reply(self, sender_id, message_dict, talk_id, p_id):
|
||
"""获取并发送AI回复 - 修改版本,确保有token"""
|
||
try:
|
||
self._log(f"🚀 AI回复流程开始,用户: {sender_id}", "INFO")
|
||
self._log(f"📋 用户消息内容: '{message_dict}'", "INFO")
|
||
|
||
# 检查是否已有token,如果没有则先请求
|
||
if sender_id not in self.user_tokens or not self.user_tokens[sender_id].get("token"):
|
||
self._log(f"🆕 用户 {sender_id} 无token,先请求token", "INFO")
|
||
self.user_tokens[sender_id] = {
|
||
"token": None,
|
||
"last_sent": 0,
|
||
"session_initialized": False,
|
||
"talk_id": talk_id,
|
||
"p_id": p_id,
|
||
"pending_messages": [] # 存储等待发送的消息
|
||
}
|
||
self._request_user_token(sender_id, p_id)
|
||
|
||
# 将当前消息加入待处理队列
|
||
self.user_tokens[sender_id]["pending_messages"].append(message_dict)
|
||
return
|
||
|
||
# 预先定义默认值 (避免后续因为没有成功走到对应分支里产生的错误)
|
||
# msg_type = "text"
|
||
message_text = message_dict.get('text', '')
|
||
avatar_url = message_dict.get('avatar', '')
|
||
|
||
# 确定消息类型 (调用确认方法类型)
|
||
msg_type = self._determine_message_type(message_dict)
|
||
|
||
if msg_type == 'video' and avatar_url or message_dict['text'] == "[视频]":
|
||
# 视频类型
|
||
message_text = message_dict['video_url']
|
||
avatar_url = avatar_url
|
||
elif msg_type == 'image' and avatar_url:
|
||
message_text = message_dict['image_url']
|
||
avatar_url = avatar_url
|
||
elif msg_type == 'order':
|
||
msg_type = 'order_card'
|
||
order_id = message_dict['order_id']
|
||
goods_id = self.get_goods_id(order_id)
|
||
if goods_id:
|
||
message_text = f"订单号: {order_id}, 商品ID: {goods_id}"
|
||
else:
|
||
message_text = f"订单号: {order_id}"
|
||
elif msg_type == 'goods' or message_dict.get("text", None) == "[商品]":
|
||
# 商品类型
|
||
msg_type = 'product_card'
|
||
message_text = f"商品卡片ID: {message_dict['goods_id']}"
|
||
elif msg_type == 'text' and avatar_url:
|
||
message_text = message_dict['text']
|
||
|
||
# 准备发送者和接收者信息
|
||
sender_info = {
|
||
"id": str(sender_id),
|
||
"name": f"用户_{sender_id}",
|
||
"is_customer": True
|
||
}
|
||
|
||
# 创建消息模板
|
||
message_template = PlatformMessage(
|
||
type="message",
|
||
content=message_text,
|
||
pin_image=avatar_url,
|
||
msg_type=msg_type,
|
||
sender={"id": sender_info.get("id", "")},
|
||
store_id=self.store_id
|
||
)
|
||
|
||
self._log("📤 准备发送消息到AI服务...", "INFO")
|
||
self._log(f"📋 消息内容: {message_template.to_json()}", "DEBUG")
|
||
|
||
# 发送消息到后端(使用统一连接,不等待回复)
|
||
start_time = time.time()
|
||
await self.ai_service.send_message_to_backend(message_template.to_dict())
|
||
response_time = time.time() - start_time
|
||
|
||
self._log(f"✅ 消息已发送到后端,耗时: {response_time:.2f}s", "SUCCESS")
|
||
self._log("🔄 等待后端AI处理并通过GUI转发回复", "INFO")
|
||
|
||
except Exception as e:
|
||
self._log(f"❌ 获取AI回复失败: {e}", "ERROR")
|
||
self._log(f"❌ 错误详情: {traceback.format_exc()}", "DEBUG")
|
||
|
||
async def _check_and_send_reply(self, sender_id, talk_id, p_id, reply_text):
|
||
"""检查并发送回复"""
|
||
try:
|
||
self._log(f"🔍 检查发送条件,用户: {sender_id}", "DEBUG")
|
||
|
||
# 确保用户信息存在
|
||
if sender_id not in self.user_tokens:
|
||
self._log(f"❌ 用户 {sender_id} 信息不存在", "ERROR")
|
||
return
|
||
|
||
user_token = self.user_tokens[sender_id].get("token")
|
||
if not user_token:
|
||
self._log(f"⚠️ 用户 {sender_id} token为空,将消息加入待发送队列", "WARNING")
|
||
# 将消息加入待发送队列
|
||
if "pending_messages" not in self.user_tokens[sender_id]:
|
||
self.user_tokens[sender_id]["pending_messages"] = []
|
||
self.user_tokens[sender_id]["pending_messages"].append(reply_text)
|
||
return
|
||
|
||
# 检查发送频率
|
||
current_time = int(time.time() * 1000)
|
||
last_sent = self.user_tokens[sender_id].get("last_sent", 0)
|
||
|
||
if current_time - last_sent < 4000:
|
||
self._log("⏳ 发送太频繁,跳过", "DEBUG")
|
||
return
|
||
|
||
self._log(f"📤 准备发送回复给用户 {sender_id}", "INFO")
|
||
self._log(f"📝 回复内容: '{reply_text}'", "DEBUG")
|
||
|
||
# 发送回复
|
||
success = await self._send_message_to_user(sender_id, talk_id, p_id, user_token, reply_text)
|
||
if success:
|
||
self.user_tokens[sender_id]["last_sent"] = current_time
|
||
self._log(f"✅ 回复发送完成", "SUCCESS")
|
||
else:
|
||
self._log(f"❌ 回复发送失败", "ERROR")
|
||
|
||
except Exception as e:
|
||
self._log(f"❌ 检查并发送回复失败: {e}", "ERROR")
|
||
|
||
async def _send_message_to_user(self, receiver_id, talk_id, p_id, user_token, content):
|
||
"""发送消息给用户 - 核心发送方法"""
|
||
try:
|
||
self._log(f"📤 正在发送消息给用户 {receiver_id}", "DEBUG")
|
||
self._log(f"📝 消息内容: {content}", "DEBUG")
|
||
|
||
# 检查必要的参数
|
||
if not all([receiver_id, talk_id, p_id, user_token, content]):
|
||
self._log("❌ 发送消息参数不全", "ERROR")
|
||
print(f"{receiver_id}--{talk_id}--{p_id}--{user_token}--{content}")
|
||
return False
|
||
|
||
# 使用 message_arg 中的 send_message 方法创建消息
|
||
value, message_type = send_message(
|
||
pigeon_sign=self.config["data"]["pigeon_sign"],
|
||
token=self.config["data"]["token"],
|
||
receiver_id=receiver_id,
|
||
shop_id=self.cookie["SHOP_ID"],
|
||
talk_id=talk_id,
|
||
session_did=self.cookie["PIGEON_CID"],
|
||
p_id=p_id,
|
||
user_code=user_token,
|
||
text=content
|
||
)
|
||
|
||
# 编码消息
|
||
form_data = blackboxprotobuf.encode_message(value=value, message_type=message_type)
|
||
|
||
# 发送消息
|
||
self.ws.send_bytes(form_data)
|
||
self._log(f"✅ 消息已发送给用户 {receiver_id}", "SUCCESS")
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
self._log(f"❌ 发送消息给用户失败: {e}", "ERROR")
|
||
return False
|
||
|
||
def _request_user_token(self, receiver_id, p_id):
|
||
"""请求用户token - 使用 message_arg 中的方法"""
|
||
try:
|
||
# 使用 message_arg 中的 get_user_code 方法
|
||
|
||
value, message_type = get_user_code(
|
||
pigeon_sign=self.config["data"]["pigeon_sign"],
|
||
token=self.config["data"]["token"],
|
||
receiver_id=receiver_id,
|
||
shop_id=self.cookie["SHOP_ID"],
|
||
session_did=self.cookie["PIGEON_CID"],
|
||
p_id=p_id
|
||
)
|
||
form_data = blackboxprotobuf.encode_message(value=value, message_type=message_type)
|
||
self.ws.send_bytes(form_data)
|
||
self._log(f"📤 已请求用户 {receiver_id} 的token", "INFO")
|
||
except Exception as e:
|
||
self._log(f"❌ 请求token失败: {e}", "ERROR")
|
||
|
||
def _log_user_tokens_state(self):
|
||
"""记录当前 user_tokens 状态(用于调试)"""
|
||
self._log("🔍 当前 user_tokens 状态:", "DEBUG")
|
||
for user_id, info in self.user_tokens.items():
|
||
token_status = "有" if info.get("token") else "无"
|
||
self._log(
|
||
f" 用户 {user_id}: token={token_status}, talk_id={info.get('talk_id')}, p_id={info.get('p_id')}",
|
||
"DEBUG")
|
||
|
||
async def _handle_customer_message(self, message_data):
|
||
"""处理来自后端的客服消息"""
|
||
try:
|
||
self._log(f"🔍 _handle_customer_message 被调用,当前实例: {self}", "DEBUG")
|
||
self._log_user_tokens_state()
|
||
|
||
content = message_data.get("content", "")
|
||
if not content:
|
||
self._log("⚠️ 客服消息内容为空", "WARNING")
|
||
return
|
||
self._log(f"后端客服传输的消息为:{content}")
|
||
|
||
receiver_info = message_data.get("receiver", {})
|
||
receiver_id = receiver_info.get("id")
|
||
|
||
if receiver_id and content:
|
||
self._log(f"📤 收到客服消息,准备发送给用户 {receiver_id}", "INFO")
|
||
|
||
# 确保 receiver_id 是整数类型
|
||
try:
|
||
receiver_id = int(receiver_id)
|
||
except (ValueError, TypeError):
|
||
self._log(f"❌ 无法转换用户ID为整数: {receiver_id}", "ERROR")
|
||
return
|
||
|
||
# 检查用户信息是否存在,如果不存在则初始化
|
||
if receiver_id not in self.user_tokens:
|
||
self._log(f"🆕 用户 {receiver_id} 信息不存在,初始化用户信息", "INFO")
|
||
self.user_tokens[receiver_id] = {
|
||
"token": None,
|
||
"last_sent": 0,
|
||
"session_initialized": False,
|
||
"talk_id": 7543146114111898922, # 使用默认值
|
||
"p_id": 7542727339747116329, # 使用默认值
|
||
"pending_messages": [content] # 直接将消息加入待发送队列
|
||
}
|
||
|
||
# 如果没有token,尝试获取token
|
||
if not self.user_tokens[receiver_id]["token"]:
|
||
self._log(f"🔄 用户 {receiver_id} 无token,尝试获取", "INFO")
|
||
# 触发token获取
|
||
self._request_user_token(
|
||
receiver_id,
|
||
self.user_tokens[receiver_id]["p_id"]
|
||
)
|
||
# 注意:这里不返回,让消息已经在队列中等待处理
|
||
return
|
||
|
||
user_info = self.user_tokens[receiver_id]
|
||
|
||
# 如果有token,直接发送消息
|
||
if user_info.get("token"):
|
||
self._log(f"✅ 用户 {receiver_id} 有token,直接发送消息", "INFO")
|
||
# 检查必要的字段是否存在
|
||
required_fields = ["talk_id", "p_id", "token"]
|
||
for field in required_fields:
|
||
if field not in user_info or not user_info[field]:
|
||
self._log(f"❌ 用户 {receiver_id} 缺少必要字段: {field}", "ERROR")
|
||
return
|
||
|
||
# 发送消息
|
||
await self._send_message_to_user(
|
||
receiver_id,
|
||
user_info["talk_id"],
|
||
user_info["p_id"],
|
||
user_info["token"],
|
||
content
|
||
)
|
||
else:
|
||
# 没有token,将消息加入队列并请求token
|
||
self._log(f"⚠️ 用户 {receiver_id} 无token,将消息加入队列", "INFO")
|
||
if "pending_messages" not in user_info:
|
||
user_info["pending_messages"] = []
|
||
user_info["pending_messages"].append(content)
|
||
# 请求token
|
||
self._request_user_token(
|
||
receiver_id,
|
||
user_info["p_id"]
|
||
)
|
||
|
||
except Exception as e:
|
||
self._log(f"❌ 处理客服消息失败: {e}", "ERROR")
|
||
self._log(f"❌ 错误详情: {traceback.format_exc()}", "DEBUG")
|
||
|
||
async def start_async(self):
|
||
"""异步启动消息处理器(不阻塞调用线程)"""
|
||
try:
|
||
self._log("🚀 异步启动消息处理器...", "INFO")
|
||
|
||
# 启动事件循环
|
||
self.start_event_loop()
|
||
|
||
# 获取配置
|
||
self.config, self.wss_url = self.get_config()
|
||
self._log("✅ 配置获取成功", "SUCCESS")
|
||
|
||
# 连接后端服务(使用统一连接,无需独立连接)
|
||
await self.ai_service.connect(self.store_id)
|
||
self._log("✅ 后端服务连接成功(使用统一连接)", "SUCCESS")
|
||
|
||
# 创建WebSocket连接
|
||
self.ws = WebSocketApp(
|
||
self.wss_url,
|
||
header={
|
||
'Origin': 'https://im.jinritemai.com',
|
||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36',
|
||
},
|
||
on_open=self.on_open,
|
||
on_message=self.on_message,
|
||
on_error=self.on_error,
|
||
on_close=self.on_close
|
||
)
|
||
|
||
# 添加重连机制
|
||
def run_with_reconnect():
|
||
while self.is_running:
|
||
try:
|
||
self.ws.run_forever()
|
||
except Exception as e:
|
||
self._log(f"WebSocket连接异常: {e}, 5秒后重连...", "WARNING")
|
||
time.sleep(5)
|
||
continue
|
||
|
||
# 注册消息处理器(统一连接模式下无需注册)
|
||
|
||
# 在新线程中运行WebSocket
|
||
ws_thread = threading.Thread(target=run_with_reconnect, daemon=True)
|
||
ws_thread.start()
|
||
|
||
self._log("🟢 消息处理器异步启动完成", "SUCCESS")
|
||
return True
|
||
|
||
except Exception as e:
|
||
self._log(f"❌ 异步启动失败: {e}", "ERROR")
|
||
return False
|
||
|
||
def start(self):
|
||
"""启动消息处理器"""
|
||
try:
|
||
self._log("🚀 启动消息处理器...", "INFO")
|
||
|
||
# 启动事件循环
|
||
self.start_event_loop()
|
||
|
||
# 获取配置
|
||
self.config, self.wss_url = self.get_config()
|
||
self._log("✅ 配置获取成功", "SUCCESS")
|
||
|
||
# 连接后端服务(使用统一连接,无需独立连接)
|
||
async def connect_backend_service():
|
||
await self.ai_service.connect(self.store_id)
|
||
self._log("✅ 后端服务连接成功(使用统一连接)", "SUCCESS")
|
||
|
||
# 在事件循环中执行连接
|
||
asyncio.run_coroutine_threadsafe(connect_backend_service(), self._loop)
|
||
|
||
# 创建WebSocket连接
|
||
self.ws = WebSocketApp(
|
||
self.wss_url,
|
||
header={
|
||
'Origin': 'https://im.jinritemai.com',
|
||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36',
|
||
},
|
||
on_open=self.on_open,
|
||
on_message=self.on_message,
|
||
on_error=self.on_error,
|
||
on_close=self.on_close
|
||
)
|
||
|
||
# 添加重连机制
|
||
def run_with_reconnect():
|
||
while self.is_running:
|
||
try:
|
||
self.ws.run_forever()
|
||
except Exception as e:
|
||
self._log(f"WebSocket连接异常: {e}, 5秒后重连...", "WARNING")
|
||
time.sleep(5)
|
||
continue
|
||
|
||
# 注册消息处理器(统一连接模式下无需注册)
|
||
|
||
# 在新线程中运行WebSocket
|
||
ws_thread = threading.Thread(target=run_with_reconnect, daemon=True)
|
||
ws_thread.start()
|
||
|
||
self._log("🟢 开始运行消息处理器...", "INFO")
|
||
|
||
# !!!关键修改:保持主线程运行!!!
|
||
while self.is_running:
|
||
time.sleep(1) # 主线程保持运行,防止进程退出
|
||
|
||
except Exception as e:
|
||
self._log(f"❌ 启动失败: {e}", "ERROR")
|
||
finally:
|
||
self._log("🛑 消息处理器已停止", "INFO")
|
||
|
||
async def close(self):
|
||
"""关闭连接"""
|
||
self.is_running = False
|
||
if self.ws:
|
||
self.ws.close()
|
||
if self.ai_service:
|
||
await self.ai_service.close()
|
||
|
||
# 关闭事件循环
|
||
if self._loop:
|
||
self._loop.call_soon_threadsafe(self._loop.stop)
|
||
if self._thread:
|
||
self._thread.join(timeout=1.0)
|
||
|
||
self._log("🛑 消息处理器已停止", "INFO")
|
||
|
||
async def send_message_external(self, receiver_id: str, content: str) -> bool:
|
||
"""外部调用的发送消息方法 - 用于后端消息转发"""
|
||
try:
|
||
self._log(f"🔄 [External-{self.instance_id}] 收到转发请求: receiver_id={receiver_id}, content={content}", "INFO")
|
||
|
||
# 修复数据类型不匹配问题:将字符串转换为整数
|
||
try:
|
||
receiver_id_int = int(receiver_id)
|
||
self._log(f"🔧 [External-{self.instance_id}] 转换 receiver_id: '{receiver_id}' -> {receiver_id_int}", "DEBUG")
|
||
except ValueError:
|
||
self._log(f"❌ [External-{self.instance_id}] receiver_id 无法转换为整数: {receiver_id}", "ERROR")
|
||
return False
|
||
|
||
# 调试信息:显示当前活跃用户
|
||
active_users = list(self.user_tokens.keys())
|
||
self._log(f"🔍 [External-{self.instance_id}] 当前活跃用户列表: {active_users}", "DEBUG")
|
||
self._log(f"🔍 [External-{self.instance_id}] 活跃用户数量: {len(active_users)}", "DEBUG")
|
||
|
||
# 检查用户是否存在于user_tokens中(使用整数类型)
|
||
self._log(f"🔍 [External-{self.instance_id}] 调试信息:", "DEBUG")
|
||
self._log(f"🔍 [External-{self.instance_id}] receiver_id_int: {receiver_id_int} (类型: {type(receiver_id_int)})", "DEBUG")
|
||
self._log(f"🔍 [External-{self.instance_id}] user_tokens keys: {list(self.user_tokens.keys())}", "DEBUG")
|
||
if self.user_tokens:
|
||
first_key = list(self.user_tokens.keys())[0]
|
||
self._log(f"🔍 [External-{self.instance_id}] 第一个key: {first_key} (类型: {type(first_key)})", "DEBUG")
|
||
self._log(f"🔍 [External-{self.instance_id}] 直接比较: {receiver_id_int == first_key}", "DEBUG")
|
||
|
||
if receiver_id_int not in self.user_tokens:
|
||
self._log(f"❌ [External-{self.instance_id}] 用户 {receiver_id_int} 不在活跃会话中", "WARNING")
|
||
self._log(f"💡 [External-{self.instance_id}] 提示:用户需要先在抖音平台发送消息建立会话", "INFO")
|
||
|
||
# 显示当前活跃用户的调试信息
|
||
self.print_active_users_debug()
|
||
|
||
return False
|
||
|
||
user_info = self.user_tokens[receiver_id_int]
|
||
talk_id = user_info.get("talk_id")
|
||
p_id = user_info.get("p_id")
|
||
user_token = user_info.get("token")
|
||
|
||
self._log(f"🔍 [External-{self.instance_id}] 用户会话信息: talk_id={talk_id}, p_id={p_id}, has_token={bool(user_token)}", "DEBUG")
|
||
|
||
# 检查必要参数
|
||
if not talk_id or not p_id:
|
||
self._log(f"❌ [External-{self.instance_id}] 用户 {receiver_id_int} 缺少必要的会话信息 (talk_id: {talk_id}, p_id: {p_id})", "ERROR")
|
||
return False
|
||
|
||
if not user_token:
|
||
self._log(f"⚠️ [External-{self.instance_id}] 用户 {receiver_id_int} token为空,尝试请求token", "WARNING")
|
||
# 请求用户token
|
||
self._request_user_token(receiver_id_int, p_id)
|
||
# 将消息加入待发送队列
|
||
if "pending_messages" not in user_info:
|
||
user_info["pending_messages"] = []
|
||
user_info["pending_messages"].append(content)
|
||
self._log(f"📝 [External-{self.instance_id}] 消息已加入待发送队列,队列长度: {len(user_info['pending_messages'])}", "INFO")
|
||
return True
|
||
|
||
# 发送消息 (注意:_send_message_to_user 可能期望字符串类型的receiver_id)
|
||
success = await self._send_message_to_user(receiver_id_int, talk_id, p_id, user_token, content)
|
||
if success:
|
||
# 更新最后发送时间
|
||
user_info["last_sent"] = int(time.time() * 1000)
|
||
self._log(f"✅ [External-{self.instance_id}] 消息转发成功", "SUCCESS")
|
||
else:
|
||
self._log(f"❌ [External-{self.instance_id}] 消息转发失败", "ERROR")
|
||
|
||
return success
|
||
|
||
except Exception as e:
|
||
self._log(f"❌ [External-{self.instance_id}] 外部消息发送异常: {e}", "ERROR")
|
||
self._log(f"❌ [External-{self.instance_id}] 错误详情: {traceback.format_exc()}", "DEBUG")
|
||
return False
|
||
|
||
def get_active_users_info(self) -> dict:
|
||
"""获取当前活跃用户的详细信息"""
|
||
try:
|
||
active_users = {}
|
||
for user_id, info in self.user_tokens.items():
|
||
active_users[user_id] = {
|
||
"has_token": bool(info.get("token")),
|
||
"talk_id": info.get("talk_id"),
|
||
"p_id": info.get("p_id"),
|
||
"last_sent": info.get("last_sent", 0),
|
||
"pending_messages_count": len(info.get("pending_messages", [])),
|
||
"session_initialized": info.get("session_initialized", False)
|
||
}
|
||
return active_users
|
||
except Exception as e:
|
||
self._log(f"获取活跃用户信息失败: {e}", "ERROR")
|
||
return {}
|
||
|
||
def print_active_users_debug(self):
|
||
"""打印当前活跃用户的调试信息"""
|
||
try:
|
||
active_users = self.get_active_users_info()
|
||
self._log(f"🔍 [Debug-{self.instance_id}] 当前活跃用户总数: {len(active_users)}", "INFO")
|
||
|
||
if not active_users:
|
||
self._log(f"📝 [Debug-{self.instance_id}] 没有活跃用户。用户需要先在抖音平台发送消息建立会话。", "INFO")
|
||
return
|
||
|
||
for user_id, info in active_users.items():
|
||
self._log(f"👤 [Debug-{self.instance_id}] 用户 {user_id}:", "INFO")
|
||
self._log(f" - Token: {'✅' if info['has_token'] else '❌'}", "INFO")
|
||
self._log(f" - Talk ID: {info['talk_id']}", "INFO")
|
||
self._log(f" - P ID: {info['p_id']}", "INFO")
|
||
self._log(f" - 待发送消息: {info['pending_messages_count']}", "INFO")
|
||
self._log(f" - 会话已初始化: {'✅' if info['session_initialized'] else '❌'}", "INFO")
|
||
|
||
except Exception as e:
|
||
self._log(f"打印调试信息失败: {e}", "ERROR")
|
||
|
||
|
||
class DouYinChatBot:
|
||
"""抖音聊天机器人主类"""
|
||
|
||
def __init__(self, cookie: dict, store_id: str = None):
|
||
self.cookie = cookie
|
||
self.store_id = store_id
|
||
if not self.store_id:
|
||
self.store_id = "68c16836-abac-4307-b763-ea1154823356"
|
||
self.ai_service = DouYinBackendService()
|
||
self.message_handler = None
|
||
self.loop = asyncio.new_event_loop() # 统一的事件循环
|
||
self._stop_event = asyncio.Event() # 添加停止事件
|
||
|
||
async def initialize(self):
|
||
"""初始化聊天机器人"""
|
||
try:
|
||
# 创建消息处理器
|
||
self.message_handler = DouYinMessageHandler(
|
||
cookie=self.cookie,
|
||
ai_service=self.ai_service,
|
||
store_id=self.store_id
|
||
)
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"❌ 初始化失败: {e}")
|
||
return False
|
||
|
||
def start(self):
|
||
"""启动聊天机器人"""
|
||
try:
|
||
# 运行初始化
|
||
asyncio.set_event_loop(self.loop)
|
||
success = self.loop.run_until_complete(self.initialize())
|
||
|
||
if success:
|
||
# 启动消息处理器
|
||
self.message_handler.start()
|
||
|
||
# !!!关键修改:简化主循环!!!
|
||
# 保持主线程运行,等待停止信号
|
||
while not self._stop_event.is_set():
|
||
time.sleep(1)
|
||
|
||
else:
|
||
print("❌ 聊天机器人启动失败")
|
||
|
||
except Exception as e:
|
||
print(f"❌ 启动失败: {e}")
|
||
traceback.print_exc()
|
||
finally:
|
||
if not self._stop_event.is_set():
|
||
self.loop.close()
|
||
|
||
async def close(self):
|
||
"""关闭聊天机器人"""
|
||
self._stop_event.set() # 设置停止信号
|
||
|
||
if self.message_handler:
|
||
await self.message_handler.close()
|
||
if self.ai_service:
|
||
await self.ai_service.close()
|
||
|
||
|
||
class DouYinListenerForGUI:
|
||
"""用于GUI集成的抖音监听包装器"""
|
||
|
||
def __init__(self, log_callback=None):
|
||
self.douyin_bot = None
|
||
self.log_callback = log_callback
|
||
self.running = False
|
||
self.stop_event = None
|
||
self.loop = None
|
||
|
||
def _log(self, message, log_type="INFO"):
|
||
"""处理日志输出"""
|
||
if self.log_callback:
|
||
self.log_callback(message, log_type)
|
||
else:
|
||
print(f"[{log_type}] {message}")
|
||
|
||
async def start_listening(self, cookie_dict, text="您好,感谢您的咨询,我们会尽快回复您!"):
|
||
"""启动监听的主方法"""
|
||
try:
|
||
self._log("🔵 开始抖音平台连接流程", "INFO")
|
||
|
||
# 验证cookie
|
||
if not cookie_dict:
|
||
self._log("❌ Cookie信息不能为空", "ERROR")
|
||
return False
|
||
|
||
required_fields = ['SHOP_ID', 'PIGEON_CID', 'sessionid']
|
||
missing_fields = [field for field in required_fields if field not in cookie_dict]
|
||
if missing_fields:
|
||
self._log(f"❌ 缺少必需的Cookie字段: {', '.join(missing_fields)}", "ERROR")
|
||
return False
|
||
|
||
self._log(f"✅ Cookie验证通过,店铺ID: {cookie_dict['SHOP_ID']}", "SUCCESS")
|
||
|
||
# 创建抖音聊天机器人实例
|
||
self.douyin_bot = DouYinChatBot(cookie=cookie_dict)
|
||
self.running = True
|
||
self.stop_event = asyncio.Event()
|
||
|
||
self._log("🎉 开始监听抖音平台消息...", "SUCCESS")
|
||
|
||
# 初始化并启动机器人
|
||
success = await self.douyin_bot.initialize()
|
||
if success:
|
||
# 在新的事件循环中运行机器人
|
||
self.loop = asyncio.new_event_loop()
|
||
asyncio.set_event_loop(self.loop)
|
||
|
||
# 启动机器人
|
||
def run_bot():
|
||
self.douyin_bot.start()
|
||
|
||
# 在新线程中运行机器人
|
||
bot_thread = threading.Thread(target=run_bot, daemon=True)
|
||
bot_thread.start()
|
||
|
||
return True
|
||
else:
|
||
self._log("❌ 抖音机器人初始化失败", "ERROR")
|
||
return False
|
||
|
||
except Exception as e:
|
||
self._log(f"❌ 监听过程中出现严重错误: {str(e)}", "ERROR")
|
||
import traceback
|
||
self._log(f"错误详情: {traceback.format_exc()}", "DEBUG")
|
||
return False
|
||
|
||
async def start_with_cookies(self, store_id, cookie_dict):
|
||
"""使用下发的cookies与store_id直接建立抖音平台WS并开始监听"""
|
||
try:
|
||
self._log("🔵 [DY] 收到后端登录指令,开始使用cookies连接平台", "INFO")
|
||
|
||
# 验证cookie
|
||
if not cookie_dict:
|
||
self._log("❌ Cookie信息不能为空", "ERROR")
|
||
return False
|
||
|
||
required_fields = ['SHOP_ID', 'PIGEON_CID', 'sessionid']
|
||
missing_fields = [field for field in required_fields if field not in cookie_dict]
|
||
if missing_fields:
|
||
self._log(f"❌ 缺少必需的Cookie字段: {', '.join(missing_fields)}", "ERROR")
|
||
return False
|
||
|
||
self._log(f"✅ Cookie验证通过,店铺ID: {cookie_dict['SHOP_ID']}", "SUCCESS")
|
||
|
||
# 建立与后端的统一连接(确保使用GUI的store_id)
|
||
backend_service = DouYinBackendService()
|
||
await backend_service.connect(store_id)
|
||
self._log("✅ 后端服务连接成功(使用统一连接)", "SUCCESS")
|
||
|
||
# 创建抖音聊天机器人实例
|
||
self.douyin_bot = DouYinChatBot(cookie=cookie_dict, store_id=store_id)
|
||
# 设置后端服务
|
||
self.douyin_bot.ai_service = backend_service
|
||
self.running = True
|
||
self.stop_event = asyncio.Event()
|
||
|
||
self._log("🎉 开始监听抖音平台消息...", "SUCCESS")
|
||
|
||
# 异步初始化机器人
|
||
success = await self.douyin_bot.initialize()
|
||
if success:
|
||
# 异步启动消息处理器
|
||
if self.douyin_bot.message_handler:
|
||
message_handler_success = await self.douyin_bot.message_handler.start_async()
|
||
if not message_handler_success:
|
||
self._log("❌ 消息处理器启动失败", "ERROR")
|
||
return False
|
||
|
||
# 注册到全局管理器
|
||
dy_manager = DouYinWebsocketManager()
|
||
shop_key = f"抖音:{store_id}"
|
||
dy_manager.on_connect(shop_key, self.douyin_bot, store_id=store_id, cookie=cookie_dict)
|
||
self._log(f"✅ 已注册抖音连接: {shop_key}", "SUCCESS")
|
||
|
||
# 创建一个长期运行的任务来保持监听状态
|
||
async def keep_running():
|
||
try:
|
||
while not self.stop_event.is_set():
|
||
await asyncio.sleep(1)
|
||
except asyncio.CancelledError:
|
||
pass
|
||
|
||
# 在后台启动监听任务
|
||
asyncio.create_task(keep_running())
|
||
|
||
self._log("✅ [DY] 抖音平台连接成功,开始监听消息", "SUCCESS")
|
||
return True
|
||
else:
|
||
self._log("❌ 抖音机器人初始化失败", "ERROR")
|
||
return False
|
||
|
||
except Exception as e:
|
||
self._log(f"❌ [DY] 监听过程中出现严重错误: {str(e)}", "ERROR")
|
||
import traceback
|
||
self._log(f"错误详情: {traceback.format_exc()}", "DEBUG")
|
||
return False
|
||
|
||
def stop_listening(self):
|
||
"""停止监听"""
|
||
if self.stop_event:
|
||
self.stop_event.set()
|
||
self.running = False
|
||
if self.douyin_bot:
|
||
# 创建新的事件循环来关闭机器人
|
||
async def cleanup():
|
||
await self.douyin_bot.close()
|
||
|
||
if self.loop and not self.loop.is_closed():
|
||
self.loop.run_until_complete(cleanup())
|
||
self.loop.close()
|
||
self._log("抖音监听已停止", "INFO")
|
||
|
||
|
||
# 使用示例
|
||
if __name__ == '__main__':
|
||
cookies = {
|
||
"passport_csrf_token": "9984fc5ad93abebf1a7bceb157fc4560",
|
||
"passport_csrf_token_default": "9984fc5ad93abebf1a7bceb157fc4560",
|
||
"qc_tt_tag": "0",
|
||
"is_staff_user": "false",
|
||
"SHOP_ID": "217051461",
|
||
"PIGEON_CID": "1216524360102748",
|
||
"passport_mfa_token": "CjeTQsM%2B1gYNk6uL6EBAHaZQqQ0qvxul3Up08WBuQeEr%2BGfitdRk91WzrSVG46q2ldU%2BOnVc%2BmPtGkoKPAAAAAAAAAAAAABPbMkgpgfbZwhLZyjNjTZUm1l5GH%2BAasL%2BonVECzLkUmLgj262JLsX2eLMDc2bxcEFUxCogvsNGPax0WwgAiIBA3PySKw%3D",
|
||
"__security_mc_1_s_sdk_crypt_sdk": "da6a1b67-4e38-b24e",
|
||
"bd_ticket_guard_client_data": "eyJiZC10aWNrZXQtZ3VhcmQtdmVyc2lvbiI6MiwiYmQtdGlja2V0LWd1YXJkLWl0ZXJhdGlvbi12ZXJzaW9uIjoxLCJiZC10aWNrZXQtZ3VhcmQtcmVlLXB1YmxpYy1rZXkiOiJCTWQ4eGpKMUY2THJEUDRwc283ald6T0ZzTWhCaXhLc3dla3BIQ1JleFNENFRWNmNOK09uOVdKQnIwNWFkNmgvRnM2N0hpQzUxSXJFTGZTSFVWakVtODQ9IiwiYmQtdGlja2V0LWd1YXJkLXdlYi12ZXJzaW9uIjoyfQ%3D%3D",
|
||
"bd_ticket_guard_client_web_domain": "2",
|
||
"ttwid": "1%7C85xTuedO1rj3zvv-kDVjAu2b0hgRS8wVjCPgOEaCaxo%7C1757499627%7Cb7216cdc62420b85602b4a810e31be759b7a184da042bf2a9a21629f081dd2d9",
|
||
"odin_tt": "2b28de94dbb7a08d45168f8f138dda7defd529f717ce89d3148b3151488b33248f176c0c52b58cdca420cfb950e13ecd6dac26d6762394725087280441ac91c5",
|
||
"passport_auth_status": "d96aa2b690a33e007d157cf4204b1078%2C62594da316130803f733a79b6a6774df",
|
||
"passport_auth_status_ss": "d96aa2b690a33e007d157cf4204b1078%2C62594da316130803f733a79b6a6774df",
|
||
"uid_tt": "d1dbdd6e4cebff9fbb496f8735b11c9d",
|
||
"uid_tt_ss": "d1dbdd6e4cebff9fbb496f8735b11c9d",
|
||
"sid_tt": "784e3e2545c5666529fa3ac9451ed016",
|
||
"sessionid": "784e3e2545c5666529fa3ac9451ed016",
|
||
"sessionid_ss": "784e3e2545c5666529fa3ac9451ed016",
|
||
"ucas_c0": "CkEKBTEuMC4wELKIjrqrodTgaBjmJiDjhaCz682rASiwITDc5uCyws2UAkCRooXGBkiR1sHIBlCmvJOipunJ9WdYbhIU08vxfhaj0eG4pjCi5loa1uv2XEY",
|
||
"ucas_c0_ss": "CkEKBTEuMC4wELKIjrqrodTgaBjmJiDjhaCz682rASiwITDc5uCyws2UAkCRooXGBkiR1sHIBlCmvJOipunJ9WdYbhIU08vxfhaj0eG4pjCi5loa1uv2XEY",
|
||
"sid_guard": "784e3e2545c5666529fa3ac9451ed016%7C1757499665%7C5184000%7CSun%2C+09-Nov-2025+10%3A21%3A05+GMT",
|
||
"session_tlb_tag": "sttt%7C17%7CeE4-JUXFZmUp-jrJRR7QFv_________3dxW8aRfOoXXRgtkArn2rBGBZ8E0_061PH30G5EzcU-M%3D",
|
||
"sid_ucp_v1": "1.0.0-KDIwMjIxOWE0MzY1NzA1YTZmNzY3NTRmZGJkMTU1OTg1ODc0MWI0MjkKGwjc5uCyws2UAhCRooXGBhiwISAMOAZA9AdIBBoCbGYiIDc4NGUzZTI1NDVjNTY2NjUyOWZhM2FjOTQ1MWVkMDE2",
|
||
"ssid_ucp_v1": "1.0.0-KDIwMjIxOWE0MzY1NzA1YTZmNzY3NTRmZGJkMTU1OTg1ODc0MWI0MjkKGwjc5uCyws2UAhCRooXGBhiwISAMOAZA9AdIBBoCbGYiIDc4NGUzZTI1NDVjNTY2NjUyOWZhM2FjOTQ1MWVkMDE2",
|
||
"PHPSESSID": "a72b9c2977908a281d086061f7281046",
|
||
"PHPSESSID_SS": "a72b9c2977908a281d086061f7281046",
|
||
"csrf_session_id": "623da6c2834082a9036c812d995f39cc"
|
||
}
|
||
|
||
# # !!!关键修改:使用同步方式启动!!!
|
||
# listener = DouYinListenerForGUI()
|
||
#
|
||
# # 创建新的事件循环来运行异步方法
|
||
# loop = asyncio.new_event_loop()
|
||
# asyncio.set_event_loop(loop)
|
||
#
|
||
# try:
|
||
# # 运行启动方法
|
||
# success = loop.run_until_complete(listener.start_with_cookies(store_id=None, cookie_dict=cookies))
|
||
# if success:
|
||
# print("✅ 监听器启动成功,按 Ctrl+C 停止...")
|
||
# # 保持主线程运行
|
||
# while listener.running:
|
||
# time.sleep(1)
|
||
# else:
|
||
# print("❌ 监听器启动失败")
|
||
# except KeyboardInterrupt:
|
||
# print("\n🛑 正在停止监听器...")
|
||
# listener.stop_listening()
|
||
# except Exception as e:
|
||
# print(f"❌ 程序异常: {e}")
|
||
# traceback.print_exc()
|
||
# finally:
|
||
# loop.close()
|
||
|
||
# 创建聊天机器人实例
|
||
bot = DouYinChatBot(cookie=cookies)
|
||
|
||
try:
|
||
# 启动机器人
|
||
bot.start()
|
||
|
||
except KeyboardInterrupt:
|
||
print("\n🛑 正在关闭机器人...")
|
||
|
||
|
||
# 清理资源
|
||
async def cleanup():
|
||
await bot.close()
|
||
|
||
|
||
# 创建临时事件循环进行清理
|
||
cleanup_loop = asyncio.new_event_loop()
|
||
asyncio.set_event_loop(cleanup_loop)
|
||
cleanup_loop.run_until_complete(cleanup())
|
||
cleanup_loop.close()
|
||
print("✅ 机器人已关闭")
|