#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 自动更新模块 - 支持后台下载、静默安装和自动重启 优化版:带重试、超时、自动重启功能 """ import os import sys import subprocess import urllib.request import tempfile import time import socket from pathlib import Path from PyQt5.QtCore import QThread, pyqtSignal import ctypes class UpdateDownloader(QThread): """ 优化版更新下载器线程 特性: - 自动重试3次 - 30秒网络超时 - 文件大小验证 - 进度实时更新 """ # 信号定义 progress = pyqtSignal(int, int) # (已下载字节, 总字节) finished = pyqtSignal(str) # 下载完成,返回文件路径 error = pyqtSignal(str) # 下载失败,返回错误信息 retry_info = pyqtSignal(int, int) # 重试信息 (当前重试次数, 总重试次数) def __init__(self, download_url, version, max_retries=3): super().__init__() self.download_url = download_url self.version = version self.max_retries = max_retries # 最大重试次数 self.is_cancelled = False def run(self): """执行下载(带重试机制)""" for attempt in range(self.max_retries): try: if attempt > 0: # 显示重试信息 self.retry_info.emit(attempt + 1, self.max_retries) time.sleep(2) # 等待2秒后重试 # 执行实际下载 self._download_file() return # 下载成功,退出 except Exception as e: error_msg = str(e) print(f"[UpdateDownloader] 下载失败 (尝试 {attempt + 1}/{self.max_retries}): {error_msg}") if self.is_cancelled: # 用户取消,立即退出 self.error.emit("下载已取消") return if attempt < self.max_retries - 1: # 还有重试机会,继续 continue else: # 所有重试都失败了 self.error.emit(f"下载失败(已重试{self.max_retries}次)\n错误: {error_msg}") def _download_file(self): """实际下载逻辑(带超时和验证)""" # 设置网络超时 socket.setdefaulttimeout(30) # 30秒超时 # 准备临时目录和文件名 temp_dir = tempfile.gettempdir() filename = f"ShuiDi_AI_Assistant_Setup_v{self.version}.exe" file_path = os.path.join(temp_dir, filename) print(f"[UpdateDownloader] 开始下载到: {file_path}") print(f"[UpdateDownloader] 下载地址: {self.download_url}") # 删除旧文件(如果存在) if os.path.exists(file_path): try: os.remove(file_path) print(f"[UpdateDownloader] 已删除旧文件") except Exception as e: print(f"[UpdateDownloader] 删除旧文件失败: {e}") # 下载文件(带进度回调) def progress_callback(block_num, block_size, total_size): if self.is_cancelled: raise Exception("用户取消下载") downloaded = block_num * block_size # 避免超过总大小 if total_size > 0 and downloaded > total_size: downloaded = total_size self.progress.emit(downloaded, total_size) # 执行下载 urllib.request.urlretrieve( self.download_url, file_path, reporthook=progress_callback ) # 验证下载的文件 if not os.path.exists(file_path): raise Exception("下载的文件不存在") file_size = os.path.getsize(file_path) print(f"[UpdateDownloader] 下载完成,文件大小: {file_size / (1024*1024):.2f} MB") # 验证文件大小(至少1MB,避免下载了错误页面) min_file_size = 1024 * 1024 # 1MB(正式环境) # 🧪 测试小文件时可以临时改为:min_file_size = 10 * 1024 # 10KB if file_size < min_file_size: raise Exception(f"下载的文件大小异常: {file_size} bytes(可能不是有效的安装包)") # 发送完成信号 self.finished.emit(file_path) def cancel(self): """取消下载""" print("[UpdateDownloader] 用户取消下载") self.is_cancelled = True def run_as_admin(script_path): """ 以管理员身份运行脚本(Windows UAC提升) 特点: - 在GUI退出前弹出UAC提示 - 用户点击"允许"后,脚本以管理员身份运行 - 用户点击"取消",返回False Args: script_path: 批处理脚本路径 Returns: bool: True表示成功启动(用户点击允许),False表示失败或用户取消 """ try: print(f"[RunAsAdmin] 请求管理员权限执行: {script_path}") # 使用 ShellExecute 以管理员身份运行 # "runas" 参数会触发UAC提示 ret = ctypes.windll.shell32.ShellExecuteW( None, # hwnd: 父窗口句柄(None表示无父窗口) "runas", # lpOperation: 以管理员身份运行 "cmd.exe", # lpFile: 要执行的程序 f'/c "{script_path}"', # lpParameters: 命令行参数 None, # lpDirectory: 工作目录(None表示当前目录) 0 # nShowCmd: 0=隐藏窗口 ) # 返回值说明: # >32: 成功 # <=32: 失败(具体错误码) # - 5: 用户拒绝UAC(点击"否") # - 2: 文件未找到 # - 其他: 其他错误 if ret > 32: print(f"[RunAsAdmin] ✅ 成功:用户已授权,脚本正在以管理员身份运行") return True elif ret == 5: print(f"[RunAsAdmin] ⚠️ 用户取消了UAC授权(返回码: {ret})") return False else: print(f"[RunAsAdmin] ❌ 启动失败(返回码: {ret})") return False except Exception as e: print(f"[RunAsAdmin] ❌ 异常: {e}") import traceback traceback.print_exc() return False def install_update_and_restart(installer_path): """ 简化版更新安装(文件替换方式,不使用NSIS安装) 工作流程: 1. 将安装包移动到程序目录 2. 创建替换脚本 3. 以管理员身份启动脚本(提前弹出UAC) 4. 脚本等待程序退出 5. 脚本执行静默安装 6. 重启程序 Args: installer_path: 安装包路径 Returns: bool: 是否成功启动安装 """ try: # 获取当前程序信息 if getattr(sys, 'frozen', False): # 打包后的exe current_exe = sys.executable current_dir = os.path.dirname(current_exe) exe_name = os.path.basename(current_exe) else: # 开发环境 current_exe = os.path.abspath("main.py") current_dir = os.path.dirname(os.path.abspath(__file__)) exe_name = "main.exe" print(f"[Updater] 当前程序: {current_exe}") print(f"[Updater] 程序目录: {current_dir}") print(f"[Updater] 主程序名: {exe_name}") print(f"[Updater] 安装包路径: {installer_path}") # 🔥 关键改进:将安装包移动到程序目录(避免跨盘符问题) installer_name = os.path.basename(installer_path) target_installer_path = os.path.join(current_dir, installer_name) # 如果安装包不在程序目录,移动过去 if os.path.abspath(installer_path) != os.path.abspath(target_installer_path): try: import shutil shutil.move(installer_path, target_installer_path) print(f"[Updater] 安装包已移动到程序目录: {target_installer_path}") installer_path = target_installer_path except Exception as e: print(f"[Updater] 移动安装包失败,使用原路径: {e}") # 创建更新脚本 update_script_path = create_update_installer_script( installer_path, current_dir, exe_name ) # 🔥 关键修复:以管理员身份启动批处理脚本,提前触发UAC print(f"[Updater] 以管理员身份启动更新脚本: {update_script_path}") success = run_as_admin(update_script_path) if success: print(f"[Updater] ✅ 更新脚本已以管理员身份启动") return True else: print(f"[Updater] ❌ 用户取消了UAC授权,更新已取消") return False except Exception as e: print(f"[Updater] ❌ 启动更新失败: {e}") import traceback traceback.print_exc() return False def create_update_installer_script(installer_path, install_dir, exe_name): """ 创建更新安装脚本(在程序目录下执行) 工作流程: 1. 等待主程序退出 2. 在程序目录下执行NSIS安装 3. 清理安装包 4. 重启程序 Args: installer_path: 安装包路径(应该已经在程序目录下) install_dir: 程序安装目录 exe_name: 主程序文件名 Returns: str: 脚本路径 """ # 脚本放在程序目录下,避免权限问题 script_path = os.path.join(install_dir, "update_installer.bat") # 转换为绝对路径(处理引号问题) installer_path = os.path.abspath(installer_path) install_dir = os.path.abspath(install_dir) script_content = f"""@echo off REM ShuiDi AI Assistant Auto Update Script REM Encoding: UTF-8 BOM title ShuiDi AI Assistant - Auto Update echo ============================================ echo ShuiDi AI Assistant Auto Update echo ============================================ echo. REM Change to program directory cd /d "{install_dir}" echo INFO: Program directory: {install_dir} echo. REM Wait for main program to exit (max 60 seconds) echo INFO: Waiting for main program to exit... set count=0 :wait_main_exit tasklist /FI "IMAGENAME eq {exe_name}" 2>NUL | find /I "{exe_name}" >NUL if NOT ERRORLEVEL 1 ( if %count% LSS 60 ( timeout /t 1 /nobreak > nul set /a count+=1 goto wait_main_exit ) else ( echo WARN: Main program did not exit normally, forcing installation ) ) echo OK: Main program exited echo. REM Execute NSIS silent installation echo INFO: Starting installation... echo INFO: Installer: "{installer_path}" echo INFO: Target directory: "{install_dir}" echo. REM Execute installation (with admin rights from NSIS) "{installer_path}" /S /D={install_dir} REM Wait for installation to complete echo INFO: Waiting for installation to complete... timeout /t 25 /nobreak > nul REM Cleanup installer echo INFO: Cleaning up installer... del /f /q "{installer_path}" 2>nul REM Start new version (with --after-update flag to show window on top) echo INFO: Starting new version... if exist "{install_dir}\\{exe_name}" ( start "" "{install_dir}\\{exe_name}" --after-update echo OK: Program started successfully ) else ( echo ERROR: Program file not found: {exe_name} echo INFO: Please start program manually pause ) REM Delete this script after delay timeout /t 2 /nobreak > nul del /f /q "%~f0" 2>nul exit """ try: with open(script_path, 'w', encoding='utf-8-sig') as f: f.write(script_content) print(f"[Updater] 更新脚本已创建: {script_path}") except Exception as e: print(f"[Updater] 创建更新脚本失败: {e}") return script_path def create_restart_launcher(install_dir, exe_name): """ 创建重启启动器脚本 该脚本会: 1. 等待当前GUI进程退出 2. 等待NSIS安装完成(15秒) 3. 启动新版本GUI 4. 自我删除 Args: install_dir: 安装目录 exe_name: 主程序文件名 Returns: str: 脚本路径 """ script_path = os.path.join(tempfile.gettempdir(), "shuidrop_restart_launcher.bat") # 构建批处理脚本 script_content = f"""@echo off chcp 65001 > nul title 水滴AI客服智能助手 - 更新助手 echo [%time%] 正在更新水滴AI客服智能助手... echo. REM 等待主程序退出(最多等待10秒) echo [%time%] 等待主程序退出... set count=0 :wait_main_exit tasklist /FI "IMAGENAME eq {exe_name}" 2>NUL | find /I "{exe_name}" >NUL if NOT ERRORLEVEL 1 ( if %count% LSS 10 ( timeout /t 1 /nobreak > nul set /a count+=1 goto wait_main_exit ) ) echo [%time%] 主程序已退出 REM 等待安装完成(15秒) echo [%time%] 等待安装完成... timeout /t 15 /nobreak > nul REM 清理旧的安装包 echo [%time%] 清理临时文件... del /f /q "%TEMP%\\ShuiDi_AI_Assistant_Setup_*.exe" 2>nul REM 启动新版本程序 echo [%time%] 正在启动新版本... cd /d "{install_dir}" if exist "{exe_name}" ( start "" "{install_dir}\\{exe_name}" echo [%time%] ✅ 程序已启动 ) else ( echo [%time%] ❌ 错误: 找不到程序文件 {exe_name} pause ) REM 延迟后删除自己 timeout /t 2 /nobreak > nul del /f /q "%~f0" 2>nul exit """ # 写入脚本文件(UTF-8 BOM编码,避免中文乱码) try: with open(script_path, 'w', encoding='utf-8-sig') as f: f.write(script_content) print(f"[Updater] 重启启动器已创建: {script_path}") except Exception as e: print(f"[Updater] 创建重启启动器失败: {e}") return script_path # 兼容性函数(保留旧接口) def install_update_silently(installer_path): """ 启动静默安装(不自动重启) 兼容旧接口,建议使用 install_update_and_restart """ return install_update_and_restart(installer_path) if __name__ == "__main__": # 测试代码 print("自动更新模块测试") print("=" * 50) # 测试重启启动器创建 script_path = create_restart_launcher("C:\\Program Files\\ShuiDi AI Assistant", "main.exe") print(f"测试脚本路径: {script_path}") if os.path.exists(script_path): print("✅ 重启启动器创建成功") with open(script_path, 'r', encoding='utf-8-sig') as f: print("\n脚本内容预览:") print("-" * 50) print(f.read()) else: print("❌ 重启启动器创建失败")