Files
shuidrop_gui/Utils/QianNiu/QianNiuUtils.py
2025-09-13 19:54:30 +08:00

2766 lines
108 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
# python let's go
# 编辑人:kris思成
# sainiu_local_test.py
import asyncio
import json
import os
import uuid
import glob
import time
import hashlib
import subprocess
import logging
from dataclasses import asdict, dataclass
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional
import websockets
import aiohttp
import sys
import io
import threading
from Utils.message_models import PlatformMessage
import config
# 设置标准输出编码为UTF-8
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8')
# 千牛WebSocket管理器类
class QianNiuWebsocketManager:
"""千牛WebSocket管理器 - 管理多店铺连接"""
_instance = None
_lock = threading.Lock()
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._store = {}
cls._instance._lock = threading.RLock()
return cls._instance
def on_connect(self, shop_key, qianniu_client, **kwargs):
"""存储千牛连接信息"""
with self._lock:
entry = self._store.setdefault(shop_key, {
'platform': None,
'customers': [],
'user_assignments': {}
})
entry['platform'] = {
'qianniu_client': qianniu_client,
'message_handler': qianniu_client.message_handler if hasattr(qianniu_client,
'message_handler') else None,
'last_heartbeat': datetime.now(),
**kwargs
}
return entry
def get_connection(self, shop_key):
"""获取千牛连接信息"""
with self._lock:
return self._store.get(shop_key)
def remove_connection(self, shop_key):
"""移除千牛连接"""
with self._lock:
if shop_key in self._store:
del self._store[shop_key]
def get_all_connections(self):
"""获取所有千牛连接"""
with self._lock:
return dict(self._store)
def update_heartbeat(self, shop_key):
"""更新心跳时间"""
with self._lock:
if shop_key in self._store and self._store[shop_key]['platform']:
self._store[shop_key]['platform']['last_heartbeat'] = datetime.now()
# 千牛(淘宝)平台专用配置
QIANNIU_CONFIG = {
# 赛牛插件服务配置
"host": "127.0.0.1",
"port": 3030,
"ws_url": "ws://127.0.0.1:3030",
"http_api_url": "http://127.0.0.1:3030/QianNiu/Function",
"http_api_url_avatar": "http://127.0.0.1:3030/QianNiu/Api",
# 认证配置
"access_type": 1, # 企业版
"access_id": "maguabishop",
"access_key": "bWFndWFfYmlzaG9w",
# 默认签名配置
"sign_key": b'111111',
"sha256": True,
# 测试配置
"test_store_id": "test_store_001",
"test_store_name": "测试店铺",
"test_user_nick": "tb420723827:redboat",
"default_store_id": "4c4025e3-8702-42fc-bdc2-671e335c0ff7",
# 连接配置
"connect_timeout": 60,
"reconnect_attempts": 3,
"reconnect_delay": 5,
"heartbeat_interval": 20,
"message_timeout": 5.0,
# DLL配置
"dll_startup_timeout": 15,
"dll_ready_timeout": 30,
"dll_ready_check_delay": 2,
"service_startup_delay": 2
}
# 配置日志修复Unicode编码问题
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("sainiu_test.log", encoding='utf-8'),
logging.StreamHandler()
]
)
logger = logging.getLogger("SaiNiuTest")
# 赛牛插件配置
QN_WS_URL = "ws://127.0.0.1:3030" # 赛牛插件WebSocket地址
QN_HTTP_API_URL = "http://127.0.0.1:3030/QianNiu/Function"
QN_HTTP_API_URL_AVATAR = 'http://127.0.0.1:3030/QianNiu/Api' # 赛牛插件HTTP API地址
STORE_ID = "test_store_001" # 测试店铺ID
STORE_NAME = "测试店铺" # 测试店铺名称
USER_NICK = "tb420723827:redboat" # 测试账号
import ctypes
import hashlib
import time
from ctypes import c_int, c_bool, c_char_p
class SaiNiuService:
"""赛牛DLL服务管理类"""
def __init__(self):
self.dll_loaded = False
self.sainiu_api = None
self.port = 3030 # 默认端口
self.sign_key = b'' # 默认签名密钥
self.sha256 = True # 使用SHA256签名
# 获取项目根目录(相对于当前文件)
current_dir = os.path.dirname(os.path.abspath(__file__)) # Utils/QianNiu/
project_root = os.path.dirname(os.path.dirname(current_dir)) # shuidrop_gui/
self.python32_path = os.path.join(project_root, "Utils", "PythonNew32", "python32.exe")
self.dll_dir = os.path.join(project_root, "Utils", "PythonNew32")
self.dll_process = None
# 验证关键路径
self._validate_paths()
def _validate_paths(self):
"""验证关键路径是否存在"""
print(f"🔍 验证路径配置...")
print(f"📁 DLL目录: {self.dll_dir}")
print(f"🐍 Python32路径: {self.python32_path}")
if not os.path.exists(self.dll_dir):
print(f"⚠️ DLL目录不存在: {self.dll_dir}")
else:
print(f"✅ DLL目录存在")
if not os.path.exists(self.python32_path):
print(f"⚠️ Python32执行文件不存在: {self.python32_path}")
else:
print(f"✅ Python32执行文件存在")
# 检查DLL文件
dll_path = os.path.join(self.dll_dir, "SaiNiuApi.dll")
if not os.path.exists(dll_path):
print(f"⚠️ SaiNiuApi.dll文件不存在: {dll_path}")
else:
print(f"✅ SaiNiuApi.dll文件存在")
def _read_dll_log(self, log_file_path):
"""读取DLL日志文件"""
try:
if os.path.exists(log_file_path):
with open(log_file_path, 'r', encoding='utf-8') as f:
content = f.read()
if content:
print("📄 DLL启动日志文件内容:")
print("-" * 50)
print(content)
print("-" * 50)
else:
print("📄 DLL日志文件为空")
else:
print("📄 DLL日志文件不存在")
except Exception as e:
print(f"❌ 读取DLL日志文件失败: {e}")
def load_dll(self, dll_path=None):
"""加载DLL文件 - 使用Python32执行"""
try:
# 如果没有指定路径,使用默认的相对路径
if dll_path is None:
dll_path = os.path.join(self.dll_dir, "SaiNiuApi.dll")
# 验证路径是否存在
if not os.path.exists(dll_path):
print(f"❌ DLL文件不存在: {dll_path}")
return False
if not os.path.exists(self.python32_path):
print(f"❌ Python32执行文件不存在: {self.python32_path}")
return False
# 切换到DLL目录
dll_dir = os.path.dirname(dll_path)
original_dir = os.getcwd() # 保存当前目录
os.chdir(dll_dir)
print(f"📁 切换到DLL目录: {dll_dir}")
# 创建DLL启动脚本 - 增强版,带详细状态输出和日志
script_content = '''import ctypes
import hashlib
import time
import json
import sys
import os
# 强制刷新输出缓冲区
sys.stdout.reconfigure(line_buffering=True)
sys.stderr.reconfigure(line_buffering=True)
# 创建日志文件
log_file = open("dll_startup.log", "w", encoding="utf-8")
def log_print(msg):
"""同时输出到控制台和日志文件"""
print(msg)
log_file.write(msg + "\\n")
log_file.flush()
sys.stdout.flush()
log_print("=== SaiNiu DLL 启动脚本开始 ===")
log_print(f"Python版本: {sys.version}")
log_print(f"当前工作目录: {os.getcwd()}")
log_print(f"时间戳: {time.strftime('%Y-%m-%d %H:%M:%S')}")
try:
log_print("正在加载 SaiNiuApi.dll...")
# 加载DLL文件
sainiu_api = ctypes.CDLL('SaiNiuApi.dll')
log_print("✅ SaiNiuApi.dll 加载成功")
# 定义函数参数类型和返回值类型
sainiu_api.Access_ServerStart.argtypes = [
ctypes.c_int,
ctypes.c_bool,
ctypes.c_bool,
ctypes.c_bool,
ctypes.c_bool,
ctypes.c_int,
ctypes.c_char_p,
ctypes.c_bool,
ctypes.c_bool
]
sainiu_api.Access_ServerStart.restype = ctypes.c_char_p
log_print("✅ DLL函数类型定义完成")
# 调用函数时传入的参数
port = 3030
web_socket = True
http_server = True
remote = False
log = True
ws_max = 0
sign_key = b''
sha256 = True
version_tip = True
log_print(f"正在启动服务器 - 端口: {port}")
# 启动服务器
result = sainiu_api.Access_ServerStart(
port,
web_socket,
http_server,
remote,
log,
ws_max,
sign_key,
sha256,
version_tip
)
# 解码并打印结果
try:
result_str = result.decode('gbk')
log_print(f"✅ Access_ServerStart 服务器启动结果: {result_str}")
except UnicodeDecodeError:
log_print(f"✅ Access_ServerStart 服务器启动结果: {result}")
log_print("=== DLL服务启动完成进入监控模式 ===")
# 保持进程运行
try:
heartbeat_count = 0
while True:
time.sleep(5)
heartbeat_count += 1
log_print(f"❤️ DLL服务心跳 #{heartbeat_count} - 服务正常运行中...")
except KeyboardInterrupt:
log_print("收到中断信号,正在停止服务...")
except Exception as e:
log_print(f"❌ DLL启动失败: {e}")
import traceback
error_trace = traceback.format_exc()
log_print(f"错误详情: {error_trace}")
log_file.close()
sys.exit(1)
finally:
if 'log_file' in locals():
log_file.close()
'''
# 保存脚本(先清理旧的)
script_path = os.path.join(dll_dir, "dll_loader.py")
if os.path.exists(script_path):
print(f"🧹 清理旧的启动脚本: {script_path}")
os.remove(script_path)
print(f"📝 创建新的启动脚本: {script_path}")
with open(script_path, "w", encoding="utf-8") as f:
f.write(script_content)
# 使用Python32启动脚本
print(f"🚀 启动Python32进程: {self.python32_path}")
print(f"📄 脚本路径: {script_path}")
self.dll_process = subprocess.Popen(
[self.python32_path, "-u", script_path], # -u 参数强制不缓冲输出
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
creationflags=subprocess.CREATE_NEW_PROCESS_GROUP,
cwd=dll_dir,
encoding='gbk',
text=True,
bufsize=0 # 不缓冲
)
# 等待进程启动(改进版)
print("⏰ 等待Python32进程启动DLL服务...")
# 给进程一些时间启动
time.sleep(3)
# 检查进程是否还在运行
if self.dll_process.poll() is not None:
# 进程已结束,获取输出
stdout, stderr = self.dll_process.communicate()
print(f"❌ Python32进程意外退出")
print(f"标准输出: {stdout}")
print(f"错误输出: {stderr}")
return False
# 进程还在运行,检查进程状态和日志文件(增强版)
print("🔍 监控Python32进程状态和DLL日志...")
log_file_path = os.path.join(dll_dir, "dll_startup.log")
# 等待一段时间让DLL有机会启动
for i in range(15): # 延长到15秒
if self.dll_process.poll() is not None:
# 进程已退出
stdout, stderr = self.dll_process.communicate()
print(f"❌ Python32进程退出 (退出码: {self.dll_process.returncode})")
if stdout:
print(f"📋 标准输出: {stdout}")
if stderr:
print(f"❌ 错误输出: {stderr}")
# 尝试读取日志文件了解详细情况
self._read_dll_log(log_file_path)
return False
# 检查日志文件是否有新内容
if os.path.exists(log_file_path):
try:
with open(log_file_path, 'r', encoding='utf-8') as f:
log_content = f.read()
if log_content:
print(f"📋 DLL日志 (第{i + 1}秒):")
for line in log_content.strip().split('\n'):
if line.strip():
print(f" {line}")
# 检查是否有启动成功的标志
if "Access_ServerStart 服务器启动结果:" in log_content:
if any(keyword in log_content for keyword in ["成功", "SUCCESS", "200"]):
print("✅ 检测到DLL启动成功标志")
self.dll_loaded = True
return True
# 检查是否有错误
if "❌ DLL启动失败:" in log_content:
print("❌ 检测到DLL启动失败")
return False
except Exception as e:
print(f"⚠️ 读取日志文件失败: {e}")
print(f"⏰ 等待DLL启动... ({i + 1}/15)")
time.sleep(1)
# 15秒后进程仍在运行检查最终状态
if self.dll_process.poll() is None:
print("✅ Python32进程仍在运行")
# 最后读取一次日志文件
self._read_dll_log(log_file_path)
self.dll_loaded = True
return True
else:
print("❌ 进程已退出,启动失败")
self._read_dll_log(log_file_path)
return False
except Exception as e:
print(f"❌ 加载SaiNiu DLL失败: {e}")
# 恢复原始目录
try:
os.chdir(original_dir)
except:
pass
return False
finally:
# 确保恢复原始目录
try:
os.chdir(original_dir)
except:
pass
def sign_get_sign(self, post, timestamp):
"""计算签名 - 按照官方demo的逻辑修改"""
if not self.sign_key: # 如果签名为空,直接返回空字符串
return ""
# 拼接字符串
data = post + str(timestamp) + self.sign_key.decode('utf-8')
# 将字符串转换为字节集
byte_data = data.encode('utf-8')
# 根据sha256参数选择哈希算法
if self.sha256:
hash_object = hashlib.sha256(byte_data)
else:
hash_object = hashlib.md5(byte_data)
# 获取十六进制表示并转换为小写
return hash_object.hexdigest().lower()
def start_server(self, port=3030, sign_key=b'111111', sha256=True):
"""启动赛牛服务器已在load_dll中完成"""
if not self.dll_loaded:
print("❌ DLL未加载请先调用load_dll()")
return False
return True
def _check_port_listening(self):
"""检查端口是否在监听状态"""
try:
import socket
# 尝试连接端口
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(1)
result = s.connect_ex(('127.0.0.1', self.port))
if result == 0:
logger.info(f"端口 {self.port} 已在监听")
return True
else:
logger.warning(f"端口 {self.port} 未在监听 (错误码: {result})")
return False
except Exception as e:
logger.error(f"检查端口状态时出错: {e}")
return False
def _wait_for_port(self, timeout=30):
"""等待端口可用"""
start_time = time.time()
while time.time() - start_time < timeout:
if self._check_port_listening():
# 额外等待一下确保服务完全启动
time.sleep(2)
# 再次检查
if self._check_port_listening():
return True
time.sleep(1)
return False
def cleanup_old_process(self):
"""清理旧的进程和端口"""
try:
import os
import psutil
# 1. 终止所有相关Python进程
for proc in psutil.process_iter(['pid', 'name', 'cmdline']):
try:
if proc.info['name'] == 'python32.exe':
cmdline = proc.info['cmdline']
if any('SaiNiuApi.dll' in cmd for cmd in cmdline):
proc.kill() # 使用kill而不是terminate
logger.info(f"已强制终止旧进程: {proc.info['pid']}")
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
# 2. 清理端口占用
for conn in psutil.net_connections():
if conn.laddr.port == self.port:
try:
proc = psutil.Process(conn.pid)
proc.kill()
logger.info(f"已终止占用端口{self.port}的进程: {conn.pid}")
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
# 3. 等待进程完全退出
time.sleep(2)
# 4. 清理可能的临时文件
temp_files = [
os.path.join(self.dll_dir, "dll_loader.py"),
os.path.join(self.dll_dir, "*.tmp")
]
for pattern in temp_files:
for f in glob.glob(pattern):
try:
os.remove(f)
logger.info(f"已清理临时文件: {f}")
except:
pass
except Exception as e:
logger.error(f"清理旧进程时出错: {e}")
def close(self):
"""关闭DLL服务"""
if self.dll_process:
self.dll_process.terminate()
try:
self.dll_process.wait(timeout=5)
except subprocess.TimeoutExpired:
self.dll_process.kill()
# 简单的内存缓存实现替换Django的cache
class SimpleCache:
def __init__(self):
self._cache = {}
def get(self, key: str) -> Optional[Any]:
data = self._cache.get(key)
if data and data['expire'] > datetime.now():
return data['value']
return None
def set(self, key: str, value: Any, timeout: int):
self._cache[key] = {
'value': value,
'expire': datetime.now() + timedelta(seconds=timeout)
}
# 创建全局缓存实例
cache = SimpleCache()
class MockStore:
"""模拟店铺对象"""
def __init__(self, pk, name, account, password):
self.pk = pk
self.store_name = name
self.store_account = account
self.store_password = password
class AIServiceConnector:
"""AI服务连接器 - 负责与AI后端服务通信单连接多店铺架构"""
def __init__(self, backend_host=config.BACKEND_HOST, backend_port=config.BACKEND_PORT):
# 使用新的单连接多店铺架构
self.backend_host = backend_host
self.backend_port = backend_port
self.websocket = None
self.store_id = None
self.connected = False
self.connect_timeout = QIANNIU_CONFIG["connect_timeout"]
self.service_cookie = None
self.message_handlers = {} # 消息处理器字典
self.pending_ai_replies = {} # 等待AI回复的结果存储
self._cleanup_task = None # 清理任务
self.reconnect_attempts = 0
self.max_reconnect_attempts = QIANNIU_CONFIG["reconnect_attempts"]
self.reconnect_delay = QIANNIU_CONFIG["reconnect_delay"]
async def connect_with_token(self, exe_token):
"""使用用户token连接后端AI服务新架构"""
try:
# 使用新的单连接多店铺URL格式
full_url = config.get_gui_ws_url(exe_token)
print(f"🔗 连接到后端服务: {full_url}")
# 添加连接超时和重试机制
self.websocket = await asyncio.wait_for(
websockets.connect(full_url),
timeout=self.connect_timeout
)
# 等待连接确认(设置超时)
try:
response = await asyncio.wait_for(self.websocket.recv(), timeout=16.0)
data = json.loads(response)
print(f"🔗 收到后端连接响应: {data}")
# 处理新架构的连接成功消息
if data.get("type") == "success" and data.get("content") == "connected":
self.connected = True
self._log(f"✅ 后端服务连接成功(单连接多店铺架构)", "SUCCESS")
# 启动清理任务
self._start_cleanup_task()
return True
elif data.get("type") == "connect_success":
# 兼容旧版连接成功消息
self.connected = True
self._log(f"✅ AI服务连接成功: {data.get('content')}", "SUCCESS")
# 从连接成功消息中提取认证信息
content = data.get("content", "")
if content:
try:
# 解析认证信息
self.service_cookie = {
'cookie': content,
'status': True
}
self._log(f"✅ 从服务获取到认证信息", "SUCCESS")
except Exception as e:
self._log(f"❌ 解析认证信息失败: {e}", "ERROR")
self.service_cookie = None
# 启动清理任务
self._start_cleanup_task()
return True
elif data.get("type") == "error":
# 处理错误消息
error_data = data.get("data", {})
if error_data.get('verify_link'):
error_msg = error_data.get('verify_link')
self._log(f"❌ 服务器返回错误,需要验证: {error_msg}", "ERROR")
self.service_cookie = {
'status': False,
'verify_link': error_msg
}
else:
error_msg = data.get("message", "未知错误")
self._log(f"❌ 服务器返回错误: {error_msg}", "ERROR")
self.service_cookie = {
'status': False,
'verify_link': None
}
return False
else:
self._log(f"❌ 服务连接失败: {data.get('content')}", "ERROR")
return False
except asyncio.TimeoutError:
self._log("⏰ 等待连接确认超时,但连接可能已建立", "WARNING")
self.connected = True
return True
except asyncio.TimeoutError:
self._log(f"❌ 连接服务超时({self.connect_timeout}秒)", "ERROR")
return False
except Exception as e:
self._log(f"❌ 连接服务失败: {e}", "ERROR")
return False
async def start_message_loop(self):
"""启动统一的消息循环处理所有后端消息"""
self._log("🔵 启动统一消息循环...", "INFO")
while True: # 改为无限循环,支持自动重连
try:
if not self.connected or not self.websocket:
self._log("🔌 连接断开,尝试重连...", "WARNING")
if not await self._reconnect():
await asyncio.sleep(self.reconnect_delay)
continue
try:
message = await asyncio.wait_for(self.websocket.recv(), timeout=30.0)
data = json.loads(message)
message_type = data.get("type")
self._log(f"📥 收到消息类型: {message_type}", "DEBUG")
# 分发消息给对应的处理器
await self._dispatch_message(data, message_type)
except asyncio.TimeoutError:
continue
except websockets.exceptions.ConnectionClosed as e:
self._log(f"🔌 WebSocket连接关闭: {e}", "WARNING")
self.connected = False
await self._handle_connection_closed()
continue
except Exception as e:
self._log(f"❌ 消息接收异常: {e}", "ERROR")
await asyncio.sleep(1)
except Exception as e:
self._log(f"❌ 消息循环异常: {e}", "ERROR")
await asyncio.sleep(1)
async def _reconnect(self, exe_token=None):
"""重连机制(新架构)"""
if self.reconnect_attempts >= self.max_reconnect_attempts:
self._log("❌ 重连次数超过限制,停止重连", "ERROR")
return False
try:
self.reconnect_attempts += 1
self._log(f"🔄 尝试第 {self.reconnect_attempts} 次重连...", "WARNING")
# 关闭旧连接
if self.websocket:
await self.websocket.close()
# 重新连接需要exe_token
if exe_token:
success = await self.connect_with_token(exe_token)
else:
self._log("❌ 重连失败缺少exe_token", "ERROR")
return False
if success:
self.reconnect_attempts = 0 # 重置重试计数
self._log("✅ 重连成功", "SUCCESS")
return True
else:
self._log(f"❌ 第 {self.reconnect_attempts} 次重连失败", "WARNING")
return False
except Exception as e:
self._log(f"❌ 重连过程中出现异常: {e}", "ERROR")
return False
async def send_message_to_backend(self, message_template):
"""发送消息到后端并获取AI回复"""
# 简化连接检查主要依赖websockets库自身的错误处理
if not hasattr(self, 'websocket') or self.websocket is None:
self._log("❌ AI服务未连接尝试重连...", "WARNING")
if not await self._reconnect():
self._log("❌ 重连失败,无法发送消息", "ERROR")
return None
try:
message_dict = message_template.to_dict()
# 使用sender.id作为匹配键因为AI回复会使用receiver.id
sender_info = message_dict.get("sender", {})
sender_id = sender_info.get("id") if sender_info else None
if not sender_id:
self._log("❌ 消息中没有发送者ID", "ERROR")
return None
# 详细的调试信息
print("=" * 60)
print("🔍 调试信息 - 准备发送消息到后端:")
print(f"连接状态: {self.connected}")
print(f"WebSocket状态: {self.websocket is not None}")
print(f"消息ID: {sender_id}")
print(f"消息内容: {message_dict.get('content', '')[:100]}...")
print("=" * 60)
# 创建回复等待记录(添加时间戳用于清理)
self.pending_ai_replies[sender_id] = {
'reply': None,
'received': False,
'created_at': asyncio.get_event_loop().time()
}
await self.websocket.send(json.dumps(message_dict))
self._log(f"📤 已发送消息到AI服务: {message_template.content[:50]}...", "DEBUG")
# 轮询等待AI回复设置超时
try:
timeout = 15.0
start_time = asyncio.get_event_loop().time()
while True:
# 检查是否收到回复
if sender_id in self.pending_ai_replies:
reply_info = self.pending_ai_replies[sender_id]
if reply_info.get('received'):
ai_reply = reply_info.get('reply')
self.pending_ai_replies.pop(sender_id, None)
self._log(f"✅ 成功获取AI回复: {ai_reply[:50]}...", "SUCCESS")
return ai_reply
# 检查超时
if asyncio.get_event_loop().time() - start_time > timeout:
self._log("⏰ 等待AI回复超时15秒", "WARNING")
self.pending_ai_replies.pop(sender_id, None)
return None
# 短暂等待后继续检查
await asyncio.sleep(0.1)
except Exception as e:
self._log(f"❌ 等待AI回复异常: {e}", "ERROR")
self.pending_ai_replies.pop(sender_id, None)
return None
except ConnectionError as e:
self._log(f"❌ 连接已断开: {e}", "ERROR")
self.pending_ai_replies.pop(sender_id, None)
return None
except websockets.exceptions.ConnectionClosed:
self._log("❌ 发送消息时连接已关闭", "ERROR")
self.connected = False
self.pending_ai_replies.pop(sender_id, None)
return None
except Exception as e:
self._log(f"❌ 发送消息到AI服务失败: {e}", "ERROR")
self.pending_ai_replies.pop(sender_id, None)
return None
async def _dispatch_message(self, data, message_type):
"""分发消息到对应的处理器(更新为新协议)"""
# 处理message类型统一消息处理
if message_type == "message":
# 使用receiver.id作为匹配键
receiver_info = data.get("receiver", {})
receiver_id = receiver_info.get("id") if receiver_info else None
# 添加调试日志
self._log(f"处理AI回复 - receiver_id: {receiver_id}", "DEBUG")
self._log(f"当前等待的回复列表: {list(self.pending_ai_replies.keys())}", "DEBUG")
if receiver_id and receiver_id in self.pending_ai_replies:
# 这是对买家消息的AI回复设置回复结果
reply_info = self.pending_ai_replies.get(receiver_id)
if reply_info and not reply_info.get('received'):
content = data.get("content", "")
self._log(f"设置AI回复结果: {content[:50]}...", "DEBUG")
reply_info['reply'] = content
reply_info['received'] = True
else:
self._log("回复已接收或已取消", "WARNING")
else:
# 这是后端主动推送的消息,需要转发给买家
self._log(f"后端主动推送消息: {receiver_id}", "WARNING")
# 打印所有等待中的回复,用于调试
for mid, info in self.pending_ai_replies.items():
self._log(f"等待中的回复 - {mid}: {info}", "DEBUG")
# 只有在没有对应等待时才调用消息处理器(避免重复处理)
handler = self.message_handlers.get("message")
if handler and callable(handler):
# 添加任务管理避免泄露
task = asyncio.create_task(handler(data))
task.add_done_callback(self._handle_task_completion)
# 处理transfer类型消息
elif message_type == "transfer":
receiver_info = data.get("receiver", {})
receiver_id = receiver_info.get("id") if receiver_info else None
if receiver_id and receiver_id in self.pending_ai_replies:
reply_info = self.pending_ai_replies.get(receiver_id)
if reply_info and not reply_info.get('received'):
reply_info['reply'] = data
reply_info['received'] = True
# 处理其他协议消息类型
elif message_type in ["connect_success", "error", "staff_list"]:
handler = self.message_handlers.get(message_type)
if handler and callable(handler):
task = asyncio.create_task(handler(data))
task.add_done_callback(self._handle_task_completion)
# 记录未处理的消息类型
else:
self._log(f"📨 未处理的消息类型: {message_type}", "DEBUG")
def _handle_task_completion(self, task):
"""处理异步任务完成(避免任务泄露)"""
try:
if task.exception():
self._log(f"❌ 异步任务异常: {task.exception()}", "ERROR")
except Exception as e:
self._log(f"❌ 处理任务完成时异常: {e}", "ERROR")
def register_message_handler(self, message_type, handler):
"""注册消息处理器"""
self.message_handlers[message_type] = handler
self._log(f"✅ 注册消息处理器: {message_type}", "DEBUG")
async def _parse_ai_response(self, response_data):
"""解析后端返回的AI回复"""
try:
# 处理连接成功消息
if response_data.get("type") == "connect_success":
content = response_data.get("content", "")
if content:
self.service_cookie = {
'cookie': content,
'status': True
}
self._log("✅ 从连接成功消息中提取到认证信息", "SUCCESS")
return content
# 检查是否为AI回复消息
if response_data.get("type") == "message":
content = response_data.get("content", "")
if content:
self._log(f"✅ 成功解析AI回复: {content}", "SUCCESS")
return content
# 检查其他可能的格式
if "content" in response_data:
content = response_data.get("content")
if content:
self._log(f"✅ 从content字段获取到回复: {content}", "SUCCESS")
return content
self._log(f"⚠️ 无法识别的回复格式: {response_data}", "WARNING")
return None
except Exception as e:
self._log(f"❌ 解析AI回复失败: {e}", "ERROR")
return None
def _start_cleanup_task(self):
"""启动清理任务"""
if self._cleanup_task and not self._cleanup_task.done():
self._cleanup_task.cancel()
self._cleanup_task = asyncio.create_task(self._periodic_cleanup())
async def _periodic_cleanup(self):
"""定期清理超时的回复等待"""
while self.connected:
try:
current_time = asyncio.get_event_loop().time()
expired_ids = []
# 检查超时的回复等待30秒超时
for message_id, reply_info in self.pending_ai_replies.items():
if isinstance(reply_info, dict):
create_time = reply_info.get('created_at', 0)
if current_time - create_time > 30: # 30秒超时
expired_ids.append(message_id)
else:
# 兼容旧格式
expired_ids.append(message_id)
# 清理超时的回复等待
for message_id in expired_ids:
self.pending_ai_replies.pop(message_id, None)
self._log(f"🧹 清理超时回复等待: {message_id}", "DEBUG")
# 每10秒清理一次
await asyncio.sleep(10)
except asyncio.CancelledError:
break
except Exception as e:
self._log(f"❌ 清理异常: {e}", "ERROR")
await asyncio.sleep(10)
async def close(self):
"""关闭连接"""
# 停止清理任务
if self._cleanup_task and not self._cleanup_task.done():
self._cleanup_task.cancel()
# 清理所有等待的回复
self.pending_ai_replies.clear()
if self.websocket:
await self.websocket.close()
self.connected = False
self._log("🔌 已关闭AI服务连接", "INFO")
def _log(self, message, level="INFO"):
"""日志记录"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 根据日志级别添加颜色
if level == "ERROR":
print(f"\033[91m[{timestamp}] [{level}] {message}\033[0m") # 红色
elif level == "WARNING":
print(f"\033[93m[{timestamp}] [{level}] {message}\033[0m") # 黄色
elif level == "SUCCESS":
print(f"\033[92m[{timestamp}] [{level}] {message}\033[0m") # 绿色
elif level == "DEBUG":
print(f"\033[96m[{timestamp}] [{level}] {message}\033[0m") # 蓝色
else:
print(f"[{timestamp}] [{level}] {message}")
def get_service_cookie(self):
"""获取服务认证信息"""
return self.service_cookie
class AIServiceIntegration:
"""AI服务集成类 - 负责处理AI回复相关功能单连接多店铺架构"""
def __init__(self, store_id=None, exe_token=None):
self.store_id = store_id or QIANNIU_CONFIG["default_store_id"]
self.exe_token = exe_token # 新增:用户执行令牌
self.ai_service = AIServiceConnector()
self.loop = None
self._ensure_event_loop()
self.retry_count = 0
self.max_retries = 3
def _ensure_event_loop(self):
"""确保事件循环存在"""
try:
self.loop = asyncio.get_event_loop()
except RuntimeError:
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
async def initialize_ai_service(self):
"""初始化AI服务新架构"""
try:
self._log("正在初始化AI服务单连接多店铺架构...", "INFO")
if not self.exe_token:
self._log("❌ 缺少exe_token无法初始化AI服务", "ERROR")
return False
success = await self.ai_service.connect_with_token(self.exe_token)
if success:
self._log("AI服务初始化成功", "SUCCESS")
self.retry_count = 0
# 启动统一的消息循环
asyncio.create_task(self.ai_service.start_message_loop())
self._log("✅ 消息循环已启动", "SUCCESS")
return True
else:
self._log("AI服务初始化失败", "ERROR")
return False
except Exception as e:
self._log(f"AI服务初始化异常: {e}", "ERROR")
return False
def register_message_handlers(self, handlers):
"""注册消息处理器"""
for message_type, handler in handlers.items():
self.ai_service.register_message_handler(message_type, handler)
async def get_ai_reply(self, message_content, sender_nick, avatar_url=""):
"""获取AI回复"""
try:
# 确保连接正常
if not await self.ensure_connection():
return "您好,感谢您的咨询,我们会尽快回复您!"
if message_content == "当前用户来自 商品详情页":
return "查看商品中~~~~"
# 检测消息类型
msg_type = "text" # 默认文本类型
control_type = str(message_content)
if control_type.__contains__("http") and (
control_type.__contains__(".jpg") or control_type.__contains__(".png")):
msg_type = "image"
elif control_type.__contains__("http") and (
control_type.__contains__(".mp4") or control_type.__contains__(".mov")):
msg_type = "video"
elif control_type.__contains__("https://h5.m.taobao.com/awp"):
msg_type = "product_card"
elif control_type.__contains__("订单号"):
msg_type = "order_card"
# 创建消息模板对象(避免重复创建)
message_template = PlatformMessage(
type="message",
content=message_content,
msg_type=msg_type,
sender={
"id": sender_nick,
},
pin_image=avatar_url,
store_id=self.store_id,
)
print(f"最终发送给后端的数据📕:{message_template.to_dict()}")
# 获取AI回复
ai_reply = await self.ai_service.send_message_to_backend(
message_template=message_template
)
if ai_reply:
self._log(f"成功获取AI回复: {ai_reply[:50]}...", "SUCCESS")
else:
self._log("未获取到AI回复", "WARNING")
# 未获取到正常回复 增加重试计数
self.retry_count += 1
if self.retry_count >= self.max_retries:
self._log(f"重试次数超过限制,停止重试", "ERROR")
return ai_reply
except Exception as e:
logger.error(f"获取AI回复失败: {e}")
return "您好,感谢您的咨询,我们会尽快回复您!"
# async def start_backend_listening(self, message_callback):
# """启动后端消息监听"""
# if not await self.ensure_connection():
# self._log("❌ AI服务连接失败无法启动后端监听", "ERROR")
# return False
#
# try:
# # 直接调用AIServiceConnector的监听方法
# asyncio.create_task(
# self.ai_service.listen_for_backend_messages(message_callback)
# )
# self._log("✅ 后端消息监听已启动", "SUCCESS")
# return True
# except Exception as e:
# self._log(f"❌ 启动后端监听失败: {e}", "ERROR")
# return False
def _log(self, message, level="INFO"):
"""日志记录"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 根据日志级别添加颜色
if level == "ERROR":
print(f"\033[91m[{timestamp}] [{level}] {message}\033[0m") # 红色
elif level == "WARNING":
print(f"\033[93m[{timestamp}] [{level}] {message}\033[0m") # 黄色
elif level == "SUCCESS":
print(f"\033[92m[{timestamp}] [{level}] {message}\033[0m") # 绿色
elif level == "DEBUG":
print(f"\033[96m[{timestamp}] [{level}] {message}\033[0m") # 蓝色
else:
print(f"[{timestamp}] [{level}] {message}")
async def check_connection(self):
"""检查AI服务连接状态"""
if not self.ai_service.connected:
self._log("AI服务连接已断开尝试重新连接...", "WARNING")
return await self.initialize_ai_service()
return True
async def ensure_connection(self):
"""确保AI服务连接正常"""
try:
# 如果标记为未连接,直接尝试重连
if not self.ai_service.connected:
self._log("AI服务连接已断开尝试重新连接...", "WARNING")
success = await self.ai_service.connect(self.store_id)
return success
# 简单检查如果websocket对象存在且不是None就认为连接正常
# 实际的连接状态会在发送消息时由websockets库自动处理
if (hasattr(self.ai_service, 'websocket') and
self.ai_service.websocket is not None):
return True
else:
self._log("WebSocket连接不存在", "WARNING")
self.ai_service.connected = False
return await self.ensure_connection()
except Exception as e:
self._log(f"检查连接状态异常: {e}", "ERROR")
return False
async def close(self):
"""关闭AI服务"""
try:
self._log("正在关闭AI服务...", "INFO")
if self.ai_service:
await self.ai_service.close()
self._log("AI服务已关闭", "SUCCESS")
except Exception as e:
self._log(f"关闭AI服务时发生错误: {e}", "ERROR")
class QianNiuClient:
"""千牛客户端类 - 负责与赛牛插件通信"""
def __init__(self, user_nick=None):
self.user_nick: str = user_nick
self.qn_ws = None
self.pending: Dict[str, dict] = {}
self.is_connected = False
self.is_authenticated = False # 改为连接成功即认证
self.message_handler = None
self._http_session = None # 添加会话
# 新增:正式账号认证信息
self.access_type = 1 # 企业版
self.access_id = "maguabishop" # 你的AccessId
self.access_key = "bWFndWFfYmlzaG9w" # 你的AccessKey
self.auth_token = None # 认证成功后获取的token
self.udid = None # 设备唯一标识
# 赛牛服务
self.sainiu_service = SaiNiuService()
async def __aenter__(self):
"""异步上下文管理器入口"""
self._http_session = aiohttp.ClientSession()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""异步上下文管理器出口"""
if self._http_session:
await self._http_session.close()
# 在 QianNiuClient 类中添加HTTP API调用方法 (新增)
async def _http_api_call(self, post_name: str, data_obj: Dict[str, Any]) -> Dict[str, Any]:
"""按官方文档以 form-data 方式调用 QianNiu/Api 接口"""
form = aiohttp.FormData()
form.add_field("post", post_name)
form.add_field("data", json.dumps(data_obj))
logger.debug(f"[TB-DIAG] HTTP CALL -> {post_name} url={QN_HTTP_API_URL_AVATAR} data={data_obj}")
try:
async with aiohttp.ClientSession() as session:
async with session.post(QN_HTTP_API_URL_AVATAR, data=form, timeout=10) as resp:
text = await resp.text()
logger.debug(f"[TB-DIAG] HTTP RESP <- {post_name} status={resp.status} body={text[:300]}")
if resp.status != 200:
logger.error(f"[QianNiuClient] HTTP接口 {post_name} 调用失败: status={resp.status}")
return {}
try:
return json.loads(text)
except Exception:
logger.error(f"[TB-DIAG] HTTP RESP JSON decode error on {post_name}")
return {}
except Exception as e:
logger.error(f"[QianNiuClient] HTTP接口 {post_name} 调用异常: {e}")
return {}
async def fetch_buyer_avatar_by_http(self, buyer_nick: str) -> str:
"""通过 HTTP GetUserIcon 获取买家头像"""
if not buyer_nick:
return ""
cache_key = f"avatar:taobao:nick:{buyer_nick}"
cached = cache.get(cache_key)
if cached:
logger.info(f"[TB-头像调试] 💾 缓存命中: {cached}")
return cached
# 使用正确的 GetUserIcon API
logger.info(f"[TB-头像调试] 📡 调用GetUserIcon API获取头像 - 买家昵称: {buyer_nick}")
resp = await self._http_api_call("GetUserIcon", {
"userNick": self.user_nick,
"buyerNick": buyer_nick,
"siteid": "cntaobao"
})
if not resp:
logger.warning(f"[TB-头像调试] ❌ HTTP GetUserIcon 调用失败")
return ""
# 完整记录API返回数据
logger.info(f"[TB-头像调试] 📥 GetUserIcon 完整返回数据: {json.dumps(resp, ensure_ascii=False, indent=2)}")
# 解析返回数据获取头像
try:
if resp.get("code") != 200:
logger.error(f"[TB-头像调试] ❌ API返回错误: code={resp.get('code')}, msg={resp.get('msg')}")
return ""
data_block = resp.get("data") or {}
logger.info(f"[TB-头像调试] 🔍 data块内容: {json.dumps(data_block, ensure_ascii=False, indent=2)}")
# 根据API文档返回格式为: {"cntaobaotb266770389534":{"iconUrl":"https://..."}}
# 构建key: siteid + buyerNick
avatar_key = f"cntaobao{buyer_nick}"
avatar_data = data_block.get(avatar_key, {})
avatar_url = avatar_data.get("iconUrl", "")
if avatar_url:
logger.info(f"[TB-头像调试] ✅ 成功获取头像: {avatar_url}")
# 处理相对协议的URL
if avatar_url.startswith("//"):
avatar_url = "https:" + avatar_url
# 缓存头像URL24小时
cache.set(cache_key, avatar_url, 60 * 60 * 24)
return avatar_url
else:
logger.warning(f"[TB-头像调试] ❌ 未找到头像信息key={avatar_key}")
return ""
except Exception as e:
logger.error(f"[TB-头像调试] GetUserIcon 解析错误: {e}")
return ""
# 新增连接的初始化
async def _request_authorization(self):
"""请求授权 - 按照官方demo修改"""
trace_id = "Access_Init" # 使用固定的traceId
try:
# 获取当前的13位时间戳毫秒级
current_time = int(time.time() * 1000)
# 计算JSON数据的签名参数post参数为"Init"
json_sign = self.sainiu_service.sign_get_sign("Init", current_time)
auth_msg = {
"type": "Invoke_SaiNiu",
"traceId": trace_id,
"codeType": "Access",
"post": "Init",
"data": {
"AccessType": 1,
"AccessId": "maguabishop",
"AccessKey": "bWFndWFfYmlzaG9w",
"ForceLogin": True
},
"time": current_time,
"sign": json_sign
}
await self.qn_ws.send(json.dumps(auth_msg))
logger.info("已发送授权请求")
print(f"Sent message: {auth_msg}")
# 不再等待响应,让消息循环处理响应
return True
except Exception as e:
logger.error(f"授权请求异常: {e}")
return False
async def _system_initialization(self):
"""系统初始化 - 按照官方demo修改"""
trace_id = "0470bf9489729b2e8a2126a04ab3e272" # 使用固定的traceId
try:
# 获取当前的13位时间戳毫秒级
current_time = int(time.time() * 1000)
# 计算新的JSON数据的签名参数post参数为"SysInit"
sys_init_sign = self.sainiu_service.sign_get_sign("SysInit", current_time)
init_msg = {
"type": "Invoke_SaiNiu",
"traceId": trace_id,
"codeType": "Access",
"post": "SysInit",
"data": {
"number": 20,
"port": 2030,
"token": True,
"connect": True,
"disconnect": True,
"newMessage": True,
"groupMessage": True,
"notice": True,
"event": True,
"assistant": True
},
"time": current_time,
"sign": sys_init_sign
}
await self.qn_ws.send(json.dumps(init_msg))
logger.info("已发送系统初始化请求")
print(f"Sent message: {init_msg}")
# 不再等待响应,让消息循环处理
return True
except Exception as e:
logger.error(f"系统初始化异常: {e}")
return False
# async def connect(self):
# """连接到赛牛插件 - 修改为不发送初始化消息"""
# logger.info(f"尝试连接到赛牛插件: {QN_WS_URL}")
# print("=== 断点1: 开始连接赛牛插件 ===")
#
# try:
# # 连接WebSocket
# self.qn_ws = await websockets.connect(QN_WS_URL, ping_interval=20, ping_timeout=10)
# self.is_connected = True
# logger.info("成功连接到赛牛插件")
# print("=== 断点2: 成功连接到赛牛插件 ===")
#
# # 正式账号需要执行授权流程
# print("=== 开始正式账号授权流程 ===")
#
# # 1. 请求授权
# auth_success = await self._request_authorization()
# if not auth_success:
# logger.error("授权请求失败")
# return False
#
# # 2. 系统初始化
# init_success = await self._system_initialization()
# if not init_success:
# logger.error("系统初始化失败")
# return False
#
#
# # 测试账号不需要初始化,连接成功即认证成功
# self.is_authenticated = True
# logger.info("✅ 正式账号认证和初始化成功")
#
# return True
#
# except Exception as e:
# logger.error(f"连接赛牛插件失败: {e}")
# print(f"=== 断点3: 连接失败 - {e} ===")
# self.is_connected = False
# return False
async def connect(self):
"""连接到赛牛插件 - 完整流程"""
logger.info("开始完整的赛牛连接流程")
try:
# 1. 加载并启动DLL服务
if not await self._start_sainiu_service():
logger.error("启动赛牛DLL服务失败")
return False
# 2. 连接到WebSocket
if not await self._connect_websocket():
logger.error("连接WebSocket失败")
return False
# 3. 执行授权流程
if not await self._request_authorization():
logger.error("授权请求失败")
return False
# 4. 系统初始化
if not await self._system_initialization():
logger.error("系统初始化失败")
return False
self.is_authenticated = True
logger.info("✅ 完整的赛牛连接流程成功")
return True
except Exception as e:
logger.error(f"连接赛牛插件失败: {e}")
self.is_connected = False
return False
async def _start_sainiu_service(self):
"""启动赛牛DLL服务"""
try:
# 加载DLL使用默认相对路径
if not self.sainiu_service.load_dll():
return False
success = True
if success:
logger.info("✅ 赛牛DLL服务启动成功")
# 等待服务完全启动
await asyncio.sleep(2)
return True
else:
logger.error("❌ 赛牛DLL服务启动失败")
return False
except Exception as e:
logger.error(f"启动赛牛服务异常: {e}")
return False
async def _connect_websocket(self):
"""连接到WebSocket服务器"""
max_retries = 5
retry_delay = 3
for attempt in range(max_retries):
try:
# 获取当前的13位时间戳毫秒级
current_time = int(time.time() * 1000)
# 计算签名
ws_sign = self.sainiu_service.sign_get_sign("ws", current_time)
# 构建URI
if ws_sign: # 如果有签名
uri = f"ws://127.0.0.1:{self.sainiu_service.port}?time={current_time}&sign={ws_sign}"
else: # 如果没有签名
uri = f"ws://127.0.0.1:{self.sainiu_service.port}"
print(f"连接URI: {uri}")
# 连接WebSocket
self.qn_ws = await websockets.connect(uri)
self.is_connected = True
print("✅ WebSocket连接成功")
# 连接成功后立即执行授权和初始化(只执行一次)
if not self.is_authenticated:
# 1. 发送授权请求
current_time = int(time.time() * 1000)
json_sign = self.sainiu_service.sign_get_sign("Init", current_time)
auth_msg = {
"type": "Invoke_SaiNiu",
"traceId": "Access_Init",
"codeType": "Access",
"post": "Init",
"data": {
"AccessType": 1,
"AccessId": self.access_id,
"AccessKey": self.access_key,
"ForceLogin": True
},
"time": current_time,
"sign": json_sign
}
await self.qn_ws.send(json.dumps(auth_msg))
print(f"已发送授权请求: {auth_msg}")
# 2. 等待授权响应
response = await self.qn_ws.recv()
response_data = json.loads(response)
print(f"收到授权响应: {response}")
if response_data.get("traceId") == "Access_Init":
# 解析授权响应
return_data = response_data.get("returnData", {})
code = return_data.get("code", 0)
if code == 200:
self.auth_token = return_data.get("token")
self.udid = return_data.get("udid")
logger.info(f"✅ 授权成功 - token: {self.auth_token}")
# 3. 发送系统初始化请求
current_time = int(time.time() * 1000)
sys_init_sign = self.sainiu_service.sign_get_sign("SysInit", current_time)
init_msg = {
"type": "Invoke_SaiNiu",
"traceId": "0470bf9489729b2e8a2126a04ab3e272",
"codeType": "Access",
"post": "SysInit",
"data": {
"number": 20,
"port": 3030,
"token": True,
"connect": True,
"disconnect": True,
"newMessage": True,
"groupMessage": True,
"notice": True,
"event": True,
"assistant": True
},
"time": current_time,
"sign": sys_init_sign
}
await self.qn_ws.send(json.dumps(init_msg))
print(f"已发送系统初始化请求: {init_msg}")
# 4. 等待初始化响应
response = await self.qn_ws.recv()
response_data = json.loads(response)
print(f"收到初始化响应: {response}")
return_data = response_data.get("returnData", {})
code = return_data.get("code", 0)
if code == 200:
self.is_authenticated = True
print("✅ 授权和初始化成功")
# 初始化完成后再等待一下确保服务完全就绪
await asyncio.sleep(2)
return True
else:
error_msg = return_data.get("msg", "未知错误")
print(f"❌ 系统初始化失败: {error_msg}")
return False
else:
error_msg = return_data.get("msg", "未知错误")
print(f"❌ 授权失败: {error_msg}")
return False
return True
except Exception as e:
print(f"连接失败 (尝试 {attempt + 1}/{max_retries}): {e}")
if attempt < max_retries - 1:
await asyncio.sleep(retry_delay)
continue
return False
return False
async def _check_port_available(self):
"""检查端口是否可用"""
try:
import socket
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
result = s.connect_ex(('127.0.0.1', self.sainiu_service.port))
return result == 0
except:
return False
async def _sys_init(self):
"""系统初始化"""
logger.info("开始赛牛插件初始化")
print("=== 断点4: 开始初始化赛牛插件 ===")
# 发送初始化消息
init_msg = {
"type": "Invoke_SaiNiu",
"traceId": str(uuid.uuid4()),
"codeType": "Access",
"post": "Init",
"data": {
"userNick": self.user_nick
}
}
await self.qn_ws.send(json.dumps(init_msg))
logger.info("已发送初始化消息")
print("=== 断点5: 初始化消息已发送 ===")
print(f"初始化消息内容: {json.dumps(init_msg, indent=2, ensure_ascii=False)}")
async def start_listening(self, message_handler):
"""开始监听消息"""
self.message_handler = message_handler
# 启动消息监听和心跳任务
listen_task = asyncio.create_task(self._message_loop())
heartbeat_task = asyncio.create_task(self._heartbeat_loop())
logger.info("开始监听千牛消息...")
print("=== 断点6: 开始监听千牛消息 ===")
return listen_task, heartbeat_task
async def _heartbeat_loop(self):
"""心跳循环"""
print("=== 断点7: 心跳循环开始 ===")
while self.is_connected:
try:
# 发送心跳
heartbeat_msg = {"type": "ping"}
await self.qn_ws.send(json.dumps(heartbeat_msg))
print("=== 断点8: 心跳已发送 ===")
await asyncio.sleep(20)
except Exception as e:
self.is_connected = False
break
async def _message_loop(self):
"""消息监听循环"""
print("=== 断点9: 消息监听循环开始 ===")
try:
# # 确保正确启动后端监听
# if (self.message_handler and
# hasattr(self.message_handler, 'ai_service') and
# self.message_handler.ai_service and
# hasattr(self.message_handler.ai_service.ai_service, 'listen_for_backend_messages')):
#
# print("🚀 启动后端消息监听任务", "DEBUG")
# # 创建监听任务
# asyncio.create_task(
# self.message_handler.ai_service.ai_service.listen_for_backend_messages(
# self.message_handler.handle_customer_message
# )
# )
# else:
# print("⚠️ 无法启动后端监听:缺少必要的属性或方法", "WARNING")
async for message in self.qn_ws:
print("=== 断点10: 收到原始消息 ===")
await self._handle_message(message)
except Exception as e:
logger.error(f"消息监听异常: {e}")
self.is_connected = False
async def _handle_message(self, raw_message):
"""处理接收到的消息"""
print("=== 断点11: 开始处理消息 ===")
try:
data = json.loads(raw_message)
msg_type = data.get("type")
logger.info(f"收到消息类型: {msg_type}")
print(f"=== 断点12: 解析消息类型 - {msg_type} ===")
print(f"收到的全消息体结构为: {data}")
# 处理心跳响应
if msg_type == "pong":
print("=== 断点13: 收到心跳响应(pong) ===")
return
# 处理连接成功消息 - 新增这个处理
if msg_type == "FUNCTION_CONNECTED":
print("=== 函数连接已建立 ===")
print(f"连接详情: {json.dumps(data, indent=2, ensure_ascii=False)}")
# 检查是否有认证信息
if hasattr(self, 'auth_token') and self.auth_token:
self.is_authenticated = True
logger.info("✅ 检测到认证token设置认证状态为 True")
else:
logger.warning("⚠️ 未检测到认证token认证状态保持为 False")
return
# 处理API响应
elif msg_type in ["Invoke_SaiNiu", "Invoke_QianNiu"]:
code_type = data.get("codeType")
trace_id = data.get("traceId")
print(f"=== API响应: type={msg_type}, codeType={code_type} ===")
print(f"响应内容: {json.dumps(data, indent=2, ensure_ascii=False)}")
# 处理等待的响应
if trace_id and trace_id in self.pending:
pending_info = self.pending[trace_id]
if not pending_info.get('received'):
pending_info['response'] = data
pending_info['received'] = True
return
# 处理买家消息
elif msg_type == "CHAT_DIALOGUE_MSG":
msg_data = data.get("data", {})
code_type = msg_data.get("codeType")
if code_type == "CHAT_RECEIVE_MSG":
message_content = msg_data.get("message", "")
if msg_data.get("data", {}) != {}:
try:
# krishao
store_msg_id_list = msg_data.get("data", {}).get("E3_keyValueDescArray", [])
if store_msg_id_list:
store_msg_id = store_msg_id_list[0].get("desc", "")
if store_msg_id:
dingdan_id = "738740221403"
logger.info("👌👌 👌👌👌👌👌👌👌👌👌👌👌👌👌👌👌👌👌 👌👌")
message_content = f"订单号: {store_msg_id} 商品ID: {dingdan_id}"
else:
pass
except Exception as e:
message_content = f"有需求要询问订单 优先回复"
sender_nick = msg_data.get("senderNick", "")
#### 核心处理逻辑!!!!
logger.info(f"收到买家消息: {sender_nick} -> {message_content}")
print(f"=== 断点14: 收到买家消息 - {sender_nick}: {message_content} ===")
# 构造标准格式消息(但保留原始数据)
processed_message = {
"type": "message",
"content": message_content,
"msg_type": "text",
"sender": {
"id": sender_nick,
"name": f"淘宝用户_{sender_nick}",
"is_customer": True
},
"store_id": STORE_ID,
"message_id": str(uuid.uuid4()),
"platform": "淘宝",
# 新增:保存原始数据
"raw_data": msg_data
}
print("=== 转换后的消息 ===")
print(json.dumps(processed_message, indent=2, ensure_ascii=False))
print("===================")
await self.message_handler(processed_message)
# 记录未处理的消息类型
print(f"=== 未处理的消息类型: {msg_type} ===")
print(f"消息内容: {json.dumps(data, indent=2, ensure_ascii=False)}")
except json.JSONDecodeError:
logger.error(f"消息JSON解析失败: {raw_message}")
print("=== 断点16: JSON解析失败 ===")
except Exception as e:
logger.error(f"处理消息异常: {e}")
print(f"=== 断点17: 处理消息异常 - {e} ===")
async def ensure_connected(self):
"""确保连接正常"""
if not self.is_connected or not self.qn_ws:
print("=== 连接已断开,尝试重连 ===")
return await self.connect()
return True
async def send_message(self, to_nick: str, message: str) -> bool:
"""发送消息给买家"""
logger.info(f"准备发送消息 -> {to_nick}: {message}")
print(f"=== 断点18: 准备发送消息给 {to_nick} ===")
print(f"消息内容: {message}")
# 确保连接正常
if not self.is_connected or not self.qn_ws:
print("=== 连接已断开,尝试重连 ===")
if not await self.connect():
return False
# 修改认证检查逻辑
if not self.is_authenticated:
logger.error("认证未完成,无法发送消息")
print("=== 认证状态检查失败 ===")
# 添加调试信息
print(f"当前认证状态: {self.is_authenticated}")
print(f"是否有auth_token: {hasattr(self, 'auth_token')}")
if hasattr(self, 'auth_token'):
print(f"auth_token值: {self.auth_token}")
return False
try:
trace_id = str(uuid.uuid4())
self.pending[trace_id] = {
'response': None,
'received': False,
'created_at': time.time()
}
# 构建发送消息请求
send_msg = {
"type": "Invoke_QianNiu",
"traceId": trace_id,
"codeType": "Function",
"post": "SendMessages",
"data": {
"userNick": self.user_nick,
"buyerNick": to_nick,
"text": message,
"siteid": "cntaobao",
"waitingTime": 5000
}
}
# 如果有认证token添加到消息中
if self.auth_token:
send_msg["token"] = self.auth_token
print(f"=== 添加认证token到消息中: {self.auth_token}")
await self.qn_ws.send(json.dumps(send_msg))
logger.info("已发送消息到赛牛插件")
print("=== 断点19: 消息已发送到赛牛插件 ===")
print(f"发送的消息内容: {json.dumps(send_msg, indent=2, ensure_ascii=False)}")
# 轮询等待响应
try:
timeout = 5.0
start_time = time.time()
while True:
# 检查是否收到响应
if trace_id in self.pending:
pending_info = self.pending[trace_id]
if pending_info.get('received'):
response = pending_info.get('response')
self.pending.pop(trace_id, None)
return_data = response.get("returnData") or response.get("data", {})
code = return_data.get("code", 0)
if code in [200, 0, 403]: # 403也视为成功测试账号特殊处理
logger.info("消息发送成功")
print("=== 断点20: 消息发送成功 ===")
return True
else:
error_msg = return_data.get("msg", "未知错误")
logger.warning(f"消息发送返回非成功码: {code}, 错误信息: {error_msg}")
print(f"=== 断点21: 消息发送失败 - 错误码: {code}, 错误信息: {error_msg} ===")
return False
# 检查超时
if time.time() - start_time > timeout:
logger.warning("消息发送超时,但可能已成功")
print("=== 断点22: 消息发送超时 ===")
self.pending.pop(trace_id, None)
return True
# 短暂等待后继续检查
await asyncio.sleep(0.1)
except Exception as e:
logger.error(f"等待响应异常: {e}")
self.pending.pop(trace_id, None)
return False
except Exception as e:
logger.error(f"发送消息异常: {e}")
print(f"=== 断点23: 发送消息异常 - {e} ===")
return False
async def close(self):
"""关闭连接"""
self.is_connected = False
self.is_authenticated = False
if self.qn_ws:
await self.qn_ws.close()
logger.info("千牛客户端已关闭")
class TestMessageHandler:
"""测试消息处理器(单连接多店铺架构)"""
def __init__(self, qn_client, store_id=None, exe_token=None):
self.qn_client = qn_client
self.received_messages = []
self.ai_service = AIServiceIntegration(store_id=store_id, exe_token=exe_token)
self.backend_listening_task = None # 保存监听任务
self.store_id = store_id
self.exe_token = exe_token
# 消息去重缓存
self.message_cache = {} # 缓存最近处理的消息
self.cache_expire_time = 300 # 缓存过期时间5分钟
async def initialize(self):
"""初始化消息处理器和AI服务"""
try:
# 初始化AI服务
success = await self.ai_service.initialize_ai_service()
if not success:
logger.error("AI服务初始化失败")
return False
# 注册消息处理器(更新为新协议)
self.ai_service.register_message_handlers({
"message": self.handle_customer_message, # 处理后端主动推送的消息
"connect_success": self.handle_connect_success,
"error": self.handle_error_message
})
return True
except Exception as e:
logger.error(f"消息处理器初始化失败: {e}")
return False
async def start_backend_listening(self):
"""启动后端消息监听"""
try:
# 简化检查逻辑
if not hasattr(self.ai_service, 'ai_service'):
self._log("⚠️ AI服务未正确初始化", "WARNING")
return False
# 直接启动监听
self.backend_listening_task = asyncio.create_task(
self.ai_service.ai_service.listen_for_backend_messages(
self.handle_customer_message
)
)
self._log("✅ 后端消息监听已启动", "SUCCESS")
return True
except Exception as e:
self._log(f"❌ 启动后端监听失败: {e}", "ERROR")
return False
async def handle_ai_message(self, message_data):
"""处理AI回复消息用于监控、日志记录等目的"""
try:
content = message_data.get("content", "")
sender_info = message_data.get("sender", {})
receiver_info = message_data.get("receiver", {})
sender_name = sender_info.get("name", "未知发送者")
receiver_name = receiver_info.get("name", "未知接收者")
# 记录AI回复信息
self._log(f"🤖 AI回复 [{sender_name}{receiver_name}]: {content[:100]}...", "DEBUG")
# 这里可以添加额外的监控和处理逻辑
if len(content) < 5:
self._log("⚠️ AI回复过短可能需要优化", "WARNING")
return True
except Exception as e:
self._log(f"❌ 处理AI消息异常: {e}", "ERROR")
return False
def _generate_message_hash(self, message):
"""生成消息的唯一标识符(用于去重)"""
try:
# 使用发送者、内容、消息类型生成hash
sender_id = message.get("sender", {}).get("id", "")
content = message.get("content", "")
msg_type = message.get("msg_type", "")
# 创建唯一标识字符串
unique_string = f"{sender_id}:{content}:{msg_type}"
# 生成MD5哈希
return hashlib.md5(unique_string.encode('utf-8')).hexdigest()
except Exception as e:
self._log(f"生成消息哈希失败: {e}", "ERROR")
return None
def _is_duplicate_message(self, message_hash):
"""检查是否为重复消息"""
if not message_hash:
return False
current_time = time.time()
# 清理过期的缓存
expired_keys = []
for key, timestamp in self.message_cache.items():
if current_time - timestamp > self.cache_expire_time:
expired_keys.append(key)
for key in expired_keys:
del self.message_cache[key]
# 检查是否为重复消息
if message_hash in self.message_cache:
return True
# 添加到缓存
self.message_cache[message_hash] = current_time
return False
async def handle_message(self, message):
"""处理接收到的买家消息"""
try:
# 消息去重检查
message_hash = self._generate_message_hash(message)
if self._is_duplicate_message(message_hash):
self._log(f"⚠️ 检测到重复消息,跳过处理: {message.get('content', '')[:50]}...", "WARNING")
return
self.received_messages.append(message)
# 新增:打印完整的接收消息体
print("=" * 60)
print("📨 收到买家完整消息体:")
print(json.dumps(message, indent=2, ensure_ascii=False))
print("=" * 60)
self.received_messages.append(message)
# 新增:分别打印原始数据和转换后的数据
print("=" * 60)
print("📨 收到买家消息 - 转换后格式:")
print(json.dumps(message, indent=2, ensure_ascii=False))
if "raw_data" in message:
print("\n📨 收到买家消息 - 原始格式:")
print(json.dumps(message["raw_data"], indent=2, ensure_ascii=False))
print("=" * 60)
# 使用转换后的数据
content = message.get("content", "")
sender_nick = message.get("sender", {}).get("id", "")
# 获取头像URL如果存在 先拿到对应的sendernick
raw_data = message.get("raw_data", {})
if raw_data:
sender_nick = raw_data.get("senderNick", "")
avatar_url = await self.qn_client.fetch_buyer_avatar_by_http(sender_nick)
else:
pass
if not content or not sender_nick:
self._log("消息内容或发送者昵称为空", "WARNING")
return
self._log(f"处理买家消息: {sender_nick} -> {content}", "INFO")
# 获取AI回复带重试机制
max_retries = 2 # 减少重试次数,避免长时间等待
for retry in range(max_retries):
try:
ai_reply = await self.ai_service.get_ai_reply(
message_content=content,
sender_nick=sender_nick,
avatar_url=avatar_url # 新增: 传递 头像url
)
if ai_reply:
self._log(f"AI回复: {ai_reply}", "INFO")
success = await self.qn_client.send_message(sender_nick, ai_reply)
if success:
self._log("AI回复发送成功", "SUCCESS")
return
else:
self._log("AI回复发送失败", "ERROR")
if retry < max_retries - 1:
await asyncio.sleep(1)
continue
else:
self._log("未获取到AI回复", "WARNING")
if retry < max_retries - 1:
await asyncio.sleep(1)
continue
except Exception as e:
self._log(f"处理消息异常 (尝试 {retry + 1}/{max_retries}): {e}", "ERROR")
if retry < max_retries - 1:
await asyncio.sleep(1)
continue
# 如果所有重试都失败,发送默认回复
default_reply = "您好,感谢您的咨询,我们会尽快回复您!"
await self.qn_client.send_message(sender_nick, default_reply)
self._log("已发送默认回复", "WARNING")
except Exception as e:
self._log(f"处理消息异常: {e}", "ERROR")
async def handle_customer_message(self, message_data):
"""处理来自后端的客服消息(必须保留)"""
try:
content = message_data.get("content", "")
if not content:
self._log("❌ 客服消息内容为空", "WARNING")
return False
# 从receiver中获取接收者ID
receiver_info = message_data.get("receiver", {})
receiver_id = receiver_info.get("id", "") if receiver_info else ""
if not receiver_id:
self._log("❌ 无法确定消息接收者", "WARNING")
return False
self._log(f"📤 发送客服消息给 {receiver_id}: {content[:50]}...", "INFO")
# 发送客服消息给买家
success = await self.qn_client.send_message(receiver_id, content)
if success:
self._log(f"✅ 客服消息发送成功", "SUCCESS")
else:
self._log(f"❌ 客服消息发送失败", "ERROR")
return success
except Exception as e:
self._log(f"❌ 处理客服消息异常: {e}", "ERROR")
return False
async def handle_connect_success(self, message_data):
"""处理连接成功消息"""
try:
content = message_data.get("content", "")
self._log(f"✅ 收到连接成功确认: {content}", "SUCCESS")
return True
except Exception as e:
self._log(f"❌ 处理连接成功消息异常: {e}", "ERROR")
return False
async def handle_error_message(self, message_data):
"""处理错误消息"""
try:
content = message_data.get("content", "")
verify_link = message_data.get("data", {}).get("verify_link", "")
error_msg = f"收到错误消息: {content}"
if verify_link:
error_msg += f", 验证链接: {verify_link}"
self._log(error_msg, "ERROR")
return True
except Exception as e:
self._log(f"❌ 处理错误消息异常: {e}", "ERROR")
return False
async def close(self):
"""关闭消息处理器"""
if self.ai_service:
await self.ai_service.close()
def _log(self, message, level="INFO"):
"""日志记录"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 根据日志级别添加颜色
if level == "ERROR":
print(f"\033[91m[{timestamp}] [{level}] {message}\033[0m") # 红色
elif level == "WARNING":
print(f"\033[93m[{timestamp}] [{level}] {message}\033[0m") # 黄色
elif level == "SUCCESS":
print(f"\033[92m[{timestamp}] [{level}] {message}\033[0m") # 绿色
elif level == "DEBUG":
print(f"\033[96m[{timestamp}] [{level}] {message}\033[0m") # 蓝色
else:
print(f"[{timestamp}] [{level}] {message}")
class QianNiuListenerForGUI:
"""用于GUI集成的千牛监听包装器类"""
def __init__(self, log_callback=None):
"""初始化千牛监听包装器"""
self.qn_client = None
self.message_handler = None
self.running = False
self.stop_event = None
self.log_callback = log_callback
self.log_signal = None # 添加日志信号属性
self.tasks = [] # 新增:任务列表
self.store_id = None
self.user_nick = None
@property
def message_handler(self):
"""获取消息处理器实例"""
return self._message_handler
@message_handler.setter
def message_handler(self, value):
"""设置消息处理器实例"""
self._message_handler = value
def _log(self, message, log_type="INFO"):
"""内部日志方法"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log_entry = f"[{timestamp}] [{log_type}] {message}"
if hasattr(self, 'log_signal') and self.log_signal:
self.log_signal.emit(message, log_type)
elif self.log_callback:
self.log_callback(message, log_type)
else:
color_map = {
"ERROR": "\033[91m",
"WARNING": "\033[93m",
"SUCCESS": "\033[92m",
"DEBUG": "\033[96m",
}
color = color_map.get(log_type, "")
print(f"{color}[{timestamp}] [{log_type}] {message}\033[0m")
async def _get_first_user(self, max_retries=2, retry_delay=2):
"""获取第一个可用的账号(初始化完成后调用,减少重试)"""
for retry in range(max_retries):
try:
self._log(f"🔵 获取所有连接的账号... (尝试 {retry + 1}/{max_retries})", "INFO")
connected_users = await self.get_all_connected_users()
if not connected_users:
if retry < max_retries - 1:
self._log(f"❌ 未找到可用的千牛账号,{retry_delay}秒后重试...", "WARNING")
await asyncio.sleep(retry_delay)
continue
else:
self._log("❌ 未找到可用的千牛账号(所有重试已耗尽)", "ERROR")
self._log("💡 请确认1) 千牛客户端已启动 2) 已登录账号 3) DLL初始化已完成", "INFO")
return None
# 获取第一个账号
first_user = next(iter(connected_users.values()))
user_nick = first_user.get("userNick")
if not user_nick:
self._log("❌ 账号信息中缺少userNick字段", "ERROR")
if retry < max_retries - 1:
await asyncio.sleep(retry_delay)
continue
return None
self._log(f"✅ 获取到账号: {user_nick}", "SUCCESS")
self._log(f"账号详情: UID={first_user.get('userUid')}, 版本={first_user.get('version')}", "DEBUG")
return user_nick
except Exception as e:
if retry < max_retries - 1:
self._log(f"❌ 获取账号失败: {e}{retry_delay}秒后重试...", "WARNING")
await asyncio.sleep(retry_delay)
else:
self._log(f"❌ 获取账号失败(所有重试已耗尽): {e}", "ERROR")
return None
async def _wait_for_dll_ready(self, max_attempts=15, delay=2):
"""等待DLL服务完全就绪改进版"""
import socket
for attempt in range(max_attempts):
try:
self._log(f"🔍 检查DLL服务状态 (尝试 {attempt + 1}/{max_attempts})", "DEBUG")
# 首先检查端口是否在监听
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(2)
result = s.connect_ex(('127.0.0.1', 3030))
if result != 0:
self._log(f"⏰ 端口3030尚未开启等待 {delay} 秒...", "DEBUG")
await asyncio.sleep(delay)
continue
except Exception as e:
self._log(f"🔍 检查端口时出错: {e}", "DEBUG")
await asyncio.sleep(delay)
continue
self._log("✅ 端口3030已开启", "SUCCESS")
# 端口开启后再等待一下确保服务完全启动
await asyncio.sleep(3)
return True
except Exception as e:
self._log(f"🔍 检查DLL服务时出错: {e}", "DEBUG")
await asyncio.sleep(delay)
self._log("❌ DLL服务等待超时", "ERROR")
return False
async def get_all_connected_users(self) -> Dict[str, Dict]:
"""获取所有已连接的千牛账号
返回格式: {"user_nick": {"userNick": "xxx", "userUid": "xxx", ...}}
"""
logger.info("开始获取所有已连接的千牛账号")
try:
# 通过HTTP API调用
response = await self._http_api_call("GetAllUser", {})
# 区分API调用失败None和返回空结果{}
if response is None:
logger.error("获取连接账号失败API调用失败")
return {}
logger.debug(f"获取连接账号响应: {response}")
# 检查返回数据格式
if isinstance(response, dict):
# 直接返回账号信息字典(可能为空)
logger.info(f"成功获取到 {len(response)} 个已连接账号")
if len(response) == 0:
logger.warning("API调用成功但未返回任何账号信息可能原因")
logger.warning("1. 千牛客户端未启动")
logger.warning("2. 千牛客户端已启动但未登录任何账号")
logger.warning("3. DLL服务与千牛客户端连接异常")
return response
else:
logger.error(f"获取连接账号失败:返回数据格式错误 - {type(response)}: {response}")
return {}
except Exception as e:
logger.error(f"获取连接账号异常: {e}")
return {}
async def _http_api_call(self, post_name: str, data_obj: Dict[str, Any]) -> Dict[str, Any]:
"""按官方文档以 form-data 方式调用 QianNiu/Api 接口(改进版)"""
form = aiohttp.FormData()
form.add_field("post", post_name)
form.add_field("data", json.dumps(data_obj))
logger.debug(f"[TB-DIAG] HTTP CALL -> {post_name} url={QN_HTTP_API_URL} data={data_obj}")
try:
async with aiohttp.ClientSession() as session:
async with session.post(QN_HTTP_API_URL, data=form, timeout=26) as resp:
text = await resp.text()
logger.debug(f"[TB-DIAG] HTTP RESP <- {post_name} status={resp.status} body={text[:300]}")
if resp.status != 200:
logger.error(f"[QianNiuClient] HTTP接口 {post_name} 调用失败: status={resp.status}")
# 返回None而不是空字典便于区分真正的空结果和错误
return None
try:
result = json.loads(text)
# 即使结果为空也是成功的API调用
logger.debug(f"[TB-DIAG] HTTP API {post_name} 调用成功,返回数据类型: {type(result)}")
return result if result is not None else {}
except json.JSONDecodeError as e:
logger.error(f"[TB-DIAG] HTTP RESP JSON decode error on {post_name}: {e}")
return None
except asyncio.TimeoutError:
logger.error(f"[QianNiuClient] HTTP接口 {post_name} 调用超时")
return None
except aiohttp.ClientError as e:
logger.error(f"[QianNiuClient] HTTP接口 {post_name} 网络错误: {e}")
return None
except Exception as e:
logger.error(f"[QianNiuClient] HTTP接口 {post_name} 调用异常: {e}")
return None
# 新增供给架构调用 ---- 测试中
async def start_listening_with_store(self, store_id, exe_token=None):
"""带店铺ID的监听启动方法单连接多店铺架构"""
try:
self._log("🔵 开始千牛平台连接流程(单连接多店铺架构)", "INFO")
self.store_id = store_id # 保存店铺ID
self.exe_token = exe_token # 保存用户令牌
# 1. 先创建临时千牛客户端用于启动DLL服务
temp_client = QianNiuClient()
self._log("🔵 步骤1: 启动DLL服务...", "INFO")
# 2. 启动DLL服务必须在所有API调用之前
if not await temp_client._start_sainiu_service():
self._log("❌ DLL服务启动失败", "ERROR")
return False
self._log("✅ DLL服务启动成功", "SUCCESS")
# 3. 等待DLL服务完全就绪
self._log("🔵 等待DLL服务完全就绪...", "INFO")
await asyncio.sleep(5) # 增加等待时间确保服务完全启动
# 4. 验证DLL服务是否可用仅检查端口
if not await self._wait_for_dll_ready():
self._log("❌ DLL服务未能完全就绪", "ERROR")
return False
self._log("✅ DLL服务已完全就绪", "SUCCESS")
# 5. 创建正式的千牛客户端(先不获取用户信息)
self._log("🔵 步骤2: 创建千牛客户端...", "INFO")
self.qn_client = QianNiuClient()
# 复用已启动的DLL服务
self.qn_client.sainiu_service = temp_client.sainiu_service
self._log("✅ 千牛客户端创建成功", "DEBUG")
# 6. 建立WebSocket连接并完成完整初始化流程
self._log("🔵 步骤3: 建立WebSocket连接并初始化...", "INFO")
if not await self.qn_client._connect_websocket():
self._log("❌ WebSocket连接失败", "ERROR")
return False
self._log("✅ WebSocket连接和初始化成功", "SUCCESS")
# 7. 现在可以安全地获取已连接的账号(初始化完成后)
self._log("🔵 步骤4: 获取可用账号...", "INFO")
self.user_nick = await self._get_first_user()
if not self.user_nick:
self._log("❌ 无法获取可用账号", "ERROR")
return False
self._log(f"✅ 获取到账号: {self.user_nick}", "SUCCESS")
# 更新客户端的用户信息
self.qn_client.user_nick = self.user_nick
# 8. 创建消息处理器
self._log("🔵 步骤5: 创建消息处理器...", "INFO")
self.message_handler = TestMessageHandler(self.qn_client, store_id, exe_token)
self._log("✅ 消息处理器创建成功", "DEBUG")
# 9. 初始化AI服务
self._log("🔵 步骤6: 初始化AI服务...", "INFO")
success = await self.message_handler.initialize()
if not success:
self._log("❌ AI服务初始化失败", "ERROR")
return False
self._log("✅ AI服务初始化成功", "SUCCESS")
# 10. 注册到全局管理器
qn_manager = QianNiuWebsocketManager()
shop_key = f"千牛:{store_id}"
qn_manager.on_connect(shop_key, self.qn_client, store_id=store_id, exe_token=exe_token)
self._log(f"✅ 已注册千牛连接: {shop_key}", "SUCCESS")
# 10. 开始监听消息
self._log("🔵 步骤7: 开始监听消息...", "INFO")
self.stop_event = asyncio.Event()
self.running = True
# 11. 启动监听任务
listen_task, heartbeat_task = await self.qn_client.start_listening(
self.message_handler.handle_message
)
self.tasks.extend([listen_task, heartbeat_task])
self._log("✅ 监听任务启动成功", "SUCCESS")
# 12. 等待任务完成或停止信号
self._log("🎉 千牛平台已完全启动,开始监听消息", "SUCCESS")
try:
await asyncio.gather(*self.tasks, return_exceptions=True)
except asyncio.CancelledError:
self._log("🔵 监听任务被取消", "WARNING")
except Exception as e:
self._log(f"❌ 监听过程中出现错误: {e}", "ERROR")
import traceback
self._log(f"错误详情: {traceback.format_exc()}", "DEBUG")
return True
except Exception as e:
self._log(f"❌ 启动监听失败: {e}", "ERROR")
import traceback
self._log(f"错误详情: {traceback.format_exc()}", "DEBUG")
return False
async def start_listening(self, user_nick):
"""启动监听的主方法"""
try:
self._log("🔵 开始千牛平台连接流程", "INFO")
# 1. 创建千牛客户端
self.qn_client = QianNiuClient()
self._log("✅ 千牛客户端创建成功", "DEBUG")
# 2. 测试DLL加载和启动与main方法一致
self._log("🔵 步骤1: 测试DLL加载和启动...", "INFO")
if not await self.qn_client._start_sainiu_service():
self._log("❌ DLL服务启动失败", "ERROR")
return False
self._log("✅ DLL服务启动成功", "SUCCESS")
# 3. 测试WebSocket连接与main方法一致
self._log("🔵 步骤2: 测试WebSocket连接...", "INFO")
if not await self.qn_client._connect_websocket():
self._log("❌ WebSocket连接失败", "ERROR")
return False
self._log("✅ WebSocket连接成功", "SUCCESS")
# 4. 创建消息处理器
self.message_handler = TestMessageHandler(self.qn_client)
self._log("✅ 消息处理器创建成功", "DEBUG")
# 5. 初始化AI服务
success = await self.message_handler.initialize()
if not success:
self._log("❌ AI服务初始化失败", "ERROR")
return False
self._log("✅ AI服务初始化成功", "SUCCESS")
# 6. 开始监听消息
self._log("🔵 步骤3: 开始监听消息...", "INFO")
self.stop_event = asyncio.Event()
self.running = True
# 7. 启动监听任务
listen_task, heartbeat_task = await self.qn_client.start_listening(
self.message_handler.handle_message
)
self.tasks.extend([listen_task, heartbeat_task])
self._log("✅ 监听任务启动成功", "SUCCESS")
# 8. 等待任务完成或停止信号
try:
await asyncio.gather(*self.tasks, return_exceptions=True)
except asyncio.CancelledError:
self._log("🔵 监听任务被取消", "WARNING")
except Exception as e:
self._log(f"❌ 监听过程中出现错误: {e}", "ERROR")
import traceback
self._log(f"错误详情: {traceback.format_exc()}", "DEBUG")
self._log("🎉 千牛平台消息监听已启动", "SUCCESS")
return True
except Exception as e:
self._log(f"❌ 启动监听失败: {e}", "ERROR")
import traceback
self._log(f"错误详情: {traceback.format_exc()}", "DEBUG")
return False
def stop_listening(self):
"""停止监听"""
try:
self._log("🔵 开始停止监听...", "INFO")
self.running = False
# 取消所有任务
for task in self.tasks:
if not task.done():
task.cancel()
self.tasks.clear()
# 关闭千牛客户端
if self.qn_client:
self._log("🔵 关闭千牛客户端...", "DEBUG")
asyncio.create_task(self.qn_client.close())
# 关闭消息处理器
if self.message_handler:
self._log("🔵 关闭消息处理器...", "DEBUG")
asyncio.create_task(self.message_handler.close())
# 设置停止事件
if self.stop_event:
self.stop_event.set()
self._log("✅ 监听已停止", "SUCCESS")
except Exception as e:
self._log(f"❌ 停止监听时出现错误: {e}", "ERROR")
import traceback
self._log(f"错误详情: {traceback.format_exc()}", "DEBUG")
def is_running(self):
"""检查是否正在运行"""
return self.running
def set_log_signal(self, signal):
"""设置日志信号"""
self.log_signal = signal
async def main(store_id=None):
"""主测试函数"""
logger.info("=== 赛牛接口本地测试开始 ===")
print("=== 断点A: 测试开始 ===")
# 如果没有提供store_id使用默认值
if store_id is None:
store_id = STORE_ID
listener = QianNiuListenerForGUI()
try:
success = await listener.start_listening_with_store(store_id)
if not success:
print('启动监听失败')
return False
# 获取message_handler实例以便后续操作
message_handler = listener.message_handler
if message_handler is None:
print("❌ 消息处理器未正确初始化")
return False
# 测试发送消息功能
print("\n📋 步骤5: 测试发送消息功能...")
test_nick = "test_buyer"
test_message = "这是一条测试消息"
print(f"=== 准备发送测试消息给 {test_nick} ===")
success = await listener.qn_client.send_message(test_nick, test_message)
if success:
print("✅ 测试消息发送成功")
else:
print("❌ 测试消息发送失败")
# 保持运行,直到用户中断
logger.info("测试程序已启动按Ctrl+C停止...")
print("=== 测试程序运行中,等待消息... ===")
# 等待停止信号
while listener.is_running():
await asyncio.sleep(1)
except KeyboardInterrupt:
logger.info("用户中断测试")
print("=== 断点D: 用户中断测试 ===")
except Exception as e:
logger.error(f"测试异常: {e}")
print(f"=== 断点E: 测试异常 - {e} ===")
import traceback
print(f"错误详情: {traceback.format_exc()}")
finally:
listener.stop_listening()
logger.info("=== 赛牛接口本地测试结束 ===")
print("=== 断点F: 测试结束 ===")
# # 创建千牛客户端
# qn_client = QianNiuClient()
# message_handler = None # 提前定义变量
#
# try:
# # 1. 测试DLL加载和启动
# print("\n📋 步骤1: 测试DLL加载和启动...")
# if not await qn_client._start_sainiu_service():
# print("❌ DLL服务启动失败")
# return False
# print("✅ DLL服务启动成功")
#
# # 2. 测试WebSocket连接包含授权和初始化
# print("\n📋 步骤2: 测试WebSocket连接...")
# if not await qn_client._connect_websocket():
# print("❌ WebSocket连接失败")
# return False
# print("✅ WebSocket连接成功")
#
# # 3. 创建消息处理器
# message_handler = TestMessageHandler(qn_client)
#
# # 4. 初始化消息处理器和AI服务
# print("\n📋 步骤3: 初始化AI服务...")
# if not await message_handler.initialize():
# logger.error("AI服务初始化失败")
# print("❌ AI服务初始化失败")
# return
# print("✅ AI服务初始化成功")
#
# # 5. 开始监听消息
# print("\n📋 步骤4: 开始监听消息...")
# listen_task, heartbeat_task = await qn_client.start_listening(message_handler.handle_message)
#
# # 6. 测试发送消息功能
# print("\n📋 步骤5: 测试发送消息功能...")
# test_nick = "test_buyer"
# test_message = "这是一条测试消息"
#
# print(f"=== 准备发送测试消息给 {test_nick} ===")
# success = await qn_client.send_message(test_nick, test_message)
#
# if success:
# print("✅ 测试消息发送成功")
# else:
# print("❌ 测试消息发送失败")
#
# # 保持运行,直到用户中断
# logger.info("测试程序已启动按Ctrl+C停止...")
# print("=== 测试程序运行中,等待消息... ===")
#
# # 等待任务完成或用户中断
# await asyncio.gather(listen_task, heartbeat_task, return_exceptions=True)
#
# except KeyboardInterrupt:
# logger.info("用户中断测试")
# print("=== 断点D: 用户中断测试 ===")
# except Exception as e:
# logger.error(f"测试异常: {e}")
# print(f"=== 断点E: 测试异常 - {e} ===")
# finally:
# if message_handler: # 检查变量是否已定义
# await message_handler.close() # 关闭消息处理器和AI服务
# await qn_client.close()
# logger.info("=== 赛牛接口本地测试结束 ===")
# print("=== 断点F: 测试结束 ===")
if __name__ == "__main__":
# 运行测试
asyncio.run(main(store_id="4c4025e3-8702-42fc-bdc2-671e335c0ff7"))
# asyncio.run(test_auth())