#!/usr/bin/python3

import dbus
from dbus.mainloop.glib import DBusGMainLoop
from gi.repository import GLib
import logging
import os
import signal
from datetime import datetime, timedelta
import time
import random
import configparser
import sqlite3
from apscheduler.schedulers.background import BackgroundScheduler
from threading import Event
from gettext import gettext as _
import gettext
import getpass
import subprocess
import re
import json
from typing import List, Dict, Union, Optional, Any
from dbus.types import Int32, Int64, String, Boolean, Double, Array,Dictionary
from dbus import service
import socket
import struct
from enum import Enum

class BatchRollbackStatus(Enum):
    TASK_ACQUIRED = 1      # 系统已获取批量回退任务
    CONDITIONS_MET = 2     # 系统已满足回退条件
    CONDITIONS_NOT_MET = 3 # 系统未满足批量回退条件
    ROLLBACK_STARTED = 4   # 开始回退
    ROLLBACK_FAILED = 5    # 回退失败
    ROLLBACK_SUCCESS = 6   # 回退成功

UNATTENDED_UPGRADE_TIMESTAMP = "/var/lib/unattended-upgrades/unattended-upgrades-timestamp"
UNATTENDED_UPGRADE_POLICY_FILE_PATH="/var/lib/unattended-upgrades/unattended-upgrades-policy.conf"
UNATTENDED_UPGRADE_UUID_FILE="/var/lib/unattended-upgrades/unattended-upgrades-uuid"
ACTION_INSTALL = 1
ACTION_CHECK_RESOLVER = 3
ACTION_DOWNLOADONLY = 4

# 回退标志文件路径
NEED_ROLLBACK_FLAG = "/var/lib/kylin-system-updater/need-rollback"
UPDATE_NOT_RESTORE_DIR = "/var/lib/kylin-system-updater/remain"
        
def signal_term_handler(signal,frame):
    # type: (int, object) -> None
    logging.warning("SIGTERM received, will stop")
    os._exit(1)

def ReadValueFromFile(file,section,option):
    config=configparser.ConfigParser(allow_no_value=True)
    config.optionxform = str
    try:
        config.read(file)
        value = config[section][option]
        return value
    except Exception as e:
        logging.error(_("read config file error:%s")%e)
        return None

def WriteValueToFile(file,section,option,value):
    config=configparser.ConfigParser(allow_no_value=True)
    config.optionxform = str
    config.add_section(section)
    config.set(section,option,value)
    config.write(open(file,"w"))   

def UpdateTimeStamp(task_id,last_run_time):
    logging.debug(_("update timestamp:%s %s")%(task_id,last_run_time))
    if os.path.exists(UNATTENDED_UPGRADE_TIMESTAMP):
        config=configparser.ConfigParser(allow_no_value=True)
        config.optionxform = str
        config.read(UNATTENDED_UPGRADE_TIMESTAMP)
        if 'TimeStamp' in config.sections():
            config.set('TimeStamp',task_id,last_run_time)  
            with open(UNATTENDED_UPGRADE_TIMESTAMP,'w') as f:
                config.write(f)


def ini_to_json(ini_path: str) -> dict:
    """
    将INI配置文件转换为JSON格式的字典对象
    
    Args:
        ini_path: INI文件路径
        
    Returns:
        dict: 转换后的JSON格式字典
        
    Raises:
        FileNotFoundError: 当INI文件不存在时抛出
        ValueError: 当文件格式错误时抛出
    """
    # 初始化配置解析器
    config = configparser.ConfigParser()
    
    # 读取INI文件
    if not config.read(ini_path):
        raise FileNotFoundError(f"文件 {ini_path} 未找到")
    
    # 构建嵌套字典
    result = {}
    for section in config.sections():
        result[section] = dict(config.items(section))  # 转换section为字典结构
        
    return result

def json_to_asv(data):
    """
    将 JSON 字典转换为 D-Bus a{sv} 类型
    参数：
        data: dict - 输入字典（需确保键为字符串）
    返回：
        dbus.Dictionary - 符合 a{sv} 签名的 D-Bus 字典
    异常：
        ValueError - 输入类型错误或数据格式不合法
        TypeError - 存在不支持的类型
    """
    def _convert_value(value):
        # 递归处理嵌套结构
        if isinstance(value, dict):
            return Dictionary(
                {k: _convert_value(v) for k, v in value.items()},
                signature=dbus.Signature('sv'),
                variant_level=1  # 隐式创建变体层
            )
        elif isinstance(value, list):
            return Array(
                [_convert_value(v) for v in value],
                signature=dbus.Signature('v'),
                variant_level=1
            )
        elif isinstance(value, bool):
            return Boolean(value)
        elif isinstance(value, int):
            return Int64(value) if abs(value) > 0x7FFFFFFF else Int32(value)
        elif isinstance(value, float):
            return Double(value)
        elif isinstance(value, str):
            return String(value)
        else:
            raise TypeError(f"不支持的类型: {type(value)}")

    try:
        # 输入类型验证
        if not isinstance(data, dict):
            raise ValueError("输入必须是字典类型")
        
        # 键类型验证（需全部为字符串）
        if not all(isinstance(k, str) for k in data.keys()):
            raise ValueError("字典键必须是字符串")
        
        return Dictionary(
            {k: _convert_value(v) for k, v in data.items()},
            signature=dbus.Signature('sv'),
            variant_level=1
        )
    except (TypeError, ValueError) as e:
        raise e
    except Exception as e:
        raise RuntimeError(f"未知错误: {str(e)}") from e
    
def process_time_intervals(data: List[Dict], max_random_minutes: int) -> List[Dict]:
    """
    合并重叠时间区间并生成随机调整后的时间段
    :param data: 输入时间段列表 [{"start":"HH:MM","end":"HH:MM"}]
    :param max_random_minutes: 最大随机偏移分钟数
    :return: 合并调整后的时间段列表
    """
    # 时间转换函数（支持跨天时间）
    def time_to_minutes(t: str) -> int:
        h, m = map(int, t.split(':'))
        return h * 60 + m

    # 分钟数转时间（支持跨天显示）
    def minutes_to_time(m: int) -> str:
        total_m = m % 1440
        return f"{total_m//60:02d}:{total_m%60:02d}"

    # 区间合并算法
    intervals = []
    for period in data:
        start = time_to_minutes(period["start"])
        end = time_to_minutes(period["end"])
        if end < start:  # 处理跨天情况
            intervals.append((start, end + 1440))
        else:
            intervals.append((start, end))

    # 排序并合并
    intervals.sort(key=lambda x: x[0])
    merged = []
    for current in intervals:
        if not merged:
            merged.append(list(current))
        else:
            last = merged[-1]
            if current[0] <= last[1]:
                last[1] = max(last[1], current[1])
            else:
                merged.append(list(current))
    
    now_time = datetime.now()
    now_minutes = now_time.hour * 60 + now_time.minute

    # 生成随机调整后的时间段
    result = []
    for start, end in merged:
        # 如果接受策略的时间在下载时间段内，那就从接受策略时间开始（加随机时间）作为开始下载时间
        if now_minutes > start and now_minutes < end:
            if end - now_minutes > max_random_minutes:
                logging.info("Current time in download range, will start download task.")
                start = now_minutes

        # 计算有效时间跨度（考虑跨天）
        total_minutes = end - start
        random_offset = random.randint(0, min(max_random_minutes, total_minutes))

        # 生成新开始时间
        new_start = (start + random_offset) % 1440

        # 处理跨天显示逻辑
        original_end = end % 1440 if end > 1440 else end
        result.append({
            "start": minutes_to_time(new_start),
            "end": minutes_to_time(original_end)
        })

    return result

def process_timelist(timelist: List[Dict]) -> List[Dict]:
    """处理跨天时间段并生成标准化输出

    Args:
        timelist: 时间区间列表，示例 [{"start":"23:00","end":"11:00"},...]

    Returns:
        包含小时、分钟和超时时间的字典列表，示例 [{"hour":23,"minute":0,"timeout":720}]
    """
    def time_to_minutes(t: str) -> int:
        """将HH:MM转换为分钟数"""
        hours, minutes = map(int, t.split(':'))
        return hours * 60 + minutes

    result = []

    for period in timelist:
        # 解析起止时间
        start = time_to_minutes(period["start"])
        end = time_to_minutes(period["end"])

        # 处理跨天情况
        if end < start:
            end += 1440  # 24小时分钟数

        # 计算超时分钟数
        timeout = end - start

        # 提取小时和分钟
        start_h, start_m = divmod(start, 60)

        result.append({
            "hour": start_h,
            "minute": start_m,
            "timeout": timeout
        })

    return result

def convert_a_sv_to_json(a_sv_data):
    """
    将DBus的a{sv}类型数据转换为JSON对象
    
    Args:
        a_sv_data (dbus.Dictionary): D-Bus类型的a{sv}数据（字典格式）
    
    Returns:
        str: JSON格式字符串
    """
    def _parse_variant(value):
        # 递归解析变体类型中的嵌套数据
        if isinstance(value, dbus.Dictionary):
            return {k: _parse_variant(v) for k, v in value.items()}
        elif isinstance(value, dbus.Array):
            return [_parse_variant(v) for v in value]
        elif isinstance(value, dbus.Struct):
            return tuple(_parse_variant(v) for v in value)
        elif isinstance(value, (dbus.String, dbus.ObjectPath)):
            return str(value)
        elif isinstance(value, dbus.Boolean):
            return bool(value)
        elif isinstance(value, (dbus.Int16, dbus.Int32, dbus.Int64)):
            return int(value)
        elif isinstance(value, (dbus.Double, dbus.UInt16, dbus.UInt32, dbus.UInt64)):
            return float(value) if '.' in str(value) else int(value)
        else:
            return value

    # 转换主逻辑
    json_dict = {}
    for key, variant in a_sv_data.items():
        json_dict[key] = _parse_variant(variant)
    
    return json.dumps(json_dict, indent=2)

def parse_time(time_input: Union[str, List[str]]) -> List[Dict[str, int]]:
    """
    将 HH:MM 格式的时间字符串或字符串列表解析为时间字典列表

    Args:
        time_input:
            - 单个时间字符串，如 "20:00" / "20：00"
            - 时间字符串列表，如 ["08:30", "20:00"]

    Returns:
        时间字典列表，示例:
        [
            {'hour': 20, 'minute': 0}
        ]

    Raises:
        ValueError: 当格式无效、类型错误或数值越界时抛出
    """

    if isinstance(time_input, str):
        time_list = [time_input]
    elif isinstance(time_input, list):
        time_list = time_input
    else:
        raise ValueError(f"无效时间类型: {type(time_input)}，应为 str 或 List[str]")

    result = []

    for time_str in time_list:
        if not isinstance(time_str, str):
            raise ValueError(f"时间元素必须为字符串: {time_str}")

        normalized_str = time_str.replace('：', ':')
        parts = normalized_str.split(':')

        if len(parts) != 2:
            raise ValueError(f"无效时间格式: {time_str}，应为 HH:MM")

        try:
            hour = int(parts[0])
            minute = int(parts[1])
        except ValueError:
            raise ValueError(f"非数字时间值: {time_str}")

        if not (0 <= hour <= 23):
            raise ValueError(f"小时值越界: {hour}，应为 0-23")
        if not (0 <= minute <= 59):
            raise ValueError(f"分钟值越界: {minute}，应为 0-59")

        result.append({'hour': hour, 'minute': minute})

    return result

def parse_time_range(time_str: str) -> dict:
    """
    解析时间字符串并生成时间范围字典
    
    参数：
    time_str - 输入时间字符串，支持三种格式：
               "HH:MM"、"HH:MM:SS"、"HH:MM-HH:MM"
    
    返回：
    {'start': 'HH:MM', 'end': 'HH:MM'}
    
    异常：
    ValueError - 当输入格式无效或时间值越界时抛出
    """
    # 格式验证正则表达式
    time_pattern = r'^([0-2][0-9]:[0-5][0-9])(?::[0-5][0-9])?(?:-([0-2][0-9]:[0-5][0-9]))?$'
    
    if match := re.match(time_pattern, time_str):
        start_time, end_time = match.groups()
        
        # 处理第三种格式（时间区间）
        if end_time:
            return validate_and_format(start_time, end_time)
        
        # 处理前两种格式（单个时间）
        base_time = time_str[:5]  # 截取HH:MM部分
        calculated_end = calculate_end_time(base_time)
        return {'start': base_time, 'end': calculated_end}
    
    raise ValueError(f"无效时间格式：{time_str}")

def validate_and_format(start: str, end: str) -> dict:
    """验证并格式化时间区间"""
    try:
        # 时间有效性验证
        datetime.strptime(start, '%H:%M')
        datetime.strptime(end, '%H:%M')
        return {'start': start, 'end': end}
    except ValueError as e:
        raise ValueError(f"无效时间值：{str(e)}")

def calculate_end_time(base: str) -> str:
    """计算3小时后时间（处理跨天）"""
    try:
        # 时间解析与计算
        base_dt = datetime.strptime(base, '%H:%M')
        end_dt = base_dt + timedelta(hours=3)
        return end_dt.strftime('%H:%M')
    except ValueError as e:
        raise ValueError(f"无效基准时间：{str(e)}")

