未添加删除快照逻辑

This commit is contained in:
panjunlan
2026-02-20 16:55:53 +08:00
parent 199cbab4aa
commit 7a0a51be4e
10 changed files with 305 additions and 581 deletions

2
.env
View File

@@ -10,5 +10,5 @@ MAX_DELETE_CONCURRENT=4
# EXCEL_OUTPUT_PATH=/tmp/vm_snapshots_report.xlsx
EXCEL_OUTPUT_PATH=./vm_snapshots_report.xlsx
# 日志文件路径
# LOG_FILE_PATH=/var/log/vm_snapshot_cleanup.log
# LOG_FILE_PATH=/var/logs/vm_snapshot_cleanup.logs
LOG_FILE_PATH=./log/vm_snapshot_cleanup.log

View File

@@ -1,19 +1,24 @@
# RemoveWeeklySnapshot
> 以下需求需要每周执行一次
Vmware 虚拟机自动化程序:自动化导出虚拟机和快照信息,自动化删除旧快照。
## Todo List
- [x] 连接vCenter/Esxi/Hyper-V
- [x] 获取所有 vms
- [x] 获取所有 snapshots
- [x] 筛选出15天半个月前的 snapshots
- [x] 以上内容以 Excel 表格的形式导出,超出 15 天的快照填充蓝色底标识
- [ ] Outlook 邮箱发送超出 15 天的快照
- [ ] 增加排除不能删除的快照
- [ ] 需要控制每台 vCenter 不可以同时删除超过 4 个快照
- [x] 以上内容以 Excel 表格的形式导出,超出 15 天的快照蓝色底填充标识
- [ ] 增加排除不能删除的快照信息,用红色底填充标识
- [ ] Outlook 邮箱发送超出 15 天的快照信息
- [ ] 需要控制每台 vCenter 不可以同时删除超过 4 个快照(需要同时获取删除成功的信息)
- [ ] 最后删除 15 天前的 snapshot并同时记录删除的 snapshot 日志信息
- [ ] 设置计划任务,每 2 周(半个月)执行一次
## 输出所有可用的属性和方法
| 你想获取 | 代码 | 示例输出 |
| ------------ | ----------------------------------- | ---------------------------------------------- |
| **名称** | `vm.name` | `"WebServer-01"` |
@@ -30,17 +35,6 @@
## 输出所有可用的属性和方法
```
vm.config.createDate # 虚拟机的创建时间
vm.runtime.bootTime # 虚拟机上次启动的时间
```
>以下这些方法和属性主要用于操作虚拟机VM、快照、存储和其他资源。
>
>vm
@@ -380,3 +374,25 @@ if __name__ == '__main__':
print("无快照")
```
## PY 文件作用描述
``` powershell
PS D:\PycharmProjects\RemoveWeeklyShapshot> tree /F
```
## 所用到的 Python 库
``` shell
pip install pyVmomi ...
```

View File

@@ -1,34 +1,31 @@
# 管理节点配置包含vCenter和ESXi
management_nodes:
# vCenter节点
# - type: vcenter # 标记类型为vcenter
# name: vc1 # 节点名称(用于日志)
# host: 192.168.40.134 # 地址
# user: administrator@lan.com
# password: Root@2025
# max_delete_concurrent: 4 # 该节点最大并发删除数
- type: vcenter # 标记类型为vcenter
name: vc1 # 节点名称(用于日志)
host: 192.168.40.134 # 地址
user: administrator@lan.com
password: Root@2025
max_delete_concurrent: 4 # 该节点最大并发删除数
# ESXi节点未接入 vCenter 的 Esxi 主机)
- type: esxi # 标记类型为esxi
name: esxi1
host: 192.168.40.133
user: root # ESXi默认用root
password: Root@2025
max_delete_concurrent: 2 # ESXi性能较弱并发数可设小些
# - type: esxi # 标记类型为esxi
# name: esxi1
# host: 192.168.40.133
# user: root # ESXi默认用root
# password: Root@2025
# max_delete_concurrent: 2 # ESXi性能较弱并发数可设小些
- type: esxi
name: esxi2
host: 192.168.40.135
user: root
password: Root@2025
max_delete_concurrent: 2
# - type: esxi
# name: esxi2
# host: 192.168.40.135
# user: root
# password: Root@2025
# max_delete_concurrent: 2
# 全局策略配置
global:
snapshot_retention_days: 15
excel_output_path: ./vm_snapshots_report.xlsx
# excel_output_path: /tmp/vm_snapshots_report.xlsx
# log_file_path: /var/log/vm_snapshot_cleanup.log
log_file_path: ./vm_snapshot_cleanup.log
snapshot_retention_days: 15 # 可选,使用默认值 15 天
# excel_output_path: ./vm_snapshots_report.xlsx # 可选,使用默认值,如:/logs/2026-02-20_14-00-21-VMsSnapShots_report.xlsx
# ESXi连接特殊配置禁用SSL验证ESXi默认自签证书
disable_ssl_verify: true

View File

@@ -1,14 +1,10 @@
import yaml
import os
from datetime import datetime, timedelta
# from utils.logger import logger
# 配置文件路径
CONFIG_PATH = os.path.join(os.path.dirname(__file__), 'config.yaml')
from datetime import datetime
from utils.logger import logger
def load_config():
"""加载YAML配置区分vCenter和ESXi"""
"""加载 YAML 配置文件并解析其内容"""
try:
with open(CONFIG_PATH, 'r', encoding='utf-8') as f:
raw_config = yaml.safe_load(f)
@@ -18,18 +14,17 @@ def load_config():
config = {
# vCenter/ESXi节点列表
"MANAGEMENT_NODES": raw_config.get('management_nodes', []),
# vCenter/ESXi节点列表
"SNAPSHOT_RETENTION_DAYS": int(global_config.get('snapshot_retention_days', 15)),
"EXCEL_OUTPUT_PATH": global_config.get('excel_output_path', '/tmp/vm_snapshots_report.xlsx'),
"LOG_FILE_PATH": global_config.get('log_file_path', '/var/log/vm_snapshot_cleanup.log'),
# "EXCEL_OUTPUT_PATH": global_config.get('excel_output_path', f'/logs/{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}-VMsSnapShots_report.xlsx'),
# "LOG_FILE_PATH": global_config.get('log_file_path', f'/logs/{datetime.now().strftime('%Y%m%d_%H%M%S')}-VMsSnapShots_cleanup.logs'), "EXCEL_OUTPUT_PATH": global_config.get('excel_output_path', f'/logs/{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}-VMsSnapShots_report.xlsx'),
"EXCEL_OUTPUT_PATH": global_config.get('excel_output_path', f'D:\\PycharmProjects\\RemoveWeeklyShapshot\\logs\\{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}-VMsSnapShots_report.xlsx'),
"DISABLE_SSL_VERIFY": global_config.get('disable_ssl_verify', True),
# 算出的过期时间点
"EXPIRE_DATE": datetime.now() - timedelta(days=int(global_config.get('snapshot_retention_days', 15)))
}
# 验证配置
if not config["MANAGEMENT_NODES"]:
raise ValueError("未配置任何管理节点vCenter/ESXi")
raise ValueError("未配置任何管理节点vCenterESXi,只少要有一台 management_nodes 节点。")
# 检查每个节点的必填字段
required_fields = ['type', 'name', 'host', 'user', 'password', 'max_delete_concurrent']
@@ -39,25 +34,23 @@ def load_config():
raise ValueError(f"节点 {node.get('name', '未知')} 缺少配置字段: {missing}")
# 验证类型合法性
if node['type'] not in ['vcenter', 'esxi']:
raise ValueError(f"节点 {node['name']} 类型错误仅支持vcenter/esxi")
raise ValueError(f"节点 {node['name']} 类型错误(仅支持 vcenteresxi")
#logger.info(f"✅ 成功加载配置,共 {len(config['MANAGEMENT_NODES'])} 个管理节点")
logger.info(f"✅ 成功加载配置,共 {len(config['MANAGEMENT_NODES'])} 个管理节点")
return config
except Exception as e:
#logger.error(f"❌ 加载配置失败: {str(e)}")
logger.error(f"❌ 加载配置失败: {str(e)}")
raise
# 配置文件路径
CONFIG_PATH = os.path.join(os.path.dirname(__file__), 'config.yaml')
# 加载配置并导出全局变量
config = load_config() # 模块导入时立即执行
MANAGEMENT_NODES = config["MANAGEMENT_NODES"]
SNAPSHOT_RETENTION_DAYS = config["SNAPSHOT_RETENTION_DAYS"]
EXCEL_OUTPUT_PATH = config["EXCEL_OUTPUT_PATH"]
LOG_FILE_PATH = config["LOG_FILE_PATH"]
#LOG_FILE_PATH = config["LOG_FILE_PATH"]
DISABLE_SSL_VERIFY = config["DISABLE_SSL_VERIFY"]
EXPIRE_DATE = config["EXPIRE_DATE"]
# 验证配置函数
def validate_config():
@@ -68,9 +61,8 @@ if __name__ == "__main__":
# 打印全局配置
print("\n【全局配置】")
print(f" 快照保留天数: {config['SNAPSHOT_RETENTION_DAYS']}")
print(f" 过期日期: {config['EXPIRE_DATE']}")
print(f" Excel输出路径: {config['EXCEL_OUTPUT_PATH']}")
print(f" 日志文件路径: {config['LOG_FILE_PATH']}")
print(f" Excel输出路径: {EXCEL_OUTPUT_PATH}")
#print(f" 日志文件路径: {LOG_FILE_PATH}")
print(f" 禁用SSL验证: {config['DISABLE_SSL_VERIFY']}")
# 打印管理节点

16
core/deleteSnapshots.py Normal file
View File

@@ -0,0 +1,16 @@
from getVmsSnapshots import collect_snapshot_data
def delete_old_snapshots(dele_snapshots):
"""删除旧快照"""
for snapshot in dele_snapshots:
if snapshot['is_old']:
# 在这里执行删除操作,例如调用 API 或者其他逻辑
print(f"Deleting old snapshot: {snapshot['Snapshot Name']} (ID: {snapshot['ID']})")
# 示例:假设存在一个 delete_snapshot 函数
# delete_snapshot(snapshot['ID'])
# 删除旧快照
print(collect_snapshot_data)
# delete_old_snapshots(dele_snapshots)

215
core/getVmsSnapshots.py Normal file
View File

@@ -0,0 +1,215 @@
import pandas as pd
from pyVmomi import vim
from datetime import datetime, timedelta
from pyVim.connect import SmartConnect, Disconnect
from openpyxl.styles import Border, Side, Font, PatternFill
from utils.logger import logger
from config.settings import MANAGEMENT_NODES, SNAPSHOT_RETENTION_DAYS, EXCEL_OUTPUT_PATH
def get_all_vms():
""" 主函数,负责流程控制"""
vm_list = []
for node in MANAGEMENT_NODES:
try:
si = SmartConnect(
host=node['host'],
user=node['user'],
pwd=node['password'],
disableSslCertValidation=True
)
content = si.RetrieveContent()
vm_view = content.viewManager.CreateContainerView(
content.rootFolder, [vim.VirtualMachine], True
)
for vm in vm_view.view:
vm_info = build_vm_info(vm, node['host'])
vm_list.append(vm_info)
except vim.fault.InvalidLogin as e:
print(f"登录 {node['host']} 失败,请检查用户名和密码:{e.msg}")
logger.info(f"登录 {node['host']} 失败,请检查用户名和密码:{e.msg}")
except Exception as e:
print(f"无法连接到 {node['host']}{e}")
logger.error(f"无法连接到 {node['host']}{e}")
finally:
if 'si' in locals(): # 确保 si 是定义过的
Disconnect(si) # 确保连接被断开
print(f"获取到 {len(vm_list)} 台VM")
return vm_list
def build_vm_info(vm, node_host):
"""构造虚拟机信息字典"""
vm_info = {
'NodeHost': node_host,
'name': vm.name,
'moId': vm._moId,
'powerState': vm.runtime.powerState,
'system': vm.config.guestFullName,
'ipAddress': vm.guest.ipAddress,
'hostName': vm.guest.hostName,
'diskSpaceGB': get_virtual_disk_size(vm),
'createDate': (vm.config.createDate + timedelta(hours=8)).strftime('%Y-%m-%d %H:%M:%S') if vm.runtime.bootTime else None, # 虚拟机的创建时间
'bootTime': (vm.runtime.bootTime + timedelta(hours=8)).strftime('%Y-%m-%d %H:%M:%S') if vm.runtime.bootTime else None, # 虚拟机上次启动的时间 'createDate': (vm.config.createDate + timedelta(hours=8)).strftime('%Y-%m-%d %H:%M:%S'), # 虚拟机的创建时间
# 'createDate': vm.config.createDate, # 虚拟机的创建时间
# 'bootTime': vm.runtime.bootTime, # 虚拟机上次启动的时间
'snapshots': [],
'Host': vm.runtime.host.name,
'vmPath': vm.config.files.vmPathName
}
# 判断虚拟机是否有快照如果有就构造快照结构并记录当前快照ID如果没有就设为 None。
if vm.snapshot:
current_snapshot = vm.snapshot.currentSnapshot
root_snapshots = vm.snapshot.rootSnapshotList
for snapshot in root_snapshots:
vm_info['snapshots'].extend(build_snapshot_dict(snapshot))
vm_info['currentSnapshotId'] = (
current_snapshot._moId if current_snapshot else None
)
else:
vm_info['snapshots'] = None
vm_info['currentSnapshotId'] = None
return vm_info
def build_snapshot_dict(snapshot):
"""递归构造快照结构"""
snapshot_info = {
'name': snapshot.name,
'description': snapshot.description,
'createTime': (snapshot.createTime + timedelta(hours=8)).strftime('%Y-%m-%d %H:%M:%S'),
'state': str(snapshot.state),
'id': snapshot.id,
'moId': snapshot.snapshot._moId,
'quiesced': getattr(snapshot, 'quiesced', None),
'children': []
}
if snapshot.childSnapshotList:
for child in snapshot.childSnapshotList:
snapshot_info['children'].extend(build_snapshot_dict(child))
return [snapshot_info]
"""设置表格样式"""
def style_sheet(sheet, is_old_data=None):
"""设置工作表样式:列宽、边框、加粗标题、冻结首行和设置背景颜色"""
# 定义边框样式
thin = Side(border_style="thin", color="000000")
border = Border(left=thin, right=thin, top=thin, bottom=thin)
# 设置列宽和边框
for column in sheet.columns:
max_length = 0
column_letter = column[0].column_letter
for cell in column:
try:
if len(str(cell.value)) > max_length:
max_length = len(str(cell.value))
cell.border = border # 为每个单元格添加边框
except Exception as e:
pass
adjusted_width = (max_length + 2)
sheet.column_dimensions[column_letter].width = adjusted_width
# 加粗标题并冻结首行
for cell in sheet[1]: # 加粗标题
cell.font = Font(bold=True)
sheet.freeze_panes = 'A2' # 冻结首行
# 设置背景颜色
if is_old_data is not None:
for row in range(2, len(is_old_data) + 2):
if is_old_data[row - 2]:
for col in range(1, sheet.max_column + 1): # 使用 sheet.max_column
cell = sheet.cell(row=row, column=col)
cell.fill = PatternFill(start_color='ADD8E6', end_color='ADD8E6', fill_type='solid')
"""获取虚拟机的总磁盘大小(仅是虚拟磁盘的分配空间)"""
def get_virtual_disk_size(vm):
total_size = 0
for device in vm.config.hardware.device:
if isinstance(device, vim.vm.device.VirtualDisk):
# 获取每个虚拟磁盘的容量
total_size += device.capacityInKB / (1024 * 1024) # 转换为GB
return total_size
def collect_snapshot_data(snapshot, vm, snapshot_data):
"""递归扁平化快照数据用于Excel"""
create_time = datetime.strptime(snapshot['createTime'], '%Y-%m-%d %H:%M:%S')
is_old = create_time < (datetime.now() - timedelta(days=SNAPSHOT_RETENTION_DAYS))
snapshot_data.append({
'NodeHost': vm['NodeHost'],
'VMName': vm['name'],
'Snapshot Name': snapshot['name'],
'Description': snapshot['description'],
'CreateTime': snapshot['createTime'],
'State': snapshot['state'],
'ID': snapshot['id'],
'MOID': snapshot['moId'],
'Quiesced': snapshot['quiesced'],
'is_old': is_old
})
for child in snapshot['children']:
collect_snapshot_data(child, vm, snapshot_data)
# 输出数据到 Excel 文件
def create_excel_report(vms):
vm_data = []
snapshot_data = []
for vm in vms:
vm_data.append({
'NodeHost': vm['NodeHost'],
'VMName': vm['name'],
'MOID': vm['moId'],
'PowerState': vm['powerState'],
'System': vm['system'],
'IPAddress': vm['ipAddress'],
'HostName': vm['hostName'],
'CurrentSnapshotID': vm.get('currentSnapshotId'),
'DiskSpace/GB': vm['diskSpaceGB'],
'createDate': vm['createDate'], # 虚拟机的创建时间
'bootTime': vm['bootTime'], # 虚拟机上次启动的时间
'Host': vm['Host'],
'VMPath': vm['vmPath']
})
if vm['snapshots']:
for snapshot in vm['snapshots']:
collect_snapshot_data(snapshot, vm, snapshot_data)
vm_df = pd.DataFrame(vm_data)
snapshot_df = pd.DataFrame(snapshot_data)
with pd.ExcelWriter(EXCEL_OUTPUT_PATH, engine='openpyxl') as writer:
vm_df.to_excel(writer, sheet_name='VMs', index=False)
snapshot_df.to_excel(writer, sheet_name='Snapshots', index=False)
workbook = writer.book
style_sheet(workbook['VMs'])
style_sheet(workbook['Snapshots'], snapshot_df['is_old'].tolist())
print("报告已生成:", EXCEL_OUTPUT_PATH)
if __name__ == '__main__':
vms = get_all_vms() # 主函数入口,获取虚拟机信息
# print(vms)
create_excel_report(vms) # 生成Excel报告

View File

@@ -1,223 +0,0 @@
from pyVmomi import vim
from pyVim.connect import SmartConnect, Disconnect
from config.settings import MANAGEMENT_NODES
import pandas as pd
from openpyxl.styles import Border, Side, Font, PatternFill
from datetime import datetime, timedelta
"""设置表格样式"""
def style_sheet(sheet, is_old_data=None):
"""设置工作表样式:列宽、边框、加粗标题、冻结首行和设置背景颜色"""
# 定义边框样式
thin = Side(border_style="thin", color="000000")
border = Border(left=thin, right=thin, top=thin, bottom=thin)
# 设置列宽和边框
for column in sheet.columns:
max_length = 0
column_letter = column[0].column_letter
for cell in column:
try:
if len(str(cell.value)) > max_length:
max_length = len(str(cell.value))
cell.border = border # 为每个单元格添加边框
except Exception as e:
pass
adjusted_width = (max_length + 2)
sheet.column_dimensions[column_letter].width = adjusted_width
# 加粗标题并冻结首行
for cell in sheet[1]: # 加粗标题
cell.font = Font(bold=True)
sheet.freeze_panes = 'A2' # 冻结首行
# 设置背景颜色
if is_old_data is not None:
for row in range(2, len(is_old_data) + 2):
if is_old_data[row - 2]:
for col in range(1, sheet.max_column + 1): # 使用 sheet.max_column
cell = sheet.cell(row=row, column=col)
cell.fill = PatternFill(start_color='ADD8E6', end_color='ADD8E6', fill_type='solid')
"""获取虚拟机的总磁盘大小(仅是虚拟磁盘的分配空间)"""
def get_virtual_disk_size(vm):
total_size = 0
for device in vm.config.hardware.device:
if isinstance(device, vim.vm.device.VirtualDisk):
# 获取每个虚拟磁盘的容量
total_size += device.capacityInKB / (1024 * 1024) # 转换为GB
return total_size
"""获取所有虚拟机"""
def get_all_vms():
vm_list = []
for node in MANAGEMENT_NODES:
# 连接vCenter
si = SmartConnect(
host=node['host'],
user=node['user'],
pwd=node['password'],
disableSslCertValidation=True
)
# 获取所有VM
content = si.RetrieveContent()
vm_view = content.viewManager.CreateContainerView(
content.rootFolder, [vim.VirtualMachine], True
)
for vm in vm_view.view:
# print(dir(vm)) # 输出所有可用的属性和方法
# print(vm.runtime)
# print(vm.summary)
# print(vm.snapshot)
# 初始化VM信息字典
# print(vars(vm.summary))
# print(vm.summary)
vm_info = {
'NodeHost': node['host'],
'name': vm.name,
'moId': vm._moId,
'powerState': str(vm.runtime.powerState),
'system': vm.config.guestFullName,
'ipAddress': vm.guest.ipAddress,
'hostName': vm.guest.hostName,
'vmPath': vm.config.files.vmPathName,
'Host': vm.runtime.host.name, # 拿到 Host 主机名,如果是从 Esxi 主机直接获取的可能拿不到正确信息
'snapshots': [], # 添加快照信息
'diskSpaceGB': get_virtual_disk_size(vm) # 添加虚拟机占用的磁盘空间
}
# print(vm_info)
# 获取快照信息
if vm.snapshot is not None:
current_snapshot = vm.snapshot.currentSnapshot
root_snapshots = vm.snapshot.rootSnapshotList
# 处理根快照列表
for snapshot in root_snapshots:
snapshot_info = get_snapshot_info(snapshot)
vm_info['snapshots'].extend(snapshot_info)
# 添加当前快照ID如果有
if current_snapshot:
vm_info['currentSnapshotId'] = current_snapshot._moId
else:
vm_info['snapshots'] = None
vm_info['currentSnapshotId'] = None
vm_list.append(vm_info)
# print(vm_info)
vm_view.Destroy()
Disconnect(si)
print(f"获取到 {len(vm_list)} 台VM")
return vm_list
"""递归获取快照信息"""
def get_snapshot_info(snapshot):
snapshot_list = []
# 当前快照信息
snapshot_info = {
'name': snapshot.name,
'description': snapshot.description,
'createTime': (snapshot.createTime + timedelta(hours=8)).strftime('%Y-%m-%d %H:%M:%S'), # + timedelta(hours=8) 是时间运算,转换为北京时间
'state': str(snapshot.state),
'id': snapshot.id,
'moId': snapshot.snapshot._moId, # 快照的Managed Object ID
# 'sizeMB': snapshot.diskSizeMB if hasattr(snapshot, 'diskSizeMB') else None, # 快照大小
'quiesced': snapshot.quiesced if hasattr(snapshot, 'quiesced') else None, # 是否静默快照
'children': [] # 子快照
}
# 递归处理子快照
if snapshot.childSnapshotList:
for child in snapshot.childSnapshotList:
snapshot_info['children'].extend(get_snapshot_info(child))
snapshot_list.append(snapshot_info)
return snapshot_list
"""将虚拟机和快照信息写入Excel文件并标记创建时间在15天前的快照"""
def create_excel_report(vms):
vm_data = []
snapshot_data = []
def add_snapshots(snapshot, vm_name):
create_time = datetime.strptime(snapshot['createTime'], '%Y-%m-%d %H:%M:%S')
is_old = create_time < (datetime.now() - timedelta(days=1))
snapshot_data.append({
'NodeHost': vm['NodeHost'],
'VMName': vm_name,
'Snapshot Name': snapshot['name'],
'Description': snapshot['description'],
'CreateTime': snapshot['createTime'],
'State': snapshot['state'],
'ID': snapshot['id'],
'MOID': snapshot['moId'],
# 'Size(MB)': snapshot['sizeMB'],
'Quiesced': snapshot['quiesced'],
'is_old': is_old
})
"""递归将快照信息加入报告"""
for child in snapshot['children']:
add_snapshots(child, vm_name)
for vm in vms:
# print(vm['NodeHost'])
vm_data.append({
'NodeHost': vm['NodeHost'],
'VMName': vm['name'],
'MOID': vm['moId'],
'PowerState': vm['powerState'],
'System': vm['system'],
'IPAddress': vm['ipAddress'],
'HostName': vm['hostName'],
'CurrentSnapshotID': vm.get('currentSnapshotId', None),
'DiskSpace/GB': vm['diskSpaceGB'],
'Host': vm['Host'],
'VMPath': vm['vmPath']
})
if vm['snapshots']:
# print(vm['snapshots'])
for snapshot in vm['snapshots']:
add_snapshots(snapshot, vm['name'])
vm_df = pd.DataFrame(vm_data)
snapshot_df = pd.DataFrame(snapshot_data)
with pd.ExcelWriter('vm_and_snapshots_report.xlsx', engine='openpyxl') as writer:
vm_df.to_excel(writer, sheet_name='VMs', index=False)
snapshot_df.to_excel(writer, sheet_name='Snapshots', index=False)
# 获取写入的工作表
workbook = writer.book
vm_sheet = workbook['VMs']
snapshot_sheet = workbook['Snapshots']
# 调用样式设置函数,传入 is_old_data
style_sheet(vm_sheet) # 设置 VMs 工作表样式
style_sheet(snapshot_sheet, snapshot_df['is_old'].tolist()) # 设置 Snapshots 工作表样式并传入旧数据标识列表
print("报告已生成: vm_and_snapshots_report.xlsx")
if __name__ == '__main__':
vms = get_all_vms()
# print(vms)
# 生成Excel报告
create_excel_report(vms)

View File

@@ -1,105 +0,0 @@
from pyVmomi import vim
from config.settings import MANAGEMENT_NODES
from core.vcenter_connector import VCenterManager # 使用管理器类
from utils.logger import logger
from typing import List, Dict
def get_all_vms_from_node(si, node_name: str) -> List[Dict]:
"""
从单个vCenter/ESXi获取所有虚拟机
:param si: ServiceInstance
:param node_name: 节点名称(用于标识来源)
:return: VM列表
"""
try:
content = si.RetrieveContent()
vm_view = content.viewManager.CreateContainerView(
content.rootFolder, [vim.VirtualMachine], True
)
vms = vm_view.view
vm_view.Destroy()
vm_list = []
for vm in vms:
# 安全获取快照数量
num_snapshots = 0
if vm.snapshot and vm.snapshot.rootSnapshotList:
num_snapshots = len(vm.snapshot.rootSnapshotList)
vm_info = {
'vcenter_node': node_name, # 来自哪个vCenter/ESXi
'vcenter_host': si._stub.host, # vCenter主机地址
'vm_name': vm.name,
'vm_moid': vm._moId,
'power_state': str(vm.runtime.powerState), # 转字符串
'num_snapshots': num_snapshots,
'guest_os': vm.config.guestFullName if vm.config else 'Unknown'
}
vm_list.append(vm_info)
logger.info(f"📥 [{node_name}] 获取到 {len(vm_list)} 台VM")
return vm_list
except Exception as e:
logger.error(f"❌ [{node_name}] 获取VM列表失败: {str(e)}")
return []
def get_all_vms_from_all_nodes() -> List[Dict]:
"""
从所有配置的vCenter/ESXi节点获取虚拟机
:return: 合并后的VM列表
"""
all_vms = []
# 使用VCenterManager连接所有节点
with VCenterManager() as manager:
connections = manager.connect_all()
if not connections:
logger.error("没有可用的vCenter/ESXi连接")
return []
# 遍历每个连接获取VM
for conn in connections:
vms = get_all_vms_from_node(conn.si, conn.name)
all_vms.extend(vms) # 合并列表
logger.info(f"📊 总计获取 {len(all_vms)} 台VM来自 {len(connections)} 个节点)")
return all_vms
def print_vm_table(vm_list: List[Dict]):
"""打印VM列表表格调试用"""
if not vm_list:
print("没有获取到VM数据")
return
print(f"\n{'=' * 100}")
print(f"{'节点':<15} {'VM名称':<30} {'状态':<12} {'快照数':<8} {'操作系统':<20}")
print(f"{'-' * 100}")
for vm in vm_list:
print(f"{vm['vcenter_node']:<15} {vm['vm_name']:<30} "
f"{vm['power_state']:<12} {vm['num_snapshots']:<8} "
f"{vm['guest_os'][:20]:<20}")
print(f"{'=' * 100}")
print(f"总计: {len(vm_list)} 台VM")
# ==================== 主程序入口 ====================
if __name__ == '__main__':
# 获取所有VM
vm_list = get_all_vms_from_all_nodes()
# 打印结果
print_vm_table(vm_list)
# 示例筛选有快照的VM
vms_with_snapshots = [vm for vm in vm_list if vm['num_snapshots'] > 0]
print(f"\n有快照的VM: {len(vms_with_snapshots)}")
for vm in vms_with_snapshots:
print(f" - {vm['vm_name']}: {vm['num_snapshots']} 个快照")

View File

@@ -1,184 +0,0 @@
from pyVim.connect import SmartConnect, Disconnect
from pyVmomi import vim
from config.settings import MANAGEMENT_NODES
from utils.logger import logger
from dataclasses import dataclass
from typing import List, Optional, Dict
@dataclass
class Connection:
"""包装连接对象包含节点信息和ServiceInstance"""
name: str # 节点名称
host: str # 主机地址
node_type: str # vcenter 或 esxi
si: any # ServiceInstance
max_concurrent: int # 该节点的最大并发数
class VCenterManager:
"""管理多个vCenter/ESXi连接"""
def __init__(self):
# 构造方法,创建对象时自动执行
self.connections: List[Connection] = [] # 属性1成功连接列表
self.failed_nodes: List[Dict] = [] # 属性2失败节点列表
def connect_all(self) -> List[Connection]:
"""
连接所有配置的管理节点
:return: 成功连接的列表
"""
logger.info(f"开始连接 {len(MANAGEMENT_NODES)} 个管理节点...")
for node in MANAGEMENT_NODES:
name = node['name']
host = node['host']
user = node['user']
password = node['password']
node_type = node['type']
max_concurrent = node.get('max_delete_concurrent', 4)
logger.info(f"正在连接节点: {name} ({host}) [{node_type}]")
si = self._connect_single(host, user, password, node_type)
if si:
conn = Connection(
name=name,
host=host,
node_type=node_type,
si=si,
max_concurrent=max_concurrent
)
self.connections.append(conn)
logger.info(f"{name} 连接成功")
else:
self.failed_nodes.append({
'name': name,
'host': host,
'type': node_type
})
logger.error(f"{name} 连接失败")
logger.info(f"连接完成: 成功 {len(self.connections)}/{len(MANAGEMENT_NODES)}")
return self.connections
def _connect_single(self, host: str, user: str, password: str, node_type: str):
"""连接单个节点"""
try:
si = SmartConnect(
host=host,
user=user,
pwd=password,
disableSslCertValidation=True
)
return si
except Exception as e:
logger.error(f"连接 {host} 失败: {str(e)}")
return None
def get_connection(self, name: str = None) -> Optional[Connection]:
"""
获取指定名称的连接,不指定则返回第一个
"""
if not self.connections:
return None
if name:
for conn in self.connections:
if conn.name == name:
return conn
return None
return self.connections[0]
def get_all_connections(self) -> List[Connection]:
"""获取所有成功连接"""
return self.connections
def get_vcenter_connections(self) -> List[Connection]:
"""仅获取 vcenter 类型的连接"""
return [c for c in self.connections if c.node_type == 'vcenter']
def get_esxi_connections(self) -> List[Connection]:
"""仅获取 esxi 类型的连接"""
return [c for c in self.connections if c.node_type == 'esxi']
def disconnect_all(self):
"""关闭所有连接"""
for conn in self.connections:
try:
Disconnect(conn.si)
logger.info(f"已断开 {conn.name}")
except Exception as e:
logger.warning(f"断开 {conn.name} 时出错: {e}")
self.connections = []
def __enter__(self):
"""上下文管理器支持"""
self.connect_all()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""确保退出时断开连接"""
self.disconnect_all()
# ==================== 使用示例 ====================
def main():
# 方式1手动管理连接
manager = VCenterManager()
connections = manager.connect_all()
# 遍历所有连接执行操作
for conn in connections:
print(f"\n处理节点: {conn.name} ({conn.host})")
print(f"最大并发: {conn.max_concurrent}")
# 获取内容视图
content = conn.si.RetrieveContent()
# 获取所有VM
from pyVmomi import vim
obj_view = content.viewManager.CreateContainerView(
content.rootFolder, [vim.VirtualMachine], True
)
vms = obj_view.view
print(f"该节点有 {len(vms)} 台虚拟机")
obj_view.Destroy()
# 断开所有连接
manager.disconnect_all()
def main_with_context():
# 方式2使用上下文管理器推荐自动断开
with VCenterManager() as manager:
# 获取所有vcenter连接
vcenters = manager.get_vcenter_connections()
for conn in vcenters:
print(f"处理vCenter: {conn.name}")
# 执行操作...
# 退出时自动断开所有连接
def main_single_operation():
# 方式3只连接特定节点
manager = VCenterManager()
manager.connect_all()
# 获取指定节点
prod_vc = manager.get_connection('vcenter-prod')
if prod_vc:
print(f"连接到生产环境: {prod_vc.host}")
# 执行操作...
manager.disconnect_all()
if __name__ == '__main__':
main()

View File

@@ -1,8 +1,8 @@
import logging
from config.settings import LOG_FILE_PATH
# from config.settings import LOG_FILE_PATH
from datetime import datetime
def setup_logger():
def get_logger():
"""配置日志系统返回logger实例"""
# 创建logger
logger = logging.getLogger('vm_snapshot_cleanup')
@@ -16,8 +16,8 @@ def setup_logger():
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
# 文件处理器(写入日志文件)
file_handler = logging.FileHandler(LOG_FILE_PATH, encoding='utf-8')
file_handler.setFormatter(formatter)
file_handler = logging.FileHandler(f'D:\\PycharmProjects\\RemoveWeeklyShapshot\\logs\\{datetime.now().strftime('%Y%m%d')}-VMsSnapShots_cleanup.log', encoding='utf-8')
file_handler.setFormatter(formatter) # 应用格式化器
# 控制台处理器(输出到终端)
console_handler = logging.StreamHandler()
@@ -31,4 +31,4 @@ def setup_logger():
# 全局logger实例
logger = setup_logger()
logger = get_logger()