Files
shuidrop_gui/auto_updater.py

312 lines
9.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
自动更新模块 - 支持后台下载、静默安装和自动重启
优化版:带重试、超时、自动重启功能
"""
import os
import sys
import subprocess
import urllib.request
import tempfile
import time
import socket
from pathlib import Path
from PyQt5.QtCore import QThread, pyqtSignal
class UpdateDownloader(QThread):
"""
优化版更新下载器线程
特性:
- 自动重试3次
- 30秒网络超时
- 文件大小验证
- 进度实时更新
"""
# 信号定义
progress = pyqtSignal(int, int) # (已下载字节, 总字节)
finished = pyqtSignal(str) # 下载完成,返回文件路径
error = pyqtSignal(str) # 下载失败,返回错误信息
retry_info = pyqtSignal(int, int) # 重试信息 (当前重试次数, 总重试次数)
def __init__(self, download_url, version, max_retries=3):
super().__init__()
self.download_url = download_url
self.version = version
self.max_retries = max_retries # 最大重试次数
self.is_cancelled = False
def run(self):
"""执行下载(带重试机制)"""
for attempt in range(self.max_retries):
try:
if attempt > 0:
# 显示重试信息
self.retry_info.emit(attempt + 1, self.max_retries)
time.sleep(2) # 等待2秒后重试
# 执行实际下载
self._download_file()
return # 下载成功,退出
except Exception as e:
error_msg = str(e)
print(f"[UpdateDownloader] 下载失败 (尝试 {attempt + 1}/{self.max_retries}): {error_msg}")
if self.is_cancelled:
# 用户取消,立即退出
self.error.emit("下载已取消")
return
if attempt < self.max_retries - 1:
# 还有重试机会,继续
continue
else:
# 所有重试都失败了
self.error.emit(f"下载失败(已重试{self.max_retries}次)\n错误: {error_msg}")
def _download_file(self):
"""实际下载逻辑(带超时和验证)"""
# 设置网络超时
socket.setdefaulttimeout(30) # 30秒超时
# 准备临时目录和文件名
temp_dir = tempfile.gettempdir()
filename = f"ShuiDi_AI_Assistant_Setup_v{self.version}.exe"
file_path = os.path.join(temp_dir, filename)
print(f"[UpdateDownloader] 开始下载到: {file_path}")
print(f"[UpdateDownloader] 下载地址: {self.download_url}")
# 删除旧文件(如果存在)
if os.path.exists(file_path):
try:
os.remove(file_path)
print(f"[UpdateDownloader] 已删除旧文件")
except Exception as e:
print(f"[UpdateDownloader] 删除旧文件失败: {e}")
# 下载文件(带进度回调)
def progress_callback(block_num, block_size, total_size):
if self.is_cancelled:
raise Exception("用户取消下载")
downloaded = block_num * block_size
# 避免超过总大小
if total_size > 0 and downloaded > total_size:
downloaded = total_size
self.progress.emit(downloaded, total_size)
# 执行下载
urllib.request.urlretrieve(
self.download_url,
file_path,
reporthook=progress_callback
)
# 验证下载的文件
if not os.path.exists(file_path):
raise Exception("下载的文件不存在")
file_size = os.path.getsize(file_path)
print(f"[UpdateDownloader] 下载完成,文件大小: {file_size / (1024*1024):.2f} MB")
# 验证文件大小至少1MB避免下载了错误页面
min_file_size = 1024 * 1024 # 1MB正式环境
# 🧪 测试小文件时可以临时改为min_file_size = 10 * 1024 # 10KB
if file_size < min_file_size:
raise Exception(f"下载的文件大小异常: {file_size} bytes可能不是有效的安装包")
# 发送完成信号
self.finished.emit(file_path)
def cancel(self):
"""取消下载"""
print("[UpdateDownloader] 用户取消下载")
self.is_cancelled = True
def install_update_and_restart(installer_path):
"""
启动静默安装并自动重启程序
工作流程:
1. 启动NSIS安装包静默模式
2. 启动重启启动器(等待安装完成后重启)
3. 当前程序退出
Args:
installer_path: 安装包路径
Returns:
bool: 是否成功启动安装
"""
try:
# 获取当前程序信息
if getattr(sys, 'frozen', False):
# 打包后的exe
current_exe = sys.executable
current_dir = os.path.dirname(current_exe)
exe_name = os.path.basename(current_exe)
else:
# 开发环境
current_exe = os.path.abspath("main.py")
current_dir = os.getcwd()
exe_name = "main.exe"
print(f"[Updater] 当前程序: {current_exe}")
print(f"[Updater] 安装目录: {current_dir}")
print(f"[Updater] 主程序名: {exe_name}")
# 创建重启启动器脚本
restart_script_path = create_restart_launcher(current_dir, exe_name)
# 启动NSIS安装包
# 参数说明:
# /S - 静默安装
# /D=目录 - 指定安装目录(必须是最后一个参数)
install_cmd = [
installer_path,
'/S', # 静默安装
f'/D={current_dir}' # 安装到当前目录(覆盖)
]
print(f"[Updater] 启动安装命令: {' '.join(install_cmd)}")
# 启动安装进程(独立进程,不受父进程影响)
subprocess.Popen(
install_cmd,
creationflags=subprocess.CREATE_NO_WINDOW | subprocess.DETACHED_PROCESS,
close_fds=True
)
print(f"[Updater] ✅ 安装包已启动")
# 同时启动重启启动器延迟15秒启动确保安装完成
subprocess.Popen(
['cmd', '/c', restart_script_path],
creationflags=subprocess.CREATE_NO_WINDOW | subprocess.DETACHED_PROCESS,
close_fds=True
)
print(f"[Updater] ✅ 重启启动器已启动")
return True
except Exception as e:
print(f"[Updater] ❌ 启动安装失败: {e}")
import traceback
traceback.print_exc()
return False
def create_restart_launcher(install_dir, exe_name):
"""
创建重启启动器脚本
该脚本会:
1. 等待当前GUI进程退出
2. 等待NSIS安装完成15秒
3. 启动新版本GUI
4. 自我删除
Args:
install_dir: 安装目录
exe_name: 主程序文件名
Returns:
str: 脚本路径
"""
script_path = os.path.join(tempfile.gettempdir(), "shuidrop_restart_launcher.bat")
# 构建批处理脚本
script_content = f"""@echo off
chcp 65001 > nul
title 水滴AI客服智能助手 - 更新助手
echo [%time%] 正在更新水滴AI客服智能助手...
echo.
REM 等待主程序退出最多等待10秒
echo [%time%] 等待主程序退出...
set count=0
:wait_main_exit
tasklist /FI "IMAGENAME eq {exe_name}" 2>NUL | find /I "{exe_name}" >NUL
if NOT ERRORLEVEL 1 (
if %count% LSS 10 (
timeout /t 1 /nobreak > nul
set /a count+=1
goto wait_main_exit
)
)
echo [%time%] 主程序已退出
REM 等待安装完成15秒
echo [%time%] 等待安装完成...
timeout /t 15 /nobreak > nul
REM 清理旧的安装包
echo [%time%] 清理临时文件...
del /f /q "%TEMP%\\ShuiDi_AI_Assistant_Setup_*.exe" 2>nul
REM 启动新版本程序
echo [%time%] 正在启动新版本...
cd /d "{install_dir}"
if exist "{exe_name}" (
start "" "{install_dir}\\{exe_name}"
echo [%time%] ✅ 程序已启动
) else (
echo [%time%] ❌ 错误: 找不到程序文件 {exe_name}
pause
)
REM 延迟后删除自己
timeout /t 2 /nobreak > nul
del /f /q "%~f0" 2>nul
exit
"""
# 写入脚本文件UTF-8 BOM编码避免中文乱码
try:
with open(script_path, 'w', encoding='utf-8-sig') as f:
f.write(script_content)
print(f"[Updater] 重启启动器已创建: {script_path}")
except Exception as e:
print(f"[Updater] 创建重启启动器失败: {e}")
return script_path
# 兼容性函数(保留旧接口)
def install_update_silently(installer_path):
"""
启动静默安装(不自动重启)
兼容旧接口,建议使用 install_update_and_restart
"""
return install_update_and_restart(installer_path)
if __name__ == "__main__":
# 测试代码
print("自动更新模块测试")
print("=" * 50)
# 测试重启启动器创建
script_path = create_restart_launcher("C:\\Program Files\\ShuiDi AI Assistant", "main.exe")
print(f"测试脚本路径: {script_path}")
if os.path.exists(script_path):
print("✅ 重启启动器创建成功")
with open(script_path, 'r', encoding='utf-8-sig') as f:
print("\n脚本内容预览:")
print("-" * 50)
print(f.read())
else:
print("❌ 重启启动器创建失败")