class NotifyService(service.Object):
    def __init__(self):
        # 连接到会话总线并注册对象路径
        bus = dbus.SystemBus()
        service.Object.__init__(self,bus, '/com/kylin/notifysend')

    @dbus.service.signal(dbus_interface='com.kylin.notifysend', 
                        signature='is')  # 参数类型定义为int
    def RebootNotify(self, status_code,status_description):
        """自定义信号声明：参数为整数状态码"""
        pass  # 函数体为空，通过调用该方法触发信号

    @dbus.service.signal(dbus_interface='com.kylin.notifysend', 
                        signature='is')  # 参数类型定义为int
    def InstallFinishNotify(self, status_code,status_description):
        """自定义信号声明：参数为整数状态码"""
        pass  # 函数体为空，通过调用该方法触发信号

    @dbus.service.signal(dbus_interface='com.kylin.notifysend', 
                        signature='b')  # 参数类型定义为bool
    def ConnectDistUpgrade(self, status):
        """自定义信号声明：参数为是否连接备份和下载完成信号状态码"""
        pass  # 函数体为空，通过调用该方法触发信号

class UnattendedUpgradesShutdown():
    def __init__(self) -> None:
        DBusGMainLoop(set_as_default=True)
        self.loop = GLib.MainLoop()
        self.system_bus = dbus.SystemBus()
        self.update_utils_proxy = self.system_bus.get_object('com.kylin.systemupgrade','/com/kylin/systemupgrade/utils')
        self.update_utils_interface = dbus.Interface(self.update_utils_proxy,dbus_interface='com.kylin.systemupgrade.interface')
        self.update_proxy = self.system_bus.get_object('com.kylin.systemupgrade','/com/kylin/systemupgrade',follow_name_owner_changes=True)
        self.update_interface = dbus.Interface(self.update_proxy,dbus_interface='com.kylin.systemupgrade.interface')
        self.upgrade_strategy_proxy = self.system_bus.get_object('com.kylin.UpgradeStrategies','/com/kylin/UpgradeStrategies',follow_name_owner_changes=True)
        self.upgrade_strategy_interface = dbus.Interface(self.upgrade_strategy_proxy,dbus_interface='com.kylin.UpgradeStrategies.interface')
        self.backup_proxy = self.system_bus.get_object('com.kylin.backupserver','/',follow_name_owner_changes=True)
        self.backup_interface = dbus.Interface(self.backup_proxy,dbus_interface='com.kylin.backup.server')
        self.update_proxy.connect_to_signal('UpdateDetectFinished',self.update_detect_finished_handler)
        self.update_proxy.connect_to_signal('UpdateFixBrokenStatus',self.update_fix_broken_status)
        self.update_proxy.connect_to_signal('UpdateDependResloveStatus',self.update_depend_resolve_status)
        self.update_proxy.connect_to_signal('UpdateDloadAndInstStaChanged',self.update_download_install_status)
        self.update_proxy.connect_to_signal('UpdateInstallFinished',self.update_install_finished)
        self.update_proxy.connect_to_signal('UpdateDownloadFinished',self.update_download_finished)
        self.update_proxy.connect_to_signal("ChangeUpgradePolicy",self.change_upgrade_policy_handler)
        self.update_proxy.connect_to_signal("UpgradeAllNow",self.upgrade_all_now_handler)
        self.upgrade_strategy_proxy.connect_to_signal("AutocheckStatusChanged",self.auto_check_status_changed_handler)
        self.upgrade_strategy_proxy.connect_to_signal("AutoUpgradeStatusChanged",self.autoupgrade_status_changed_handler)
        self.upgrade_strategy_proxy.connect_to_signal("AutoupgradePeriodChanged",self.autoupgrade_period_changed_handler)
        self.upgrade_strategy_proxy.connect_to_signal("AutoupgradeDetailChanged",self.autoupgrade_detail_changed_handler)
        self.upgrade_strategy_proxy.connect_to_signal("UpgradeSchemeChanged",self.change_upgrade_scheme_handler)
        self.upgrade_strategy_proxy.connect_to_signal("UpgradeStrategyChanged",self.change_upgrade_strategy_handler)
        self.upgrade_strategy_proxy.connect_to_signal("PropertyChanged",self.property_changed_handler)
        self.upgrade_strategy_proxy.connect_to_signal("UpgradeAllNow",self.upgrade_all_now_handler)
        self.backup_rollback_sendrate = None   # 备份进度
        self.backup_rollback_sendresult =None  # 备份结果
        self.update_detect_status = False
        self.update_detect_event = Event()
        self.update_list = []
        self.full_update_list = []  # 完整的更新列表
        self.resolve_depend_status = False
        self.resolve_depend_status_event = Event()
        self.remove_pkgs = []
        self.install_finish_status = False
        self.install_finish_status_event = Event()
        self.install_finish_group = []
        self.download_finish_status = False
        self.download_finish_status_event = Event()
        self.download_finish_group = []
        self.backup_finish_result = False
        self.backup_finish_event = Event()
        self.batch_rollback_finish_result = False
        self.batch_rollback_finish_event = Event()
        self.need_check_backup=True
        self.notify_service = NotifyService()
        self.shutdown_install = 'True'
        self.backup_finish_status = ""

    def DataBackendCollect(self,updateinfo,json_file):
        try:
            self.update_utils_interface.DataBackendCollect(updateinfo,json_file)
        except Exception as e:
            logging.error(f"Failed to collect backend data for {updateinfo}: {e}")
            # 不重新抛出异常，避免导致程序崩溃

    def ReportBatchRollback(self, status=BatchRollbackStatus.ROLLBACK_SUCCESS, error_code="", error_string="", extra_data=None):
        """
        上报批量回退流程数据
        :param status: 状态 (BatchRollbackStatus.TASK_ACQUIRED:系统已获取批量回退任务, BatchRollbackStatus.CONDITIONS_MET:系统已满足回退条件,
                      BatchRollbackStatus.CONDITIONS_NOT_MET:系统未满足批量回退条件, BatchRollbackStatus.ROLLBACK_STARTED:开始回退,
                      BatchRollbackStatus.ROLLBACK_FAILED:回退失败, BatchRollbackStatus.ROLLBACK_SUCCESS:回退成功)
        :param error_code: 错误码
        :param error_string: 错误描述
        :param extra_data: 额外数据
        """
        try:
            # 获取batchback的id字段
            batchback_id = self._get_batchback_id()
            
            rollback_info = {
                "batchback_id":     batchback_id,
                "status":           status.value,  # 使用枚举的值
                "error_code":       error_code,
                "error_string":     error_string
            }
            
            if extra_data:
                rollback_info.update(extra_data)
            
            json_file = json.dumps(rollback_info.copy())
            self.DataBackendCollect("batchBack", json_file)
            logging.info(f"Reported batch rollback status: {status}, json_file: {json_file}")
        except Exception as e:
            logging.error(f"Failed to report batch rollback data: {e}")

    def _get_batchback_id(self):
        """
        从合并后的配置中获取batchback的id字段（支持高优先级配置）
        :return: batchback的id值，如果获取失败则返回0
        """
        try:
            import json
            
            # 优先通过D-Bus获取合并后的配置
            try:
                if hasattr(self, 'GetUpgradeStrategy'):
                    strategy = self.GetUpgradeStrategy()
                    # 将D-Bus字典转换为普通Python字典
                    data = {}
                    for key, value in strategy.items():
                        if hasattr(value, 'signature'):
                            if value.signature == 'v':
                                try:
                                    if hasattr(value, 'variant_level') and value.variant_level > 0:
                                        actual_value = value.value
                                    else:
                                        actual_value = value
                                except:
                                    actual_value = str(value)
                            else:
                                actual_value = value
                        else:
                            actual_value = value
                        data[key] = actual_value
                else:
                    # 降级：直接读取文件
                    file_path = "/var/lib/unattended-upgrades/kylin-update-strategy.json"
                    if not os.path.exists(file_path):
                        logging.warning(f"File {file_path} does not exist")
                        return 0
                    with open(file_path, 'r', encoding='utf-8') as file:
                        data = json.load(file)
            except Exception as e:
                logging.error(f"获取配置失败，使用降级方案: {e}")
                file_path = "/var/lib/unattended-upgrades/kylin-update-strategy.json"
                if not os.path.exists(file_path):
                    return 0
                with open(file_path, 'r', encoding='utf-8') as file:
                    data = json.load(file)
                
            # 获取batchback字段的id值
            if "batchback" in data and "id" in data["batchback"]:
                return data["batchback"]["id"]
            else:
                logging.warning("batchback or id field not found in JSON file")
                return 0
        except Exception as e:
            logging.error(f"Failed to get batchback id: {e}")
            return 0
            logging.error(f"Failed to report batch rollback data: {e}")
            # 不重新抛出异常，避免导致程序崩溃

    def GetConfigValue(self,section,option):
        ret1,ret2=self.update_interface.GetConfigValue(section,option)
        return ret2
    
    def GetAutoBackupStatus(self):
        return self.upgrade_strategy_interface.GetAutoBackupStatus()
    
    def SetConfigValue(self,section,option,value):
        return self.update_interface.SetConfigValue(section,option,value)
    
    def GetUpgradeStrategy(self):
        return self.upgrade_strategy_interface.GetUpgradeStrategy()
    
    def SetUpgradeStrategy(self,input):
        return self.upgrade_strategy_interface.SetUpgradeStrategy(input)
    
    def CancelDownload(self):
        return self.update_interface.CancelDownload()
    
    def GetBackendStatus(self):
        return self.update_interface.GetBackendStatus("")

    def init_dbus_connections(self):
        pass

    def init_config(self):
        pass

    def init_event_flags(self):
        self.background_scheduler = BackgroundScheduler(
            job_defaults={
                'max_instances': 3,
                'coalesce': True,
                'misfire_grace_time': 60
            }
        )
        self.background_scheduler.start()

    def init_scheduler(self):
        pass

    def run(self):
        pass

    def prepare_upgrade(self):
        logging.info(_("prepare for upgrade"))
        self.update_detect_event.clear()
        self.update_interface.UpdateDetect()
        self.update_detect_event.wait()
        logging.debug(_("update detect finish:%s,%s")%(self.update_detect_status,",".join(self.update_list)))
        if self.update_detect_status and len(self.update_list)>0:
            pass
        elif not self.update_detect_status and 'kylin-system-updater' in self.update_list:
            logging.info(_("self update finished"))
        else:
            return False
        self.resolve_depend_status_event.clear()
        # 检查是否需要进行部分更新
        # 只有当update_list和full_update_list不一样时才走部分更新
        if hasattr(self, 'update_list') and hasattr(self, 'full_update_list'):
            # 如果两个列表不同，则使用部分更新
            if set(self.update_list) != set(self.full_update_list):
                logging.info("Using partial upgrade for resolver, update_list: %s", ",".join(self.update_list))
                # 使用DistUpgradePartial进行部分更新的依赖解析
                # 注意：需要将列表转换为适合DBus调用的格式
                partial_upgrade_list = dbus.Array(self.update_list, signature='s')
                errcode, errstr = self.update_interface.DistUpgradePartial(False, partial_upgrade_list)
                logging.info("Partial upgrade resolver result: errcode=%d, errstr=%s", errcode, errstr)
                if errcode != 0:
                    logging.error("Partial upgrade resolver failed: %s", errstr)
                    return False
            else:
                # 使用全量更新
                logging.info("Using full upgrade resolver")
                self.update_interface.DistUpgradeForAuto(ACTION_CHECK_RESOLVER)
        else:
            # 默认使用全量更新
            logging.info("Using full upgrade resolver (default)")
            self.update_interface.DistUpgradeForAuto(ACTION_CHECK_RESOLVER)
        self.resolve_depend_status_event.wait()
        logging.debug(_("resolve dependency status:%s,%s")%(self.resolve_depend_status,",".join(self.remove_pkgs)))
        if self.resolve_depend_status and len(self.remove_pkgs)==0:
            pass
        else:
            return False
        logging.info("disconnect notifysend from downloadfinish and backup result")
        self.notify_service.ConnectDistUpgrade(False)
        needbackup=self.upgrade_strategy_interface.GetAutoBackupStatus()
        if(self.need_check_backup):
            logging.debug(_("checking if need backup:%s")%needbackup)
            if needbackup:
                if self.backup():
                    logging.debug(_("backup success"))
                    ret = self.SetConfigValue("UpdateFrontendConf","backup_exist","True")
                    logging.info(_("set config value :%s")%(ret))
                    try:
                        restorable_flag="/tmp/update-backup.success"
                        logging.info("create restorable flag:%s"%(restorable_flag))
                        with open(restorable_flag, 'w') as file:
                            file.write("restore")
                        if os.path.exists(UNATTENDED_UPGRADE_UUID_FILE):
                            os.remove(UNATTENDED_UPGRADE_UUID_FILE)
                        current_uuid = SystemEnv.get_local_uuid()
                        logging.info("recode uuid :%s"%(current_uuid))
                        with open(UNATTENDED_UPGRADE_UUID_FILE, 'w') as file:
                            file.write(current_uuid)
                            file.flush()
                    except Exception as e: 
                        logging.error("Failed to create backup flag or uuid: %s", e)
           
                else:
                    BackupInfo = {}
                    logging.debug(_("backup failed"))
                    BackupInfo.update({"errorCode":str(self.backup_finish_status),
                                    "error_string":"备份失败",
                                    "appname":",".join(self.update_list)})
                    BackupInfo.update({"status":"failed"})
                    json_file = json.dumps(BackupInfo.copy())
                    self.DataBackendCollect("BackupInfo",json_file)
                    return False
        return True

    def download(self,timeout):
        logging.debug(_("start download"))
        logging.info("download timeout:%d"%timeout)
        self.download_finish_status=False
        self.download_finish_group=[]
        self.download_finish_status_event.clear()
        # 检查是否需要进行部分更新
        # 只有当update_list和full_update_list不一样时才走部分更新
        if hasattr(self, 'update_list') and hasattr(self, 'full_update_list'):
            # 如果两个列表不同，则使用部分更新
            if set(self.update_list) != set(self.full_update_list):
                logging.info("Using partial upgrade for download, update_list: %s", ",".join(self.update_list))
                # 使用DistUpgradePartial进行部分更新的下载
                # 注意：需要将列表转换为适合DBus调用的格式
                partial_upgrade_list = dbus.Array(self.update_list, signature='s')
                errcode, errstr = self.update_interface.DistUpgradePartial(True, partial_upgrade_list)
                logging.info("Partial upgrade download result: errcode=%d, errstr=%s", errcode, errstr)
                if errcode != 0:
                    logging.error("Partial upgrade download failed: %s", errstr)
                    return False
            else:
                # 使用全量更新
                logging.info("Using full upgrade download")
                errcode,errstr = self.update_interface.DistUpgradeForAuto(ACTION_DOWNLOADONLY)
        else:
            # 默认使用全量更新
            logging.info("Using full upgrade download (default)")
            errcode,errstr = self.update_interface.DistUpgradeForAuto(ACTION_DOWNLOADONLY)
        logging.info("ret code:%d,ret string:%s"%(errcode,errstr))
        if(errcode!=0):
            return False
        self.download_finish_status_event.wait(timeout=timeout)
        backend_status=self.GetBackendStatus()
        logging.info("backend status:%d"%backend_status)
        if(backend_status==ACTION_DOWNLOADONLY):
            logging.info("cancel download")
            self.CancelDownload()
            time.sleep(1)
        logging.debug(_("download finish status:%s,%s")%(self.download_finish_status,",".join(self.download_finish_group)))
        if self.download_finish_status and len(self.download_finish_group)>0:
            pass
        else:
            return False
        return True 
    
    def install_basic(self):
        logging.debug(_("start install"))
        self.install_finish_status_event.clear()
        
        # 检查是否需要进行部分更新
        # 只有当update_list和full_update_list不一样时才走部分更新
        if hasattr(self, 'update_list') and hasattr(self, 'full_update_list'):
            # 如果两个列表不同，则使用部分更新
            if set(self.update_list) != set(self.full_update_list):
                logging.info("Using partial upgrade for update_list: %s", ",".join(self.update_list))
                # 使用TriggerInstallOnShutdown触发关机安装，mode指定为"unattended"
                errcode, errstr = self.update_interface.TriggerInstallOnShutdown("unattended")
                logging.info("TriggerInstallOnShutdown result: errcode=%d, errstr=%s", errcode, errstr)
                if errcode != 0:
                    logging.error("TriggerInstallOnShutdown failed: %s", errstr)
                    return False
            else:
                # 使用全量更新
                logging.info("Using full upgrade")
                self.update_interface.DistUpgradeForAuto(ACTION_INSTALL)
        else:
            # 默认使用全量更新
            logging.info("Using full upgrade (default)")
            self.update_interface.DistUpgradeForAuto(ACTION_INSTALL)
            
        self.install_finish_status_event.wait()
        logging.debug(_("install finish status:%s,%s")%(self.install_finish_status,",".join(self.install_finish_group)))
        if self.install_finish_status and len(self.install_finish_group)>0:
            pass
        else:
            return False
        return True

    def install(self):
        pass
    
    def download_with_timeout(self,timeout):
        pass
         
    def predownload_with_timeout(self,timeout):
        pass

    def backup(self):
        logging.info(_("start backup"))
        # self.backup_proxy = self.system_bus.get_object('com.kylin.backupserver','/',follow_name_owner_changes=True)
        # self.backup_interface = dbus.Interface(self.backup_proxy,dbus_interface='com.kylin.backup.server')
        # self.backup_proxy.connect_to_signal('sendBackupResult',self.backup_result_handler)
        # self.backup_rollback_sendrate = self.backup_proxy.connect_to_signal('sendRate',self.send_rate_handler) 
        # 清理信号连接
        if self.backup_rollback_sendrate != None:
            self.backup_rollback_sendrate.remove()
        if  self.backup_rollback_sendresult != None:
            self.backup_rollback_sendresult.remove()
        # 重新建立信号连接
        self.backup_rollback_sendrate = self.backup_proxy.connect_to_signal('sendRate', self.send_rate_handler)
        self.backup_rollback_sendresult = self.backup_proxy.connect_to_signal('sendBackupResult', self.backup_result_handler)                                        
        backup_name = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        userName=getpass.getuser()
        uid=os.getuid()
        self.backup_finish_event.clear()
        self.backup_interface.autoBackUpForSystemUpdate_noreturn(backup_name,userName,uid)
        self.backup_finish_event.wait()
        # 断开信号连接
        self.backup_rollback_sendrate.remove()
        self.backup_rollback_sendresult.remove()
        return self.backup_finish_result
    
    def reboot(self):
        subprocess.run(["systemctl", "reboot"], check=True)

    def auto_check_status_changed_handler(self,status):
        pass

    def change_upgrade_strategy_handler(self,diff):
        pass
    
    def change_upgrade_scheme_handler(self,scheme):
        pass
        
    def change_upgrade_policy_handler(self):
        pass

    def autoupgrade_status_changed_handler(self,autoupgradestate):
        pass

    def autoupgrade_detail_changed_handler(self,mask,time):
        pass

    def autoupgrade_period_changed_handler(self,day):
        pass

    def property_changed_handler(self,property, value): 
        pass
        
    def upgrade_all_now_handler(self):
        pass

    def update_detect_finished_handler(self,success,updatelist,error_status,error_cause):
        logging.info(_("update detect finished:success:%s,updatelist:%s,error_status:%s,error_cause:%s")\
            %(success,",".join(updatelist),error_status,error_cause))
        self.update_detect_status = success
        
                # 检测到正常的uuid就记录下来
        if os.path.exists(UNATTENDED_UPGRADE_UUID_FILE):
            os.remove(UNATTENDED_UPGRADE_UUID_FILE)
        current_uuid = SystemEnv.get_local_uuid()
        logging.info("recode uuid :%s"%(current_uuid))
        if len(current_uuid) >= 30:
            with open(UNATTENDED_UPGRADE_UUID_FILE, 'w') as file:
                file.write(current_uuid)
                file.flush()

        # 保存完整的更新列表
        self.full_update_list = updatelist.copy()
        
        updategroup=["kylin-update-desktop-system", "kylin-update-desktop-quality", "kylin-update-desktop-security", "kylin-update-desktop-urgency", "kylin-update-desktop-equipment", "kylin-update-desktop-custom"]
        install_updategroup_tmp = []
        install_updategroup = []  # 需要安装的组件包列表
        install_single = []       # 需要安装的单个包列表
        for elepackage in updatelist:
            if elepackage in updategroup:
                install_updategroup_tmp.append(elepackage)
            else:
                install_single.append(elepackage)
        updatecontent=self.config.get("updatecontent",[])
        for elepackage in install_updategroup_tmp:
            if elepackage in updatecontent:
                install_updategroup.append(elepackage)
        if "single-package" in updatecontent:
            self.update_list = install_updategroup + install_single
        else:
            self.update_list = install_updategroup
        logging.info("need handle update list: %s, full update list: %s",
                     ",".join(self.update_list) if self.update_list else "None",
                     ",".join(self.full_update_list) if hasattr(self, 'full_update_list') and self.full_update_list else "None")
        self.update_detect_event.set()
        
    def update_fix_broken_status(self,resolver_status,remove_status,remove_pkgs,pkg_raw_description,delete_desc,error_string,error_desc):
        logging.info(_("update fix broken status:resolver_status:%s,remove_status:%s,error_string:%s,error_desc:%s")%(resolver_status,remove_status,error_string,error_desc))
        self.update_detect_status = False
        self.update_list = []
        self.update_detect_event.set()    
        
    def update_depend_resolve_status(self,resolver_status,remove_status,remove_pkgs,pkg_raw_description,delete_description,error_string,error_desc):
        logging.info(_("update depend resove status:%s,remove status:%s,remove pkgs:%s,pkg raw description:%s,delete_descrition:%s,error string:%s,error desc:%s")\
            %(resolver_status,remove_status,",".join(remove_pkgs),",".join(pkg_raw_description),",".join(delete_description),error_string,error_desc))
        self.resolve_depend_status = resolver_status
        self.remove_pkgs = remove_pkgs
        self.resolve_depend_status_event.set()
        self.shutdown_install=self.GetConfigValue("InstatllMode","shutdown_install")
        
    def update_download_install_status(self,group,progress,status,details):
        logging.debug(_("%s update progress:%d,status:%s,details:%s")%(",".join(group),progress,status,details))
        
    def update_install_finished(self,success,group,error_string,error_desc):
        logging.info(_("update install finisih success:%s,group:%s,error string:%s,error desc:%s")\
            %(success,",".join(group),error_string,error_desc))
        self.install_finish_status = success
        self.install_finish_group = group
        self.install_finish_status_event.set()
        self.download_finish_status = success
        self.download_finish_group = group
        self.download_finish_status_event.set()
        logging.info("shutdown install:%s"%(self.shutdown_install))
        if(self.shutdown_install=='True'):
            errcodelist=["#0208","#0204"]
            if(error_string in errcodelist):
                logging.info("error string is %s,do not clear the delay count and button hide status")
            else:
                logging.info("reset hideupdate and delaycount")
                strategy_data={
                    "autoupdate":{
                        "hideupdate":False,
                        "delaycount":0                   
                        }
                    } 
                self.SetUpgradeStrategy(json_to_asv(strategy_data))
        
    def update_download_finished(self,success,group,error_string,error_desc):
        logging.info(_("update download finisih success:%s,group:%s,error string:%s,error desc:%s")\
            %(success,",".join(group),error_string,error_desc))
        self.download_finish_status = success    
        self.download_finish_group = group
        self.download_finish_status_event.set()
                        
    def backup_result_handler(self,result,status):
        logging.debug(_("backup result:%s,status:%d")%(result,status))                  
        self.backup_finish_result = result 
        self.backup_finish_status = status
        self.backup_finish_event.set()
        
    def send_rate_handler(self,sta,pro):
        logging.debug(_("backup status:%d,progress:%d")%(sta,pro))
        
    def batch_rollback_result_handler(self, result, status):
        logging.debug(_("batch rollback result:%s,status:%d") % (result, status))

        # 记录回退结果到 system-version-upgrade.conf
        try:
            backup_dir = "/var/lib/kylin-system-updater/transID/"
            backup_file = os.path.join(backup_dir, "system-version-upgrade.conf")
            
            # 确保目录存在
            if not os.path.exists(backup_dir):
                os.makedirs(backup_dir, mode=0o755, exist_ok=True)

            backup_config = configparser.ConfigParser()
            if os.path.exists(backup_file):
                backup_config.read(backup_file, encoding='utf-8')
            
            # 确保 [batchBackStatus] section 存在
            if not backup_config.has_section('batchBackStatus'):
                backup_config.add_section('batchBackStatus')
            
            # 记录回退结果状态：成功为6(ROLLBACK_SUCCESS)，失败为5(ROLLBACK_FAILED)
            rollback_final_status = '6' if result else '5'
            backup_config.set('batchBackStatus', 'status', rollback_final_status)
            
            # 记录失败错误信息（如果失败）
            if not result:
                backup_config.set('batchBackStatus', 'error_code', str(status))
                backup_config.set('batchBackStatus', 'error_string', "Rollback failed")
            else:
                # 如果成功，清除可能的错误信息
                if backup_config.has_option('batchBackStatus', 'error_code'):
                    backup_config.remove_option('batchBackStatus', 'error_code')
                if backup_config.has_option('batchBackStatus', 'error_string'):
                    backup_config.remove_option('batchBackStatus', 'error_string')
            
            with open(backup_file, 'w', encoding='utf-8') as f:
                backup_config.write(f)
            
            logging.info(f"Batch rollback result (status={rollback_final_status}) recorded to {backup_file}")
        except Exception as e:
            logging.error(f"Failed to write batch rollback result to config file: {e}")
        
        self.batch_rollback_finish_result = result
        self.batch_rollback_finish_event.set()
        
    def batch_rollback_rate_handler(self, sta, pro):
        logging.debug(_("batch rollback status:%d,progress:%d") % (sta, pro))

