Files
shuidrop_gui/auto_updater.py

469 lines
15 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
自动更新模块 - 支持后台下载静默安装和自动重启
优化版带重试超时自动重启功能
"""
import os
import sys
import subprocess
import urllib.request
import tempfile
import time
import socket
from pathlib import Path
from PyQt5.QtCore import QThread, pyqtSignal
import ctypes
class UpdateDownloader(QThread):
"""
优化版更新下载器线程
特性
- 自动重试3次
- 30秒网络超时
- 文件大小验证
- 进度实时更新
"""
# 信号定义
progress = pyqtSignal(int, int) # (已下载字节, 总字节)
finished = pyqtSignal(str) # 下载完成,返回文件路径
error = pyqtSignal(str) # 下载失败,返回错误信息
retry_info = pyqtSignal(int, int) # 重试信息 (当前重试次数, 总重试次数)
def __init__(self, download_url, version, max_retries=3):
super().__init__()
self.download_url = download_url
self.version = version
self.max_retries = max_retries # 最大重试次数
self.is_cancelled = False
def run(self):
"""执行下载(带重试机制)"""
for attempt in range(self.max_retries):
try:
if attempt > 0:
# 显示重试信息
self.retry_info.emit(attempt + 1, self.max_retries)
time.sleep(2) # 等待2秒后重试
# 执行实际下载
self._download_file()
return # 下载成功,退出
except Exception as e:
error_msg = str(e)
print(f"[UpdateDownloader] 下载失败 (尝试 {attempt + 1}/{self.max_retries}): {error_msg}")
if self.is_cancelled:
# 用户取消,立即退出
self.error.emit("下载已取消")
return
if attempt < self.max_retries - 1:
# 还有重试机会,继续
continue
else:
# 所有重试都失败了
self.error.emit(f"下载失败(已重试{self.max_retries}次)\n错误: {error_msg}")
def _download_file(self):
"""实际下载逻辑(带超时和验证)"""
# 设置网络超时
socket.setdefaulttimeout(30) # 30秒超时
# 准备临时目录和文件名
temp_dir = tempfile.gettempdir()
filename = f"ShuiDi_AI_Assistant_Setup_v{self.version}.exe"
file_path = os.path.join(temp_dir, filename)
print(f"[UpdateDownloader] 开始下载到: {file_path}")
print(f"[UpdateDownloader] 下载地址: {self.download_url}")
# 删除旧文件(如果存在)
if os.path.exists(file_path):
try:
os.remove(file_path)
print(f"[UpdateDownloader] 已删除旧文件")
except Exception as e:
print(f"[UpdateDownloader] 删除旧文件失败: {e}")
# 下载文件(带进度回调)
def progress_callback(block_num, block_size, total_size):
if self.is_cancelled:
raise Exception("用户取消下载")
downloaded = block_num * block_size
# 避免超过总大小
if total_size > 0 and downloaded > total_size:
downloaded = total_size
self.progress.emit(downloaded, total_size)
# 执行下载
urllib.request.urlretrieve(
self.download_url,
file_path,
reporthook=progress_callback
)
# 验证下载的文件
if not os.path.exists(file_path):
raise Exception("下载的文件不存在")
file_size = os.path.getsize(file_path)
print(f"[UpdateDownloader] 下载完成,文件大小: {file_size / (1024*1024):.2f} MB")
# 验证文件大小至少1MB避免下载了错误页面
min_file_size = 1024 * 1024 # 1MB正式环境
# 🧪 测试小文件时可以临时改为min_file_size = 10 * 1024 # 10KB
if file_size < min_file_size:
raise Exception(f"下载的文件大小异常: {file_size} bytes可能不是有效的安装包")
# 发送完成信号
self.finished.emit(file_path)
def cancel(self):
"""取消下载"""
print("[UpdateDownloader] 用户取消下载")
self.is_cancelled = True
def run_as_admin(script_path):
"""
以管理员身份运行脚本Windows UAC提升
特点
- 在GUI退出前弹出UAC提示
- 用户点击"允许"脚本以管理员身份运行
- 用户点击"取消"返回False
Args:
script_path: 批处理脚本路径
Returns:
bool: True表示成功启动用户点击允许False表示失败或用户取消
"""
try:
print(f"[RunAsAdmin] 请求管理员权限执行: {script_path}")
# 使用 ShellExecute 以管理员身份运行
# "runas" 参数会触发UAC提示
ret = ctypes.windll.shell32.ShellExecuteW(
None, # hwnd: 父窗口句柄None表示无父窗口
"runas", # lpOperation: 以管理员身份运行
"cmd.exe", # lpFile: 要执行的程序
f'/c "{script_path}"', # lpParameters: 命令行参数
None, # lpDirectory: 工作目录None表示当前目录
0 # nShowCmd: 0=隐藏窗口
)
# 返回值说明:
# >32: 成功
# <=32: 失败(具体错误码)
# - 5: 用户拒绝UAC点击"否"
# - 2: 文件未找到
# - 其他: 其他错误
if ret > 32:
print(f"[RunAsAdmin] ✅ 成功:用户已授权,脚本正在以管理员身份运行")
return True
elif ret == 5:
print(f"[RunAsAdmin] ⚠️ 用户取消了UAC授权返回码: {ret}")
return False
else:
print(f"[RunAsAdmin] ❌ 启动失败(返回码: {ret}")
return False
except Exception as e:
print(f"[RunAsAdmin] ❌ 异常: {e}")
import traceback
traceback.print_exc()
return False
def install_update_and_restart(installer_path):
"""
简化版更新安装文件替换方式不使用NSIS安装
工作流程
1. 将安装包移动到程序目录
2. 创建替换脚本
3. 以管理员身份启动脚本提前弹出UAC
4. 脚本等待程序退出
5. 脚本执行静默安装
6. 重启程序
Args:
installer_path: 安装包路径
Returns:
bool: 是否成功启动安装
"""
try:
# 获取当前程序信息
if getattr(sys, 'frozen', False):
# 打包后的exe
current_exe = sys.executable
current_dir = os.path.dirname(current_exe)
exe_name = os.path.basename(current_exe)
else:
# 开发环境
current_exe = os.path.abspath("main.py")
current_dir = os.path.dirname(os.path.abspath(__file__))
exe_name = "main.exe"
print(f"[Updater] 当前程序: {current_exe}")
print(f"[Updater] 程序目录: {current_dir}")
print(f"[Updater] 主程序名: {exe_name}")
print(f"[Updater] 安装包路径: {installer_path}")
# 🔥 关键改进:将安装包移动到程序目录(避免跨盘符问题)
installer_name = os.path.basename(installer_path)
target_installer_path = os.path.join(current_dir, installer_name)
# 如果安装包不在程序目录,移动过去
if os.path.abspath(installer_path) != os.path.abspath(target_installer_path):
try:
import shutil
shutil.move(installer_path, target_installer_path)
print(f"[Updater] 安装包已移动到程序目录: {target_installer_path}")
installer_path = target_installer_path
except Exception as e:
print(f"[Updater] 移动安装包失败,使用原路径: {e}")
# 创建更新脚本
update_script_path = create_update_installer_script(
installer_path,
current_dir,
exe_name
)
# 🔥 关键修复以管理员身份启动批处理脚本提前触发UAC
print(f"[Updater] 以管理员身份启动更新脚本: {update_script_path}")
success = run_as_admin(update_script_path)
if success:
print(f"[Updater] ✅ 更新脚本已以管理员身份启动")
return True
else:
print(f"[Updater] ❌ 用户取消了UAC授权更新已取消")
return False
except Exception as e:
print(f"[Updater] ❌ 启动更新失败: {e}")
import traceback
traceback.print_exc()
return False
def create_update_installer_script(installer_path, install_dir, exe_name):
"""
创建更新安装脚本在程序目录下执行
工作流程
1. 等待主程序退出
2. 在程序目录下执行NSIS安装
3. 清理安装包
4. 重启程序
Args:
installer_path: 安装包路径应该已经在程序目录下
install_dir: 程序安装目录
exe_name: 主程序文件名
Returns:
str: 脚本路径
"""
# 脚本放在程序目录下,避免权限问题
script_path = os.path.join(install_dir, "update_installer.bat")
# 转换为绝对路径(处理引号问题)
installer_path = os.path.abspath(installer_path)
install_dir = os.path.abspath(install_dir)
script_content = f"""@echo off
REM ShuiDi AI Assistant Auto Update Script
REM Encoding: UTF-8 BOM
title ShuiDi AI Assistant - Auto Update
echo ============================================
echo ShuiDi AI Assistant Auto Update
echo ============================================
echo.
REM Change to program directory
cd /d "{install_dir}"
echo INFO: Program directory: {install_dir}
echo.
REM Wait for main program to exit (max 60 seconds)
echo INFO: Waiting for main program to exit...
set count=0
:wait_main_exit
tasklist /FI "IMAGENAME eq {exe_name}" 2>NUL | find /I "{exe_name}" >NUL
if NOT ERRORLEVEL 1 (
if %count% LSS 60 (
timeout /t 1 /nobreak > nul
set /a count+=1
goto wait_main_exit
) else (
echo WARN: Main program did not exit normally, forcing installation
)
)
echo OK: Main program exited
echo.
REM Execute NSIS silent installation
echo INFO: Starting installation...
echo INFO: Installer: "{installer_path}"
echo INFO: Target directory: "{install_dir}"
echo.
REM Execute installation (with admin rights from NSIS)
"{installer_path}" /S /D={install_dir}
REM Wait for installation to complete
echo INFO: Waiting for installation to complete...
timeout /t 25 /nobreak > nul
REM Cleanup installer
echo INFO: Cleaning up installer...
del /f /q "{installer_path}" 2>nul
REM Start new version (with --after-update flag to show window on top)
echo INFO: Starting new version...
if exist "{install_dir}\\{exe_name}" (
start "" "{install_dir}\\{exe_name}" --after-update
echo OK: Program started successfully
) else (
echo ERROR: Program file not found: {exe_name}
echo INFO: Please start program manually
pause
)
REM Delete this script after delay
timeout /t 2 /nobreak > nul
del /f /q "%~f0" 2>nul
exit
"""
try:
with open(script_path, 'w', encoding='utf-8-sig') as f:
f.write(script_content)
print(f"[Updater] 更新脚本已创建: {script_path}")
except Exception as e:
print(f"[Updater] 创建更新脚本失败: {e}")
return script_path
def create_restart_launcher(install_dir, exe_name):
"""
创建重启启动器脚本
该脚本会
1. 等待当前GUI进程退出
2. 等待NSIS安装完成15
3. 启动新版本GUI
4. 自我删除
Args:
install_dir: 安装目录
exe_name: 主程序文件名
Returns:
str: 脚本路径
"""
script_path = os.path.join(tempfile.gettempdir(), "shuidrop_restart_launcher.bat")
# 构建批处理脚本
script_content = f"""@echo off
chcp 65001 > nul
title 水滴AI客服智能助手 - 更新助手
echo [%time%] 正在更新水滴AI客服智能助手...
echo.
REM 等待主程序退出最多等待10秒
echo [%time%] 等待主程序退出...
set count=0
:wait_main_exit
tasklist /FI "IMAGENAME eq {exe_name}" 2>NUL | find /I "{exe_name}" >NUL
if NOT ERRORLEVEL 1 (
if %count% LSS 10 (
timeout /t 1 /nobreak > nul
set /a count+=1
goto wait_main_exit
)
)
echo [%time%] 主程序已退出
REM 等待安装完成15
echo [%time%] 等待安装完成...
timeout /t 15 /nobreak > nul
REM 清理旧的安装包
echo [%time%] 清理临时文件...
del /f /q "%TEMP%\\ShuiDi_AI_Assistant_Setup_*.exe" 2>nul
REM 启动新版本程序
echo [%time%] 正在启动新版本...
cd /d "{install_dir}"
if exist "{exe_name}" (
start "" "{install_dir}\\{exe_name}"
echo [%time%] 程序已启动
) else (
echo [%time%] 错误: 找不到程序文件 {exe_name}
pause
)
REM 延迟后删除自己
timeout /t 2 /nobreak > nul
del /f /q "%~f0" 2>nul
exit
"""
# 写入脚本文件UTF-8 BOM编码避免中文乱码
try:
with open(script_path, 'w', encoding='utf-8-sig') as f:
f.write(script_content)
print(f"[Updater] 重启启动器已创建: {script_path}")
except Exception as e:
print(f"[Updater] 创建重启启动器失败: {e}")
return script_path
# 兼容性函数(保留旧接口)
def install_update_silently(installer_path):
"""
启动静默安装不自动重启
兼容旧接口建议使用 install_update_and_restart
"""
return install_update_and_restart(installer_path)
if __name__ == "__main__":
# 测试代码
print("自动更新模块测试")
print("=" * 50)
# 测试重启启动器创建
script_path = create_restart_launcher("C:\\Program Files\\ShuiDi AI Assistant", "main.exe")
print(f"测试脚本路径: {script_path}")
if os.path.exists(script_path):
print("✅ 重启启动器创建成功")
with open(script_path, 'r', encoding='utf-8-sig') as f:
print("\n脚本内容预览:")
print("-" * 50)
print(f.read())
else:
print("❌ 重启启动器创建失败")