Get Vms Snapshots and Export Excel has completed.

This commit is contained in:
panjunlan
2026-02-19 15:35:27 +08:00
parent 55719ed953
commit 3332e3b865
10 changed files with 748 additions and 74 deletions

237
core/get_vms.py Normal file
View File

@@ -0,0 +1,237 @@
from pyVmomi import vim
from pyVim.connect import SmartConnect, Disconnect
from config.settings import MANAGEMENT_NODES
from datetime import timedelta
import pandas as pd
import openpyxl
from openpyxl.styles import Border, Side, Font, PatternFill
from datetime import datetime, timedelta
"""计算虚拟机的总磁盘大小(仅是虚拟磁盘的分配空间)"""
def get_virtual_disk_size(vm):
total_size = 0
for device in vm.config.hardware.device:
if isinstance(device, vim.vm.device.VirtualDisk):
# 获取每个虚拟磁盘的容量
total_size += device.capacityInKB / (1024 * 1024) # 转换为GB
return total_size
"""获取所有虚拟机"""
def get_all_vms():
# 取第一个节点
node = MANAGEMENT_NODES[0]
# 连接vCenter
si = SmartConnect(
host=node['host'],
user=node['user'],
pwd=node['password'],
disableSslCertValidation=True
)
# 获取所有VM
content = si.RetrieveContent()
vm_view = content.viewManager.CreateContainerView(
content.rootFolder, [vim.VirtualMachine], True
)
vm_list = []
for vm in vm_view.view:
# 初始化VM信息字典
vm_info = {
'name': vm.name,
'moId': vm._moId,
'powerState': str(vm.runtime.powerState),
'system': vm.config.guestFullName,
'ipAddress': vm.guest.ipAddress,
'hostName': vm.guest.hostName,
'vmPath': vm.config.files.vmPathName,
'Host': vm.runtime.host.name, # 拿到 Host 主机名
'snapshots': [], # 添加快照信息
'diskSpaceGB': get_virtual_disk_size(vm) # 添加虚拟机占用的磁盘空间
}
# 获取快照信息
if vm.snapshot is not None:
current_snapshot = vm.snapshot.currentSnapshot
root_snapshots = vm.snapshot.rootSnapshotList
# 处理根快照列表
for snapshot in root_snapshots:
snapshot_info = get_snapshot_info(snapshot)
vm_info['snapshots'].extend(snapshot_info)
# 添加当前快照ID如果有
if current_snapshot:
vm_info['currentSnapshotId'] = current_snapshot._moId
else:
vm_info['snapshots'] = None
vm_info['currentSnapshotId'] = None
vm_list.append(vm_info)
vm_view.Destroy()
Disconnect(si)
print(f"获取到 {len(vm_list)} 台VM")
return vm_list
"""递归获取快照信息"""
def get_snapshot_info(snapshot):
snapshot_list = []
# 当前快照信息
snapshot_info = {
'name': snapshot.name,
'description': snapshot.description,
'createTime': (snapshot.createTime + timedelta(hours=8)).strftime('%Y-%m-%d %H:%M:%S'),
'state': str(snapshot.state),
'id': snapshot.id,
'moId': snapshot.snapshot._moId, # 快照的Managed Object ID
'sizeMB': snapshot.diskSizeMB if hasattr(snapshot, 'diskSizeMB') else None, # 快照大小
'quiesced': snapshot.quiesced if hasattr(snapshot, 'quiesced') else None, # 是否静默快照
'children': [] # 子快照
}
# 递归处理子快照
if snapshot.childSnapshotList:
for child in snapshot.childSnapshotList:
snapshot_info['children'].extend(get_snapshot_info(child))
snapshot_list.append(snapshot_info)
return snapshot_list
def print_snapshot_tree(snapshot_info, level=0):
"""递归打印快照树(辅助函数)"""
indent = " " * level
print(f"{indent}├─ {snapshot_info['name']}")
print(f"{indent}│ ├─ 创建时间: {snapshot_info['createTime']}")
print(f"{indent}│ ├─ 描述: {snapshot_info['description']}")
print(f"{indent}│ ├─ 状态: {snapshot_info['state']}")
if snapshot_info['sizeMB']:
print(f"{indent}│ ├─ 大小: {snapshot_info['sizeMB']} MB")
for child in snapshot_info['children']:
print_snapshot_tree(child, level + 1)
"""表格样式"""
def style_sheet(sheet, is_old_data=None):
"""设置工作表样式:列宽、边框、加粗标题、冻结首行和设置背景颜色"""
# 定义边框样式
thin = Side(border_style="thin", color="000000")
border = Border(left=thin, right=thin, top=thin, bottom=thin)
# 设置列宽和边框
for column in sheet.columns:
max_length = 0
column_letter = column[0].column_letter
for cell in column:
try:
if len(str(cell.value)) > max_length:
max_length = len(str(cell.value))
cell.border = border # 为每个单元格添加边框
except Exception as e:
pass
adjusted_width = (max_length + 2)
sheet.column_dimensions[column_letter].width = adjusted_width
# 加粗标题并冻结首行
for cell in sheet[1]: # 加粗标题
cell.font = Font(bold=True)
sheet.freeze_panes = 'A2' # 冻结首行
# 设置背景颜色
if is_old_data is not None:
for row in range(2, len(is_old_data) + 2):
if is_old_data[row - 2]:
for col in range(1, sheet.max_column + 1): # 使用 sheet.max_column
cell = sheet.cell(row=row, column=col)
cell.fill = PatternFill(start_color='ADD8E6', end_color='ADD8E6', fill_type='solid')
"""将虚拟机和快照信息写入Excel文件并标记创建时间在15天前的快照"""
def create_excel_report(vms):
vm_data = []
snapshot_data = []
def add_snapshots_to_report(snapshot, vm_name):
"""递归将快照信息加入报告"""
create_time = datetime.strptime(snapshot['createTime'], '%Y-%m-%d %H:%M:%S')
is_old = create_time < (datetime.now() - timedelta(days=1))
snapshot_data.append({
'VM Name': vm_name,
'Snapshot Name': snapshot['name'],
'Description': snapshot['description'],
'Create Time': snapshot['createTime'],
'State': snapshot['state'],
'ID': snapshot['id'],
'MO ID': snapshot['moId'],
'Size (MB)': snapshot['sizeMB'],
'Quiesced': snapshot['quiesced'],
'is_old': is_old
})
for child in snapshot['children']:
add_snapshots_to_report(child, vm_name)
for vm in vms:
vm_data.append({
'VM Name': vm['name'],
'MO ID': vm['moId'],
'Power State': vm['powerState'],
'System': vm['system'],
'IP Address': vm['ipAddress'],
'Host Name': vm['hostName'],
'VM Path': vm['vmPath'],
'Host': vm['Host'],
'Current Snapshot ID': vm.get('currentSnapshotId', None),
'DiskSpace/GB': vm['diskSpaceGB']
})
if vm['snapshots']:
for snapshot in vm['snapshots']:
add_snapshots_to_report(snapshot, vm['name'])
vm_df = pd.DataFrame(vm_data)
snapshot_df = pd.DataFrame(snapshot_data)
with pd.ExcelWriter('vm_and_snapshots_report.xlsx', engine='openpyxl') as writer:
vm_df.to_excel(writer, sheet_name='VMs', index=False)
snapshot_df.to_excel(writer, sheet_name='Snapshots', index=False)
# 获取写入的工作表
workbook = writer.book
vm_sheet = workbook['VMs']
snapshot_sheet = workbook['Snapshots']
# 调用样式设置函数,传入 is_old_data
style_sheet(vm_sheet) # 设置 VMs 工作表样式
style_sheet(snapshot_sheet, snapshot_df['is_old'].tolist()) # 设置 Snapshots 工作表样式并传入旧数据标识列表
print("报告已生成: vm_and_snapshots_report.xlsx")
if __name__ == '__main__':
vms = get_all_vms()
# # 打印示例显示每个VM的快照信息
# for vm in vms:
# print(f"\nVM: {vm['name']}")
# if vm['snapshots']:
# print(f"快照数量: {len(vm['snapshots'])}")
# for snapshot in vm['snapshots']:
# print_snapshot_tree(snapshot)
# else:
# print("无快照")
# 生成Excel报告
create_excel_report(vms)