class UnattendedUpgradeBackend(UnattendedUpgradesShutdown):
    def __init__(self):
        super().__init__()
        self.rollback_manager = BatchRollbackManager(self)
    
    def check_and_report_rollback_status(self):
        """
        检查回退状态文件，如果发现回退任务状态为 ROLLBACK_STARTED、ROLLBACK_SUCCESS 或 ROLLBACK_FAILED，
        则补充上报对应状态，并清除状态记录。
        """
        try:
            backup_dir = "/var/lib/kylin-system-updater/transID/"
            backup_file = os.path.join(backup_dir, "system-version-upgrade.conf")
            
            if not os.path.exists(backup_file):
                return False

            backup_config = configparser.ConfigParser()
            backup_config.read(backup_file, encoding='utf-8')
            
            # 检查是否有 [batchBackStatus] 段
            if not backup_config.has_section('batchBackStatus'):
                return False
            
            # 检查 status 字段
            if not backup_config.has_option('batchBackStatus', 'status'):
                return False
            
            status = backup_config.get('batchBackStatus', 'status')
            
            # 根据状态值进行上报
            if status == '4':  # ROLLBACK_STARTED - 回退已开始但未完成
                logging.info("Detected incomplete rollback (status=ROLLBACK_STARTED), reporting failure...")
                
                # 上报回退失败
                if hasattr(self, 'ReportBatchRollback'):
                    self.ReportBatchRollback(BatchRollbackStatus.ROLLBACK_FAILED,
                                            error_string=_("Rollback was interrupted or incomplete"))
                
                # 清除状态记录
                backup_config.remove_section('batchBackStatus')
                with open(backup_file, 'w', encoding='utf-8') as f:
                    backup_config.write(f)
                
                logging.info("Incomplete rollback status cleared from config file")
                return True
            
            elif status == '6':  # ROLLBACK_SUCCESS - 回退成功
                logging.info("Detected completed rollback (status=ROLLBACK_SUCCESS), reporting success...")
                
                # 上报回退成功
                if hasattr(self, 'ReportBatchRollback'):
                    self.ReportBatchRollback(BatchRollbackStatus.ROLLBACK_SUCCESS)
                
                # 清除状态记录
                backup_config.remove_section('batchBackStatus')
                with open(backup_file, 'w', encoding='utf-8') as f:
                    backup_config.write(f)
                
                logging.info("Rollback success status cleared from config file")
                return True
            
            elif status == '5':  # ROLLBACK_FAILED - 回退失败
                logging.info("Detected completed rollback (status=ROLLBACK_FAILED), reporting failure...")
                
                # 读取错误信息
                error_code = ""
                error_string = _("Rollback failed")
                if backup_config.has_option('batchBackStatus', 'error_code'):
                    error_code = backup_config.get('batchBackStatus', 'error_code')
                if backup_config.has_option('batchBackStatus', 'error_string'):
                    error_string = backup_config.get('batchBackStatus', 'error_string')
                
                # 上报回退失败
                if hasattr(self, 'ReportBatchRollback'):
                    self.ReportBatchRollback(BatchRollbackStatus.ROLLBACK_FAILED,
                                            error_code=error_code,
                                            error_string=error_string)
                
                # 清除状态记录
                backup_config.remove_section('batchBackStatus')
                with open(backup_file, 'w', encoding='utf-8') as f:
                    backup_config.write(f)
                
                logging.info("Rollback failure status cleared from config file")
                return True
            
            return False
        except Exception as e:
            logging.error(f"Error checking rollback status: {e}")
            return False

    def init_config(self):
        updagradestrategy=self.GetUpgradeStrategy()
        configjsonstr=convert_a_sv_to_json(updagradestrategy)
        logging.info(configjsonstr)
        self.config={}
        self.config=json.loads(configjsonstr)
    
    def init_scheduler(self):
        self.schedule_task()
        with open("/var/log/kylin-unattended-upgrades/unattended-upgrades-shutdown.log",'a+') as f:
            self.background_scheduler.print_jobs(out=f)  
        
    def schedule_task(self):
        try:
            updatemode=self.config.get('updatemode','off')
            logging.info("upgrade mode:%s"%updatemode)

            if(updatemode=="off"):
                pass

            elif(updatemode=="autoupdate"):
                autoupdateconfig    = self.config.get("autoupdate",{})

                period              = autoupdateconfig.get("period",1)
                # 确保 period 不为 0
                if not period or period <= 0:
                    period = 1
                    logging.warning(f"Invalid period value, reset to {period}")
                
                downloadMode        = autoupdateconfig.get("downloadMode","timeWindow")     # 下载模式
                downloadrandom      = autoupdateconfig.get("downloadtimerandom",120)
                mode                = autoupdateconfig.get("mode","timing")                 # 安装模式

                # 创建定时下载任务
                if downloadMode == "timeWindow":
                    downloadtime        = autoupdateconfig.get("downloadtime",[])
                    timelist            = process_timelist(process_time_intervals(downloadtime,downloadrandom))
                    logging.info("Generate download timelist: %s", timelist)

                    downloadtimestamp=ReadValueFromFile(UNATTENDED_UPGRADE_TIMESTAMP,'TimeStamp','download')
                    if not downloadtimestamp:
                        now = datetime.now()
                        downloadtimestamp = now.strftime("%Y-%m-%d %H:%M:%S")
                    dtime=datetime.strptime(downloadtimestamp, "%Y-%m-%d %H:%M:%S")
                    
                    # 如果 dtime 是过去的日期，重置为当前时间
                    now = datetime.now()
                    if dtime < now:
                        dtime = now
                        logging.info(f"Download timestamp is in the past, reset to now: {dtime}")

                    for t in timelist:
                        taskdate=(dtime+timedelta(days=float(period))).replace(hour=t.get("hour",0),minute=t.get("minute",0))
                        task_id="download_task_%s"%(taskdate.strftime("%Y-%m-%d %H:%M:%S"))
                        self.background_scheduler.add_job(self.download_with_timeout,args=[t.get("timeout",30)*60],trigger='interval',days=period,start_date=taskdate,id=task_id,replace_existing=True,max_instances=1)

                elif downloadMode == "periodic":
                    # 使用一次性任务，完成后重新调度
                    self.schedule_next_periodic_download()

                else:
                    pass

                # 创建定时安装任务
                if(mode=="timing"):
                    installtime     =autoupdateconfig.get("installtime",[])
                    installtimeList =parse_time(installtime)
                    logging.info("Generate install timeList: %s", installtimeList)

                    # 确保 period 不为 0
                    if not period or period <= 0:
                        period = 1
                        logging.warning(f"Invalid period value, reset to {period}")
                    
                    installtimestamp=ReadValueFromFile(UNATTENDED_UPGRADE_TIMESTAMP,'TimeStamp','install')
                    if not installtimestamp:
                        now = datetime.now()
                        installtimestamp = now.strftime("%Y-%m-%d %H:%M:%S")
                    itime=datetime.strptime(installtimestamp, "%Y-%m-%d %H:%M:%S")
                    
                    # 如果 itime 是过去的日期，重置为当前时间
                    now = datetime.now()
                    if itime < now:
                        itime = now
                        logging.info(f"Install timestamp is in the past, reset to now: {itime}")

                    for t in installtimeList:
                        # 计算任务执行时间
                        taskdate = (itime + timedelta(days=float(period))).replace(
                            hour=t.get("hour", 0),
                            minute=t.get("minute", 0)
                        )
                        
                        # 如果计算出的时间已经过去，调整到下一天
                        if taskdate <= now:
                            taskdate = taskdate + timedelta(days=int(period))
                            logging.info(f"Install task time has passed, adjusted to next period: {taskdate}")
                        
                        logging.info(f"Install task scheduled: taskdate={taskdate}, period={period} days")
                        
                        task_id = "install_task_%s" % (taskdate.strftime("%Y-%m-%d %H:%M:%S"))
                        self.background_scheduler.add_job(
                            self.install,
                            trigger='interval',
                            days=period,
                            start_date=taskdate,
                            id=task_id,
                            replace_existing=True,
                            max_instances=1)

            elif(updatemode=="predownload"):
                autoupdateconfig=self.config.get("predownload",{})
                downloadtime=autoupdateconfig.get("downloadtime",[])
                downloadrandom=autoupdateconfig.get("downloadtimerandom",120)
                period=autoupdateconfig.get("period",1)
                # 确保 period 不为 0
                if not period or period <= 0:
                    period = 1
                    logging.warning(f"Invalid period value, reset to {period}")
                
                timelist=process_timelist(process_time_intervals(downloadtime,downloadrandom))
                logging.info(timelist)
                downloadtimestamp=ReadValueFromFile(UNATTENDED_UPGRADE_TIMESTAMP,'TimeStamp','predownload')
                if not downloadtimestamp:
                    now = datetime.now()
                    downloadtimestamp = now.strftime("%Y-%m-%d %H:%M:%S")
                dtime=datetime.strptime(downloadtimestamp, "%Y-%m-%d %H:%M:%S")
                
                # 如果 dtime 是过去的日期，重置为当前时间
                now = datetime.now()
                if dtime < now:
                    dtime = now
                    logging.info(f"Predownload timestamp is in the past, reset to now: {dtime}")
                
                for t in timelist:
                    taskdate=(dtime+timedelta(days=float(period))).replace(hour=t.get("hour",0),minute=t.get("minute",0))
                    task_id="predownload_task_%s"%(taskdate.strftime("%Y-%m-%d %H:%M:%S"))
                    self.background_scheduler.add_job(self.predownload_with_timeout,args=[t.get("timeout",30)*60],trigger='interval',days=period,start_date=taskdate,id=task_id,replace_existing=True,max_instances=1)

            elif(updatemode=="updatenow"):
                autoupdateconfig=self.config.get("updatenow",{})
                downloadrandom=autoupdateconfig.get("downloadtimerandom",120)
                delay = random.uniform(0, downloadrandom)
                delta = timedelta(minutes=delay)
                taskdate=datetime.now() + delta
                self.background_scheduler.add_job(self.install,'date',run_date=taskdate)

            else:
                pass 
        except Exception as e:
            logging.error("schedule_task error: %s", e) 

    def install(self):
        self.need_check_backup=True
        if(self.prepare_upgrade()):
            if(self.install_basic()):
                updatemode=self.config.get("updatemode","autoupdate")
                logging.info("updatemode:%s"%updatemode)
                if(updatemode=="autoupdate"):
                    now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
                    UpdateTimeStamp('install',now)
                    postaction=self.config.get("autoupdate",{}).get("postaction",'no')
                    autoupdatemode = self.config.get("autoupdate", {}).get("mode","timing")
                    logging.info("install success, postaction:%s"%postaction)
                    if(autoupdatemode=="timing" or autoupdatemode=="now"):
                        if(postaction=="notify"):
                            try:
                                with BackgroundNotification(timeout=None) as client:
                                    ret = client.send_command("UPGRADE_DIALOG_INSTALL_SUCCESS_REBOOT_REQUIRED", {"mode": "notify"})
                                    logging.info(f"BackgroundNotification send_command result: {ret}")
                                    self.rollback_manager.handle_popup_result(ret)
                            except Exception as e:
                                logging.error(f"BackgroundNotification failed: {e}")
                        elif(postaction=="reboot"):
                            try:
                                with BackgroundNotification(timeout=None) as client:
                                    ret = client.send_command("UPGRADE_DIALOG_INSTALL_SUCCESS_REBOOT_REQUIRED", {"mode": "reboot"})
                                    logging.info(f"BackgroundNotification send_command result: {ret}")
                            except Exception as e:
                                logging.error(f"BackgroundNotification failed: {e}")
                            
                            # 从autoupdate配置中读取restartDelayCountAfterInstall，阻塞对应分钟后再执行重启操作
                            try:
                                timeout_minutes = self.config.get("autoupdate", {}).get("restartDelayCountAfterInstall", "0")
                                if timeout_minutes > 0:
                                    logging.info(f"Waiting for {timeout_minutes} minutes before reboot...")
                                    time.sleep(timeout_minutes * 60)
                            except Exception as e:
                                logging.error(f"Error while waiting for timeout: {e}")
                            SystemEnv.system_reboot()
                elif(updatemode=="updatenow"):
                    postaction=self.config.get("autoupdate",{}).get("postaction",'no')
                    logging.info("postaction:%s"%postaction)
                    if(postaction=="notify"):
                        self.notify_service.InstallFinishNotify(0,"install finished")
                    elif(postaction=="reboot"):
                        self.reboot()  
            else:
                # 优先从合并后的配置中获取 autobackup
                autobackflag = self.config.get("autobackup")
                
                if autobackflag is None:
                    # 合并后的配置中没有 autobackup，直接读取低优先级配置文件
                    try:
                        low_priority_file = "/var/lib/unattended-upgrades/kylin-update-strategy.json"
                        if os.path.exists(low_priority_file):
                            with open(low_priority_file, 'r') as f:
                                low_priority_config = json.load(f)
                            autobackflag = low_priority_config.get("autobackup", "true")
                            logging.info("autobackup from low priority config file: %s", autobackflag)
                        else:
                            autobackflag = "true"
                            logging.info("low priority config file not found, autobackup: %s", autobackflag)
                    except Exception as e:
                        logging.error("Failed to read low priority config for autobackup: %s", e)
                        autobackflag = "true"
                else:
                    logging.info("autobackup from merged config: %s", autobackflag)
                
                # 将字符串转换为布尔值进行判断
                if isinstance(autobackflag, bool):
                    autoback_enabled = autobackflag
                elif isinstance(autobackflag, str):
                    autoback_enabled = autobackflag.lower() == "true"
                else:
                    autoback_enabled = bool(autobackflag)
                
                if not autoback_enabled:
                    logging.info("autoback setting is close.")
                    return False
                autobackmode=self.config.get("autoback",{}).get("mode","now")
                if(autobackmode=="now"):
                    # 安装失败，立即回退
                    BackupServerInterface.execute_batch_rollback(
                        self.system_bus,
                        self.batch_rollback_result_handler,
                        self.batch_rollback_rate_handler,
                        self.batch_rollback_finish_event
                    )
                    restartmode = self.config.get("autoback", {}).get("restartMode", "no")
                    logging.info("rollback success, restart mode:%s",restartmode)
                    if restartmode == "no":
                        self.reboot()
                    elif restartmode == "reboot":
                        try:
                            with BackgroundNotification(timeout=None) as client:
                                ret = client.send_command("UPGRADE_DIALOG_INSTALL_FAILED_ROLLBACK_COMPLETED", {"mode": "reboot"})
                                logging.info(f"BackgroundNotification send_command result: {ret}")
                        except Exception as e:
                            logging.error(f"BackgroundNotification failed: {e}")
                        
                        try:
                            timeout_minutes = self.config.get("autoback", {}).get("delayCount", "0")
                            if timeout_minutes > 0:
                                logging.info(f"Waiting for {timeout_minutes} minutes before reboot...")
                                time.sleep(timeout_minutes * 60)
                        except Exception as e:
                            logging.error(f"Error while waiting for timeout: {e}")
                        SystemEnv.system_reboot()
                    elif restartmode == "notify":
                        try:
                            with BackgroundNotification(timeout=None) as client:
                                ret = client.send_command("UPGRADE_DIALOG_INSTALL_FAILED_ROLLBACK_COMPLETED", {"mode": "notify"})
                                logging.info(f"BackgroundNotification send_command result: {ret}")
                                self.rollback_manager.handle_popup_result(ret)
                        except Exception as e:
                            logging.error(f"BackgroundNotification failed: {e}")
   
    def download_with_timeout(self,timeout):
        self.need_check_backup  = False
        wait_timeout            = timeout
        start_time              = time.time()

        if(self.prepare_upgrade()):
            end_time    = time.time()
            time_diff   = end_time - start_time

            if(wait_timeout>time_diff):
                wait_timeout=wait_timeout-time_diff
                logging.info("timeout:%d,time diff:%d"%(wait_timeout,time_diff))

            if(self.download(wait_timeout)):
                now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
                UpdateTimeStamp('download',now)
                mode=self.config.get("autoupdate",{}).get("mode","timing")
                logging.info("auto update install mode:%s"%mode)
                if(mode=="shutdown"):
                    imode=self.GetConfigValue("InstallMode","shutdown_install")
                    logging.info("install mode:%s"%imode)
                    if(imode=="True"):
                        logging.info("send reboot notify signal")
                        self.notify_service.RebootNotify(0,"download finish")
                        pass
                elif(mode=="now"):
                    logging.info("install mode:%s"%mode)
                    self.install()

            else: # TODO: 注意超出下载时间段时的False信号需处理
                logging.info("download failed ...")
                self.download_retry(start_time, timeout)
        
        # 任务完成后，重新调度下一个周期下载任务
        self.schedule_next_periodic_download()

    def schedule_next_periodic_download(self):
        """调度下一个周期下载任务（带随机值），确保同时只有一个任务执行"""
        try:
            autoupdateconfig = self.config.get("autoupdate", {})
            downloadMode = autoupdateconfig.get("downloadMode", "timeWindow")
            
            if downloadMode != "periodic":
                return
            
            # 检查是否已有任务在运行
            existing_job = self.background_scheduler.get_job("download_task_periodic")
            if existing_job and existing_job.next_run_time:
                # 任务已在调度中，跳过
                logging.info("Periodic download task already scheduled, skip reschedule")
                return
            
            downloadCheckPeriod = autoupdateconfig.get("downloadCheckPeriod", 60)
            downloadCheckPeriod = 60 if downloadCheckPeriod == 0 else downloadCheckPeriod
            downloadrandom = autoupdateconfig.get("downloadtimerandom", 120)
            
            # 生成随机偏移
            random_max = max(0, downloadrandom)
            if random_max >= downloadCheckPeriod:
                random_max = downloadCheckPeriod - 1
            random_offset = random.randint(0, random_max)
            
            # 计算下次执行时间
            taskdate = datetime.now() + timedelta(minutes=downloadCheckPeriod - random_offset)
            logging.info("Schedule next periodic download: random_offset=%d, next_run=%s",
                        random_offset, taskdate.strftime("%Y-%m-%d %H:%M:%S"))
            
            # 使用 date 触发器（一次性任务），任务完成后会重新调度
            self.background_scheduler.add_job(
                self.download_with_timeout,
                trigger='date',
                run_date=taskdate,
                args=[7*24*60*60],  # 超时时间
                id="download_task_periodic",
                replace_existing=True,
                max_instances=1  # 最多同时运行1个实例
            )
        except Exception as e:
            logging.error("Failed to schedule next periodic download: %s", e)

    def download_retry(self, start_time, timeout):
        autoupdateconfig    = self.config.get("autoupdate",{})

        attempt             = 0
        max_retries         = 10
        downloadMode        = autoupdateconfig.get("downloadMode","timeWindow")     # 下载模式
        downloadRetryEnable = autoupdateconfig.get("downloadRetryEnable",False)
        downloadRetryPeriod = autoupdateconfig.get("downloadRetryPeriod",30)
        downloadRetryPeriodTime = downloadRetryPeriod * 60
        if downloadMode == "timeWindow" and downloadRetryEnable:

            stop_event = Event()

            while attempt < max_retries:
                attempt += 1
            
                download_time = time.time() - start_time
                if timeout - download_time > downloadRetryPeriodTime:
                    logging.info("Wait %smin, retry download.", downloadRetryPeriod)
                    stop_event.wait(timeout=downloadRetryPeriodTime)
                    self.prepare_upgrade()

                    if (self.download(timeout-download_time-downloadRetryPeriodTime)): # TODO: event阻塞时收到策略修改信号需处理
                        now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
                        UpdateTimeStamp('download',now)
                        mode=self.config.get("autoupdate",{}).get("mode","timing")
                        logging.info("auto update install mode:%s"%mode)
                        if(mode=="shutdown"):
                            imode=self.GetConfigValue("InstallMode","shutdown_install")
                            logging.info("install mode:%s"%imode)
                            if(imode=="True"):
                                logging.info("send reboot notify signal")
                                self.notify_service.RebootNotify(0,"download finish")
                        elif(mode=="now"):
                            logging.info("install mode:%s"%mode)
                            self.install()
                        break
                    else:
                        self.download_retry(start_time, timeout)
                else:
                    logging.info("Insufficient remaining download time in the current time period, stop retry download")

    def predownload_with_timeout(self,timeout):
        self.need_check_backup=False
        wait_timeout=timeout
        start_time=time.time()
        if(self.prepare_upgrade()):
            end_time=time.time()
            time_diff=end_time-start_time
            logging.info("timeout:%d,time diff:%d"%(wait_timeout,time_diff))
            if(wait_timeout>time_diff):
                wait_timeout=wait_timeout-time_diff
            if(self.download(wait_timeout)):
                now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
                UpdateTimeStamp('predownload',now)

    def change_upgrade_strategy_handler(self, diff):
        logging.info("upgrade strategy changed:%s"%diff)
        json_diff=json.loads(diff)
        updagradestrategy=self.GetUpgradeStrategy()
        configjsonstr=convert_a_sv_to_json(updagradestrategy)
        logging.info(configjsonstr)
        self.config=json.loads(configjsonstr)
        updatemode=self.config.get("updatemode","off")
        logging.info("upgrade mode:%s"%updatemode)
        added=json_diff.get("added",{})
        modified=json_diff.get("modified",{})
        if(not added and not modified):
            logging.info("no changes of strategy")
        else:
            # 配置已更新，清除rollback_manager的配置缓存
            if hasattr(self, 'rollback_manager') and hasattr(self.rollback_manager, '_clear_config_cache'):
                self.rollback_manager._clear_config_cache()
            
            # 检查是否需要刷新定时任务（只有updatecontent、autoupdate、autoback相关变动才需要）
            need_reschedule = self._need_reschedule_for_strategy_change(json_diff)
            
            if need_reschedule:
                logging.info("strategy changed (updatecontent/autoupdate/autoback), reschedule tasks")
                self.background_scheduler.remove_all_jobs()
                self.schedule_task()
            else:
                logging.info("strategy changed but no need to reschedule tasks (only batchback or other fields)")
            
            # 检查是否包含批量回退策略修改
            if self._contains_batch_rollback_modified(json_diff):
                logging.info("Contains batch rollback policy modified, executing rollback policy")
                self.rollback_manager.check_batch_rollback_policy()
        
        with open("/var/log/kylin-unattended-upgrades/unattended-upgrades-shutdown.log",'a+') as f:
            self.background_scheduler.print_jobs(out=f)
    
    def _need_reschedule_for_strategy_change(self, json_diff):
        """
        检查策略变动是否需要重新调度定时任务
        
        只有以下字段的变动才需要重新调度：
        - updatecontent
        - autoupdate（包括其子字段）
        - autoback（包括其子字段）
        
        参数:
            json_diff: 策略变动的diff信息
            
        返回:
            bool: 是否需要重新调度
        """
        added = json_diff.get("added", {})
        modified = json_diff.get("modified", {})
        
        # 需要监控的字段列表
        reschedule_fields = [
            "updatecontent",
            "autoupdate",
            "autoback"
        ]
        
        # 检查added中的字段
        for key in added.keys():
            for field in reschedule_fields:
                if key == field or key.startswith(f"{field}."):
                    return True
        
        # 检查modified中的字段
        for key in modified.keys():
            for field in reschedule_fields:
                if key == field or key.startswith(f"{field}."):
                    return True
        
        return False

    def _contains_batch_rollback_modified(self, json_diff):
        """
        检查是否包含批量回退策略修改
        :param json_diff: 策略变化的diff信息
        :return: 如果包含批量回退策略修改返回True，否则返回False
        """
        added = json_diff.get("added", {})
        modified = json_diff.get("modified", {})
        
        # 检查新增项中是否包含batchback相关的修改
        if added:
            for key in added.keys():
                if key == "batchback" or key.startswith("batchback."):
                    return True
        
        # 检查修改项中是否包含batchback相关的修改
        if modified:
            for key in modified.keys():
                if key == "batchback" or key.startswith("batchback."):
                    return True

        logging.info("No changes of batch rollback strategy")
        return False

    def property_changed_handler(self, property, value):
        logging.info("property changed %s:%s"%(property,value))
        self.change_upgrade_policy_handler()

    def change_upgrade_policy_handler(self):
        logging.info("upgrade policy changed")
        json_data=ini_to_json(UNATTENDED_UPGRADE_POLICY_FILE_PATH)
        logging.info(json_data)
        data={}
        autoupgradepolicy=json_data.get('autoUpgradePolicy',{})
        updatedays=int(autoupgradepolicy.get('updatedays',1))
        randomrange=int(autoupgradepolicy.get('randomrange',1))
        if(autoupgradepolicy.get('autoupgradestate','off')=='on'):
            data.update({'updatemode':'autoupdate'})
            downloadmode=autoupgradepolicy.get('downloadmode','timing')
            downloadtime=parse_time_range(autoupgradepolicy.get('downloadtime','11:00-12:00'))
            installmode=autoupgradepolicy.get('installmode','timing')
            installtime=parse_time_range(autoupgradepolicy.get('installtime','10:00-11:00'))
            autoupdate={}
            if(downloadmode=='timing'):
                autoupdate.update({'downloadtime':[downloadtime]})
            if(installmode=='timing'):
                autoupdate.update({'mode':'timing'})
                autoupdate.update({'installtime':installtime.get('start',"10:00")})
            elif(installmode=='bshutdown'):
                autoupdate.update({'mode':'shutdown'})
            autoupdate.update({'downloadtimerandom': randomrange})
            autoupdate.update({'period':updatedays})
            data.update({'autoupdate':autoupdate})
        elif(autoupgradepolicy.get('predownload','off')=='on'):
            data.update({'updatemode':'predownload'})
            predownloadtime=parse_time_range(autoupgradepolicy.get('predownloadtime','10:00-11:00'))
            predownload={}
            predownload.update({"downloadtime":[predownloadtime]})
            predownload.update({'downloadtimerandom': randomrange})
            predownload.update({'period':updatedays})
            data.update({"predownload":predownload})
        else:
            data.update({'updatemode':'off'})
        self.SetUpgradeStrategy(json_to_asv(data))
    
    def change_upgrade_scheme_handler(self,scheme):
        def generate_time_output(input_data: dict) -> dict:
            """
            处理时间字段并生成指定格式的字典输出
            
            参数：
            input_data - 包含download_time和install_time的原始数据
            
            返回：
            动态生成的字典，仅包含有效数据项
            
            规则：
            1. 当download_time列表非空时，生成downloadtime字段（键名转换）
            2. 当install_time列表非空时，提取第一个元素的start_time生成installtime字段
            3. 空列表或不存在字段时自动过滤
            """
            output = {}
            
            # 处理download_time（键名转换逻辑）
            download_periods = [
                {"start": item["start_time"], "end": item["end_time"]}
                for item in input_data.get("download_time", [])
                if isinstance(item, dict) and "start_time" in item and "end_time" in item
            ]
            if download_periods:  # 仅当列表非空时生成字段
                output["downloadtime"] = download_periods
            
            # 处理install_time（首项提取逻辑）
            install_items = input_data.get("install_time", [])
            if install_items and isinstance(install_items[0], dict):  # 检查列表非空且首项有效
                first_install = install_items[0]
                if "start_time" in first_install:
                    output["installtime"] = first_install["start_time"]
            
            return output        
        configjsonstr=convert_a_sv_to_json(scheme)
        logging.info("upgrade scheme changed")
        logging.info(configjsonstr)
        json_data=json.loads(configjsonstr)
        data={}
        if(json_data.get('auto_download','true')=='true'):
            data.update({'updatemode':'autoupdate'})
            autoupdate=generate_time_output(json_data)
            period=int(json_data.get('download_period',{}).get('day',1))
            mode=json_data.get('mode','shutdown')
            autoupdate.update({"mode":mode}) 
            autoupdate.update({"period":period})
            data.update({"autoupdate":autoupdate})
        elif(json_data.get('auto_download','false')=='false'):
            data.update({'updatemode':'off'})
        self.SetUpgradeStrategy(json_to_asv(data))

    def auto_check_status_changed_handler(self, status):
        logging.info("is instance of bool:%s"%isinstance(status,bool))
        data={}
        if(status):
            data.update({"notification":bool(1)})    
        else:
            data.update({"notification":bool(0)})
        self.SetUpgradeStrategy(json_to_asv(data))

    def autoupgrade_status_changed_handler(self,autoupgradestate):
        logging.info("auto upgrade status changed:%s"%autoupgradestate)

    def autoupgrade_detail_changed_handler(self,mask,time):
        logging.info("auto upgrade detail changed:%s,%s"%(mask,time))

    def autoupgrade_period_changed_handler(self,day):
        logging.info("auto upgrade period changed:%d"%day)
        
    def upgrade_all_now_handler(self):
        logging.info("upgrade all now")
        data={}
        data.update({'updatemode':'updatenow'})
        self.SetUpgradeStrategy(json_to_asv(data))

    def run(self):
        logging.debug(_("Waiting for signal to start operation"))
        self.loop.run() 

    def init_project_config(self):
        try:
            pkg="kylin-update-desktop-config"
            logging.info("check %s version"%pkg)
            success, result = self.get_package_special_id(pkg)
            logging.info("check project info result:%s,id:%s"%(success,result))
            version="none"
            version=result.lower()
            if(version=="hg" or version=="hw.hg"):
                logging.info("set hg project json config")
                refreshjsonflag="/var/lib/hg-json-refresh-flag"        
                if(os.path.exists(refreshjsonflag)):
                    logging.info("/var/lib/hg-json-refresh-flag exists,already set kylin-update-strategy.json,pass")
                else:
                    logging.info("set hg project json config")
                    data={}
                    with open("/var/lib/unattended-upgrades/kylin-update-strategy.json", 'r') as file1:
                        data = json.load(file1)
                    data["updatemode"]="autoupdate"
                    data["autoupdate"]["mode"]="shutdown"
                    data["autoupdate"]["period"]=1
                    data["autoupdate"]["downloadtimerandom"]=120
                    data["autoupdate"]["downloadtime"]=[{"start":"11:00", "end":"14:00"}]                 
                    with open('/var/lib/unattended-upgrades/kylin-update-strategy.json', 'w') as file2:
                        json.dump(data, file2, indent=4) 
                    with open(refreshjsonflag,'w') as f:
                        f.write("kylin-update-strategy.json already refreshed") 
                    cmd="/usr/share/kylin-system-updater/SystemUpdater/Core/quitStrategiesService.py"
                    subret=subprocess.run(cmd,shell=True,capture_output=True,text=True,check=True) 
                    logging.info(subret)
        except Exception as e:
            logging.info("init project config error",e)  

    def get_package_special_id(self,package_name):
        """
        获取Ubuntu系统软件包版本号中的特殊项目标识
        
        :param package_name: 软件包名称（如"kylin-update-desktop-config"）
        :return: (状态, 结果)元组，成功返回(True, 标识符)，失败返回(False, 错误信息)
        """
        try:
            # 使用dpkg-query查询软件包版本号
            result = subprocess.run(
                ['dpkg-query', '-W', '-f=${Version}', package_name],
                capture_output=True,
                text=True,
                check=True,
                timeout=5  # 设置5秒超时
            )
            
            version = result.stdout.strip()
            if not version:
                return False, "获取的版本号为空"
            elif "hw.hg" in version:
                return True,"hg"
            elif "hg" in version:
                return True,"hg"
            # 正则表达式匹配特殊标识符
            # 规则：匹配长度≥2的字母组合，前后必须有数字或边界
            pattern = r'(?:^|[.-])\d*([a-zA-Z]{2,})(?=\d|\.|$)'
            match = re.search(pattern, version)
            
            return True, match.group(1) if match else "none"
        
        except Exception as e:
            # 处理包不存在或查询失败
            error_msg = e.stderr.strip() or "软件包查询失败"
            if "no packages found" in error_msg.lower():
                return False, f"软件包'{package_name}'未安装"
            return False, error_msg
        
        except subprocess.TimeoutExpired:
            return False, "查询超时，请检查dpkg-query是否可用"
        
        except Exception as e:
            return False, f"未知错误: {str(e)}"



