490 lines
16 KiB
Python
490 lines
16 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
自动更新模块 - 支持后台下载、静默安装和自动重启
|
||
优化版:带重试、超时、自动重启功能
|
||
"""
|
||
|
||
import os
|
||
import sys
|
||
import subprocess
|
||
import urllib.request
|
||
import tempfile
|
||
import time
|
||
import socket
|
||
from pathlib import Path
|
||
from PyQt5.QtCore import QThread, pyqtSignal
|
||
import ctypes
|
||
|
||
|
||
class UpdateDownloader(QThread):
|
||
"""
|
||
优化版更新下载器线程
|
||
特性:
|
||
- 自动重试3次
|
||
- 30秒网络超时
|
||
- 文件大小验证
|
||
- 进度实时更新
|
||
"""
|
||
|
||
# 信号定义
|
||
progress = pyqtSignal(int, int) # (已下载字节, 总字节)
|
||
finished = pyqtSignal(str) # 下载完成,返回文件路径
|
||
error = pyqtSignal(str) # 下载失败,返回错误信息
|
||
retry_info = pyqtSignal(int, int) # 重试信息 (当前重试次数, 总重试次数)
|
||
|
||
def __init__(self, download_url, version, max_retries=3):
|
||
super().__init__()
|
||
self.download_url = download_url
|
||
self.version = version
|
||
self.max_retries = max_retries # 最大重试次数
|
||
self.is_cancelled = False
|
||
|
||
def run(self):
|
||
"""执行下载(带重试机制)"""
|
||
for attempt in range(self.max_retries):
|
||
try:
|
||
if attempt > 0:
|
||
# 显示重试信息
|
||
self.retry_info.emit(attempt + 1, self.max_retries)
|
||
time.sleep(2) # 等待2秒后重试
|
||
|
||
# 执行实际下载
|
||
self._download_file()
|
||
return # 下载成功,退出
|
||
|
||
except Exception as e:
|
||
error_msg = str(e)
|
||
print(f"[UpdateDownloader] 下载失败 (尝试 {attempt + 1}/{self.max_retries}): {error_msg}")
|
||
|
||
if self.is_cancelled:
|
||
# 用户取消,立即退出
|
||
self.error.emit("下载已取消")
|
||
return
|
||
|
||
if attempt < self.max_retries - 1:
|
||
# 还有重试机会,继续
|
||
continue
|
||
else:
|
||
# 所有重试都失败了
|
||
self.error.emit(f"下载失败(已重试{self.max_retries}次)\n错误: {error_msg}")
|
||
|
||
def _download_file(self):
|
||
"""实际下载逻辑(带超时和验证)"""
|
||
# 设置网络超时
|
||
socket.setdefaulttimeout(30) # 30秒超时
|
||
|
||
# 准备临时目录和文件名
|
||
temp_dir = tempfile.gettempdir()
|
||
filename = f"ShuiDi_AI_Assistant_Setup_v{self.version}.exe"
|
||
file_path = os.path.join(temp_dir, filename)
|
||
|
||
print(f"[UpdateDownloader] 开始下载到: {file_path}")
|
||
print(f"[UpdateDownloader] 下载地址: {self.download_url}")
|
||
|
||
# 删除旧文件(如果存在)
|
||
if os.path.exists(file_path):
|
||
try:
|
||
os.remove(file_path)
|
||
print(f"[UpdateDownloader] 已删除旧文件")
|
||
except Exception as e:
|
||
print(f"[UpdateDownloader] 删除旧文件失败: {e}")
|
||
|
||
# 下载文件(带进度回调)
|
||
def progress_callback(block_num, block_size, total_size):
|
||
if self.is_cancelled:
|
||
raise Exception("用户取消下载")
|
||
|
||
downloaded = block_num * block_size
|
||
# 避免超过总大小
|
||
if total_size > 0 and downloaded > total_size:
|
||
downloaded = total_size
|
||
|
||
self.progress.emit(downloaded, total_size)
|
||
|
||
# 执行下载
|
||
urllib.request.urlretrieve(
|
||
self.download_url,
|
||
file_path,
|
||
reporthook=progress_callback
|
||
)
|
||
|
||
# 验证下载的文件
|
||
if not os.path.exists(file_path):
|
||
raise Exception("下载的文件不存在")
|
||
|
||
file_size = os.path.getsize(file_path)
|
||
print(f"[UpdateDownloader] 下载完成,文件大小: {file_size / (1024*1024):.2f} MB")
|
||
|
||
# 验证文件大小(至少1MB,避免下载了错误页面)
|
||
min_file_size = 1024 * 1024 # 1MB(正式环境)
|
||
# 🧪 测试小文件时可以临时改为:min_file_size = 10 * 1024 # 10KB
|
||
|
||
if file_size < min_file_size:
|
||
raise Exception(f"下载的文件大小异常: {file_size} bytes(可能不是有效的安装包)")
|
||
|
||
# 发送完成信号
|
||
self.finished.emit(file_path)
|
||
|
||
def cancel(self):
|
||
"""取消下载"""
|
||
print("[UpdateDownloader] 用户取消下载")
|
||
self.is_cancelled = True
|
||
|
||
|
||
def run_as_admin(script_path):
|
||
"""
|
||
以管理员身份运行脚本(Windows UAC提升)
|
||
|
||
特点:
|
||
- 在GUI退出前弹出UAC提示
|
||
- 用户点击"允许"后,脚本以管理员身份运行
|
||
- 用户点击"取消",返回False
|
||
|
||
Args:
|
||
script_path: 批处理脚本路径
|
||
|
||
Returns:
|
||
bool: True表示成功启动(用户点击允许),False表示失败或用户取消
|
||
"""
|
||
try:
|
||
print(f"[RunAsAdmin] 请求管理员权限执行: {script_path}")
|
||
|
||
# 使用 ShellExecute 以管理员身份运行
|
||
# "runas" 参数会触发UAC提示
|
||
ret = ctypes.windll.shell32.ShellExecuteW(
|
||
None, # hwnd: 父窗口句柄(None表示无父窗口)
|
||
"runas", # lpOperation: 以管理员身份运行
|
||
"cmd.exe", # lpFile: 要执行的程序
|
||
f'/c "{script_path}"', # lpParameters: 命令行参数
|
||
None, # lpDirectory: 工作目录(None表示当前目录)
|
||
0 # nShowCmd: 0=隐藏窗口
|
||
)
|
||
|
||
# 返回值说明:
|
||
# >32: 成功
|
||
# <=32: 失败(具体错误码)
|
||
# - 5: 用户拒绝UAC(点击"否")
|
||
# - 2: 文件未找到
|
||
# - 其他: 其他错误
|
||
|
||
if ret > 32:
|
||
print(f"[RunAsAdmin] ✅ 成功:用户已授权,脚本正在以管理员身份运行")
|
||
return True
|
||
elif ret == 5:
|
||
print(f"[RunAsAdmin] ⚠️ 用户取消了UAC授权(返回码: {ret})")
|
||
return False
|
||
else:
|
||
print(f"[RunAsAdmin] ❌ 启动失败(返回码: {ret})")
|
||
return False
|
||
|
||
except Exception as e:
|
||
print(f"[RunAsAdmin] ❌ 异常: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return False
|
||
|
||
|
||
def install_update_and_restart(installer_path):
|
||
"""
|
||
简化版更新安装(文件替换方式,不使用NSIS安装)
|
||
|
||
工作流程:
|
||
1. 将安装包移动到程序目录
|
||
2. 创建替换脚本
|
||
3. 以管理员身份启动脚本(提前弹出UAC)
|
||
4. 脚本等待程序退出
|
||
5. 脚本执行静默安装
|
||
6. 重启程序
|
||
|
||
Args:
|
||
installer_path: 安装包路径
|
||
|
||
Returns:
|
||
bool: 是否成功启动安装
|
||
"""
|
||
try:
|
||
# 获取当前程序信息
|
||
if getattr(sys, 'frozen', False):
|
||
# 打包后的exe
|
||
current_exe = sys.executable
|
||
current_dir = os.path.dirname(current_exe)
|
||
exe_name = os.path.basename(current_exe)
|
||
else:
|
||
# 开发环境
|
||
current_exe = os.path.abspath("main.py")
|
||
current_dir = os.path.dirname(os.path.abspath(__file__))
|
||
exe_name = "main.exe"
|
||
|
||
print(f"[Updater] 当前程序: {current_exe}")
|
||
print(f"[Updater] 程序目录: {current_dir}")
|
||
print(f"[Updater] 主程序名: {exe_name}")
|
||
print(f"[Updater] 安装包路径: {installer_path}")
|
||
|
||
# 🔥 关键改进:将安装包移动到程序目录(避免跨盘符问题)
|
||
installer_name = os.path.basename(installer_path)
|
||
target_installer_path = os.path.join(current_dir, installer_name)
|
||
|
||
# 如果安装包不在程序目录,移动过去
|
||
if os.path.abspath(installer_path) != os.path.abspath(target_installer_path):
|
||
try:
|
||
import shutil
|
||
shutil.move(installer_path, target_installer_path)
|
||
print(f"[Updater] 安装包已移动到程序目录: {target_installer_path}")
|
||
installer_path = target_installer_path
|
||
except Exception as e:
|
||
print(f"[Updater] 移动安装包失败,使用原路径: {e}")
|
||
|
||
# 创建更新脚本
|
||
update_script_path = create_update_installer_script(
|
||
installer_path,
|
||
current_dir,
|
||
exe_name
|
||
)
|
||
|
||
# 🔥 关键修复:以管理员身份启动批处理脚本,提前触发UAC
|
||
print(f"[Updater] 以管理员身份启动更新脚本: {update_script_path}")
|
||
success = run_as_admin(update_script_path)
|
||
|
||
if success:
|
||
print(f"[Updater] ✅ 更新脚本已以管理员身份启动")
|
||
return True
|
||
else:
|
||
print(f"[Updater] ❌ 用户取消了UAC授权,更新已取消")
|
||
return False
|
||
|
||
except Exception as e:
|
||
print(f"[Updater] ❌ 启动更新失败: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return False
|
||
|
||
|
||
def create_update_installer_script(installer_path, install_dir, exe_name):
|
||
"""
|
||
创建更新安装脚本(在程序目录下执行)
|
||
|
||
工作流程:
|
||
1. 等待主程序退出
|
||
2. 在程序目录下执行NSIS安装
|
||
3. 清理安装包
|
||
4. 重启程序
|
||
|
||
Args:
|
||
installer_path: 安装包路径(应该已经在程序目录下)
|
||
install_dir: 程序安装目录
|
||
exe_name: 主程序文件名
|
||
|
||
Returns:
|
||
str: 脚本路径
|
||
"""
|
||
# 脚本放在程序目录下,避免权限问题
|
||
script_path = os.path.join(install_dir, "update_installer.bat")
|
||
|
||
# 转换为绝对路径(处理引号问题)
|
||
installer_path = os.path.abspath(installer_path)
|
||
install_dir = os.path.abspath(install_dir)
|
||
|
||
script_content = f"""@echo off
|
||
REM ShuiDi AI Assistant Auto Update Script
|
||
REM Encoding: UTF-8 BOM
|
||
|
||
title ShuiDi AI Assistant - Auto Update
|
||
|
||
echo ============================================
|
||
echo ShuiDi AI Assistant Auto Update
|
||
echo ============================================
|
||
echo.
|
||
|
||
REM Change to program directory
|
||
cd /d "{install_dir}"
|
||
echo INFO: Program directory: {install_dir}
|
||
echo.
|
||
|
||
REM Wait for main program to exit (max 60 seconds)
|
||
echo INFO: Waiting for main program to exit...
|
||
set count=0
|
||
:wait_main_exit
|
||
tasklist /FI "IMAGENAME eq {exe_name}" 2>NUL | find /I "{exe_name}" >NUL
|
||
if NOT ERRORLEVEL 1 (
|
||
if %count% LSS 60 (
|
||
timeout /t 1 /nobreak > nul
|
||
set /a count+=1
|
||
goto wait_main_exit
|
||
) else (
|
||
echo WARN: Main program did not exit normally, forcing termination
|
||
taskkill /F /IM {exe_name} 2>nul
|
||
)
|
||
)
|
||
echo OK: Main program exited
|
||
echo.
|
||
|
||
REM Critical fix: Wait 2 seconds to ensure all file handles are released
|
||
echo INFO: Waiting for file handles to release...
|
||
timeout /t 2 /nobreak > nul
|
||
echo OK: File handles released
|
||
echo.
|
||
|
||
REM Critical fix: Delete old _internal directory to force clean install
|
||
echo INFO: Removing old _internal directory...
|
||
if exist "{install_dir}\\_internal" (
|
||
rd /s /q "{install_dir}\\_internal" 2>nul
|
||
if exist "{install_dir}\\_internal" (
|
||
echo WARN: Failed to delete _internal directory
|
||
) else (
|
||
echo OK: Old _internal directory removed successfully
|
||
)
|
||
) else (
|
||
echo INFO: No old _internal directory found
|
||
)
|
||
echo.
|
||
|
||
REM Execute NSIS silent installation
|
||
echo INFO: Starting installation...
|
||
echo INFO: Installer: "{installer_path}"
|
||
echo INFO: Target directory: "{install_dir}"
|
||
echo.
|
||
|
||
REM Execute installation (with admin rights from NSIS)
|
||
"{installer_path}" /S /D={install_dir}
|
||
|
||
REM Wait for installation to complete (extended to 35 seconds for full copy)
|
||
echo INFO: Waiting for installation to complete...
|
||
timeout /t 35 /nobreak > nul
|
||
|
||
REM Cleanup installer
|
||
echo INFO: Cleaning up installer...
|
||
del /f /q "{installer_path}" 2>nul
|
||
|
||
REM Start new version (with --after-update flag to show window on top)
|
||
echo INFO: Starting new version...
|
||
if exist "{install_dir}\\{exe_name}" (
|
||
start "" "{install_dir}\\{exe_name}" --after-update
|
||
echo OK: Program started successfully
|
||
) else (
|
||
echo ERROR: Program file not found: {exe_name}
|
||
echo INFO: Please start program manually
|
||
pause
|
||
)
|
||
|
||
REM Delete this script after delay
|
||
timeout /t 2 /nobreak > nul
|
||
del /f /q "%~f0" 2>nul
|
||
exit
|
||
"""
|
||
|
||
try:
|
||
with open(script_path, 'w', encoding='utf-8-sig') as f:
|
||
f.write(script_content)
|
||
print(f"[Updater] 更新脚本已创建: {script_path}")
|
||
except Exception as e:
|
||
print(f"[Updater] 创建更新脚本失败: {e}")
|
||
|
||
return script_path
|
||
|
||
|
||
def create_restart_launcher(install_dir, exe_name):
|
||
"""
|
||
创建重启启动器脚本
|
||
|
||
该脚本会:
|
||
1. 等待当前GUI进程退出
|
||
2. 等待NSIS安装完成(15秒)
|
||
3. 启动新版本GUI
|
||
4. 自我删除
|
||
|
||
Args:
|
||
install_dir: 安装目录
|
||
exe_name: 主程序文件名
|
||
|
||
Returns:
|
||
str: 脚本路径
|
||
"""
|
||
script_path = os.path.join(tempfile.gettempdir(), "shuidrop_restart_launcher.bat")
|
||
|
||
# 构建批处理脚本
|
||
script_content = f"""@echo off
|
||
chcp 65001 > nul
|
||
title ShuiDi AI Assistant - Update Helper
|
||
|
||
echo [%time%] Updating ShuiDi AI Assistant...
|
||
echo.
|
||
|
||
REM Wait for main program to exit (max 10 seconds)
|
||
echo [%time%] Waiting for main program to exit...
|
||
set count=0
|
||
:wait_main_exit
|
||
tasklist /FI "IMAGENAME eq {exe_name}" 2>NUL | find /I "{exe_name}" >NUL
|
||
if NOT ERRORLEVEL 1 (
|
||
if %count% LSS 10 (
|
||
timeout /t 1 /nobreak > nul
|
||
set /a count+=1
|
||
goto wait_main_exit
|
||
)
|
||
)
|
||
echo [%time%] Main program exited
|
||
|
||
REM Wait for installation to complete (15 seconds)
|
||
echo [%time%] Waiting for installation to complete...
|
||
timeout /t 15 /nobreak > nul
|
||
|
||
REM Clean up old installer files
|
||
echo [%time%] Cleaning up temporary files...
|
||
del /f /q "%TEMP%\\ShuiDi_AI_Assistant_Setup_*.exe" 2>nul
|
||
|
||
REM Start new version
|
||
echo [%time%] Starting new version...
|
||
cd /d "{install_dir}"
|
||
if exist "{exe_name}" (
|
||
start "" "{install_dir}\\{exe_name}"
|
||
echo [%time%] Program started successfully
|
||
) else (
|
||
echo [%time%] ERROR: Program file not found: {exe_name}
|
||
pause
|
||
)
|
||
|
||
REM Delete this script after delay
|
||
timeout /t 2 /nobreak > nul
|
||
del /f /q "%~f0" 2>nul
|
||
exit
|
||
"""
|
||
|
||
# Write script file with UTF-8 BOM encoding
|
||
try:
|
||
with open(script_path, 'w', encoding='utf-8-sig') as f:
|
||
f.write(script_content)
|
||
print(f"[Updater] Restart launcher created: {script_path}")
|
||
except Exception as e:
|
||
print(f"[Updater] Failed to create restart launcher: {e}")
|
||
|
||
return script_path
|
||
|
||
|
||
# 兼容性函数(保留旧接口)
|
||
def install_update_silently(installer_path):
|
||
"""
|
||
启动静默安装(不自动重启)
|
||
兼容旧接口,建议使用 install_update_and_restart
|
||
"""
|
||
return install_update_and_restart(installer_path)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
# 测试代码
|
||
print("自动更新模块测试")
|
||
print("=" * 50)
|
||
|
||
# 测试重启启动器创建
|
||
script_path = create_restart_launcher("C:\\Program Files\\ShuiDi AI Assistant", "main.exe")
|
||
print(f"测试脚本路径: {script_path}")
|
||
|
||
if os.path.exists(script_path):
|
||
print("✅ 重启启动器创建成功")
|
||
with open(script_path, 'r', encoding='utf-8-sig') as f:
|
||
print("\n脚本内容预览:")
|
||
print("-" * 50)
|
||
print(f.read())
|
||
else:
|
||
print("❌ 重启启动器创建失败")
|
||
|