105
core/snapshot_collector.py Normal file
View File

@@ -0,0 +1,105 @@
from pyVmomi import vim
from config.settings import MANAGEMENT_NODES
from core.vcenter_connector import VCenterManager # 使用管理器类
from utils.logger import logger
from typing import List, Dict
def get_all_vms_from_node(si, node_name: str) -> List[Dict]:
"""
从单个vCenter/ESXi获取所有虚拟机
:param si: ServiceInstance
:param node_name: 节点名称(用于标识来源)
:return: VM列表
"""
try:
content = si.RetrieveContent()
vm_view = content.viewManager.CreateContainerView(
content.rootFolder, [vim.VirtualMachine], True
)
vms = vm_view.view
vm_view.Destroy()
vm_list = []
for vm in vms:
# 安全获取快照数量
num_snapshots = 0
if vm.snapshot and vm.snapshot.rootSnapshotList:
num_snapshots = len(vm.snapshot.rootSnapshotList)
vm_info = {
'vcenter_node': node_name, # 来自哪个vCenter/ESXi
'vcenter_host': si._stub.host, # vCenter主机地址
'vm_name': vm.name,
'vm_moid': vm._moId,
'power_state': str(vm.runtime.powerState), # 转字符串
'num_snapshots': num_snapshots,
'guest_os': vm.config.guestFullName if vm.config else 'Unknown'
}
vm_list.append(vm_info)
logger.info(f"📥 [{node_name}] 获取到 {len(vm_list)} 台VM")
return vm_list
except Exception as e:
logger.error(f"❌ [{node_name}] 获取VM列表失败: {str(e)}")
return []
def get_all_vms_from_all_nodes() -> List[Dict]:
"""
从所有配置的vCenter/ESXi节点获取虚拟机
:return: 合并后的VM列表
"""
all_vms = []
# 使用VCenterManager连接所有节点
with VCenterManager() as manager:
connections = manager.connect_all()
if not connections:
logger.error("没有可用的vCenter/ESXi连接")
return []
# 遍历每个连接获取VM
for conn in connections:
vms = get_all_vms_from_node(conn.si, conn.name)
all_vms.extend(vms) # 合并列表
logger.info(f"📊 总计获取 {len(all_vms)} 台VM来自 {len(connections)} 个节点)")
return all_vms
def print_vm_table(vm_list: List[Dict]):
"""打印VM列表表格调试用"""
if not vm_list:
print("没有获取到VM数据")
return
print(f"\n{'=' * 100}")
print(f"{'节点':<15} {'VM名称':<30} {'状态':<12} {'快照数':<8} {'操作系统':<20}")
print(f"{'-' * 100}")
for vm in vm_list:
print(f"{vm['vcenter_node']:<15} {vm['vm_name']:<30} "
f"{vm['power_state']:<12} {vm['num_snapshots']:<8} "
f"{vm['guest_os'][:20]:<20}")
print(f"{'=' * 100}")
print(f"总计: {len(vm_list)} 台VM")
# ==================== 主程序入口 ====================
if __name__ == '__main__':
# 获取所有VM
vm_list = get_all_vms_from_all_nodes()
# 打印结果
print_vm_table(vm_list)
# 示例筛选有快照的VM
vms_with_snapshots = [vm for vm in vm_list if vm['num_snapshots'] > 0]
print(f"\n有快照的VM: {len(vms_with_snapshots)}")
for vm in vms_with_snapshots:
print(f" - {vm['vm_name']}: {vm['num_snapshots']} 个快照")