class BackgroundNotification():
    socket_file = "/tmp/kylin-background-upgrade-strategies.sock"

    def __init__(self, socket_path=socket_file, timeout=5):
        self.socket_path = socket_path
        self.timeout = timeout
        self.sock = None

    def connect(self):
        if self.sock: return
        if not os.path.exists(self.socket_path):
            raise FileNotFoundError(f"Socket文件未找到: {self.socket_path}")

        self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
        self.sock.settimeout(self.timeout)
        self.sock.connect(self.socket_path)

    def close(self):
        if self.sock:
            self.sock.close()
            self.sock = None

    def send_command(self, op_type, params=None):
        """
        发送命令的核心方法
        :param op_type: 操作类型字符串，例如 "check_update"
        :param params: 字典类型的参数，例如 {"force": True}
        """
        if params is None:
            params = {}
            
        try:
            self.connect()
            
            # 1. 构造 JSON
            request = {
                "type": op_type,
                "data": params
            }
            json_bytes = json.dumps(request).encode('utf-8')

            # 2. 发送协议头 (4字节大端长度) + JSON内容
            # >I 表示 Big-Endian Unsigned Int (4 bytes)
            header = struct.pack('>I', len(json_bytes))
            self.sock.sendall(header + json_bytes)

            # 3. 接收响应 (同样是 4字节头 + JSON)
            # 3.1 读头
            resp_header = self._recv_n_bytes(4)
            if not resp_header: return None
            resp_len = struct.unpack('>I', resp_header)[0]

            # 3.2 读体
            resp_body = self._recv_n_bytes(resp_len)
            return json.loads(resp_body.decode('utf-8'))

        except Exception as e:
            logging.info(f"[Python Client Error] {e}")
            self.close()
            return None

    def _recv_n_bytes(self, n):
        data = b''
        while len(data) < n:
            chunk = self.sock.recv(n - len(data))
            if not chunk: return None
            data += chunk
        return data

    def __enter__(self):
        self.connect()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()


