Files
RemoveWeeklyShapshot/core/getVmsSnapshots.py
2026-02-20 21:36:58 +08:00

238 lines
8.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import pandas as pd
import yaml
from pyVmomi import vim
from datetime import datetime, timedelta
from pyVim.connect import SmartConnect, Disconnect
from openpyxl.styles import Border, Side, Font, PatternFill
from utils.logger import logger
from config.settings import MANAGEMENT_NODES, SNAPSHOT_RETENTION_DAYS, EXCEL_OUTPUT_PATH, YAML_OUTPUT_PATH
def get_all_vms():
""" 主函数,负责流程控制"""
vm_list = []
for node in MANAGEMENT_NODES:
try:
si = SmartConnect(
host=node['host'],
user=node['user'],
pwd=node['password'],
disableSslCertValidation=True
)
content = si.RetrieveContent()
vm_view = content.viewManager.CreateContainerView(
content.rootFolder, [vim.VirtualMachine], True
)
for vm in vm_view.view:
vm_info = build_vm_info(vm, node['host'])
vm_list.append(vm_info)
except vim.fault.InvalidLogin as e:
logger.info(f"登录 {node['host']} 失败,请检查用户名和密码:{e.msg}")
except Exception as e:
logger.error(f"无法连接到 {node['host']}{e}")
finally:
if 'si' in locals(): # 确保 si 是定义过的
Disconnect(si) # 确保连接被断开
print(f"获取到 {len(vm_list)} 台VM")
return vm_list
def build_vm_info(vm, node_host):
"""构造虚拟机信息字典"""
vm_info = {
'NodeHost': node_host,
'name': vm.name,
'moId': vm._moId,
'powerState': vm.runtime.powerState,
'system': vm.config.guestFullName,
'ipAddress': vm.guest.ipAddress,
'hostName': vm.guest.hostName,
'diskSpaceGB': get_virtual_disk_size(vm),
'createDate': (vm.config.createDate + timedelta(hours=8)).strftime('%Y-%m-%d %H:%M:%S') if vm.runtime.bootTime else None, # 虚拟机的创建时间
'bootTime': (vm.runtime.bootTime + timedelta(hours=8)).strftime('%Y-%m-%d %H:%M:%S') if vm.runtime.bootTime else None, # 虚拟机上次启动的时间 'createDate': (vm.config.createDate + timedelta(hours=8)).strftime('%Y-%m-%d %H:%M:%S'), # 虚拟机的创建时间
# 'createDate': vm.config.createDate, # 虚拟机的创建时间
# 'bootTime': vm.runtime.bootTime, # 虚拟机上次启动的时间
'snapshots': [],
'Host': vm.runtime.host.name,
'vmPath': vm.config.files.vmPathName
}
# 判断虚拟机是否有快照如果有就构造快照结构并记录当前快照ID如果没有就设为 None。
if vm.snapshot:
current_snapshot = vm.snapshot.currentSnapshot
root_snapshots = vm.snapshot.rootSnapshotList
for snapshot in root_snapshots:
vm_info['snapshots'].extend(build_snapshot_dict(snapshot))
vm_info['currentSnapshotId'] = (
current_snapshot._moId if current_snapshot else None
)
else:
vm_info['snapshots'] = None
vm_info['currentSnapshotId'] = None
return vm_info
def build_snapshot_dict(snapshot):
"""递归构造快照结构"""
snapshot_info = {
'name': snapshot.name,
'description': snapshot.description,
'createTime': (snapshot.createTime + timedelta(hours=8)).strftime('%Y-%m-%d %H:%M:%S'),
'state': str(snapshot.state),
'id': snapshot.id,
'moId': snapshot.snapshot._moId,
'quiesced': getattr(snapshot, 'quiesced', None),
'children': []
}
if snapshot.childSnapshotList:
for child in snapshot.childSnapshotList:
snapshot_info['children'].extend(build_snapshot_dict(child))
return [snapshot_info]
"""设置表格样式"""
def style_sheet(sheet, is_old_data=None):
"""设置工作表样式:列宽、边框、加粗标题、冻结首行和设置背景颜色"""
# 定义边框样式
thin = Side(border_style="thin", color="000000")
border = Border(left=thin, right=thin, top=thin, bottom=thin)
# 设置列宽和边框
for column in sheet.columns:
max_length = 0
column_letter = column[0].column_letter
for cell in column:
try:
if len(str(cell.value)) > max_length:
max_length = len(str(cell.value))
cell.border = border # 为每个单元格添加边框
except Exception as e:
pass
adjusted_width = (max_length + 2)
sheet.column_dimensions[column_letter].width = adjusted_width
# 加粗标题并冻结首行
for cell in sheet[1]: # 加粗标题
cell.font = Font(bold=True)
sheet.freeze_panes = 'A2' # 冻结首行
# 设置背景颜色
if is_old_data is not None:
for row in range(2, len(is_old_data) + 2):
if is_old_data[row - 2]:
for col in range(1, sheet.max_column + 1): # 使用 sheet.max_column
cell = sheet.cell(row=row, column=col)
cell.fill = PatternFill(start_color='ADD8E6', end_color='ADD8E6', fill_type='solid')
"""获取虚拟机的总磁盘大小(仅是虚拟磁盘的分配空间)"""
def get_virtual_disk_size(vm):
total_size = 0
for device in vm.config.hardware.device:
if isinstance(device, vim.vm.device.VirtualDisk):
# 获取每个虚拟磁盘的容量
total_size += device.capacityInKB / (1024 * 1024) # 转换为GB
return total_size
def collect_snapshot_data(snapshot, vm, snapshot_data, old_snapshots):
"""递归扁平化快照数据用于Excel并收集旧快照"""
create_time = datetime.strptime(snapshot['createTime'], '%Y-%m-%d %H:%M:%S')
is_old = create_time < (datetime.now() - timedelta(days=SNAPSHOT_RETENTION_DAYS))
snapshot_info = {
'NodeHost': vm['NodeHost'],
'VMName': vm['name'],
'Snapshot Name': snapshot['name'],
'Description': snapshot['description'],
'CreateTime': snapshot['createTime'],
'State': snapshot['state'],
'ID': snapshot['id'],
'MOID': snapshot['moId'],
'Quiesced': snapshot['quiesced'],
'is_old': is_old
}
# 如果是旧快照,添加到旧快照列表
if is_old:
old_snapshots.append(snapshot_info)
snapshot_data.append(snapshot_info)
for child in snapshot['children']:
collect_snapshot_data(child, vm, snapshot_data, old_snapshots)
# 输出数据到 Excel 文件
def create_excel_report(vms):
vm_data = []
snapshot_data = []
old_snapshots = [] # 用于存储旧快照的信息
for vm in vms:
vm_data.append({
'NodeHost': vm['NodeHost'],
'VMName': vm['name'],
'MOID': vm['moId'],
'PowerState': vm['powerState'],
'System': vm['system'],
'IPAddress': vm['ipAddress'],
'HostName': vm['hostName'],
'CurrentSnapshotID': vm.get('currentSnapshotId'),
'DiskSpace/GB': vm['diskSpaceGB'],
'createDate': vm['createDate'], # 虚拟机的创建时间
'bootTime': vm['bootTime'], # 虚拟机上次启动的时间
'Host': vm['Host'],
'VMPath': vm['vmPath']
})
if vm['snapshots']:
for snapshot in vm['snapshots']:
collect_snapshot_data(snapshot, vm, snapshot_data, old_snapshots)
vm_df = pd.DataFrame(vm_data)
snapshot_df = pd.DataFrame(snapshot_data)
with pd.ExcelWriter(EXCEL_OUTPUT_PATH, engine='openpyxl') as writer:
vm_df.to_excel(writer, sheet_name='VMs', index=False)
snapshot_df.to_excel(writer, sheet_name='Snapshots', index=False)
workbook = writer.book
style_sheet(workbook['VMs'])
# style_sheet(workbook['Snapshots'], snapshot_df['is_old'].tolist())
# 判断 DataFrame 是否有数据,没有数据则跳过,有数据则写入 excel 文件
if 'is_old' in snapshot_df.columns and snapshot_df['is_old'].tolist():
style_sheet(workbook['Snapshots'], snapshot_df['is_old'].tolist())
logger.debug(f"Excel 文件已生成: {EXCEL_OUTPUT_PATH}")
# 返回旧快照的数据
return old_snapshots
# 输出待删除的旧快照到 YAML 文件
def export_yaml(old_snapshots):
print(old_snapshots)
# 将旧快照信息存储到 YAML 文件
with open(YAML_OUTPUT_PATH, 'w', encoding='utf-8') as yaml_file:
# allow_unicode输出 Unicode 字符中文等allow_unicode使用块样式多行缩进sort_keys不按键名排序保留原始插入顺序
yaml.dump(old_snapshots, yaml_file, allow_unicode=True, default_flow_style=False, sort_keys=False)
logger.debug(f"YAML 文件已生成: {YAML_OUTPUT_PATH}")
if __name__ == '__main__':
vms = get_all_vms() # 主函数入口,获取虚拟机信息
# print(vms)
old_snapshots = create_excel_report(vms) # 生成Excel报告并获取旧快照
export_yaml(old_snapshots)
# print(old_snapshots)