184
core/vcenter_connector.py Normal file
View File

@@ -0,0 +1,184 @@
from pyVim.connect import SmartConnect, Disconnect
from pyVmomi import vim
from config.settings import MANAGEMENT_NODES
from utils.logger import logger
from dataclasses import dataclass
from typing import List, Optional, Dict
@dataclass
class Connection:
"""包装连接对象包含节点信息和ServiceInstance"""
name: str # 节点名称
host: str # 主机地址
node_type: str # vcenter 或 esxi
si: any # ServiceInstance
max_concurrent: int # 该节点的最大并发数
class VCenterManager:
"""管理多个vCenter/ESXi连接"""
def __init__(self):
# 构造方法,创建对象时自动执行
self.connections: List[Connection] = [] # 属性1成功连接列表
self.failed_nodes: List[Dict] = [] # 属性2失败节点列表
def connect_all(self) -> List[Connection]:
"""
连接所有配置的管理节点
:return: 成功连接的列表
"""
logger.info(f"开始连接 {len(MANAGEMENT_NODES)} 个管理节点...")
for node in MANAGEMENT_NODES:
name = node['name']
host = node['host']
user = node['user']
password = node['password']
node_type = node['type']
max_concurrent = node.get('max_delete_concurrent', 4)
logger.info(f"正在连接节点: {name} ({host}) [{node_type}]")
si = self._connect_single(host, user, password, node_type)
if si:
conn = Connection(
name=name,
host=host,
node_type=node_type,
si=si,
max_concurrent=max_concurrent
)
self.connections.append(conn)
logger.info(f"{name} 连接成功")
else:
self.failed_nodes.append({
'name': name,
'host': host,
'type': node_type
})
logger.error(f"{name} 连接失败")
logger.info(f"连接完成: 成功 {len(self.connections)}/{len(MANAGEMENT_NODES)}")
return self.connections
def _connect_single(self, host: str, user: str, password: str, node_type: str):
"""连接单个节点"""
try:
si = SmartConnect(
host=host,
user=user,
pwd=password,
disableSslCertValidation=True
)
return si
except Exception as e:
logger.error(f"连接 {host} 失败: {str(e)}")
return None
def get_connection(self, name: str = None) -> Optional[Connection]:
"""
获取指定名称的连接,不指定则返回第一个
"""
if not self.connections:
return None
if name:
for conn in self.connections:
if conn.name == name:
return conn
return None
return self.connections[0]
def get_all_connections(self) -> List[Connection]:
"""获取所有成功连接"""
return self.connections
def get_vcenter_connections(self) -> List[Connection]:
"""仅获取 vcenter 类型的连接"""
return [c for c in self.connections if c.node_type == 'vcenter']
def get_esxi_connections(self) -> List[Connection]:
"""仅获取 esxi 类型的连接"""
return [c for c in self.connections if c.node_type == 'esxi']
def disconnect_all(self):
"""关闭所有连接"""
for conn in self.connections:
try:
Disconnect(conn.si)
logger.info(f"已断开 {conn.name}")
except Exception as e:
logger.warning(f"断开 {conn.name} 时出错: {e}")
self.connections = []
def __enter__(self):
"""上下文管理器支持"""
self.connect_all()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""确保退出时断开连接"""
self.disconnect_all()
# ==================== 使用示例 ====================
def main():
# 方式1手动管理连接
manager = VCenterManager()
connections = manager.connect_all()
# 遍历所有连接执行操作
for conn in connections:
print(f"\n处理节点: {conn.name} ({conn.host})")
print(f"最大并发: {conn.max_concurrent}")
# 获取内容视图
content = conn.si.RetrieveContent()
# 获取所有VM
from pyVmomi import vim
obj_view = content.viewManager.CreateContainerView(
content.rootFolder, [vim.VirtualMachine], True
)
vms = obj_view.view
print(f"该节点有 {len(vms)} 台虚拟机")
obj_view.Destroy()
# 断开所有连接
manager.disconnect_all()
def main_with_context():
# 方式2使用上下文管理器推荐自动断开
with VCenterManager() as manager:
# 获取所有vcenter连接
vcenters = manager.get_vcenter_connections()
for conn in vcenters:
print(f"处理vCenter: {conn.name}")
# 执行操作...
# 退出时自动断开所有连接
def main_single_operation():
# 方式3只连接特定节点
manager = VCenterManager()
manager.connect_all()
# 获取指定节点
prod_vc = manager.get_connection('vcenter-prod')
if prod_vc:
print(f"连接到生产环境: {prod_vc.host}")
# 执行操作...
manager.disconnect_all()
if __name__ == '__main__':
main()