class BackupServerInterface:
    @staticmethod
    def execute_batch_rollback(system_bus, batch_rollback_result_handler, batch_rollback_rate_handler, batch_rollback_finish_event):
        """实际的回退逻辑"""
        logging.info("Executing batch rollback operation ...")
        # 创建独立的备份服务代理实例
        # 这样可以避免与其他操作共享信号连接，确保信号处理的独立性
        backup_proxy = system_bus.get_object('com.kylin.backupserver','/',follow_name_owner_changes=True)
        backup_interface = dbus.Interface(backup_proxy,dbus_interface='com.kylin.backup.server')
        
        # 连接信号处理器
        # 由于使用独立的代理实例，这里的信号连接不会影响其他操作
        backup_proxy.connect_to_signal('sendAutoRestoreResult', batch_rollback_result_handler)
        backup_proxy.connect_to_signal('sendRate', batch_rollback_rate_handler)
        
        # 清除事件
        batch_rollback_finish_event.clear()
        
        # 调用回退方法
        userName = getpass.getuser()
        uid = os.getuid()
        backup_interface.autoRestoreForSystemUpdateWithoutReboot(userName, uid)
        
        # 等待回退操作完成
        batch_rollback_finish_event.wait()
        
        logging.info("Rollback operation completed.")

    @staticmethod
    def set_rollback_flag():
        """
        设置回退标志位. 旧回退方案，需要备份还原主动检测标志位并触发.
        同时记录回退状态到 system-version-upgrade.conf 文件中。
        """
        # 创建目录（如果不存在）
        if not os.path.exists(UPDATE_NOT_RESTORE_DIR):
            os.makedirs(UPDATE_NOT_RESTORE_DIR)
        
        # 创建回退标志文件
        with open(NEED_ROLLBACK_FLAG, 'w') as file:
            pass  # 不写入任何内容，只创建文件
            
        # 记录回退状态到 system-version-upgrade.conf
        try:
            backup_dir = "/var/lib/kylin-system-updater/transID/"
            backup_file = os.path.join(backup_dir, "system-version-upgrade.conf")
            
            if not os.path.exists(backup_dir):
                os.makedirs(backup_dir, mode=0o755, exist_ok=True)

            backup_config = configparser.ConfigParser()
            if os.path.exists(backup_file):
                backup_config.read(backup_file, encoding='utf-8')
            
            # 确保 [batchBackStatus] section 存在
            if not backup_config.has_section('batchBackStatus'):
                backup_config.add_section('batchBackStatus')
            
            # 记录回退状态为 ROLLBACK_STARTED (4)
            backup_config.set('batchBackStatus', 'status', '4')
            
            with open(backup_file, 'w', encoding='utf-8') as f:
                backup_config.write(f)
            
            logging.info(f"Batch rollback status (ROLLBACK_STARTED) recorded to {backup_file}")
        except Exception as e:
            logging.error(f"Failed to write batch rollback status to config file: {e}")
            
        logging.info("Fallback flag has been generated")


