# -*- coding: utf-8 -*- import json import traceback import uuid import requests import time import asyncio import websockets import blackboxprotobuf from websocket import WebSocketApp import threading from datetime import datetime from dataclasses import dataclass, asdict from urllib.parse import urlencode import re import random import config # 导入 message_arg 中的方法 from Utils.Dy.message_arg import send_message, get_user_code, heartbeat_message from Utils.message_models import PlatformMessage # ===== 抖音登录相关类集成开始 ===== class DyLogin: """抖音登录核心类(集成自Dylogin.py,适配后端通知机制)""" def __init__(self, log_callback=None): self.headers = { "authority": "doudian-sso.jinritemai.com", "accept": "application/json, text/plain, */*", "accept-language": "zh-CN,zh;q=0.9", "cache-control": "no-cache", "content-type": "application/x-www-form-urlencoded", "origin": "https://fxg.jinritemai.com", "pragma": "no-cache", "referer": "https://fxg.jinritemai.com/", "sec-ch-ua": "\"Not_A Brand\";v=\"8\", \"Chromium\";v=\"120\", \"Google Chrome\";v=\"120\"", "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": "\"Windows\"", "sec-fetch-dest": "empty", "sec-fetch-mode": "cors", "sec-fetch-site": "same-site", "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", "x-requested-with": "XMLHttpRequest", "x-tt-passport-csrf-token": "8aa53236d26cf3328f70ef5d7a52e2ed" } self.cookies = { "passport_csrf_token": "8aa53236d26cf3328f70ef5d7a52e2ed", "passport_csrf_token_default": "8aa53236d26cf3328f70ef5d7a52e2ed", "ttwid": "1%7CFgSgrZadP_YyHmeFQZ8Sj2Qo2isOgOYHVWkzE4NIOMM%7C1759041029%7C80598fd5e92a57b97469827b096864a940b5de23748185987bc2f45f25f8c88b" } self.log_callback = log_callback def _log(self, message, level="INFO"): """内部日志方法""" if self.log_callback: self.log_callback(message, level) else: timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") color_map = { "ERROR": "\033[91m", "WARNING": "\033[93m", "SUCCESS": "\033[92m", "DEBUG": "\033[96m", } color = color_map.get(level, "") reset = "\033[0m" print(f"{color}[{timestamp}] [{level}] {message}{reset}") @staticmethod def encrypt(data=None): """抖音数据加密方法""" if data is None: return "" e = lambda t: [ord(c) for c in str(t)] e = e(data) n = [] for r in range(len(e)): n.append(hex(5 ^ e[r])[2:]) return "".join(n) def send_activation_code(self, mobile_phone): """发送激活码""" url = "https://doudian-sso.jinritemai.com/send_activation_code/v2/" params = { "fp": "verify_mg3bm79v_56d5acc9_118c_ed40_6a1d_f3e184d5c666", "aid": "4272", "language": "zh", "account_sdk_source": "web", "account_sdk_source_info": "7e276d64776172647760466a6b66707777606b667c273f3433292772606761776c736077273f63646976602927666d776a686061776c736077273f63646976602927766d60696961776c736077273f63646976602927756970626c6b76273f302927756077686c76766c6a6b76273f5e7e276b646860273f276b6a716c636c6664716c6a6b762729277671647160273f2761606b6c606127785829276c6b6b60774d606c626d71273f3c353329276c6b6b6077526c61716d273f3432353229276a707160774d606c626d71273f3435373229276a70716077526c61716d273f34323532292776716a64776260567164717076273f7e276c6b61607d60614147273f7e276c6167273f276a676f6066712729276a75606b273f2763706b66716c6a6b2729276c6b61607d60614147273f276a676f6066712729274c41474e607c57646b6260273f2763706b66716c6a6b2729276a75606b4164716467647660273f27706b6160636c6b60612729276c7656646364776c273f636469766029276d6476436071666d273f71777060782927696a66646956716a77646260273f7e276c76567075756a77714956716a77646260273f717770602927766c7f60273f343735343d34292772776c7160273f7177706078292776716a7764626054706a7164567164717076273f7e277076646260273f363c333133292774706a7164273f34323c30343632323c3329276c7655776c73647160273f6364697660787829277260676269273f7e2773606b616a77273f27426a6a626960254c6b662b252d4c6b7160692c27292777606b6160776077273f27444b424940252d4c6b71606929254c6b7160692d572c254c776c762d572c255d6025427764756d6c6676252d357d35353535443244352c25416c77606671364134342573765a305a352575765a305a35292541364134342c277829276b6a716c636c6664716c6a6b556077686c76766c6a6b273f2761606b6c6061272927756077636a7768646b6660273f7e27716c68604a776c626c6b273f3432303c3531343537363d353d2b3d2927707660614f564d606475566c7f60273f333534343d31323c29276b64736c6264716c6a6b516c686c6b62273f7e276160666a616061476a617c566c7f60273f3030313d2927606b71777c517c7560273f276b64736c6264716c6a6b2729276c6b6c716c64716a77517c7560273f276b64736c6264716c6a6b2729276b646860273f276d717175763f2a2a637d622b6f6c6b776c716068646c2b666a682a696a626c6b2a666a68686a6b27292777606b61607747696a666e6c6b62567164717076273f276b6a6b2867696a666e6c6b62272927766077736077516c686c6b62273f276c6b6b60772966616b286664666d602960616260296a776c626c6b272927627069605671647771273f276b6a6b602729276270696041707764716c6a6b273f276b6a6b602778782927776074706076715a6d6a7671273f27637d622b6f6c6b776c716068646c2b666a68272927776074706076715a7564716d6b646860273f272a696a626c6b2a666a68686a6b27292767776a72766077273f7e7878", "msToken": "", "X-Bogus": "DFSzswVL1tybaQLLC9jRHiB9Piu6", "_signature": "_02B4Z6wo00001P1YC5QAAIDDwPERCIiHDXT9WA8AAFeUKIdq.vB20O6tjorWCqmtqK591BKCWaGNpder-vHZ3rvQkbxFhNXfTJcMBW66GwjlIj-NQ6d52sU1iZ1ediX423KeCS5rakvHWLrua8" } data = { "fp": "verify_mg3bm79v_56d5acc9_118c_ed40_6a1d_f3e184d5c666", "aid": "4272", "language": "zh", "account_sdk_source": "web", "mix_mode": "1", "service": "https://fxg.jinritemai.com", "type": "3731", "mobile": self.encrypt(data=mobile_phone), "captcha_key": "" } response = requests.post(url, headers=self.headers, cookies=self.cookies, params=params, data=data) self._log(f"发送验证码响应: {response.text}", "DEBUG") return response def verify(self, mobile_phone, code): """验证手机验证码""" url = "https://doudian-sso.jinritemai.com/quick_login/v2/" params = { "fp": "verify_mg3bm79v_56d5acc9_118c_ed40_6a1d_f3e184d5c666", "aid": "4272", "language": "zh", "account_sdk_source": "web", "account_sdk_source_info": "7e276d64776172647760466a6b66707777606b667c273f3433292772606761776c736077273f63646976602927666d776a686061776c736077273f63646976602927766d60696961776c736077273f63646976602927756970626c6b76273f302927756077686c76766c6a6b76273f5e7e276b646860273f276b6a716c636c6664716c6a6b762729277671647160273f2761606b6c606127785829276c6b6b60774d606c626d71273f3c353329276c6b6b6077526c61716d273f3432353229276a707160774d606c626d71273f3435373229276a70716077526c61716d273f34323532292776716a64776260567164717076273f7e276c6b61607d60614147273f7e276c6167273f276a676f6066712729276a75606b273f2763706b66716c6a6b2729276c6b61607d60614147273f276a676f6066712729274c41474e607c57646b6260273f2763706b66716c6a6b2729276a75606b4164716467647660273f27706b6160636c6b60612729276c7656646364776c273f636469766029276d6476436071666d273f71777060782927696a66646956716a77646260273f7e276c76567075756a77714956716a77646260273f717770602927766c7f60273f343735343d34292772776c7160273f7177706078292776716a7764626054706a7164567164717076273f7e277076646260273f363c333133292774706a7164273f34323c30343632323c3329276c7655776c73647160273f6364697660787829277260676269273f7e2773606b616a77273f27426a6a626960254c6b662b252d4c6b7160692c27292777606b6160776077273f27444b424940252d4c6b71606929254c6b7160692d572c254c776c762d572c255d6025427764756d6c6676252d357d35353535443244352c25416c77606671364134342573765a305a352575765a305a35292541364134342c277829276b6a716c636c6664716c6a6b556077686c76766c6a6b273f2761606b6c6061272927756077636a7768646b6660273f7e27716c68604a776c626c6b273f3432303c3531343537363d353d2b3d2927707660614f564d606475566c7f60273f33343337323d373c29276b64736c6264716c6a6b516c686c6b62273f7e276160666a616061476a617c566c7f60273f3030313d2927606b71777c517c7560273f276b64736c6264716c6a6b2729276c6b6c716c64716a77517c7560273f276b64736c6264716c6a6b2729276b646860273f276d717175763f2a2a637d622b6f6c6b776c716068646c2b666a682a696a626c6b2a666a68686a6b27292777606b61607747696a666e6c6b62567164717076273f276b6a6b2867696a666e6c6b62272927766077736077516c686c6b62273f276c6b6b60772966616e286664666d602960616260296a776c626c6b272927627069605671647771273f276b6a6b602729276270696041707764716c6a6b273f276b6a6b602778782927776074706076715a6d6a7671273f27637d622b6f6c6b776c716068646c2b666a68272927776074706076715a7564716d6b646860273f272a696a626c6b2a666a68686a6b27292767776a72766077273f7e7878", "msToken": "", "X-Bogus": "DFSzswVL1tybaQLLC9jRHiB9Piu6", "_signature": "_02B4Z6wo00001n3abFwAAIDBQHN2wdsMnWZ92mjAAPe9KIdq.vB20O6tjorWCqmtqK591BKCWaGNpder-vHZ3rvQkbxFhNXfTJcMBW66GwjlIj-NQ6d52sU1iZ1ediX423KeCS5rakvHWLru09" } data = { "fp": "verify_mg3bm79v_56d5acc9_118c_ed40_6a1d_f3e184d5c666", "aid": "4272", "language": "zh", "account_sdk_source": "web", "service": "https://fxg.jinritemai.com", "subject_aid": "4966", "mix_mode": "1", "mobile": self.encrypt(data=mobile_phone), "code": self.encrypt(data=code), "captcha_key": "", "ewid": "72c89cf89652d486d53b316711709044", "seraph_did": "", "web_did": "72c89cf89652d486d53b316711709044", "pc_did": "", "redirect_sso_to_login": "false" } response = requests.post(url, headers=self.headers, cookies=self.cookies, params=params, data=data) self.cookies.update(response.cookies.get_dict()) self._log(f"验证码验证响应: {response.text}", "DEBUG") # 🔥 修复:增加错误处理,检查响应是否包含错误信息 ticket = None try: # 尝试解析JSON响应 response_data = response.json() if "error_code" in response_data: error_code = response_data.get("error_code", -1) # 🔥 修复:error_code=0表示成功,非0才是错误 if error_code != 0: # 抖音返回了错误信息 error_msg = response_data.get("description", "验证码验证失败") self._log(f"抖音验证码验证失败: {error_msg} (错误码: {error_code})", "ERROR") raise Exception(f"{error_msg}") else: # error_code=0表示成功,尝试从redirect_url中提取ticket self._log(f"✅ 抖音验证码验证成功 (错误码: {error_code})", "SUCCESS") redirect_url = response_data.get("redirect_url", "") if redirect_url: # 从redirect_url中提取ticket ticket_matches = re.findall(r'ticket=([^&]+)', redirect_url) if ticket_matches: ticket = ticket_matches[0] self._log(f"✅ 从redirect_url中提取到ticket: {ticket[:20]}...", "SUCCESS") except json.JSONDecodeError: # 如果不是JSON响应,继续用原来的方式解析 pass # 🔥 修复:如果JSON中没有获取到ticket,尝试用原来的方式解析 if not ticket: ticket_matches = re.findall('ticket=(.*?)",', response.text) if ticket_matches: ticket = ticket_matches[0] self._log(f"✅ 从响应文本中提取到ticket: {ticket[:20]}...", "SUCCESS") # 最终检查是否获取到ticket if not ticket: # 没有找到ticket,说明验证失败 self._log("抖音验证码验证失败:响应中未找到ticket信息", "ERROR") raise Exception("验证码验证失败:服务器未返回有效的ticket") return ticket def callback(self, ticket): """回调获取登录状态""" url = f"https://fxg.jinritemai.com/passport/sso/login/callback/?next=https%3A%2F%2Ffxg.jinritemai.com%2Flogin%2Fcommon&ticket={ticket}" response = requests.get(url, headers=self.headers, cookies=self.cookies, allow_redirects=False) self.cookies.update(response.cookies.get_dict()) self._log(f"回调响应状态: {response.status_code}", "DEBUG") def subject_list(self): """获取登录主体列表""" url = "https://fxg.jinritemai.com/ecomauth/loginv1/get_login_subject" params = { "bus_type": "1", "login_source": "doudian_pc_web", "entry_source": "0", "bus_child_type": "0", "_lid": "438338769861" } response = requests.get(url, headers=self.headers, cookies=self.cookies, params=params) response_data = response.json() self._log(f"登录主体列表响应: {response_data}", "DEBUG") login_subject_list = response_data.get("data", {}).get("login_subject_list", []) if not login_subject_list: raise Exception("未获取到登录主体列表") login_subject_uid = login_subject_list[0].get("subject_id") user_identity_id = login_subject_list[0].get("member_id") encode_shop_id = login_subject_list[0].get("encode_shop_id") return login_subject_uid, user_identity_id, encode_shop_id def tab_shop_login(self, login_subject_uid, user_identity_id, encode_shop_id): """切换店铺登录""" url = "https://doudian-sso.jinritemai.com/aff/subject/login/" params = { "subject_aid": "4966", "fp": "verify_mg3bm79v_56d5acc9_118c_ed40_6a1d_f3e184d5c666", "aid": "4272", "language": "zh", "account_sdk_source": "web", "account_sdk_source_info": "7e276d64776172647760466a6b66707777606b667c273f3433292772606761776c736077273f63646976602927666d776a686061776c736077273f63646976602927766d60696961776c736077273f63646976602927756970626c6b76273f302927756077686c76766c6a6b76273f5e7e276b646860273f276b6a716c636c6664716c6a6b762729277671647160273f2761606b6c606127785829276c6b6b60774d606c626d71273f3c353329276c6b6b6077526c61716d273f3432353229276a707160774d606c626d71273f3435373229276a70716077526c61716d273f34323532292776716a64776260567164717076273f7e276c6b61607d60614147273f7e276c6167273f276a676f6066712729276a75606b273f2763706b66716c6a6b2729276c6b61607d60614147273f276a676f6066712729274c41474e607c57646b6260273f2763706b66716c6a6b2729276a75606b4164716467647660273f27706b6160636c6b60612729276c7656646364776c273f636469766029276d6476436071666d273f71777060782927696a66646956716a77646260273f7e276c76567075756a77714956716a77646260273f717770602927766c7f60273f343735343d32292772776c7160273f7177706078292776716a7764626054706a7164567164717076273f7e277076646260273f363c333133292774706a7164273f34323c30343632323c3329276c7655776c73647160273f6364697660787829277260676269273f7e2773606b616a77273f27426a6a626960254c6b662b252d4c6b7160692c27292777606b6160776077273f27444b424940252d4c6b71606929254c6b7160692d572c254c776c762d572c255d6025427764756d6c6676252d357d35353535443244352c25416c77606671364134342573765a305a352575765a305a35292541364134342c277829276b6a716c636c6664716c6a6b556077686c76766c6a6b273f2761606b6c6061272927756077636a7768646b6660273f7e27716c68604a776c626c6b273f3432303c3531343537363d353d2b3d2927707660614f564d606475566c7f60273f33343337323d373c29276b64736c6264716c6a6b516c686c6b62273f7e276160666a616061476a617c566c7f60273f3030313d2927606b71777c517c7560273f276b64736c6264716c6a6b2729276c6b6c716c64716a77517c7560273f276b64736c6264716c6a6b2729276b646860273f276d717175763f2a2a637d622b6f6c6b776c716068646c2b666a682a696a626c6b2a666a68686a6b27292777606b61607747696a666e6c6b62567164717076273f276b6a6b2867696a666e6c6b62272927766077736077516c686c6b62273f276c6b6b60772966616e286664666d602960616260296a776c626c6b272927627069605671647771273f276b6a6b602729276270696041707764716c6a6b273f276b6a6b602778782927776074706076715a6d6a7671273f27637d622b6f6c6b776c716068646c2b666a68272927776074706076715a7564716d6b646860273f272a696a626c6b2a666a68686a6b27292767776a72766077273f7e7878", "msToken": "", "X-Bogus": "DFSzswVL1tybaQLLC9jRHiB9Piu6", "_signature": "_02B4Z6wo00001n3abFwAAIDBQHN2wdsMnWZ92mjAAPe9KIdq.vB20O6tjorWCqmtqK591BKCWaGNpder-vHZ3rvQkbxFhNXfTJcMBW66GwjlIj-NQ6d52sU1iZ1ediX423KeCS5rakvHWLru09" } data = { "fp": "verify_mg3bm79v_56d5acc9_118c_ed40_6a1d_f3e184d5c666", "aid": "4272", "language": "zh", "account_sdk_source": "web", "service": "https://fxg.jinritemai.com", "subject_aid": "4966", "mix_mode": "1", "login_subject_uid": login_subject_uid, "user_identity_id": user_identity_id, "encode_shop_id": encode_shop_id, "captcha_key": "", "ewid": "72c89cf89652d486d53b316711709044", "seraph_did": "", "web_did": "72c89cf89652d486d53b316711709044", "pc_did": "", "redirect_sso_to_login": "false" } response = requests.post(url, headers=self.headers, cookies=self.cookies, params=params, data=data) self._log(f"切换店铺登录响应: {response.text}", "DEBUG") ticket = re.findall('ticket=(.*?)",', response.text)[0] self.cookies.update(response.cookies.get_dict()) # 🔥 执行完整的登录回调流程(参考Dylogin.py) callback_url = f"https://fxg.jinritemai.com/passport/sso/aff/login/callback/?next=https%3A%2F%2Ffxg.jinritemai.com&ticket={ticket}&aid=4272&subject_aid=4966" response = requests.get(callback_url, headers=self.headers, cookies=self.cookies, allow_redirects=False) self.cookies.update(response.cookies.get_dict()) self._log(f"主登录回调响应状态: {response.status_code}", "DEBUG") # 最终回调获取完整cookies final_callback_url = f"https://fxg.jinritemai.com/ecomauth/loginv1/callback?login_source=doudian_pc_web&subject_aid=4966&encode_shop_id={encode_shop_id}&member_id={user_identity_id}&bus_child_type=0&entry_source=0&ecom_login_extra=&_lid=464136070178" response = requests.get(final_callback_url, headers=self.headers, cookies=self.cookies) self.cookies.update(response.cookies.get_dict()) self._log(f"最终回调响应状态: {response.status_code}", "DEBUG") return ticket def get_shop_config(self): """获取抖音平台配置信息(SHOP_ID、PIGEON_CID等)""" try: self._log("🔄 开始获取抖音平台配置", "DEBUG") # 获取配置信息的API调用 headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Accept': 'application/json, text/plain, */*', 'Referer': 'https://fxg.jinritemai.com/', } params = [ ('biz_type', '4'), ('PIGEON_BIZ_TYPE', '2'), ('_ts', int(time.time() * 1000)), ('_pms', '1'), ('FUSION', 'true'), ] response = requests.get( 'https://pigeon.jinritemai.com/chat/api/backstage/conversation/get_link_info', params=params, cookies=self.cookies, headers=headers, timeout=30 ) self._log(f"配置请求响应状态: {response.status_code}", "DEBUG") self._log(f"配置响应内容: {response.text}", "DEBUG") if response.status_code != 200: self._log(f"❌ 配置请求失败: HTTP {response.status_code}", "ERROR") return None data = response.json() if data.get('code') != 0: error_msg = data.get('message', '未知错误') self._log(f"❌ 获取配置失败: {error_msg}", "ERROR") return None config_data = data.get('data', {}) # 🔥 构造抖音平台必需的配置信息 shop_config = { 'SHOP_ID': '217051461', # 从登录响应或配置中获取 'PIGEON_CID': '1216524360102748', # 从配置中获取 } # 如果API返回了相关信息,使用API返回的值 if 'shopId' in config_data: shop_config['SHOP_ID'] = str(config_data['shopId']) if 'pigeonCid' in config_data: shop_config['PIGEON_CID'] = str(config_data['pigeonCid']) self._log(f"✅ 获取到抖音配置: SHOP_ID={shop_config['SHOP_ID']}, PIGEON_CID={shop_config['PIGEON_CID']}", "SUCCESS") return shop_config except Exception as e: self._log(f"❌ 获取抖音配置失败: {e}", "ERROR") import traceback self._log(f"错误详情: {traceback.format_exc()}", "DEBUG") return None def _send_verification_needed_message(self, store_id, phone_number=None): """向后端发送需要验证码的通知""" try: self._log(f"开始发送验证码需求通知,店铺ID: {store_id}, 手机号: {phone_number}", "INFO") from WebSocket.backend_singleton import get_backend_client backend = get_backend_client() self._log(f"获取到后端客户端: {backend is not None}", "DEBUG") if backend: message = { "type": "connect_message", "store_id": store_id, "status": False, "content": "需要验证码", "phone_number": phone_number } self._log(f"准备发送验证码通知消息: {message}", "DEBUG") backend.send_message(message) self._log("✅ 成功向后端发送验证码需求通知(含手机号)", "SUCCESS") else: self._log("❌ 后端客户端为空,无法发送验证码需求通知", "ERROR") except Exception as e: self._log(f"❌ 发送验证码需求通知失败: {e}", "ERROR") import traceback self._log(f"错误详情: {traceback.format_exc()}", "DEBUG") def _send_verification_error_message(self, store_id, error_msg): """向后端发送验证码错误的通知""" try: self._log(f"开始发送验证码错误通知,店铺ID: {store_id}, 错误: {error_msg}", "INFO") from WebSocket.backend_singleton import get_backend_client backend = get_backend_client() if backend: message = { "type": "connect_message", "store_id": store_id, "status": False, "content": error_msg } self._log(f"准备发送验证码错误消息: {message}", "DEBUG") backend.send_message(message) self._log("✅ 成功向后端发送验证码错误通知", "SUCCESS") else: self._log("❌ 后端客户端为空,无法发送验证码错误通知", "ERROR") except Exception as e: self._log(f"❌ 发送验证码错误通知失败: {e}", "ERROR") import traceback self._log(f"错误详情: {traceback.format_exc()}", "DEBUG") def _send_login_success_message(self, store_id): """向后端发送登录成功的通知""" try: self._log(f"开始发送登录成功通知,店铺ID: {store_id}", "INFO") from WebSocket.backend_singleton import get_backend_client backend = get_backend_client() if backend: message = { "type": "connect_message", "store_id": store_id, "status": True, # 登录成功 "cookies": self.cookies # 🔥 新增:添加登录生成的cookie信息 } self._log(f"准备发送登录成功消息: {message}", "DEBUG") backend.send_message(message) self._log("✅ 成功向后端发送登录成功通知", "SUCCESS") else: self._log("❌ 获取后端客户端失败", "ERROR") except Exception as e: self._log(f"❌ 发送登录成功通知失败: {e}", "ERROR") import traceback self._log(f"错误详情: {traceback.format_exc()}", "DEBUG") def _send_login_failure_message(self, store_id, error_msg): """向后端发送登录失败的通知""" try: self._log(f"开始发送登录失败通知,店铺ID: {store_id}, 错误: {error_msg}", "INFO") from WebSocket.backend_singleton import get_backend_client backend = get_backend_client() if backend: message = { "type": "connect_message", "store_id": store_id, "status": False, # 登录失败 "content": error_msg # 官方返回的失败原因 } self._log(f"准备发送登录失败消息: {message}", "DEBUG") backend.send_message(message) self._log("✅ 成功向后端发送登录失败通知", "SUCCESS") else: self._log("❌ 获取后端客户端失败", "ERROR") except Exception as e: self._log(f"❌ 发送登录失败通知失败: {e}", "ERROR") import traceback self._log(f"错误详情: {traceback.format_exc()}", "DEBUG") def request_verification_code(self, phone_number, store_id, original_phone=None): """向抖音平台请求发送验证码""" self._log(f"开始请求验证码,手机号: {phone_number}, 店铺ID: {store_id}", "INFO") try: # 🔥 发送验证码到手机(抖音会自动加密原始手机号) response = self.send_activation_code(phone_number) self._log(f"发送验证码请求结果: {response.text}") # 发送消息给后端,告知需要验证码(使用原始手机号) self._log("准备向后端发送验证码需求通知", "INFO") phone_for_backend = original_phone if original_phone else phone_number self._send_verification_needed_message(store_id, phone_for_backend) # 这里需要等待后端重新下发包含验证码的登录参数 return None except Exception as e: self._log(f"❌ 请求验证码失败: {e}", "ERROR") self._send_verification_error_message(store_id, f"发送验证码失败: {str(e)}") return None def login_with_params(self, login_params, store_id=None): """使用后端下发的登录参数进行登录(与拼多多保持一致的实现)""" self._log("🚀 [DyLogin] 开始使用参数登录", "INFO") # 检查验证码字段(兼容 code 和 verification_code) verification_code = login_params.get("verification_code") or login_params.get("code", "") phone_number = login_params.get("phone_number", "") encrypted_phone = login_params.get("encrypted_phone", "") self._log(f"📋 [DyLogin] 登录参数: phone_number={phone_number}, 包含验证码={bool(verification_code)}", "DEBUG") try: if not verification_code: # 第一次登录,需要发送验证码 self._log("检测到需要手机验证码,正在调用发送验证码方法", "INFO") self._log(f"为手机号 {phone_number} 发送验证码", "INFO") # 🔥 抖音需要传递原始手机号让其自己加密,不能传递已经加密的手机号 self.request_verification_code(phone_number, store_id, phone_number) return "need_verification_code" else: # 带验证码登录 self._log(f"开始验证码登录,验证码: {verification_code}", "INFO") try: # 🔥 抖音需要传递原始手机号让其自己加密 ticket = self.verify(phone_number, verification_code) self._log(f"✅ 验证码验证成功,获取到ticket", "SUCCESS") # 执行后续登录流程 self.callback(ticket) login_subject_uid, user_identity_id, encode_shop_id = self.subject_list() self.tab_shop_login(login_subject_uid, user_identity_id, encode_shop_id) # 🔥 获取抖音平台必需的配置信息(SHOP_ID和PIGEON_CID) self._log("🔄 开始获取抖音平台配置信息", "INFO") shop_config = self.get_shop_config() if shop_config: # 将配置信息添加到cookies中 self.cookies.update(shop_config) self._log("🎉 登录成功!配置信息已获取", "SUCCESS") # 🔥 发送登录成功通知给后端(与拼多多保持一致) self._send_login_success_message(store_id) return self.cookies else: error_msg = "获取抖音平台配置信息失败" self._log(f"❌ {error_msg}", "ERROR") self._send_login_failure_message(store_id, error_msg) return "login_failure" except Exception as e: # 验证码错误或其他登录失败 error_msg = f"验证码验证失败: {str(e)}" self._log(f"❌ {error_msg}", "ERROR") self._send_verification_error_message(store_id, error_msg) return "verification_code_error" except Exception as e: # 登录过程中的其他错误 error_msg = f"登录过程出错: {str(e)}" self._log(f"❌ {error_msg}", "ERROR") self._send_login_failure_message(store_id, error_msg) return "login_failure" # ===== 抖音登录相关类集成结束 ===== # 抖音WebSocket管理器类 class DouYinWebsocketManager: _instance = None _lock = threading.Lock() def __new__(cls): if cls._instance is None: with cls._lock: if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._store = {} cls._instance._lock = threading.RLock() return cls._instance def on_connect(self, shop_key, douyin_bot, **kwargs): """存储抖音机器人连接信息""" with self._lock: entry = self._store.setdefault(shop_key, { 'platform': None, 'customers': [], 'user_assignments': {} }) entry['platform'] = { 'douyin_bot': douyin_bot, 'message_handler': douyin_bot.message_handler if douyin_bot else None, 'last_heartbeat': datetime.now(), **kwargs } return entry def get_connection(self, shop_key): """获取抖音连接信息""" with self._lock: return self._store.get(shop_key) def remove_connection(self, shop_key): """移除抖音连接""" with self._lock: if shop_key in self._store: del self._store[shop_key] def get_all_connections(self): """获取所有抖音连接""" with self._lock: return dict(self._store) @dataclass class StaffInfo: """客服信息结构体""" staff_id: str name: str status: str department: str = None online: bool = True def to_dict(self): return { "staff_id": self.staff_id, "name": self.name, "status": self.status, "department": self.department or "", "online": self.online } class DouYinBackendService: """抖音后端服务调用类(新版:使用统一后端连接 BackendClient)""" def __init__(self, *args, **kwargs): self.current_store_id = "" async def connect(self, store_id, *args, **kwargs): """连接后端服务(使用统一连接,无需独立连接)""" try: self.current_store_id = str(store_id or "") except Exception: self.current_store_id = "" return True async def send_message_to_backend(self, platform_message): """🔥 改为通过单后端连接发送,与拼多多保持完全一致的逻辑""" try: from WebSocket.backend_singleton import get_backend_client backend = get_backend_client() if not backend: return None # 🔥 确保消息包含store_id(与拼多多一致) if isinstance(platform_message, dict): if 'store_id' not in platform_message and self.current_store_id: platform_message['store_id'] = self.current_store_id # 🔥 通过统一后端连接发送(与拼多多完全一致) backend.send_message(platform_message) return True except Exception: return None async def close(self): """关闭连接(统一连接模式下无需特殊处理)""" pass class DouYinMessageHandler: """抖音消息处理器""" def __init__(self, cookie: dict, ai_service: DouYinBackendService, store_id: str): import uuid self.instance_id = str(uuid.uuid4())[:8] # 添加实例ID用于调试 self.cookie = cookie self.ai_service = ai_service self.store_id = store_id self.user_tokens = {} self.config = None self.wss_url = None self.ws = None self.is_running = True self._loop = None # 添加事件循环 # asyncio.set_event_loop(self._loop) # 设置事件循环 self._thread = None # 添加事件循环线程 # 设置 AI 服务器的消息处理器引用(统一连接模式下无需设置) # self.ai_service.set_message_handler(self) # 添加视频解析相关的headers self.video_headers = { "authority": "pigeon.jinritemai.com", "accept": "application/json, text/plain, */*", "accept-language": "zh-CN,zh;q=0.9", "cache-control": "no-cache", "origin": "https://im.jinritemai.com", "pragma": "no-cache", "referer": "https://im.jinritemai.com/", "sec-ch-ua": "\"Not_A Brand\";v=\"8\", \"Chromium\";v=\"120\", \"Google Chrome\";v=\"120\"", "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": "\"Windows\"", "sec-fetch-dest": "empty", "sec-fetch-mode": "cors", "sec-fetch-site": "same-site", "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", "x-secsdk-csrf-token": "0001000000017e890e18651b2ef5f6d36d0485a64cae6b0bfc36d69e27fdc20fe7d423670eba1861a5bcb5baaf40,a25cfb6098b498c33ee5f0a5dcafe47b" } # 打印实例创建信息 print(f"[DY Handler] 创建实例 {self.instance_id} for store {store_id}") def get_casl(self): """获取可分配客服列表 - 根据原始代码实现""" headers = { "authority": "pigeon.jinritemai.com", "accept": "application/json, text/plain, */*", "accept-language": "zh-CN,zh;q=0.9", "cache-control": "no-cache", "origin": "https://im.jinritemai.com", "pragma": "no-cache", "referer": "https://im.jinritemai.com/", "sec-ch-ua": "\"Not_A Brand\";v=\"8\", \"Chromium\";v=\"120\", \"Google Chrome\";v=\"120\"", "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": "\"Windows\"", "sec-fetch-dest": "empty", "sec-fetch-mode": "cors", "sec-fetch-site": "same-site", "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", "x-secsdk-csrf-token": "000100000001fbdad828f64ea30cad11ae4188140964c4e321a7f711791309da962ec586d0471861a72a472781fb,a25cfb6098b498c33ee5f0a5dcafe47b" } url = "https://pigeon.jinritemai.com/backstage/getCanAssignStaffList" params = { "biz_type": "4", "PIGEON_BIZ_TYPE": "2", "_ts": int(time.time() * 1000), "_pms": "1", "FUSION": "true", "verifyFp": "", # 🔥 恢复为空字符串,因为原始代码中也是空的 "_v": "1.0.1.3585" } try: self._log(f"🔄 正在获取客服列表,cookies包含字段: {list(self.cookie.keys())}", "DEBUG") # 🔥 按照原始代码的方式处理响应 response = requests.get(url, headers=headers, cookies=self.cookie, params=params).json() self._log(f"客服列表API响应内容: {response}", "DEBUG") if response.get('code') == 0: staff_data = response.get('data', []) self._log(f"✅ 成功获取客服列表,共 {len(staff_data)} 个客服", "SUCCESS") return staff_data else: error_msg = response.get('message', '未知错误') self._log(f"❌ 客服列表API返回错误: code={response.get('code')}, message={error_msg}", "ERROR") return None except Exception as e: self._log(f"❌ 获取客服列表失败: {e}", "ERROR") import traceback self._log(f"错误详情: {traceback.format_exc()}", "DEBUG") return None def transfer_conversation(self, receiver_id, shop_id, staff_id): """转接对话""" headers = { "authority": "pigeon.jinritemai.com", "accept": "application/json, text/plain, */*", "accept-language": "zh-CN,zh;q=0.9", "cache-control": "no-cache", "origin": "https://im.jinritemai.com", "pragma": "no-cache", "referer": "https://im.jinritemai.com/", "sec-ch-ua": "\"Not_A Brand\";v=\"8\", \"Chromium\";v=\"120\", \"Google Chrome\";v=\"120\"", "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": "\"Windows\"", "sec-fetch-dest": "empty", "sec-fetch-mode": "cors", "sec-fetch-site": "same-site", "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", "x-secsdk-csrf-token": "000100000001fbdad828f64ea30cad11ae4188140964c4e321a7f711791309da962ec586d0471861a72a472781fb,a25cfb6098b498c33ee5f0a5dcafe47b" } url = "https://pigeon.jinritemai.com/chat/api/backstage/conversation/transfer_conversation?PIGEON_BIZ_TYPE=2" params = { "bizConversationId": f"{receiver_id}:{shop_id}::2:1:pigeon", "toCid": f"{staff_id}", "extParams": "{}" } try: response = requests.post(url, headers=headers, cookies=self.cookie, json=params) if response.status_code == 200: result = response.json() if result.get('code') == 0: self._log(f"✅ 成功转接对话到客服 {staff_id}", "SUCCESS") return True self._log(f"❌ 转接对话失败: {response.text}", "ERROR") return False except Exception as e: self._log(f"❌ 转接对话异常: {e}", "ERROR") return False # 新增: 发送客服列表消息到后端 async def send_staff_list_to_backend(self): """发送客服列表到后端""" try: # 获取客服列表 staff_list = self.get_casl() if staff_list is None: self._log("⚠️ 获取客服列表失败", "WARNING") return False # 🔥 处理空客服列表的情况(API成功但无客服) if len(staff_list) == 0: self._log("📋 当前无可分配客服,发送空列表到后端", "INFO") # 继续处理,发送空列表给后端 # 转换客服数据格式 - 🔥 根据原始代码调试实际字段名 staff_infos = [] for staff in staff_list: # 打印原始数据结构用于调试 self._log(f"🔍 原始客服数据结构: {staff}", "DEBUG") staff_info = StaffInfo( staff_id=str(staff.get('staffId', '') or staff.get('staff_id', '') or staff.get('id', '')), name=staff.get('staffName', '') or staff.get('staff_name', '') or staff.get('name', ''), status=str(staff.get('status', 0) or staff.get('state', 0)), department=staff.get('department', '') or staff.get('dept', ''), online=staff.get('online', True) ) staff_infos.append(staff_info.to_dict()) # 创建消息模板 message_template = PlatformMessage( type="staff_list", content="客服列表更新", data={ "staff_list": staff_infos, "total_count": len(staff_infos) }, store_id=self.store_id ) # 发送到后端 await self.ai_service.send_message_to_backend(message_template.to_dict()) self._log(f"发送客服列表消息的结构体为: {message_template.to_json()}") if len(staff_infos) > 0: self._log(f"✅ [DY] 成功发送客服列表到后端,共 {len(staff_infos)} 个客服", "SUCCESS") print(f"🔥 [DY] 客服列表已上传到后端: {len(staff_infos)} 个客服") print(f"[DY] 客服详情: {[{'id': s['staff_id'], 'name': s['name']} for s in staff_infos]}") else: self._log(f"✅ [DY] 成功发送空客服列表到后端(当前无可分配客服)", "SUCCESS") print(f"🔥 [DY] 空客服列表已上传到后端(当前无可分配客服)") return True except Exception as e: self._log(f"❌ 发送客服列表到后端异常: {e}", "ERROR") return False def start_event_loop(self): """启动事件循环线程""" if self._loop is not None and not self._loop.is_closed(): self._loop.close() self._loop = asyncio.new_event_loop() def run_loop(): asyncio.set_event_loop(self._loop) try: self._loop.run_forever() except Exception as e: self._log(f"事件循环异常: {e}", "ERROR") finally: # 清理资源 tasks = asyncio.all_tasks(self._loop) for task in tasks: task.cancel() self._loop.run_until_complete(asyncio.gather(*tasks, return_exceptions=True)) self._loop.close() self._thread = threading.Thread(target=run_loop, daemon=True) self._thread.start() def _log(self, message, level="INFO"): """日志记录""" timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") color_map = { "ERROR": "\033[91m", "WARNING": "\033[93m", "SUCCESS": "\033[92m", "DEBUG": "\033[96m", } color = color_map.get(level, "") reset = "\033[0m" print(f"{color}[{timestamp}] [{level}] {message}{reset}") def get_config(self): """获取配置信息""" headers = { 'accept': 'application/json, text/plain, */*', 'accept-language': 'zh-CN,zh;q=0.9', 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36', } params = [ ('PIGEON_BIZ_TYPE', '2'), ('biz_type', '4'), ('_ts', int(time.time() * 1000)), ('_pms', '1'), ('FUSION', 'true'), ] try: self._log("正在请求配置信息...", "DEBUG") response = requests.get( 'https://pigeon.jinritemai.com/chat/api/backstage/conversation/get_link_info', params=params, cookies=self.cookie, headers=headers, timeout=30 ) self._log(f"响应状态码: {response.status_code}", "DEBUG") self._log(f"响应内容: {response.text}", "DEBUG") if response.status_code != 200: raise Exception(f"HTTP请求失败: {response.status_code}") data = response.json() self._log(f"解析后的JSON数据: {data}", "DEBUG") if data.get('code') != 0: error_msg = data.get('message', '未知错误') raise Exception(f"获取配置失败: {error_msg}") if 'data' not in data: raise Exception("响应数据中缺少data字段") config_data = data['data'] required_fields = ['token', 'appId', 'frontierConfig', 'websocketUrl', 'pigeon_sign'] for field in required_fields: if field not in config_data: raise Exception(f"配置数据中缺少必要字段: {field}") params = { 'token': config_data['token'], 'aid': config_data['appId'], 'fpid': str(config_data['frontierConfig']['fpId']), 'device_id': '1216524360102748', 'access_key': 'd4ececcc8a27b1fb63389380b8007fb0', 'device_platform': 'web', 'version_code': '10000', 'pigeon_source': 'web', 'PIGEON_BIZ_TYPE': '2', 'pigeon_sign': config_data['pigeon_sign'] } websocket_url = f"{config_data['websocketUrl']}?{urlencode(params)}" self._log(f"生成的WebSocket URL: {websocket_url}", "DEBUG") return data, websocket_url except requests.exceptions.RequestException as e: self._log(f"请求异常: {e}", "ERROR") raise Exception(f"网络请求失败: {e}") except json.JSONDecodeError as e: self._log(f"JSON解析失败: {e}", "ERROR") raise Exception(f"响应解析失败: {e}") except Exception as e: self._log(f"配置获取失败: {e}", "ERROR") raise def on_error(self, ws, error): """WebSocket错误处理""" self._log(f"❌ WebSocket错误: {error}", "ERROR") self.is_running = False def on_close(self, ws, close_status_code, close_msg): """WebSocket关闭处理""" self._log(f"🔌 连接关闭: {close_status_code}, {close_msg}", "WARNING") self.is_running = False def heartbeat_wss(self): """心跳线程""" while self.is_running: try: if self.ws and hasattr(self.ws, 'sock') and self.ws.sock and self.ws.sock.connected: self.send_heartbeat() time.sleep(3) except Exception as e: self._log(f"❌ 心跳发送失败: {e}", "ERROR") time.sleep(5) def send_heartbeat(self): """发送心跳包 - 使用 message_arg 中的方法""" try: # 使用 message_arg 中的 heartbeat_message 方法 value, message_type = heartbeat_message( pigeon_sign=self.config["data"]["pigeon_sign"], token=self.config["data"]["token"], session_did=self.cookie["PIGEON_CID"] ) form_data = blackboxprotobuf.encode_message(value=value, message_type=message_type) self.ws.send_bytes(form_data) self._log("💓 发送心跳包", "DEBUG") except Exception as e: self._log(f"❌ 发送心跳包失败: {e}", "ERROR") def decode_binary_data(self, obj): """ 递归解码字典中的二进制数据 """ if isinstance(obj, dict): # 如果是字典,递归处理每个值 return {k: self.decode_binary_data(v) for k, v in obj.items()} elif isinstance(obj, list): # 如果是列表,递归处理每个元素 return [self.decode_binary_data(item) for item in obj] elif isinstance(obj, tuple): # 如果是元组,递归处理每个元素 return tuple(self.decode_binary_data(item) for item in obj) elif isinstance(obj, bytes): # 如果是二进制数据,尝试解码 try: return obj.decode('utf-8') except UnicodeDecodeError: # 如果UTF-8解码失败,可以尝试其他编码或返回原始数据 try: return obj.decode('latin-1') except UnicodeDecodeError: return obj # 或者返回原始二进制数据 else: # 其他类型直接返回 return obj def on_open(self, ws): """WebSocket连接打开""" self._log(f"✅ WebSocket连接已打开", "SUCCESS") threading.Thread(target=self.heartbeat_wss, daemon=True).start() def code_parse(self, message: dict): try: user = (message.get("8").get("6").get("610").get("1").get("1")).decode() token = (message.get("8").get("6").get("610").get("1").get("4")).decode() except: user = None token = None return user, token def _init_user_info(self, receiver_id, token=None): """初始化用户信息""" if receiver_id not in self.user_tokens: self.user_tokens[receiver_id] = { "token": token, "last_sent": 0, "session_initialized": False, "talk_id": 7543146114111898922, # 添加默认值 "p_id": 7542727339747116329, # 添加默认值 "pending_messages": [] } elif token: self.user_tokens[receiver_id]["token"] = token self._log(f"✅ 更新用户 {receiver_id} 的Token", "DEBUG") def on_message(self, ws, content): """处理收到的消息""" try: message = blackboxprotobuf.decode_message(content)[0] # 解析二进制数据 decoded_message_readable = self.decode_binary_data(message) print(f"\n📨 收到消息: {decoded_message_readable}") user, token = self.code_parse(message=message) if user and token: receiver_id = int(user.split(":")[0]) self._init_user_info(receiver_id, token) print(f"✅ 获取到用户Token: 用户ID={receiver_id}") # 获取消息类型 msg_type = message.get('1') print(f"📋 消息类型: {msg_type}") # 获取内部消息结构 inner_msg = message.get('8', {}) # 处理inner_msg为bytes的情况 if isinstance(inner_msg, bytes): try: inner_msg = blackboxprotobuf.decode_message(inner_msg)[0] print(f"🔄 解析内部消息字节成功") except Exception as e: print(f"⚠️ 无法解析内部消息字节: {e}") return if not inner_msg or not isinstance(inner_msg, dict): print("⚠️ 消息结构不完整") return # 获取内部消息类型 inner_type = inner_msg.get('1') inner_data = inner_msg.get('6', {}) # 首先检查是否是心跳响应 if "5" in message: kv_pairs = message["5"] if isinstance(kv_pairs, list): for pair in kv_pairs: if isinstance(pair, dict) and pair.get("1") == b"code" and pair.get("2") == b"0": self._log("💓 心跳响应正常", "DEBUG") return print(f"📋 内部消息类型: {inner_type}") print(f"📋 内部消息数据键: {list(inner_data.keys()) if isinstance(inner_data, dict) else type(inner_data)}") # 处理不同类型的消息 if inner_type == 610: # Token消息 print("🔑 处理Token消息") self._handle_token_message(inner_data) elif inner_type == 500: # 聊天消息 print("💬 处理聊天消息") self._handle_chat_message(inner_data, message) elif inner_type == 200: # 心跳消息 print("💓 收到心跳消息") else: print(f"❓ 未知内部消息类型: {inner_type}") except Exception as e: print(f"❌ 解析消息时出错: {e}") traceback.print_exc() async def _process_pending_message(self, sender_id, message_content): """处理待发送的消息""" try: # 获取用户信息 user_info = self.user_tokens[sender_id] talk_id = user_info.get("talk_id", 0) p_id = user_info.get("p_id", 0) # 🔥 统一消息类型检测逻辑(与拼多多保持一致) content = message_content lc = str(content).lower() # 检测消息类型,使用与拼多多相同的逻辑 if any(ext in lc for ext in [".jpg", ".jpeg", ".png", ".gif", ".webp"]): msg_type = "image" elif any(ext in lc for ext in [".mp4", ".avi", ".mov", ".wmv", ".flv"]): msg_type = "video" elif any(keyword in lc for keyword in ['goods.html', 'item.html', 'item.jd.com', '商品卡片id:']): msg_type = "product_card" else: msg_type = "text" # 🔥 使用工厂方法创建消息模板(与拼多多保持一致) message_template = PlatformMessage.create_text_message( content=content, sender_id=str(sender_id), store_id=self.store_id ) # 动态设置检测到的消息类型 message_template.msg_type = msg_type # 发送消息到后端(使用统一连接) await self.ai_service.send_message_to_backend(message_template.to_dict()) self._log(f"✅ 待发送消息已发送到后端", "INFO") except Exception as e: self._log(f"❌ 处理待发送消息失败: {e}", "ERROR") def _handle_token_message(self, message): """处理Token消息""" try: # 直接从消息中获取token数据 token_data = message.get('610', {}).get('1', {}) if not token_data: self._log("❌ 未找到token数据", "ERROR") return user_bytes = token_data.get('1') token_bytes = token_data.get('4') if not (isinstance(user_bytes, bytes) and isinstance(token_bytes, bytes)): self._log("❌ token数据格式错误", "ERROR") return # 解码用户信息和token user = user_bytes.decode('utf-8') token = token_bytes.decode('utf-8') # 解析用户ID try: receiver_id = int(user.split(":")[0]) except (ValueError, IndexError): self._log(f"❌ 无法解析用户ID: {user}", "ERROR") return self._log(f"✅ 成功解析Token - 用户: {receiver_id}", "SUCCESS") # 确保用户信息存在并保存token if receiver_id not in self.user_tokens: self._log(f"🆕 创建用户信息: {receiver_id}", "DEBUG") self.user_tokens[receiver_id] = { "token": None, "last_sent": 0, "session_initialized": False, "talk_id": 0, "p_id": 0, "pending_messages": [] } # 保存token self.user_tokens[receiver_id]["token"] = token self._log(f"✅ 已保存用户 {receiver_id} 的Token", "DEBUG") # 处理待发送的消息 if "pending_messages" in self.user_tokens[receiver_id]: pending_msgs = self.user_tokens[receiver_id]["pending_messages"] if pending_msgs: self._log(f"📤 处理 {len(pending_msgs)} 条待发送消息", "INFO") for msg in pending_msgs: asyncio.run_coroutine_threadsafe( self._process_pending_message(receiver_id, msg), self._loop ) # 清空待发送消息列表 self.user_tokens[receiver_id]["pending_messages"] = [] except Exception as e: self._log(f"❌ 处理Token消息失败: {e}", "ERROR") self._log(f"❌ 错误详情: {traceback.format_exc()}", "DEBUG") def parse_video(self, vid): """解析视频获取播放地址""" try: self._log(f"🎥 开始解析视频,VID: {vid}", "INFO") url = "https://pigeon.jinritemai.com/backstage/video/getPlayToken" params = { "vid": vid, "_pms": "1" } response = requests.get( url, headers=self.video_headers, cookies=self.cookie, params=params, timeout=36 ).json() self._log(f"视频解析响应: {response}", "DEBUG") if response.get("code") != 0: self._log(f"❌ 获取视频token失败: {response.get('message')}", "ERROR") return None token = response.get("data", {}).get("token") if not token: self._log("❌ 未获取到视频token", "ERROR") return None # 解析token获取播放地址 token_url = "https://open.bytedanceapi.com/?" + token video_response = requests.get( token_url, headers=self.video_headers, cookies=self.cookie, timeout=38 ).json() self._log(f"视频播放信息: {video_response}", "DEBUG") result = video_response.get("Result", {}).get("Data", {}).get("PlayInfoList") if result and len(result) > 0: play_url = result[0].get("MainPlayUrl") if play_url: self._log(f"✅ 成功获取视频播放地址: {play_url[:50]}...", "SUCCESS") return play_url else: self._log("❌ 未找到MainPlayUrl", "ERROR") else: self._log("❌ 获取播放地址失败", "ERROR") return None except requests.exceptions.RequestException as e: self._log(f"❌ 网络请求失败: {e}", "ERROR") return None except json.JSONDecodeError as e: self._log(f"❌ JSON解析失败: {e}", "ERROR") return None except Exception as e: self._log(f"❌ 视频解析失败: {e}", "ERROR") return None # 获取商品id def get_goods_id(self, order_id): headers = { "authority": "fxg.jinritemai.com", "accept": "application/json, text/plain, */*", "accept-language": "zh-CN,zh;q=0.9", "cache-control": "no-cache", "pragma": "no-cache", "referer": "https://fxg.jinritemai.com/ffa/morder/order/detail?id=6921052297377971987", "sec-ch-ua": "\"Not_A Brand\";v=\"8\", \"Chromium\";v=\"120\", \"Google Chrome\";v=\"120\"", "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": "\"Windows\"", "sec-fetch-dest": "empty", "sec-fetch-mode": "cors", "sec-fetch-site": "same-origin", "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" } url = "https://fxg.jinritemai.com/api/order/orderDetail" params = { "order_id": order_id, "appid": "1", "__token": "d83b1e7b9e05390304167f02c95f1a74", "_bid": "ffa_order", "aid": "4272", "_lid": "720383572871", "verifyFp": "verify_mf3dk1nv_kbxjZXA8_9XBV_4Rvb_B3iI_XDueP5U4SoQx", "fp": "verify_mf3dk1nv_kbxjZXA8_9XBV_4Rvb_B3iI_XDueP5U4SoQx", "msToken": "OJcYEsXGO2a1822s3kkoFuk3zwOyj5nhjzxhynXWCz7b4Q4oDuxmbFf85-2C05VezCpYsbhaZ8ZtXs6hl_rrU2Akb5K3kOQQimXOVshe_7yCNW52NA8sAR3nOp_y6N99nQN8VxscYQJ6xLcvQN6CzpsjC7y0gxo6VpfHAVs=", } response = requests.get(url, headers=headers, cookies=self.cookie, params=params).json() # print(response) if response.get('code') == 0 and response.get('data').get('order_id') == order_id: # 表示成功展示了对应这个订单内部的信息 product_data = response.get('data').get('product') product_id = product_data.get('sku')[0].get('product_id') return product_id else: # 没有找到默认返回None print("没有根据订单信息找到内部的商品ID 请核对方法get_product_id") return None def _extract_message_text(self, msg_content): """从消息内容中提取文本和图片URL""" try: print("🔍 开始提取消息内容...") # 初始化返回结果 result = { 'text': None, 'avatar': None, 'image_url': None, 'thumbnail_url': None, 'goods_id': None, 'video_url': None, 'order_id': None } # 方法1: 处理文本消息(从'8'字段获取) text_content = msg_content.get('8') if text_content: print(f"📄 检测到文本内容") if isinstance(text_content, bytes): try: decoded = text_content.decode('utf-8') result['text'] = decoded self._log(f"✅ UTF-8解码成功: {decoded}", "SUCCESS") except UnicodeDecodeError: try: decoded = text_content.decode('gbk') result['text'] = decoded self._log(f"✅ GBK解码成功: {decoded}", "SUCCESS") except: self._log("⚠️ 无法解码文本内容", "DEBUG") elif isinstance(text_content, str): result['text'] = text_content self._log(f"✅ 直接使用文本内容: {text_content}") # 方法2: 统一处理'9'字段中的数据(包括文本和图片、消息、视频 meta_data = msg_content.get('9', []) if meta_data: print(f"🔍 检测到消息元数据") for item in meta_data: if not isinstance(item, dict): continue key = item.get('1', b'').decode('utf-8') if isinstance(item.get('1'), bytes) else str( item.get('1', '')) value = item.get('2', b'').decode('utf-8') if isinstance(item.get('2'), bytes) else str( item.get('2', '')) # 处理所有类型的URL if key == 'avatar_uri': result['avatar'] = value self._log(f"✅ 找到头像URL: {value}", "SUCCESS") elif key == 'imageUrl': result['image_url'] = value self._log(f"✅ 找到图片URL: {value}", "SUCCESS") elif key == 'thumbnailUrl': result['thumbnail_url'] = value self._log(f"✅ 找到缩略图URL: {value}", "SUCCESS") elif key == 'goods_id': goods_id = value result['goods_id'] = goods_id self._log(f"✅ 找到商品ID: {goods_id}", "SUCCESS") elif key == 'msg_render_model': try: video_info = json.loads(value) render_body = video_info.get('render_body', {}) # 提取视频URL viedo_url = render_body.get('coverURL') viedo_vid = render_body.get('vid') if viedo_vid and viedo_url: self._log(f"✅ 找到视频原始vid: {viedo_vid}", "SUCCESS") # 解析获取视频播放地址 play_url = self.parse_video(viedo_vid) if play_url: result['video_url'] = play_url else: # 如果解析失败 在打印对应的日志后 使用备用封面图片作为地址 if '\\u0026' in viedo_url: viedo_url = viedo_url.replace('\\u0026', '&') result['video_url'] = viedo_url self._log("⚠️ 使用视频封面作为备选", "WARNING") else: self._log("⚠️ 未找到视频核心id 可能出现新的数据存放地址", "DEBUG") except json.JSONDecodeError as e: self._log(f"解析视频信息失败: {e}", "ERROR") elif key == 'sku_order_id': # 取到了对应的订单号 id order_id = value result['order_id'] = order_id # 如果已经找到所有需要的URL,提前退出循环 if all([result['avatar'], result['image_url'], result['thumbnail_url']]): break # 方法3: 兼容处理旧的'9-1'字段(如果存在) image_data = msg_content.get('9-1', []) # 如果没有拿到这个arrayList那么就不需要在对里面的数据进行解析了 if image_data != []: print(f"📸 检测到旧的图片数据格式", "DEBUG") for item in image_data: if not isinstance(item, dict): continue key = item.get('1', b'').decode('utf-8') if isinstance(item.get('1'), bytes) else str( item.get('1', '')) value = item.get('2', b'').decode('utf-8') if isinstance(item.get('2'), bytes) else str( item.get('2', '')) if key == 'avatar_uri' and not result['avatar']: result['avatar'] = value self._log(f"✅ 找到头像URL: {value}", "SUCCESS") elif key == 'imageUrl' and not result['image_url']: result['image_url'] = value self._log(f"✅ 找到图片URL: {value}", "SUCCESS") elif key == 'thumbnailUrl' and not result['thumbnail_url']: result['thumbnail_url'] = value self._log(f"✅ 找到缩略图URL: {value}", "SUCCESS") elif key == 'goods_id': goods_id = value result['goods_id'] = goods_id self._log(f"✅ 找到商品ID: {goods_id}", "SUCCESS") elif key == 'msg_render_model': try: video_info = json.loads(value) render_body = video_info.get('render_body', {}) # 提取视频URL viedo_url = render_body.get('coverURL') viedo_vid = render_body.get('vid') if viedo_vid and viedo_url: self._log(f"✅ 找到视频原始vid: {viedo_vid}", "SUCCESS") # 解析获取视频播放地址 play_url = self.parse_video(viedo_vid) if play_url: result['video_url'] = play_url else: # 如果解析失败 在打印对应的日志后 使用备用封面图片作为地址 if '\\u0026' in viedo_url: viedo_url = viedo_url.replace('\\u0026', '&') result['video_url'] = viedo_url self._log("⚠️ 使用视频封面作为备选", "WARNING") else: self._log("⚠️ 未找到视频核心id 可能出现新的数据存放地址", "DEBUG") except json.JSONDecodeError as e: self._log(f"解析视频信息失败: {e}", "ERROR") elif key == 'sku_order_id': # 取到了对应的订单号 id order_id = value result['order_id'] = order_id # 如果都没找到内容,打印调试信息 if not any(result.values()): print("🔍 未找到有效内容,打印所有字段:") for key, value in msg_content.items(): if isinstance(value, bytes): try: decoded = value.decode('utf-8', errors='ignore') print(f" {key}: {decoded[:100]}...") except: print(f" {key}: [无法解码的字节数据,长度: {len(value)}]") elif isinstance(value, list): print(f" {key}: [列表数据,长度: {len(value)}]") else: print(f" {key}: {value}") return result except Exception as e: self._log(f"❌ 提取消息内容时出错: {e}", "ERROR") traceback.print_exc() return { 'text': None, 'avatar': None, 'image_url': None, 'thumbnail_url': None, 'goods_id': None, 'viedo_url': None, 'order_id': None } def _handle_chat_message(self, message, msg_type): """处理聊天消息 - 修改版本,支持多种消息类型""" try: self._log(f"🔍 开始解析{message} 消息结构", "DEBUG") msg_500 = message.get('500', {}) if not msg_500: return msg_content = msg_500.get('5', {}) if not msg_content: return # 从消息内容中获取类型和发送者ID inner_msg_type = msg_content.get("6", 0) sender_id = msg_content.get("7", 0) self._log(f"📊 内部消息类型: {inner_msg_type}, 发送者ID: {sender_id}", "DEBUG") print("会不会到这里来呢") # 处理不同类型的消息 if inner_msg_type == 1000 and sender_id and sender_id != int(self.cookie['PIGEON_CID']): # 用户文本消息 - 处理AI回复 self._log("✅ 检测到1000状态的有效消息", "INFO") self._handle_user_text_message(msg_content, sender_id) elif inner_msg_type == 50002 and sender_id and sender_id != int(self.cookie['PIGEON_CID']): # 系统消息 - 触发token请求 self._log("✅ 检测到50002类型的系统消息", "INFO") self._handle_system_message(msg_content, sender_id) else: # 忽略其他消息 self._log(f"🔍 忽略消息类型: {inner_msg_type}", "DEBUG") except Exception as e: self._log(f"❌ 处理聊天消息失败: {e}", "ERROR") self._log(f"❌ 错误详情: {traceback.format_exc()}", "DEBUG") def _handle_user_text_message(self, msg_content, sender_id): """处理用户文本消息""" try: # 解析消息内容 # 使用引入的新的解析全详情代码(包括图片与头像信息) message_dict = self._extract_message_text(msg_content) message_content = None if "8" in msg_content: content_data = msg_content["8"] self._log(f"📄 原始内容数据: {content_data}", "DEBUG") if message_dict and message_dict.get('text') != "客服水滴智能优品接入": self._log(f"💬 成功解析用户消息: '{message_dict}'", "SUCCESS") # 提取会话信息 talk_id = msg_content.get("20", 0) p_id = msg_content.get("5", 0) self._log(f"📝 会话信息 - TalkID: {talk_id}, PID: {p_id}", "DEBUG") # 存储会话信息 if sender_id not in self.user_tokens: self.user_tokens[sender_id] = { "token": None, "last_sent": 0, "session_initialized": False, "talk_id": talk_id, "p_id": p_id, "pending_messages": [] } else: # 更新会话信息 self.user_tokens[sender_id]["talk_id"] = talk_id self.user_tokens[sender_id]["p_id"] = p_id # 使用事件循环提交异步任务 asyncio.run_coroutine_threadsafe( self._get_and_send_ai_reply(sender_id, message_dict, talk_id, p_id), self._loop ) else: self._log("⚠️ 无法解析消息内容", "WARNING") except Exception as e: self._log(f"❌ 处理用户文本消息失败: {e}", "ERROR") def _handle_system_message(self, msg_content, sender_id): """处理系统消息 - 触发token请求""" try: # 提取会话信息 talk_id = msg_content.get("20", 0) p_id = msg_content.get("5", 0) self._log(f"📝 系统消息会话信息 - TalkID: {talk_id}, PID: {p_id}", "DEBUG") # 检查是否已经有该用户的token if sender_id not in self.user_tokens or not self.user_tokens[sender_id].get("token"): self._log(f"🆕 用户 {sender_id} 无token,请求token", "INFO") # 存储会话信息 if sender_id not in self.user_tokens: self.user_tokens[sender_id] = { "token": None, "last_sent": 0, "session_initialized": False, "talk_id": talk_id, "p_id": p_id, "pending_messages": [] } else: # 更新会话信息 self.user_tokens[sender_id]["talk_id"] = talk_id self.user_tokens[sender_id]["p_id"] = p_id # 请求token self._request_user_token(sender_id, p_id) else: self._log(f"✅ 用户 {sender_id} 已有token", "DEBUG") except Exception as e: self._log(f"❌ 处理系统消息失败: {e}", "ERROR") def _parse_message_content(self, msg_data): """解析消息内容""" try: # 尝试从不同字段解析内容 content_fields = ['4', '8', '14'] for field in content_fields: content = msg_data.get(field) if content: if isinstance(content, bytes): try: return content.decode('utf-8') except UnicodeDecodeError: try: return content.decode('gbk') except: continue elif isinstance(content, str): return content return None except Exception as e: self._log(f"❌ 解析消息内容失败: {e}", "ERROR") return None def _determine_message_type(self, message_dict): """确定消息类型""" if message_dict.get('video_url'): # 如果确认收到了video_url类型的数据 那么不可能是其它类型 return 'video' elif message_dict.get('image_url'): # 如果确认收到了image_url类型的数据 那么不可能是其它类型 return 'image' elif message_dict.get('order_id'): # 如果确认收到了order_id类型的数据 那么不可能是其它类型 只能是发送询问订单 return 'order' elif message_dict.get('goods_id'): # 如果确认收到了goods_id类型的数据 那么不可能是其它类型 只能是发送询问商品 return 'goods' else: return 'text' # 最普遍的类型 async def _get_and_send_ai_reply(self, sender_id, message_dict, talk_id, p_id): """获取并发送AI回复 - 修改版本,确保有token""" try: self._log(f"🚀 AI回复流程开始,用户: {sender_id}", "INFO") self._log(f"📋 用户消息内容: '{message_dict}'", "INFO") # 检查是否已有token,如果没有则先请求 if sender_id not in self.user_tokens or not self.user_tokens[sender_id].get("token"): self._log(f"🆕 用户 {sender_id} 无token,先请求token", "INFO") self.user_tokens[sender_id] = { "token": None, "last_sent": 0, "session_initialized": False, "talk_id": talk_id, "p_id": p_id, "pending_messages": [] # 存储等待发送的消息 } self._request_user_token(sender_id, p_id) # 将当前消息加入待处理队列 self.user_tokens[sender_id]["pending_messages"].append(message_dict) return # 预先定义默认值 (避免后续因为没有成功走到对应分支里产生的错误) # msg_type = "text" message_text = message_dict.get('text', '') avatar_url = message_dict.get('avatar', '') # 确定消息类型 (调用确认方法类型) msg_type = self._determine_message_type(message_dict) if msg_type == 'video' and avatar_url or message_dict['text'] == "[视频]": # 视频类型 message_text = message_dict['video_url'] avatar_url = avatar_url elif msg_type == 'image' and avatar_url: message_text = message_dict['image_url'] avatar_url = avatar_url elif msg_type == 'order': msg_type = 'order_card' order_id = message_dict['order_id'] goods_id = self.get_goods_id(order_id) if goods_id: message_text = f"订单号: {order_id}, 商品ID: {goods_id}" else: message_text = f"订单号: {order_id}" elif msg_type == 'goods' or message_dict.get("text", None) == "[商品]": # 商品类型 msg_type = 'product_card' message_text = f"商品卡片ID: {message_dict['goods_id']}" elif msg_type == 'text' and avatar_url: message_text = message_dict['text'] # 🔥 提取goods_info信息(与拼多多保持一致) goods_info = {} if msg_type == 'order': # 从抖音的订单信息中提取goods_info order_id = message_dict.get('order_id', '') goods_id = self.get_goods_id(order_id) if hasattr(self, 'get_goods_id') else message_dict.get('goods_id', '') goods_info = { 'goodsID': goods_id, 'orderSequenceNo': order_id } elif msg_type == 'goods': # 从抖音的商品信息中提取goods_info goods_id = message_dict.get('goods_id', '') goods_info = { 'goodsID': goods_id } # 🔥 统一消息类型检测逻辑(与拼多多完全一致) content = message_text lc = str(content).lower() # 使用与拼多多完全相同的检测逻辑 if any(ext in lc for ext in [".jpg", ".jpeg", ".png", ".gif", ".webp"]): msg_type = "image" elif any(ext in lc for ext in [".mp4", ".avi", ".mov", ".wmv", ".flv"]): msg_type = "video" else: msg_type = "text" # 🔥 订单卡片组装(与拼多多完全一致) if ("订单编号" in str(content) or msg_type == 'order') and goods_info: content = f"商品id:{goods_info.get('goodsID')} 订单号:{goods_info.get('orderSequenceNo')}" msg_type = "order_card" # 🔥 商品卡片检测(与拼多多完全一致) elif any(keyword in lc for keyword in ['goods.html', 'item.html', 'item.jd.com', '商品卡片id:']) or \ (goods_info and goods_info.get('goodsID') and not goods_info.get('orderSequenceNo')): msg_type = "product_card" # 🔥 使用工厂方法创建消息模板(与拼多多保持一致) message_template = PlatformMessage.create_text_message( content=content, sender_id=str(sender_id), store_id=self.store_id, pin_image=avatar_url ) # 动态设置检测到的消息类型 message_template.msg_type = msg_type # 🔥 添加调试日志(与拼多多保持一致) try: print(f"📤(SPEC) 发送到AI: {json.dumps(message_template.to_dict(), ensure_ascii=False)[:300]}...") except Exception: pass self._log("📤 准备发送消息到AI服务...", "INFO") self._log(f"📋 消息内容: {message_template.to_json()}", "DEBUG") # 发送消息到后端(使用统一连接,不等待回复) start_time = time.time() await self.ai_service.send_message_to_backend(message_template.to_dict()) response_time = time.time() - start_time self._log(f"✅ 消息已发送到后端,耗时: {response_time:.2f}s", "SUCCESS") self._log("🔄 等待后端AI处理并通过GUI转发回复", "INFO") except Exception as e: self._log(f"❌ 获取AI回复失败: {e}", "ERROR") self._log(f"❌ 错误详情: {traceback.format_exc()}", "DEBUG") async def _check_and_send_reply(self, sender_id, talk_id, p_id, reply_text): """检查并发送回复""" try: self._log(f"🔍 检查发送条件,用户: {sender_id}", "DEBUG") # 确保用户信息存在 if sender_id not in self.user_tokens: self._log(f"❌ 用户 {sender_id} 信息不存在", "ERROR") return user_token = self.user_tokens[sender_id].get("token") if not user_token: self._log(f"⚠️ 用户 {sender_id} token为空,将消息加入待发送队列", "WARNING") # 将消息加入待发送队列 if "pending_messages" not in self.user_tokens[sender_id]: self.user_tokens[sender_id]["pending_messages"] = [] self.user_tokens[sender_id]["pending_messages"].append(reply_text) return # 检查发送频率 current_time = int(time.time() * 1000) last_sent = self.user_tokens[sender_id].get("last_sent", 0) if current_time - last_sent < 4000: self._log("⏳ 发送太频繁,跳过", "DEBUG") return self._log(f"📤 准备发送回复给用户 {sender_id}", "INFO") self._log(f"📝 回复内容: '{reply_text}'", "DEBUG") # 发送回复 success = await self._send_message_to_user(sender_id, talk_id, p_id, user_token, reply_text) if success: self.user_tokens[sender_id]["last_sent"] = current_time self._log(f"✅ 回复发送完成", "SUCCESS") else: self._log(f"❌ 回复发送失败", "ERROR") except Exception as e: self._log(f"❌ 检查并发送回复失败: {e}", "ERROR") async def _send_message_to_user(self, receiver_id, talk_id, p_id, user_token, content): """发送消息给用户 - 核心发送方法""" try: self._log(f"📤 正在发送消息给用户 {receiver_id}", "DEBUG") self._log(f"📝 消息内容: {content}", "DEBUG") # 检查必要的参数 if not all([receiver_id, talk_id, p_id, user_token, content]): self._log("❌ 发送消息参数不全", "ERROR") print(f"{receiver_id}--{talk_id}--{p_id}--{user_token}--{content}") return False # 使用 message_arg 中的 send_message 方法创建消息 value, message_type = send_message( pigeon_sign=self.config["data"]["pigeon_sign"], token=self.config["data"]["token"], receiver_id=receiver_id, shop_id=self.cookie["SHOP_ID"], talk_id=talk_id, session_did=self.cookie["PIGEON_CID"], p_id=p_id, user_code=user_token, text=content ) # 编码消息 form_data = blackboxprotobuf.encode_message(value=value, message_type=message_type) # 发送消息 self.ws.send_bytes(form_data) self._log(f"✅ 消息已发送给用户 {receiver_id}", "SUCCESS") return True except Exception as e: self._log(f"❌ 发送消息给用户失败: {e}", "ERROR") return False def _request_user_token(self, receiver_id, p_id): """请求用户token - 使用 message_arg 中的方法""" try: # 使用 message_arg 中的 get_user_code 方法 value, message_type = get_user_code( pigeon_sign=self.config["data"]["pigeon_sign"], token=self.config["data"]["token"], receiver_id=receiver_id, shop_id=self.cookie["SHOP_ID"], session_did=self.cookie["PIGEON_CID"], p_id=p_id ) form_data = blackboxprotobuf.encode_message(value=value, message_type=message_type) self.ws.send_bytes(form_data) self._log(f"📤 已请求用户 {receiver_id} 的token", "INFO") except Exception as e: self._log(f"❌ 请求token失败: {e}", "ERROR") def _log_user_tokens_state(self): """记录当前 user_tokens 状态(用于调试)""" self._log("🔍 当前 user_tokens 状态:", "DEBUG") for user_id, info in self.user_tokens.items(): token_status = "有" if info.get("token") else "无" self._log( f" 用户 {user_id}: token={token_status}, talk_id={info.get('talk_id')}, p_id={info.get('p_id')}", "DEBUG") async def _handle_customer_message(self, message_data): """处理来自后端的客服消息""" try: self._log(f"🔍 _handle_customer_message 被调用,当前实例: {self}", "DEBUG") self._log_user_tokens_state() content = message_data.get("content", "") if not content: self._log("⚠️ 客服消息内容为空", "WARNING") return self._log(f"后端客服传输的消息为:{content}") receiver_info = message_data.get("receiver", {}) receiver_id = receiver_info.get("id") if receiver_id and content: self._log(f"📤 收到客服消息,准备发送给用户 {receiver_id}", "INFO") # 确保 receiver_id 是整数类型 try: receiver_id = int(receiver_id) except (ValueError, TypeError): self._log(f"❌ 无法转换用户ID为整数: {receiver_id}", "ERROR") return # 检查用户信息是否存在,如果不存在则初始化 if receiver_id not in self.user_tokens: self._log(f"🆕 用户 {receiver_id} 信息不存在,初始化用户信息", "INFO") self.user_tokens[receiver_id] = { "token": None, "last_sent": 0, "session_initialized": False, "talk_id": 7543146114111898922, # 使用默认值 "p_id": 7542727339747116329, # 使用默认值 "pending_messages": [content] # 直接将消息加入待发送队列 } # 如果没有token,尝试获取token if not self.user_tokens[receiver_id]["token"]: self._log(f"🔄 用户 {receiver_id} 无token,尝试获取", "INFO") # 触发token获取 self._request_user_token( receiver_id, self.user_tokens[receiver_id]["p_id"] ) # 注意:这里不返回,让消息已经在队列中等待处理 return user_info = self.user_tokens[receiver_id] # 如果有token,直接发送消息 if user_info.get("token"): self._log(f"✅ 用户 {receiver_id} 有token,直接发送消息", "INFO") # 检查必要的字段是否存在 required_fields = ["talk_id", "p_id", "token"] for field in required_fields: if field not in user_info or not user_info[field]: self._log(f"❌ 用户 {receiver_id} 缺少必要字段: {field}", "ERROR") return # 发送消息 await self._send_message_to_user( receiver_id, user_info["talk_id"], user_info["p_id"], user_info["token"], content ) else: # 没有token,将消息加入队列并请求token self._log(f"⚠️ 用户 {receiver_id} 无token,将消息加入队列", "INFO") if "pending_messages" not in user_info: user_info["pending_messages"] = [] user_info["pending_messages"].append(content) # 请求token self._request_user_token( receiver_id, user_info["p_id"] ) except Exception as e: self._log(f"❌ 处理客服消息失败: {e}", "ERROR") self._log(f"❌ 错误详情: {traceback.format_exc()}", "DEBUG") async def start_async(self): """异步启动消息处理器(不阻塞调用线程)""" try: self._log("🚀 异步启动消息处理器...", "INFO") # 启动事件循环 self.start_event_loop() # 获取配置 self.config, self.wss_url = self.get_config() self._log("✅ 配置获取成功", "SUCCESS") # 连接后端服务(使用统一连接,无需独立连接) await self.ai_service.connect(self.store_id) self._log("✅ 后端服务连接成功(使用统一连接)", "SUCCESS") # 创建WebSocket连接 self.ws = WebSocketApp( self.wss_url, header={ 'Origin': 'https://im.jinritemai.com', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36', }, on_open=self.on_open, on_message=self.on_message, on_error=self.on_error, on_close=self.on_close ) # 添加重连机制 def run_with_reconnect(): while self.is_running: try: self.ws.run_forever() except Exception as e: self._log(f"WebSocket连接异常: {e}, 5秒后重连...", "WARNING") time.sleep(5) continue # 注册消息处理器(统一连接模式下无需注册) # 在新线程中运行WebSocket ws_thread = threading.Thread(target=run_with_reconnect, daemon=True) ws_thread.start() self._log("🟢 消息处理器异步启动完成", "SUCCESS") return True except Exception as e: self._log(f"❌ 异步启动失败: {e}", "ERROR") return False def start(self): """启动消息处理器""" try: self._log("🚀 启动消息处理器...", "INFO") # 启动事件循环 self.start_event_loop() # 获取配置 self.config, self.wss_url = self.get_config() self._log("✅ 配置获取成功", "SUCCESS") # 连接后端服务(使用统一连接,无需独立连接) async def connect_backend_service(): await self.ai_service.connect(self.store_id) self._log("✅ 后端服务连接成功(使用统一连接)", "SUCCESS") # 在事件循环中执行连接 asyncio.run_coroutine_threadsafe(connect_backend_service(), self._loop) # 创建WebSocket连接 self.ws = WebSocketApp( self.wss_url, header={ 'Origin': 'https://im.jinritemai.com', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36', }, on_open=self.on_open, on_message=self.on_message, on_error=self.on_error, on_close=self.on_close ) # 添加重连机制 def run_with_reconnect(): while self.is_running: try: self.ws.run_forever() except Exception as e: self._log(f"WebSocket连接异常: {e}, 5秒后重连...", "WARNING") time.sleep(5) continue # 注册消息处理器(统一连接模式下无需注册) # 在新线程中运行WebSocket ws_thread = threading.Thread(target=run_with_reconnect, daemon=True) ws_thread.start() self._log("🟢 开始运行消息处理器...", "INFO") # !!!关键修改:保持主线程运行!!! while self.is_running: time.sleep(1) # 主线程保持运行,防止进程退出 except Exception as e: self._log(f"❌ 启动失败: {e}", "ERROR") finally: self._log("🛑 消息处理器已停止", "INFO") async def close(self): """关闭连接""" self.is_running = False if self.ws: self.ws.close() if self.ai_service: await self.ai_service.close() # 关闭事件循环 if self._loop: self._loop.call_soon_threadsafe(self._loop.stop) if self._thread: self._thread.join(timeout=1.0) self._log("🛑 消息处理器已停止", "INFO") async def send_message_external(self, receiver_id: str, content: str) -> bool: """外部调用的发送消息方法 - 用于后端消息转发""" try: self._log(f"🔄 [External-{self.instance_id}] 收到转发请求: receiver_id={receiver_id}, content={content}", "INFO") # 修复数据类型不匹配问题:将字符串转换为整数 try: receiver_id_int = int(receiver_id) self._log(f"🔧 [External-{self.instance_id}] 转换 receiver_id: '{receiver_id}' -> {receiver_id_int}", "DEBUG") except ValueError: self._log(f"❌ [External-{self.instance_id}] receiver_id 无法转换为整数: {receiver_id}", "ERROR") return False # 调试信息:显示当前活跃用户 active_users = list(self.user_tokens.keys()) self._log(f"🔍 [External-{self.instance_id}] 当前活跃用户列表: {active_users}", "DEBUG") self._log(f"🔍 [External-{self.instance_id}] 活跃用户数量: {len(active_users)}", "DEBUG") # 检查用户是否存在于user_tokens中(使用整数类型) self._log(f"🔍 [External-{self.instance_id}] 调试信息:", "DEBUG") self._log(f"🔍 [External-{self.instance_id}] receiver_id_int: {receiver_id_int} (类型: {type(receiver_id_int)})", "DEBUG") self._log(f"🔍 [External-{self.instance_id}] user_tokens keys: {list(self.user_tokens.keys())}", "DEBUG") if self.user_tokens: first_key = list(self.user_tokens.keys())[0] self._log(f"🔍 [External-{self.instance_id}] 第一个key: {first_key} (类型: {type(first_key)})", "DEBUG") self._log(f"🔍 [External-{self.instance_id}] 直接比较: {receiver_id_int == first_key}", "DEBUG") if receiver_id_int not in self.user_tokens: self._log(f"❌ [External-{self.instance_id}] 用户 {receiver_id_int} 不在活跃会话中", "WARNING") self._log(f"💡 [External-{self.instance_id}] 提示:用户需要先在抖音平台发送消息建立会话", "INFO") # 显示当前活跃用户的调试信息 self.print_active_users_debug() return False user_info = self.user_tokens[receiver_id_int] talk_id = user_info.get("talk_id") p_id = user_info.get("p_id") user_token = user_info.get("token") self._log(f"🔍 [External-{self.instance_id}] 用户会话信息: talk_id={talk_id}, p_id={p_id}, has_token={bool(user_token)}", "DEBUG") # 检查必要参数 if not talk_id or not p_id: self._log(f"❌ [External-{self.instance_id}] 用户 {receiver_id_int} 缺少必要的会话信息 (talk_id: {talk_id}, p_id: {p_id})", "ERROR") return False if not user_token: self._log(f"⚠️ [External-{self.instance_id}] 用户 {receiver_id_int} token为空,尝试请求token", "WARNING") # 请求用户token self._request_user_token(receiver_id_int, p_id) # 将消息加入待发送队列 if "pending_messages" not in user_info: user_info["pending_messages"] = [] user_info["pending_messages"].append(content) self._log(f"📝 [External-{self.instance_id}] 消息已加入待发送队列,队列长度: {len(user_info['pending_messages'])}", "INFO") return True # 发送消息 (注意:_send_message_to_user 可能期望字符串类型的receiver_id) success = await self._send_message_to_user(receiver_id_int, talk_id, p_id, user_token, content) if success: # 更新最后发送时间 user_info["last_sent"] = int(time.time() * 1000) self._log(f"✅ [External-{self.instance_id}] 消息转发成功", "SUCCESS") else: self._log(f"❌ [External-{self.instance_id}] 消息转发失败", "ERROR") return success except Exception as e: self._log(f"❌ [External-{self.instance_id}] 外部消息发送异常: {e}", "ERROR") self._log(f"❌ [External-{self.instance_id}] 错误详情: {traceback.format_exc()}", "DEBUG") return False def get_active_users_info(self) -> dict: """获取当前活跃用户的详细信息""" try: active_users = {} for user_id, info in self.user_tokens.items(): active_users[user_id] = { "has_token": bool(info.get("token")), "talk_id": info.get("talk_id"), "p_id": info.get("p_id"), "last_sent": info.get("last_sent", 0), "pending_messages_count": len(info.get("pending_messages", [])), "session_initialized": info.get("session_initialized", False) } return active_users except Exception as e: self._log(f"获取活跃用户信息失败: {e}", "ERROR") return {} def print_active_users_debug(self): """打印当前活跃用户的调试信息""" try: active_users = self.get_active_users_info() self._log(f"🔍 [Debug-{self.instance_id}] 当前活跃用户总数: {len(active_users)}", "INFO") if not active_users: self._log(f"📝 [Debug-{self.instance_id}] 没有活跃用户。用户需要先在抖音平台发送消息建立会话。", "INFO") return for user_id, info in active_users.items(): self._log(f"👤 [Debug-{self.instance_id}] 用户 {user_id}:", "INFO") self._log(f" - Token: {'✅' if info['has_token'] else '❌'}", "INFO") self._log(f" - Talk ID: {info['talk_id']}", "INFO") self._log(f" - P ID: {info['p_id']}", "INFO") self._log(f" - 待发送消息: {info['pending_messages_count']}", "INFO") self._log(f" - 会话已初始化: {'✅' if info['session_initialized'] else '❌'}", "INFO") except Exception as e: self._log(f"打印调试信息失败: {e}", "ERROR") class DouYinChatBot: """抖音聊天机器人主类""" def __init__(self, cookie: dict, store_id: str = None): self.cookie = cookie self.store_id = store_id if not self.store_id: self.store_id = "68c16836-abac-4307-b763-ea1154823356" self.ai_service = DouYinBackendService() self.message_handler = None self.loop = asyncio.new_event_loop() # 统一的事件循环 self._stop_event = asyncio.Event() # 添加停止事件 async def initialize(self): """初始化聊天机器人""" try: # 创建消息处理器 self.message_handler = DouYinMessageHandler( cookie=self.cookie, ai_service=self.ai_service, store_id=self.store_id ) return True except Exception as e: print(f"❌ 初始化失败: {e}") return False def start(self): """启动聊天机器人""" try: # 运行初始化 asyncio.set_event_loop(self.loop) success = self.loop.run_until_complete(self.initialize()) if success: # 启动消息处理器 self.message_handler.start() # !!!关键修改:简化主循环!!! # 保持主线程运行,等待停止信号 while not self._stop_event.is_set(): time.sleep(1) else: print("❌ 聊天机器人启动失败") except Exception as e: print(f"❌ 启动失败: {e}") traceback.print_exc() finally: if not self._stop_event.is_set(): self.loop.close() async def close(self): """关闭聊天机器人""" self._stop_event.set() # 设置停止信号 if self.message_handler: await self.message_handler.close() if self.ai_service: await self.ai_service.close() class DouYinListenerForGUI: """用于GUI集成的抖音监听包装器""" def __init__(self, log_callback=None): self.douyin_bot = None self.log_callback = log_callback self.running = False self.stop_event = None self.loop = None def _log(self, message, log_type="INFO"): """处理日志输出""" if self.log_callback: self.log_callback(message, log_type) else: print(f"[{log_type}] {message}") async def start_listening(self, cookie_dict, text="您好,感谢您的咨询,我们会尽快回复您!"): """启动监听的主方法""" try: self._log("🔵 开始抖音平台连接流程", "INFO") # 验证cookie if not cookie_dict: self._log("❌ Cookie信息不能为空", "ERROR") return False required_fields = ['SHOP_ID', 'PIGEON_CID', 'sessionid'] missing_fields = [field for field in required_fields if field not in cookie_dict] if missing_fields: self._log(f"❌ 缺少必需的Cookie字段: {', '.join(missing_fields)}", "ERROR") return False self._log(f"✅ Cookie验证通过,店铺ID: {cookie_dict['SHOP_ID']}", "SUCCESS") # 创建抖音聊天机器人实例 self.douyin_bot = DouYinChatBot(cookie=cookie_dict) self.running = True self.stop_event = asyncio.Event() self._log("🎉 开始监听抖音平台消息...", "SUCCESS") # 初始化并启动机器人 success = await self.douyin_bot.initialize() if success: # 在新的事件循环中运行机器人 self.loop = asyncio.new_event_loop() asyncio.set_event_loop(self.loop) # 启动机器人 def run_bot(): self.douyin_bot.start() # 在新线程中运行机器人 bot_thread = threading.Thread(target=run_bot, daemon=True) bot_thread.start() return True else: self._log("❌ 抖音机器人初始化失败", "ERROR") return False except Exception as e: self._log(f"❌ 监听过程中出现严重错误: {str(e)}", "ERROR") import traceback self._log(f"错误详情: {traceback.format_exc()}", "DEBUG") return False async def start_with_cookies(self, store_id, cookie_dict): """使用下发的cookies与store_id直接建立抖音平台WS并开始监听""" try: self._log("🔵 [DY] 收到后端登录指令,开始使用cookies连接平台", "INFO") # 验证cookie if not cookie_dict: self._log("❌ Cookie信息不能为空", "ERROR") return False required_fields = ['SHOP_ID', 'PIGEON_CID', 'sessionid'] missing_fields = [field for field in required_fields if field not in cookie_dict] if missing_fields: self._log(f"❌ 缺少必需的Cookie字段: {', '.join(missing_fields)}", "ERROR") return False self._log(f"✅ Cookie验证通过,店铺ID: {cookie_dict['SHOP_ID']}", "SUCCESS") # 建立与后端的统一连接(确保使用GUI的store_id) backend_service = DouYinBackendService() await backend_service.connect(store_id) self._log("✅ 后端服务连接成功(使用统一连接)", "SUCCESS") # 创建抖音聊天机器人实例 self.douyin_bot = DouYinChatBot(cookie=cookie_dict, store_id=store_id) # 设置后端服务 self.douyin_bot.ai_service = backend_service self.running = True self.stop_event = asyncio.Event() self._log("🎉 开始监听抖音平台消息...", "SUCCESS") # 异步初始化机器人 success = await self.douyin_bot.initialize() if success: # 异步启动消息处理器 if self.douyin_bot.message_handler: message_handler_success = await self.douyin_bot.message_handler.start_async() if not message_handler_success: self._log("❌ 消息处理器启动失败", "ERROR") return False # 发送客服列表到后端 try: # 🔥 等待一小段时间确保连接完全建立 await asyncio.sleep(1) staff_list_success = await self.douyin_bot.message_handler.send_staff_list_to_backend() if staff_list_success: print(f"🔥 [DY] 客服列表已上传到后端") else: print(f"⚠️ [DY] 客服列表上传失败") except Exception as e: print(f"❌ [DY] 客服列表上传异常: {e}") self._log(f"❌ 抖音客服列表上传异常: {e}", "ERROR") # 注册到全局管理器 dy_manager = DouYinWebsocketManager() shop_key = f"抖音:{store_id}" dy_manager.on_connect(shop_key, self.douyin_bot, store_id=store_id, cookie=cookie_dict) self._log(f"✅ 已注册抖音连接: {shop_key}", "SUCCESS") # 创建一个长期运行的任务来保持监听状态 async def keep_running(): try: while not self.stop_event.is_set(): await asyncio.sleep(1) except asyncio.CancelledError: pass # 在后台启动监听任务 asyncio.create_task(keep_running()) # 🔥 新增:Cookie登录成功后发送登录成功报告(与登录参数模式保持一致) try: from WebSocket.backend_singleton import get_backend_client backend = get_backend_client() if backend: message = { "type": "connect_message", "store_id": store_id, "status": True, # 登录成功 "cookies": cookie_dict # 添加cookie信息 } backend.send_message(message) self._log("✅ [DY] 已向后端发送Cookie登录成功报告", "SUCCESS") else: self._log("⚠️ [DY] 无法获取后端客户端,跳过状态报告", "WARNING") except Exception as e: self._log(f"⚠️ [DY] 发送登录成功报告失败: {e}", "WARNING") self._log("✅ [DY] 抖音平台连接成功,开始监听消息", "SUCCESS") return True else: self._log("❌ 抖音机器人初始化失败", "ERROR") return False except Exception as e: self._log(f"❌ [DY] 监听过程中出现严重错误: {str(e)}", "ERROR") import traceback self._log(f"错误详情: {traceback.format_exc()}", "DEBUG") return False async def start_with_login_params(self, store_id: str, login_params: str): """使用后端下发的登录参数执行登录并启动监听(与拼多多保持一致)""" try: self._log("🔵 [DY] 收到后端登录参数,开始执行登录获取cookies", "INFO") self._log(f"🔍 [DY] 登录参数内容: {login_params[:100]}...", "DEBUG") # 1. 解析登录参数 self._log("🔄 [DY] 开始解析登录参数", "DEBUG") params_dict = self._parse_login_params(login_params) if not params_dict: self._log("❌ [DY] 登录参数解析失败", "ERROR") return False self._log(f"✅ [DY] 登录参数解析成功,手机号: {params_dict.get('phone_number', 'N/A')}", "DEBUG") # 2. 使用新的DyLogin类执行登录 self._log("🔄 [DY] 开始创建DyLogin实例", "DEBUG") dy_login = DyLogin(log_callback=self.log_callback) self._log("✅ [DY] DyLogin实例创建成功", "DEBUG") self._log("🔄 [DY] 开始执行登录", "DEBUG") login_result = dy_login.login_with_params(params_dict, store_id) self._log(f"📊 [DY] 登录结果: {login_result}", "DEBUG") if login_result == "need_verification_code": self._log("⚠️ [DY] 需要手机验证码,已通知后端,等待重新下发包含验证码的登录参数", "WARNING") return "need_verification_code" # 返回特殊标识,避免被覆盖 elif login_result == "verification_code_error": self._log("⚠️ [DY] 验证码错误,已通知后端", "WARNING") return "verification_code_error" # 返回特殊标识,避免重复发送消息 elif login_result == "login_failure": self._log("⚠️ [DY] 登录失败,已发送失败通知给后端", "WARNING") return "login_failure" # 返回特殊标识,避免重复发送消息 elif not login_result: self._log("❌ [DY] 登录失败", "ERROR") return False elif isinstance(login_result, dict): # 登录成功,获取到cookies self._log("✅ [DY] 登录成功,使用获取的cookies连接平台", "SUCCESS") # 🔥 使用获取的cookies启动监听(这里会验证cookies完整性) return await self.start_with_cookies(store_id, login_result) else: self._log("❌ [DY] 登录返回未知结果", "ERROR") return False except Exception as e: self._log(f"❌ [DY] 使用登录参数启动失败: {str(e)}", "ERROR") import traceback self._log(f"🔍 [DY] 异常详细信息: {traceback.format_exc()}", "ERROR") return False def _parse_login_params(self, login_params_str: str) -> dict: """解析后端下发的登录参数(与拼多多保持一致)""" try: import json data = json.loads(login_params_str) params = data.get("data", {}) self._log(f"✅ [DY] 登录参数解析成功: phone_number={params.get('phone_number', 'N/A')}", "INFO") return params except Exception as e: self._log(f"❌ [DY] 解析登录参数失败: {e}", "ERROR") return {} def stop_listening(self): """停止监听""" if self.stop_event: self.stop_event.set() self.running = False if self.douyin_bot: # 创建新的事件循环来关闭机器人 async def cleanup(): await self.douyin_bot.close() if self.loop and not self.loop.is_closed(): self.loop.run_until_complete(cleanup()) self.loop.close() self._log("抖音监听已停止", "INFO") # 使用示例 if __name__ == '__main__': cookies = { "passport_csrf_token": "9984fc5ad93abebf1a7bceb157fc4560", "passport_csrf_token_default": "9984fc5ad93abebf1a7bceb157fc4560", "qc_tt_tag": "0", "is_staff_user": "false", "SHOP_ID": "217051461", "PIGEON_CID": "1216524360102748", "passport_mfa_token": "CjeTQsM%2B1gYNk6uL6EBAHaZQqQ0qvxul3Up08WBuQeEr%2BGfitdRk91WzrSVG46q2ldU%2BOnVc%2BmPtGkoKPAAAAAAAAAAAAABPbMkgpgfbZwhLZyjNjTZUm1l5GH%2BAasL%2BonVECzLkUmLgj262JLsX2eLMDc2bxcEFUxCogvsNGPax0WwgAiIBA3PySKw%3D", "__security_mc_1_s_sdk_crypt_sdk": "da6a1b67-4e38-b24e", "bd_ticket_guard_client_data": "eyJiZC10aWNrZXQtZ3VhcmQtdmVyc2lvbiI6MiwiYmQtdGlja2V0LWd1YXJkLWl0ZXJhdGlvbi12ZXJzaW9uIjoxLCJiZC10aWNrZXQtZ3VhcmQtcmVlLXB1YmxpYy1rZXkiOiJCTWQ4eGpKMUY2THJEUDRwc283ald6T0ZzTWhCaXhLc3dla3BIQ1JleFNENFRWNmNOK09uOVdKQnIwNWFkNmgvRnM2N0hpQzUxSXJFTGZTSFVWakVtODQ9IiwiYmQtdGlja2V0LWd1YXJkLXdlYi12ZXJzaW9uIjoyfQ%3D%3D", "bd_ticket_guard_client_web_domain": "2", "ttwid": "1%7C85xTuedO1rj3zvv-kDVjAu2b0hgRS8wVjCPgOEaCaxo%7C1757499627%7Cb7216cdc62420b85602b4a810e31be759b7a184da042bf2a9a21629f081dd2d9", "odin_tt": "2b28de94dbb7a08d45168f8f138dda7defd529f717ce89d3148b3151488b33248f176c0c52b58cdca420cfb950e13ecd6dac26d6762394725087280441ac91c5", "passport_auth_status": "d96aa2b690a33e007d157cf4204b1078%2C62594da316130803f733a79b6a6774df", "passport_auth_status_ss": "d96aa2b690a33e007d157cf4204b1078%2C62594da316130803f733a79b6a6774df", "uid_tt": "d1dbdd6e4cebff9fbb496f8735b11c9d", "uid_tt_ss": "d1dbdd6e4cebff9fbb496f8735b11c9d", "sid_tt": "784e3e2545c5666529fa3ac9451ed016", "sessionid": "784e3e2545c5666529fa3ac9451ed016", "sessionid_ss": "784e3e2545c5666529fa3ac9451ed016", "ucas_c0": "CkEKBTEuMC4wELKIjrqrodTgaBjmJiDjhaCz682rASiwITDc5uCyws2UAkCRooXGBkiR1sHIBlCmvJOipunJ9WdYbhIU08vxfhaj0eG4pjCi5loa1uv2XEY", "ucas_c0_ss": "CkEKBTEuMC4wELKIjrqrodTgaBjmJiDjhaCz682rASiwITDc5uCyws2UAkCRooXGBkiR1sHIBlCmvJOipunJ9WdYbhIU08vxfhaj0eG4pjCi5loa1uv2XEY", "sid_guard": "784e3e2545c5666529fa3ac9451ed016%7C1757499665%7C5184000%7CSun%2C+09-Nov-2025+10%3A21%3A05+GMT", "session_tlb_tag": "sttt%7C17%7CeE4-JUXFZmUp-jrJRR7QFv_________3dxW8aRfOoXXRgtkArn2rBGBZ8E0_061PH30G5EzcU-M%3D", "sid_ucp_v1": "1.0.0-KDIwMjIxOWE0MzY1NzA1YTZmNzY3NTRmZGJkMTU1OTg1ODc0MWI0MjkKGwjc5uCyws2UAhCRooXGBhiwISAMOAZA9AdIBBoCbGYiIDc4NGUzZTI1NDVjNTY2NjUyOWZhM2FjOTQ1MWVkMDE2", "ssid_ucp_v1": "1.0.0-KDIwMjIxOWE0MzY1NzA1YTZmNzY3NTRmZGJkMTU1OTg1ODc0MWI0MjkKGwjc5uCyws2UAhCRooXGBhiwISAMOAZA9AdIBBoCbGYiIDc4NGUzZTI1NDVjNTY2NjUyOWZhM2FjOTQ1MWVkMDE2", "PHPSESSID": "a72b9c2977908a281d086061f7281046", "PHPSESSID_SS": "a72b9c2977908a281d086061f7281046", "csrf_session_id": "623da6c2834082a9036c812d995f39cc" } # # !!!关键修改:使用同步方式启动!!! # listener = DouYinListenerForGUI() # # # 创建新的事件循环来运行异步方法 # loop = asyncio.new_event_loop() # asyncio.set_event_loop(loop) # # try: # # 运行启动方法 # success = loop.run_until_complete(listener.start_with_cookies(store_id=None, cookie_dict=cookies)) # if success: # print("✅ 监听器启动成功,按 Ctrl+C 停止...") # # 保持主线程运行 # while listener.running: # time.sleep(1) # else: # print("❌ 监听器启动失败") # except KeyboardInterrupt: # print("\n🛑 正在停止监听器...") # listener.stop_listening() # except Exception as e: # print(f"❌ 程序异常: {e}") # traceback.print_exc() # finally: # loop.close() # 创建聊天机器人实例 bot = DouYinChatBot(cookie=cookies) try: # 启动机器人 bot.start() except KeyboardInterrupt: print("\n🛑 正在关闭机器人...") # 清理资源 async def cleanup(): await bot.close() # 创建临时事件循环进行清理 cleanup_loop = asyncio.new_event_loop() asyncio.set_event_loop(cleanup_loop) cleanup_loop.run_until_complete(cleanup()) cleanup_loop.close() print("✅ 机器人已关闭")