Files
shuidrop_gui/auto_updater.py

412 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
自动更新模块 - 支持后台下载、静默安装和自动重启
优化版:带重试、超时、自动重启功能
"""
import os
import sys
import subprocess
import urllib.request
import tempfile
import time
import socket
from pathlib import Path
from PyQt5.QtCore import QThread, pyqtSignal
class UpdateDownloader(QThread):
"""
优化版更新下载器线程
特性:
- 自动重试3次
- 30秒网络超时
- 文件大小验证
- 进度实时更新
"""
# 信号定义
progress = pyqtSignal(int, int) # (已下载字节, 总字节)
finished = pyqtSignal(str) # 下载完成,返回文件路径
error = pyqtSignal(str) # 下载失败,返回错误信息
retry_info = pyqtSignal(int, int) # 重试信息 (当前重试次数, 总重试次数)
def __init__(self, download_url, version, max_retries=3):
super().__init__()
self.download_url = download_url
self.version = version
self.max_retries = max_retries # 最大重试次数
self.is_cancelled = False
def run(self):
"""执行下载(带重试机制)"""
for attempt in range(self.max_retries):
try:
if attempt > 0:
# 显示重试信息
self.retry_info.emit(attempt + 1, self.max_retries)
time.sleep(2) # 等待2秒后重试
# 执行实际下载
self._download_file()
return # 下载成功,退出
except Exception as e:
error_msg = str(e)
print(f"[UpdateDownloader] 下载失败 (尝试 {attempt + 1}/{self.max_retries}): {error_msg}")
if self.is_cancelled:
# 用户取消,立即退出
self.error.emit("下载已取消")
return
if attempt < self.max_retries - 1:
# 还有重试机会,继续
continue
else:
# 所有重试都失败了
self.error.emit(f"下载失败(已重试{self.max_retries}次)\n错误: {error_msg}")
def _download_file(self):
"""实际下载逻辑(带超时和验证)"""
# 设置网络超时
socket.setdefaulttimeout(30) # 30秒超时
# 准备临时目录和文件名
temp_dir = tempfile.gettempdir()
filename = f"ShuiDi_AI_Assistant_Setup_v{self.version}.exe"
file_path = os.path.join(temp_dir, filename)
print(f"[UpdateDownloader] 开始下载到: {file_path}")
print(f"[UpdateDownloader] 下载地址: {self.download_url}")
# 删除旧文件(如果存在)
if os.path.exists(file_path):
try:
os.remove(file_path)
print(f"[UpdateDownloader] 已删除旧文件")
except Exception as e:
print(f"[UpdateDownloader] 删除旧文件失败: {e}")
# 下载文件(带进度回调)
def progress_callback(block_num, block_size, total_size):
if self.is_cancelled:
raise Exception("用户取消下载")
downloaded = block_num * block_size
# 避免超过总大小
if total_size > 0 and downloaded > total_size:
downloaded = total_size
self.progress.emit(downloaded, total_size)
# 执行下载
urllib.request.urlretrieve(
self.download_url,
file_path,
reporthook=progress_callback
)
# 验证下载的文件
if not os.path.exists(file_path):
raise Exception("下载的文件不存在")
file_size = os.path.getsize(file_path)
print(f"[UpdateDownloader] 下载完成,文件大小: {file_size / (1024*1024):.2f} MB")
# 验证文件大小至少1MB避免下载了错误页面
min_file_size = 1024 * 1024 # 1MB正式环境
# 🧪 测试小文件时可以临时改为min_file_size = 10 * 1024 # 10KB
if file_size < min_file_size:
raise Exception(f"下载的文件大小异常: {file_size} bytes可能不是有效的安装包")
# 发送完成信号
self.finished.emit(file_path)
def cancel(self):
"""取消下载"""
print("[UpdateDownloader] 用户取消下载")
self.is_cancelled = True
def install_update_and_restart(installer_path):
"""
简化版更新安装文件替换方式不使用NSIS安装
工作流程:
1. 将安装包移动到程序目录
2. 创建替换脚本
3. 脚本等待程序退出
4. 脚本解压并替换文件
5. 重启程序
Args:
installer_path: 安装包路径
Returns:
bool: 是否成功启动安装
"""
try:
# 获取当前程序信息
if getattr(sys, 'frozen', False):
# 打包后的exe
current_exe = sys.executable
current_dir = os.path.dirname(current_exe)
exe_name = os.path.basename(current_exe)
else:
# 开发环境
current_exe = os.path.abspath("main.py")
current_dir = os.path.dirname(os.path.abspath(__file__))
exe_name = "main.exe"
print(f"[Updater] 当前程序: {current_exe}")
print(f"[Updater] 程序目录: {current_dir}")
print(f"[Updater] 主程序名: {exe_name}")
print(f"[Updater] 安装包路径: {installer_path}")
# 🔥 关键改进:将安装包移动到程序目录(避免跨盘符问题)
installer_name = os.path.basename(installer_path)
target_installer_path = os.path.join(current_dir, installer_name)
# 如果安装包不在程序目录,移动过去
if os.path.abspath(installer_path) != os.path.abspath(target_installer_path):
try:
import shutil
shutil.move(installer_path, target_installer_path)
print(f"[Updater] 安装包已移动到程序目录: {target_installer_path}")
installer_path = target_installer_path
except Exception as e:
print(f"[Updater] 移动安装包失败,使用原路径: {e}")
# 创建更新脚本
update_script_path = create_update_installer_script(
installer_path,
current_dir,
exe_name
)
# 启动更新脚本(独立进程)
print(f"[Updater] 启动更新脚本: {update_script_path}")
subprocess.Popen(
['cmd', '/c', update_script_path],
creationflags=subprocess.CREATE_NO_WINDOW | subprocess.DETACHED_PROCESS,
close_fds=True
)
print(f"[Updater] ✅ 更新脚本已启动")
return True
except Exception as e:
print(f"[Updater] ❌ 启动更新失败: {e}")
import traceback
traceback.print_exc()
return False
def create_update_installer_script(installer_path, install_dir, exe_name):
"""
创建更新安装脚本(在程序目录下执行)
工作流程:
1. 等待主程序退出
2. 在程序目录下执行NSIS安装
3. 清理安装包
4. 重启程序
Args:
installer_path: 安装包路径(应该已经在程序目录下)
install_dir: 程序安装目录
exe_name: 主程序文件名
Returns:
str: 脚本路径
"""
# 脚本放在程序目录下,避免权限问题
script_path = os.path.join(install_dir, "update_installer.bat")
# 转换为绝对路径(处理引号问题)
installer_path = os.path.abspath(installer_path)
install_dir = os.path.abspath(install_dir)
script_content = f"""@echo off
chcp 65001 > nul
title 水滴AI客服智能助手 - 自动更新
echo ============================================
echo 水滴AI客服智能助手 - 自动更新
echo ============================================
echo.
REM 切换到程序目录
cd /d "{install_dir}"
echo [INFO] 程序目录: {install_dir}
echo.
REM 等待主程序退出最多等待30秒
echo [INFO] 等待主程序退出...
set count=0
:wait_main_exit
tasklist /FI "IMAGENAME eq {exe_name}" 2>NUL | find /I "{exe_name}" >NUL
if NOT ERRORLEVEL 1 (
if %count% LSS 30 (
timeout /t 1 /nobreak > nul
set /a count+=1
goto wait_main_exit
) else (
echo [WARN] 主程序未正常退出,强制继续安装
)
)
echo [OK] 主程序已退出
echo.
REM 执行静默安装(在当前目录下)
echo [INFO] 开始安装更新...
echo [INFO] 安装包: "{installer_path}"
echo [INFO] 目标目录: "{install_dir}"
echo.
REM 使用完整路径和引号避免空格问题
"{installer_path}" /S /D="{install_dir}"
REM 等待安装完成
echo [INFO] 等待安装完成...
timeout /t 20 /nobreak > nul
REM 清理安装包
echo [INFO] 清理安装包...
del /f /q "{installer_path}" 2>nul
REM 启动新版本程序
echo [INFO] 正在启动新版本...
if exist "{install_dir}\\{exe_name}" (
start "" "{install_dir}\\{exe_name}"
echo [OK] ✅ 程序已启动
) else (
echo [ERROR] ❌ 找不到程序文件: {exe_name}
echo [INFO] 请手动启动程序
pause
)
REM 延迟后删除自己
timeout /t 2 /nobreak > nul
del /f /q "%~f0" 2>nul
exit
"""
try:
with open(script_path, 'w', encoding='utf-8-sig') as f:
f.write(script_content)
print(f"[Updater] 更新脚本已创建: {script_path}")
except Exception as e:
print(f"[Updater] 创建更新脚本失败: {e}")
return script_path
def create_restart_launcher(install_dir, exe_name):
"""
创建重启启动器脚本
该脚本会:
1. 等待当前GUI进程退出
2. 等待NSIS安装完成15秒
3. 启动新版本GUI
4. 自我删除
Args:
install_dir: 安装目录
exe_name: 主程序文件名
Returns:
str: 脚本路径
"""
script_path = os.path.join(tempfile.gettempdir(), "shuidrop_restart_launcher.bat")
# 构建批处理脚本
script_content = f"""@echo off
chcp 65001 > nul
title 水滴AI客服智能助手 - 更新助手
echo [%time%] 正在更新水滴AI客服智能助手...
echo.
REM 等待主程序退出最多等待10秒
echo [%time%] 等待主程序退出...
set count=0
:wait_main_exit
tasklist /FI "IMAGENAME eq {exe_name}" 2>NUL | find /I "{exe_name}" >NUL
if NOT ERRORLEVEL 1 (
if %count% LSS 10 (
timeout /t 1 /nobreak > nul
set /a count+=1
goto wait_main_exit
)
)
echo [%time%] 主程序已退出
REM 等待安装完成15秒
echo [%time%] 等待安装完成...
timeout /t 15 /nobreak > nul
REM 清理旧的安装包
echo [%time%] 清理临时文件...
del /f /q "%TEMP%\\ShuiDi_AI_Assistant_Setup_*.exe" 2>nul
REM 启动新版本程序
echo [%time%] 正在启动新版本...
cd /d "{install_dir}"
if exist "{exe_name}" (
start "" "{install_dir}\\{exe_name}"
echo [%time%] ✅ 程序已启动
) else (
echo [%time%] ❌ 错误: 找不到程序文件 {exe_name}
pause
)
REM 延迟后删除自己
timeout /t 2 /nobreak > nul
del /f /q "%~f0" 2>nul
exit
"""
# 写入脚本文件UTF-8 BOM编码避免中文乱码
try:
with open(script_path, 'w', encoding='utf-8-sig') as f:
f.write(script_content)
print(f"[Updater] 重启启动器已创建: {script_path}")
except Exception as e:
print(f"[Updater] 创建重启启动器失败: {e}")
return script_path
# 兼容性函数(保留旧接口)
def install_update_silently(installer_path):
"""
启动静默安装(不自动重启)
兼容旧接口,建议使用 install_update_and_restart
"""
return install_update_and_restart(installer_path)
if __name__ == "__main__":
# 测试代码
print("自动更新模块测试")
print("=" * 50)
# 测试重启启动器创建
script_path = create_restart_launcher("C:\\Program Files\\ShuiDi AI Assistant", "main.exe")
print(f"测试脚本路径: {script_path}")
if os.path.exists(script_path):
print("✅ 重启启动器创建成功")
with open(script_path, 'r', encoding='utf-8-sig') as f:
print("\n脚本内容预览:")
print("-" * 50)
print(f.read())
else:
print("❌ 重启启动器创建失败")