class SystemEnv:
    @staticmethod
    def get_local_uuid() -> str:
        """
        获取节点UUID
        从/var/lib/kylin-software-properties/config/updateID.conf中读取uuid字段
        """
        config_path = "/var/lib/kylin-software-properties/config/updateID.conf"
        try:
            if os.path.exists(config_path):
                config = configparser.ConfigParser(allow_no_value=True)
                config.optionxform = str
                config.read(config_path)
                if "update" in config and "uuid" in config["update"]:
                    return config["update"]["uuid"].strip()
            # 如果配置文件不存在或读取失败，返回默认值
            return ""
        except Exception as e:
            logging.error("Failed to read uuid from config: %s", e)
            return ""

    @staticmethod
    def get_os_version() -> str:
        """
        获取系统版本号
        从/etc/kylin-version/kylin-system-version.conf中读取[SYSTEM]部分的os_version字段
        """
        try:
            config_path = "/etc/kylin-version/kylin-system-version.conf"
            if os.path.exists(config_path):
                config = configparser.ConfigParser(allow_no_value=True)
                config.optionxform = str
                config.read(config_path)
                if "SYSTEM" in config and "os_version" in config["SYSTEM"]:
                    return config["SYSTEM"]["os_version"]
            # 如果配置文件不存在或读取失败，返回默认值
            return ""
        except Exception as e:
            logging.error("Failed to read os_version from config: %s", e)
            return ""

    @staticmethod
    def get_update_version(patch_type: str = None) -> Union[list, str]:
        """
        获取补丁版本号列表或特定类型的补丁版本号
        包含: update_version, quality_version, security_version, urgency_version, equipment_version, custom_version
        格式分别为: .F, .Q, .S, .U, .E, .C (作为版本号中间的一部分)
        例如: 68241.F250006, 68441.E250001
        
        :param patch_type: 可选，指定补丁类型 ('update', 'quality', 'security', 'urgency', 'equipment', 'custom')
                          如果未指定，则返回所有有效补丁版本的列表
        :return: 如果指定了patch_type，返回对应类型的补丁版本号字符串（如果存在）或空字符串
                 如果未指定patch_type，返回所有有效补丁版本的列表
        """
        config_path = "/etc/kylin-version/kylin-system-version.conf"
        
        # 定义版本类型映射
        version_mapping = {
            "update": ("update_version", ".F"),
            "quality": ("quality_version", ".Q"),
            "security": ("security_version", ".S"),
            "urgency": ("urgency_version", ".U"),
            "equipment": ("equipment_version", ".E"),
            "custom": ("custom_version", ".C")
        }
        
        try:
            if os.path.exists(config_path):
                config = configparser.ConfigParser(allow_no_value=True)
                config.optionxform = str
                config.read(config_path)
                
                if "SYSTEM" in config:
                    # 如果指定了补丁类型，只返回该类型的版本号
                    if patch_type and patch_type in version_mapping:
                        version_key, identifier = version_mapping[patch_type]
                        if version_key in config["SYSTEM"]:
                            version_value = config["SYSTEM"][version_key].strip()
                            # 验证格式是否符合要求
                            if version_value and SystemEnv._is_valid_patch_version(version_value, identifier):
                                return version_value
                        # 如果没有找到或格式不正确，返回空字符串
                        return ""
                    
                    # 如果未指定补丁类型，返回所有有效补丁版本的列表
                    version_list = []
                    version_types = [
                        ("update_version", ".F"),
                        ("quality_version", ".Q"),
                        ("security_version", ".S"),
                        ("urgency_version", ".U"),
                        ("equipment_version", ".E"),
                        ("custom_version", ".C")
                    ]
                    
                    for version_key, identifier in version_types:
                        if version_key in config["SYSTEM"]:
                            version_value = config["SYSTEM"][version_key].strip()
                            # 即使版本号为空也要添加到列表中，以满足"本地为空时也需要返回"的要求
                            if version_value:
                                # 验证格式是否符合要求
                                if SystemEnv._is_valid_patch_version(version_value, identifier):
                                    version_list.append(version_value)
                            else:
                                # 添加空字符串以表示该类型的版本不存在
                                version_list.append("")
                    return version_list
            
            # 如果配置文件不存在，根据是否指定了补丁类型返回相应结果
            if patch_type:
                return ""
            else:
                # 返回空列表，但包含6个空字符串以表示所有类型的版本都不存在
                return ["", "", "", "", "", ""]
                
        except Exception as e:
            logging.error("Failed to read update versions from config: %s", e)
            # 异常情况下也遵循相同的行为：指定类型时返回空字符串，未指定时返回空列表
            if patch_type:
                return ""
            else:
                return []

    @staticmethod
    def _is_valid_patch_version(version: str, expected_identifier: str) -> bool:
        """
        验证补丁版本号格式是否正确
        :param version: 版本号字符串
        :param expected_identifier: 期望的标识符 (.F, .Q, .S, .U, .E, .C)
        :return: 格式是否正确
        """
        if not version:
            return False
        # 标识符只是在正常情况下能匹配，本地的版本号可能不包含这些标识符
        # 只要version不为空总是返回True
        return True

    @staticmethod
    def system_reboot():
        logging.warning("system will reboot ...")
        subprocess.run(["systemctl", "reboot"], check=True)

