Files
shuidrop_gui/auto_updater.py

412 lines
13 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
自动更新模块 - 支持后台下载静默安装和自动重启
优化版带重试超时自动重启功能
"""
import os
import sys
import subprocess
import urllib.request
import tempfile
import time
import socket
from pathlib import Path
from PyQt5.QtCore import QThread, pyqtSignal
class UpdateDownloader(QThread):
"""
优化版更新下载器线程
特性
- 自动重试3次
- 30秒网络超时
- 文件大小验证
- 进度实时更新
"""
# 信号定义
progress = pyqtSignal(int, int) # (已下载字节, 总字节)
finished = pyqtSignal(str) # 下载完成,返回文件路径
error = pyqtSignal(str) # 下载失败,返回错误信息
retry_info = pyqtSignal(int, int) # 重试信息 (当前重试次数, 总重试次数)
def __init__(self, download_url, version, max_retries=3):
super().__init__()
self.download_url = download_url
self.version = version
self.max_retries = max_retries # 最大重试次数
self.is_cancelled = False
def run(self):
"""执行下载(带重试机制)"""
for attempt in range(self.max_retries):
try:
if attempt > 0:
# 显示重试信息
self.retry_info.emit(attempt + 1, self.max_retries)
time.sleep(2) # 等待2秒后重试
# 执行实际下载
self._download_file()
return # 下载成功,退出
except Exception as e:
error_msg = str(e)
print(f"[UpdateDownloader] 下载失败 (尝试 {attempt + 1}/{self.max_retries}): {error_msg}")
if self.is_cancelled:
# 用户取消,立即退出
self.error.emit("下载已取消")
return
if attempt < self.max_retries - 1:
# 还有重试机会,继续
continue
else:
# 所有重试都失败了
self.error.emit(f"下载失败(已重试{self.max_retries}次)\n错误: {error_msg}")
def _download_file(self):
"""实际下载逻辑(带超时和验证)"""
# 设置网络超时
socket.setdefaulttimeout(30) # 30秒超时
# 准备临时目录和文件名
temp_dir = tempfile.gettempdir()
filename = f"ShuiDi_AI_Assistant_Setup_v{self.version}.exe"
file_path = os.path.join(temp_dir, filename)
print(f"[UpdateDownloader] 开始下载到: {file_path}")
print(f"[UpdateDownloader] 下载地址: {self.download_url}")
# 删除旧文件(如果存在)
if os.path.exists(file_path):
try:
os.remove(file_path)
print(f"[UpdateDownloader] 已删除旧文件")
except Exception as e:
print(f"[UpdateDownloader] 删除旧文件失败: {e}")
# 下载文件(带进度回调)
def progress_callback(block_num, block_size, total_size):
if self.is_cancelled:
raise Exception("用户取消下载")
downloaded = block_num * block_size
# 避免超过总大小
if total_size > 0 and downloaded > total_size:
downloaded = total_size
self.progress.emit(downloaded, total_size)
# 执行下载
urllib.request.urlretrieve(
self.download_url,
file_path,
reporthook=progress_callback
)
# 验证下载的文件
if not os.path.exists(file_path):
raise Exception("下载的文件不存在")
file_size = os.path.getsize(file_path)
print(f"[UpdateDownloader] 下载完成,文件大小: {file_size / (1024*1024):.2f} MB")
# 验证文件大小至少1MB避免下载了错误页面
min_file_size = 1024 * 1024 # 1MB正式环境
# 🧪 测试小文件时可以临时改为min_file_size = 10 * 1024 # 10KB
if file_size < min_file_size:
raise Exception(f"下载的文件大小异常: {file_size} bytes可能不是有效的安装包")
# 发送完成信号
self.finished.emit(file_path)
def cancel(self):
"""取消下载"""
print("[UpdateDownloader] 用户取消下载")
self.is_cancelled = True
def install_update_and_restart(installer_path):
"""
简化版更新安装文件替换方式不使用NSIS安装
工作流程
1. 将安装包移动到程序目录
2. 创建替换脚本
3. 脚本等待程序退出
4. 脚本解压并替换文件
5. 重启程序
Args:
installer_path: 安装包路径
Returns:
bool: 是否成功启动安装
"""
try:
# 获取当前程序信息
if getattr(sys, 'frozen', False):
# 打包后的exe
current_exe = sys.executable
current_dir = os.path.dirname(current_exe)
exe_name = os.path.basename(current_exe)
else:
# 开发环境
current_exe = os.path.abspath("main.py")
current_dir = os.path.dirname(os.path.abspath(__file__))
exe_name = "main.exe"
print(f"[Updater] 当前程序: {current_exe}")
print(f"[Updater] 程序目录: {current_dir}")
print(f"[Updater] 主程序名: {exe_name}")
print(f"[Updater] 安装包路径: {installer_path}")
# 🔥 关键改进:将安装包移动到程序目录(避免跨盘符问题)
installer_name = os.path.basename(installer_path)
target_installer_path = os.path.join(current_dir, installer_name)
# 如果安装包不在程序目录,移动过去
if os.path.abspath(installer_path) != os.path.abspath(target_installer_path):
try:
import shutil
shutil.move(installer_path, target_installer_path)
print(f"[Updater] 安装包已移动到程序目录: {target_installer_path}")
installer_path = target_installer_path
except Exception as e:
print(f"[Updater] 移动安装包失败,使用原路径: {e}")
# 创建更新脚本
update_script_path = create_update_installer_script(
installer_path,
current_dir,
exe_name
)
# 启动更新脚本(独立进程)
print(f"[Updater] 启动更新脚本: {update_script_path}")
subprocess.Popen(
['cmd', '/c', update_script_path],
creationflags=subprocess.CREATE_NO_WINDOW | subprocess.DETACHED_PROCESS,
close_fds=True
)
print(f"[Updater] ✅ 更新脚本已启动")
return True
except Exception as e:
print(f"[Updater] ❌ 启动更新失败: {e}")
import traceback
traceback.print_exc()
return False
def create_update_installer_script(installer_path, install_dir, exe_name):
"""
创建更新安装脚本在程序目录下执行
工作流程
1. 等待主程序退出
2. 在程序目录下执行NSIS安装
3. 清理安装包
4. 重启程序
Args:
installer_path: 安装包路径应该已经在程序目录下
install_dir: 程序安装目录
exe_name: 主程序文件名
Returns:
str: 脚本路径
"""
# 脚本放在程序目录下,避免权限问题
script_path = os.path.join(install_dir, "update_installer.bat")
# 转换为绝对路径(处理引号问题)
installer_path = os.path.abspath(installer_path)
install_dir = os.path.abspath(install_dir)
script_content = f"""@echo off
chcp 65001 > nul
title 水滴AI客服智能助手 - 自动更新
echo ============================================
echo 水滴AI客服智能助手 - 自动更新
echo ============================================
echo.
REM 切换到程序目录
cd /d "{install_dir}"
echo [INFO] 程序目录: {install_dir}
echo.
REM 等待主程序退出最多等待30秒
echo [INFO] 等待主程序退出...
set count=0
:wait_main_exit
tasklist /FI "IMAGENAME eq {exe_name}" 2>NUL | find /I "{exe_name}" >NUL
if NOT ERRORLEVEL 1 (
if %count% LSS 30 (
timeout /t 1 /nobreak > nul
set /a count+=1
goto wait_main_exit
) else (
echo [WARN] 主程序未正常退出强制继续安装
)
)
echo [OK] 主程序已退出
echo.
REM 执行静默安装在当前目录下
echo [INFO] 开始安装更新...
echo [INFO] 安装包: "{installer_path}"
echo [INFO] 目标目录: "{install_dir}"
echo.
REM 使用完整路径和引号避免空格问题
"{installer_path}" /S /D="{install_dir}"
REM 等待安装完成
echo [INFO] 等待安装完成...
timeout /t 20 /nobreak > nul
REM 清理安装包
echo [INFO] 清理安装包...
del /f /q "{installer_path}" 2>nul
REM 启动新版本程序
echo [INFO] 正在启动新版本...
if exist "{install_dir}\\{exe_name}" (
start "" "{install_dir}\\{exe_name}"
echo [OK] 程序已启动
) else (
echo [ERROR] 找不到程序文件: {exe_name}
echo [INFO] 请手动启动程序
pause
)
REM 延迟后删除自己
timeout /t 2 /nobreak > nul
del /f /q "%~f0" 2>nul
exit
"""
try:
with open(script_path, 'w', encoding='utf-8-sig') as f:
f.write(script_content)
print(f"[Updater] 更新脚本已创建: {script_path}")
except Exception as e:
print(f"[Updater] 创建更新脚本失败: {e}")
return script_path
def create_restart_launcher(install_dir, exe_name):
"""
创建重启启动器脚本
该脚本会
1. 等待当前GUI进程退出
2. 等待NSIS安装完成15
3. 启动新版本GUI
4. 自我删除
Args:
install_dir: 安装目录
exe_name: 主程序文件名
Returns:
str: 脚本路径
"""
script_path = os.path.join(tempfile.gettempdir(), "shuidrop_restart_launcher.bat")
# 构建批处理脚本
script_content = f"""@echo off
chcp 65001 > nul
title 水滴AI客服智能助手 - 更新助手
echo [%time%] 正在更新水滴AI客服智能助手...
echo.
REM 等待主程序退出最多等待10秒
echo [%time%] 等待主程序退出...
set count=0
:wait_main_exit
tasklist /FI "IMAGENAME eq {exe_name}" 2>NUL | find /I "{exe_name}" >NUL
if NOT ERRORLEVEL 1 (
if %count% LSS 10 (
timeout /t 1 /nobreak > nul
set /a count+=1
goto wait_main_exit
)
)
echo [%time%] 主程序已退出
REM 等待安装完成15
echo [%time%] 等待安装完成...
timeout /t 15 /nobreak > nul
REM 清理旧的安装包
echo [%time%] 清理临时文件...
del /f /q "%TEMP%\\ShuiDi_AI_Assistant_Setup_*.exe" 2>nul
REM 启动新版本程序
echo [%time%] 正在启动新版本...
cd /d "{install_dir}"
if exist "{exe_name}" (
start "" "{install_dir}\\{exe_name}"
echo [%time%] 程序已启动
) else (
echo [%time%] 错误: 找不到程序文件 {exe_name}
pause
)
REM 延迟后删除自己
timeout /t 2 /nobreak > nul
del /f /q "%~f0" 2>nul
exit
"""
# 写入脚本文件UTF-8 BOM编码避免中文乱码
try:
with open(script_path, 'w', encoding='utf-8-sig') as f:
f.write(script_content)
print(f"[Updater] 重启启动器已创建: {script_path}")
except Exception as e:
print(f"[Updater] 创建重启启动器失败: {e}")
return script_path
# 兼容性函数(保留旧接口)
def install_update_silently(installer_path):
"""
启动静默安装不自动重启
兼容旧接口建议使用 install_update_and_restart
"""
return install_update_and_restart(installer_path)
if __name__ == "__main__":
# 测试代码
print("自动更新模块测试")
print("=" * 50)
# 测试重启启动器创建
script_path = create_restart_launcher("C:\\Program Files\\ShuiDi AI Assistant", "main.exe")
print(f"测试脚本路径: {script_path}")
if os.path.exists(script_path):
print("✅ 重启启动器创建成功")
with open(script_path, 'r', encoding='utf-8-sig') as f:
print("\n脚本内容预览:")
print("-" * 50)
print(f.read())
else:
print("❌ 重启启动器创建失败")