class BatchRollbackManager:
    """
    Dedicated interpreter and executor for 'batchback' configuration.
    Features: Configuration-driven, blocking execution, dynamic condition checking.
    """
    def __init__(self, parent=None):
        self.parent = parent
        # 缓存合并后的配置，避免频繁D-Bus调用
        self._cached_merged_config = None

    def _get_merged_config(self) -> dict:
        """
        获取合并后的配置（普通配置 + 高优先级配置）
        通过D-Bus调用GetUpgradeStrategy获取已合并的配置
        """
        try:
            if self._cached_merged_config is not None:
                return self._cached_merged_config
            
            if self.parent and hasattr(self.parent, 'GetUpgradeStrategy'):
                # 通过D-Bus接口获取合并后的配置
                strategy = self.parent.GetUpgradeStrategy()
                # 将D-Bus字典转换为普通Python字典
                config_dict = {}
                for key, value in strategy.items():
                    if hasattr(value, 'signature'):
                        # 处理D-Bus类型
                        if value.signature == 'v':
                            # 对于变体类型，尝试提取实际值
                            try:
                                if hasattr(value, 'variant_level') and value.variant_level > 0:
                                    actual_value = value.value
                                else:
                                    actual_value = value
                            except:
                                actual_value = str(value)
                        else:
                            actual_value = value
                    else:
                        actual_value = value
                    config_dict[key] = actual_value
                
                self._cached_merged_config = config_dict
                return config_dict
        except Exception as e:
            logging.error(f"获取合并配置失败: {e}")
        
        # 降级方案：直接读取普通配置文件
        try:
            file_path = "/var/lib/unattended-upgrades/kylin-update-strategy.json"
            if os.path.exists(file_path):
                with open(file_path, 'r', encoding='utf-8') as f:
                    config = json.load(f)
                    self._cached_merged_config = config
                    return config
        except Exception as e:
            logging.error(f"读取普通配置文件失败: {e}")
        
        return {}

    def _clear_config_cache(self):
        """清除配置缓存，用于配置更新后"""
        self._cached_merged_config = None
        
    def _get_last_updateinfo_status(self) -> Optional[str]:
        """
        获取本地数据库 updateinfos 表最后一条记录的 status 值
        返回: status 值（字符串类型），如果获取失败返回 None
        """
        db_file = "/var/lib/kylin-system-updater/kylin-system-updater.db"
        try:
            if not os.path.exists(db_file):
                logging.warning(f"Database file not found: {db_file}")
                return None
            
            conn = sqlite3.connect(db_file)
            cursor = conn.cursor()
            
            # 查询 updateinfos 表最后一条记录的 status
            cursor.execute("SELECT status FROM updateinfos ORDER BY id DESC LIMIT 1")
            result = cursor.fetchone()
            conn.close()
            
            if result is None:
                logging.info("updateinfos table is empty")
                return None
            
            # status 字段是字符串类型，直接返回
            return result[0]
                
        except Exception as e:
            logging.error(f"Failed to get last updateinfo status: {e}")
            return None

    def _check_single_condition(self, cond: Dict[str, Any]) -> bool:
        """
        Parse a single condition dictionary.
        Example: {"os_version": 2503, "compare": "eq"}
        Example with status: {"txnMtmNodeUuids": [{"status": 1, "uuid": "uuid1"}, {"status": 2, "uuid": "uuid2"}]}
        """
        operator = cond.get("compare", "eq")
        
        actual_val = None
        target_val = None
        field_name = "unknown"

        # Routing logic: Decide what to check based on keys in the config dict
        # 如果 txnMtmNodeUuids 存在但值为空，则跳过 txnMtmNodeUuids 的判断
        if "txnMtmNodeUuids" in cond:
            field_name = "UUID"
            # The config provides a list of UUIDs; check if the local UUID is in that list
            target_list = cond.get("txnMtmNodeUuids")

            if not target_list or (isinstance(target_list, list) and len(target_list) == 0):
                logging.info(f"    txnMtmNodeUuids is empty, skipping UUID check.")
            else:
                if os.path.exists(UNATTENDED_UPGRADE_UUID_FILE):
                    with open(UNATTENDED_UPGRADE_UUID_FILE,'r') as file:
                        actual_uuid = file.read().strip()
                else:
                    actual_uuid = SystemEnv.get_local_uuid()
                
                # 处理新的数据结构：txnMtmNodeUuids 是包含 status 和 uuid 的字典列表
                # 查找本地 UUID 是否在列表中，并检查对应的 status
                is_match = False
                matched_node_status = None
                
                for node in target_list:
                    if isinstance(node, dict):
                        node_uuid = node.get('uuid', node.get('UUID', ''))
                        if node_uuid == actual_uuid:
                            is_match = True
                            matched_node_status = node.get('status')
                            break
                    elif isinstance(node, str):
                        # 兼容旧格式：字符串列表
                        if node == actual_uuid:
                            is_match = True
                            break
                
                status = "in" if is_match else "not in"
                logging.info(f"    Checking {field_name}: Local={actual_uuid} is [{status}] target list.")

                if is_match and matched_node_status is not None:
                    logging.info(f"    Checking upgrade status: Target_status={matched_node_status}")
                    
                    # 获取本地数据库中 updateinfos 表最后一条记录的 status
                    local_status = self._get_last_updateinfo_status()
                    if local_status is not None:
                        logging.info(f"    Local database last updateinfo status: {local_status}")
                        # 将数字 status 转换为字符串 status 进行比较
                        # 1 -> "success", 2 -> "failed"
                        target_status_str = None
                        if matched_node_status == 1:
                            target_status_str = "success"
                        elif matched_node_status == 2:
                            target_status_str = "failed"
                        
                        if target_status_str is not None and local_status != target_status_str:
                            logging.warning(f"    Status mismatch! Target={target_status_str}, Local={local_status}")
                            return False
                        else:
                            logging.info(f"    Status match! Target={target_status_str}, Local={local_status}")

                return is_match

        elif "os_version" in cond:
            field_name = "os_version"
            actual_val = SystemEnv.get_os_version()
            target_val = cond["os_version"]

        elif "patch_version" in cond:
            # 处理兼容性问题：服务端使用"patch_version"，本地有6种补丁版本类型
            field_name = "patch_version"
            target_val = cond["patch_version"]
            
            # 先通过版本标识判断补丁类型
            patch_type = None
            if ".F" in target_val:
                patch_type = "update"
            elif ".Q" in target_val:
                patch_type = "quality"
            elif ".S" in target_val:
                patch_type = "security"
            elif ".U" in target_val:
                patch_type = "urgency"
            elif ".E" in target_val:
                patch_type = "equipment"
            elif ".C" in target_val:
                patch_type = "custom"
            
            # 再从获取到的本地补丁类型进行匹配
            if patch_type:
                actual_val = SystemEnv.get_update_version(patch_type)
            else:
                # 如果无法识别补丁类型，获取所有补丁版本
                actual_val = SystemEnv.get_update_version()
            
        else:
            logging.warning(f"Encountered unknown condition keys: {cond.keys()}")
            return False

        logging.info(f"    Checking {field_name}: Actual={actual_val} | Target={target_val} | Op={operator}")
        
        # NOTE: 暂时仅支持等于判断
        if operator == "eq":
            # 如果actual_val是列表（获取所有补丁版本），则检查target_val是否在列表中
            if isinstance(actual_val, list):
                return target_val in actual_val
            # 如果actual_val是字符串（获取特定类型的补丁版本），则直接比较
            else:
                return str(actual_val) == str(target_val)

        return False

    def evaluate_all_conditions(self, conditions_list: List[Dict]) -> bool:
        """
        Iterate through the condition list.
        All conditions must be met (AND logic).
        """
        logging.info("Checking rollback condition validation ...")
        if not conditions_list:
            logging.info("Condition list is empty. Defaulting to FAIL.")
            return False

        # Check if there's only one condition and it's os_version
        if len(conditions_list) == 1 and "os_version" in conditions_list[0]:
            logging.info("Only one os_version condition exists. Defaulting to FAIL.")
            return False

        for idx, cond in enumerate(conditions_list):
            if not self._check_single_condition(cond):
                logging.warning(f"Condition at index {idx} failed. Aborting rollback.")
                return False
        
        logging.info("All conditions passed.")
        return True

    # --- Internal Tool: Schedule Calculation ---

    def _get_seconds_to_next_schedule(self, time_str_list: List[str]) -> float:
        """
        Calculate seconds remaining until the next nearest schedule time.
        Input: ["06:00", "18:00", "20:00"]
        """
        now = datetime.now()
        candidates = []

        for t_str in time_str_list:
            try:
                # Parse HH:MM
                h, m = map(int, t_str.split(':'))
                # Construct a datetime object for today at that time
                target_time = now.replace(hour=h, minute=m, second=0, microsecond=0)
                
                if target_time <= now:
                    # If time has passed today, schedule for tomorrow
                    target_time += timedelta(days=1)
                
                candidates.append(target_time)
            except ValueError:
                logging.error(f"Invalid time format in config: {t_str}")
                continue

        if not candidates:
            return 0

        # Find the earliest future time
        next_run = min(candidates)
        wait_seconds = (next_run - now).total_seconds()

        logging.info(f"Next Schedule: {next_run.strftime('%Y-%m-%d %H:%M:%S')} (Wait {wait_seconds:.1f}s)")
        
        return wait_seconds

    def check_batch_rollback_policy(self):
        """
        Check if batch rollback policy is enabled at service startup.
        """
        try:
            # 使用合并后的配置（包含高优先级配置）
            raw_config = self._get_merged_config()
            batchback = raw_config.get("batchback", {})

            # 如果策略为空，不上报
            if not batchback:
                logging.info("Batch rollback policy is empty, skipping...")
                return False

            # 上报获取到批量回退策略
            self.parent.ReportBatchRollback(BatchRollbackStatus.TASK_ACQUIRED)

            if not batchback.get("enable", True):
                logging.info("BatchRollbackManager policy is disable, skipp ...")
                # 上报策略禁用
                self.parent.ReportBatchRollback(BatchRollbackStatus.ROLLBACK_FAILED, error_string=_("Conditions not met"))
                return False

            # 匹配所有判断条件
            conditions = batchback.get("conditions", [])
            if not self.parent.rollback_manager.evaluate_all_conditions(conditions):
                # 上报条件不满足
                self.parent.ReportBatchRollback(BatchRollbackStatus.CONDITIONS_NOT_MET, error_string=_("Conditions not met"))
                return False
            else:
                # 上报条件满足
                self.parent.ReportBatchRollback(BatchRollbackStatus.CONDITIONS_MET)

            # 启动一个30s后执行的定时器，30s后执行rollback
            run_time = datetime.now() + timedelta(seconds=30)
            task_id = f"rollback_once_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
            self.parent.background_scheduler.add_job(
                func=self.execute_rollback,
                trigger='date',
                run_date=run_time,
                args=[
                    self.parent.system_bus,
                    self.parent.batch_rollback_result_handler,
                    self.parent.batch_rollback_rate_handler,
                    self.parent.batch_rollback_finish_event,
                    self.parent.background_scheduler
                ],
                id=task_id,
                replace_existing=True
            )
            logging.info("Batch rollback policy will be executed after 30 seconds.")
            return True
        except Exception as e:
            logging.error(f"Error checking batch rollback policy: {e}")
            return False

    def batch_rollback_modified(self, modified_conf = {}):
        logging.info(f"batch_rollback_modified: modified_conf {modified_conf}")
        
        # 检查是否满足执行条件
        try:
            # 使用合并后的配置（包含高优先级配置）
            raw_config = self._get_merged_config()
            batchback = raw_config.get("batchback", {})
            
            # 如果策略为空，不上报任何状态
            if not batchback:
                logging.info("Batch rollback policy is empty, skipping...")
                return

            # 上报获取到批量回退策略
            self.parent.ReportBatchRollback(BatchRollbackStatus.TASK_ACQUIRED)

            # 检查batchback是否启用
            if not batchback.get("enable", True):
                logging.info("Batch rollback policy is disabled, skipping execution.")
                # 上报策略禁用
                self.parent.ReportBatchRollback(BatchRollbackStatus.ROLLBACK_FAILED, error_string=_("Conditions not met"))
                return
            
            # 匹配所有判断条件
            conditions = batchback.get("conditions", [])
            if not self.parent.rollback_manager.evaluate_all_conditions(conditions):
                # 上报条件不满足
                self.parent.ReportBatchRollback(BatchRollbackStatus.CONDITIONS_NOT_MET, error_string=_("Conditions not met"))
                return
            else:
                # 上报条件满足
                self.parent.ReportBatchRollback(BatchRollbackStatus.CONDITIONS_MET)
            
            # 条件满足，执行回退
            logging.info("Batch rollback conditions met, executing rollback...")
            self.execute_rollback(
                self.parent.system_bus,
                self.parent.batch_rollback_result_handler,
                self.parent.batch_rollback_rate_handler,
                self.parent.batch_rollback_finish_event,
                scheduler=self.parent.background_scheduler
            )
                
        except Exception as e:
            logging.error(f"Error in batch_rollback_modified: {e}")
    
    def execute_rollback(self, system_bus, result_handler, rate_handler, finish_event, scheduler=None):
        """
        执行批量回退操作
        :param system_bus: DBus系统总线
        :param result_handler: 回退结果处理器
        :param rate_handler: 回退进度处理器
        :param finish_event: 完成事件
        :param scheduler: BackgroundScheduler实例，用于定时任务调度
        """
        # 使用合并后的配置（包含高优先级配置）
        raw_config = self._get_merged_config()
        batchback = raw_config.get("batchback", {})
        raw_config["updatemode"] = "autoupdate"

        mode = batchback.get("mode", "shutdown")
        restartMode = batchback.get("restartMode", "notify")

        logging.info(f"Starting Rollback Strategy, Mode: {mode}, restartMode: {restartMode}")

        # ----------------------------------------
        # Strategy A: Shutdown Rollback (shutdown)
        # ----------------------------------------
        if mode == "shutdown":
            # 上报执行回退动作 - shutdown模式
            if self.parent and hasattr(self.parent, 'ReportBatchRollback'):
                self.parent.ReportBatchRollback(BatchRollbackStatus.ROLLBACK_STARTED)
            BackupServerInterface.set_rollback_flag()
            logging.info("Rollback flag set. Rollback will trigger on next system shutdown/reboot.")

        # ----------------------------------------
        # Strategy B: Immediate Rollback (now)
        # ----------------------------------------
        elif mode == "now":
            # 上报执行回退动作 - now模式
            if self.parent and hasattr(self.parent, 'ReportBatchRollback'):
                self.parent.ReportBatchRollback(BatchRollbackStatus.ROLLBACK_STARTED)
            BackupServerInterface.execute_batch_rollback(
                system_bus,
                result_handler,
                rate_handler,
                finish_event
            )

            if restartMode == "no":
                # 上报回退完成
                if self.parent and hasattr(self.parent, 'ReportBatchRollback'):
                    self.parent.ReportBatchRollback(BatchRollbackStatus.ROLLBACK_SUCCESS)
                SystemEnv.system_reboot()
            
            elif restartMode == "notify":
                try:
                    with BackgroundNotification(timeout=None) as client:
                        ret = client.send_command("UPGRADE_DIALOG_BATCH_ROLLBACK_REBOOT_REQUIRED", {"mode": "notify"})
                        logging.info(f"BackgroundNotification send_command result: {ret}")
                        self.handle_popup_result(ret)
                except Exception as e:
                    logging.error(f"BackgroundNotification failed: {e}")
                
                # 上报回退完成（在 with 块外面，确保即使 BackgroundNotification 失败也能上报）
                if self.parent and hasattr(self.parent, 'ReportBatchRollback'):
                    self.parent.ReportBatchRollback(BatchRollbackStatus.ROLLBACK_SUCCESS)

            elif restartMode == "reboot":
                try:
                    with BackgroundNotification(timeout=None) as client:
                        ret = client.send_command("UPGRADE_DIALOG_BATCH_ROLLBACK_REBOOT_REQUIRED", {"mode": "reboot"})
                        logging.info(f"BackgroundNotification send_command result: {ret}, system will reboot ...")
                except Exception as e:
                    logging.error(f"BackgroundNotification failed: {e}")
                
                # 从batchback配置中读取timeout时间，阻塞对应分钟后再执行重启操作
                try:
                    timeout_minutes = batchback.get("timeout", 0)
                    if timeout_minutes > 0:
                        logging.info(f"Waiting for {timeout_minutes} minutes before reboot...")
                        time.sleep(timeout_minutes * 60)
                except Exception as e:
                    logging.error(f"Error while waiting for timeout: {e}")
                
                # 上报回退完成（在 with 块外面，确保即使 BackgroundNotification 失败也能上报）
                if self.parent and hasattr(self.parent, 'ReportBatchRollback'):
                    self.parent.ReportBatchRollback(BatchRollbackStatus.ROLLBACK_SUCCESS)
                    SystemEnv.system_reboot()

        # ----------------------------------------
        # Strategy C: Timing Rollback (timing) - 使用scheduler非阻塞调度
        # ----------------------------------------
        elif mode == "timing":
            time_list = batchback.get("time", [])
            if not time_list:
                logging.error("Timing mode selected but 'time' list is empty.")
                return

            if not scheduler:
                logging.error("Scheduler is required for timing mode but not provided.")
                return

            # 计算定时回退时间
            wait_sec = self._get_seconds_to_next_schedule(time_list)

            if wait_sec > 0:
                logging.info(f"Scheduling rollback to execute after {wait_sec:.1f} seconds using BackgroundScheduler...")
                # 使用scheduler调度非阻塞定时任务
                run_time = datetime.now() + timedelta(seconds=wait_sec)
                task_id = f"rollback_timing_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
                
                scheduler.add_job(
                    func=self._execute_timing_rollback,
                    trigger='date',
                    run_date=run_time,
                    args=[system_bus, result_handler, rate_handler, finish_event, restartMode, batchback],
                    id=task_id,
                    replace_existing=True
                )
                # logging.info(f"Rollback task scheduled with ID: {task_id}, will run at: {run_time}")
                with open("/var/log/kylin-unattended-upgrades/unattended-upgrades-shutdown.log",'a+') as f:
                    self.parent.background_scheduler.print_jobs(out=f)
            else:
                logging.warning("Invalid wait time calculated for timing mode, skipping scheduling.")

        else:
            logging.error(f"Unknown rollback mode: {mode}")

    def _execute_timing_rollback(self, system_bus, result_handler, rate_handler, finish_event, restartMode, batchback):
        """
        执行timing模式的回退逻辑（由scheduler调度调用）
        """
        logging.info("Timed rollback executed by scheduler.")
        
        # 上报执行回退动作 - timing模式
        if self.parent and hasattr(self.parent, 'ReportBatchRollback'):
            self.parent.ReportBatchRollback(BatchRollbackStatus.ROLLBACK_STARTED)
        
        BackupServerInterface.execute_batch_rollback(
            system_bus,
            result_handler,
            rate_handler,
            finish_event
        )

        if restartMode == "no":
            # 上报回退完成
            if self.parent and hasattr(self.parent, 'ReportBatchRollback'):
                self.parent.ReportBatchRollback(BatchRollbackStatus.ROLLBACK_SUCCESS)
            SystemEnv.system_reboot()
        
        elif restartMode == "notify":
            try:
                with BackgroundNotification(timeout=None) as client:
                    ret = client.send_command("UPGRADE_DIALOG_BATCH_ROLLBACK_REBOOT_REQUIRED", {"mode": "notify"})
                    logging.info(f"BackgroundNotification send_command result: {ret}")
                    self.handle_popup_result(ret)
            except Exception as e:
                logging.error(f"BackgroundNotification failed: {e}")
            
            # 上报回退完成（在 with 块外面，确保即使 BackgroundNotification 失败也能上报）
            if self.parent and hasattr(self.parent, 'ReportBatchRollback'):
                self.parent.ReportBatchRollback(BatchRollbackStatus.ROLLBACK_SUCCESS)

        elif restartMode == "reboot":
            try:
                with BackgroundNotification(timeout=None) as client:
                    ret = client.send_command("UPGRADE_DIALOG_BATCH_ROLLBACK_REBOOT_REQUIRED", {"mode": "reboot"})
                    logging.info(f"BackgroundNotification send_command result: {ret}, system will reboot ...")
            except Exception as e:
                logging.error(f"BackgroundNotification failed: {e}")
            
            # 从batchback配置中读取timeout时间，阻塞对应分钟后再执行重启操作
            try:
                timeout_minutes = batchback.get("timeout", 0)
                if timeout_minutes > 0:
                    logging.info(f"Waiting for {timeout_minutes} minutes before reboot...")
                    time.sleep(timeout_minutes * 60)
            except Exception as e:
                logging.error(f"Error while waiting for timeout: {e}")
            
            # 上报回退完成（在 with 块外面，确保即使 BackgroundNotification 失败也能上报）
            if self.parent and hasattr(self.parent, 'ReportBatchRollback'):
                self.parent.ReportBatchRollback(BatchRollbackStatus.ROLLBACK_SUCCESS)
            SystemEnv.system_reboot()

    def handle_popup_result(self, result: Dict[str, str]):
        """
        Centralized logic to handle user choice from the popup.
        Input Example: {'choice': 'delay_1_day', 'type': '...'}
        """
        if not result or 'choice' not in result:
            logging.warning("Popup returned empty or invalid result.")
            return

        choice = result['choice']
        logging.info(f"User selected choice: [{choice}]")

        # 1. Define actions for static choices
        actions = {
            'yes': lambda: SystemEnv.system_reboot(),
            'ok':  lambda: SystemEnv.system_reboot(),
            'no':  lambda: logging.info("User canceled reboot."),
            'cancel': lambda: logging.info("User canceled the dialog.")
        }

        # 2. Execute if it matches a static action
        if choice in actions:
            # 上报回退完成
            if self.parent and hasattr(self.parent, 'ReportBatchRollback'):
                self.parent.ReportBatchRollback(BatchRollbackStatus.ROLLBACK_SUCCESS)
            actions[choice]()
            return

        # 3. Handle Dynamic "Delay" choices
        if choice.startswith("delay_"):
            seconds = self._parse_delay_string(choice)
            if seconds > 0:
                target_time = datetime.now() + timedelta(seconds=seconds)
                logging.info(f"Reboot delayed by user. Target: {target_time} ({seconds}s)")
                
                # Logic Decision:
                # Since this is a blocking module, sleeping for 1 day is risky.
                # Usually, you would set a flag/file for a scheduler.
                # For this specific blocking requirement, we can wait or exit.
                
                # Example: If delay is short (< 1 hour), wait. If long, maybe just log or exit.
                if seconds <= 3600:
                    time.sleep(seconds)
                    # 上报回退完成
                    if self.parent and hasattr(self.parent, 'ReportBatchRollback'):
                        self.parent.ReportBatchRollback(BatchRollbackStatus.ROLLBACK_SUCCESS)
                    SystemEnv.system_reboot()
                else:
                    logging.warning("Delay is too long for blocking wait. Scheduler update required (Mock).")
            else:
                logging.error("Invalid delay format selected.")
            return

        logging.warning(f"Unhandled choice: {choice}")

    def _parse_delay_string(self, delay_str: str) -> int:
        """
        Parses strings like 'delay_1_day', 'delay_30_minutes' into seconds.
        Returns 0 if parsing fails.
        
        调试模式映射（当 DEBUG_DELAY_ENABLED 为 True 时）：
        - delay_30_minutes -> 5分钟 (300秒)
        - delay_1_day -> 10分钟 (600秒)
        - delay_7_day -> 30分钟 (1800秒)
        """
        # 调试模式开关：从环境变量或配置文件读取（默认开启）
        debug_delay_enabled = True
        
        if debug_delay_enabled:
            # 调试模式：使用固定的短时间
            debug_delay_map = {
                "delay_30_minutes": 300,   # 5分钟
                "delay_1_day": 600,        # 10分钟
                "delay_7_day": 1800        # 30分钟
            }
            
            if delay_str in debug_delay_map:
                debug_seconds = debug_delay_map[delay_str]
                logging.info(f"DEBUG MODE: Using debug delay {debug_seconds}s for '{delay_str}' instead of actual delay")
                return debug_seconds
            else:
                logging.info(f"DEBUG MODE: No debug mapping for '{delay_str}', using normal parsing")
        
        # 正常模式：解析实际的延迟时间
        try:
            # Format: delay_{amount}_{unit}
            parts = delay_str.split('_')
            if len(parts) != 3:
                return 0
            
            amount = int(parts[1])
            unit = parts[2]
            
            if "min" in unit:
                return amount * 60
            elif "hour" in unit:
                return amount * 3600
            elif "day" in unit:
                return amount * 86400
            return 0
        except Exception as e:
            logging.error(f"Failed to parse delay string '{delay_str}': {e}")
            return 0

if __name__ == "__main__":
    gettext.bindtextdomain("unattended-upgrades","/usr/share/locale")
    gettext.textdomain("unattended-upgrades")
    logdir = "/var/log/kylin-unattended-upgrades/"
    if not os.path.exists(logdir):
        os.makedirs(logdir)
    logfile = os.path.join(logdir, "unattended-upgrades-shutdown.log")
    logging.basicConfig(format='%(asctime)s-%(name)s-%(levelname)s-%(message)s',datefmt='%Y-%m-%d,%H:%M:%S',level=logging.DEBUG,filename=logfile)
    signal.signal(signal.SIGTERM, signal_term_handler)
    unattendedupgrade=UnattendedUpgradeBackend()
    unattendedupgrade.init_project_config()
    unattendedupgrade.init_dbus_connections()
    unattendedupgrade.init_config()
    unattendedupgrade.init_event_flags()
    unattendedupgrade.check_and_report_rollback_status()
    rollback_flag = unattendedupgrade.rollback_manager.check_batch_rollback_policy()
    if not rollback_flag:
        unattendedupgrade.init_scheduler()
    unattendedupgrade.run()
    
