API 使用示例¶
概述¶
本文档提供了 NetPulse API 在实际业务场景中的完整使用示例,包括设备管理、配置操作、批量处理等常见场景。
场景1: 网络设备发现¶
业务需求¶
- 发现网络中的所有设备
- 收集设备基本信息
- 验证设备连接性
实现方案¶
import requests
import json
from typing import List, Dict
class NetworkDiscovery:
def __init__(self, base_url: str, api_key: str):
self.base_url = base_url
self.api_key = api_key
self.headers = {"X-API-KEY": api_key}
def discover_devices(self, device_list: List[Dict]) -> Dict:
"""网络设备发现"""
results = {
"discovered": [],
"failed": [],
"total": len(device_list)
}
for device in device_list:
try:
# 1. 测试设备连接
connection_result = self.test_device_connection(device)
if connection_result["success"]:
# 2. 收集设备信息
device_info = self.collect_device_info(device)
results["discovered"].append({
"device": device,
"connection": connection_result,
"info": device_info
})
else:
results["failed"].append({
"device": device,
"error": connection_result["error_message"]
})
except Exception as e:
results["failed"].append({
"device": device,
"error": str(e)
})
return results
def test_device_connection(self, device: Dict) -> Dict:
"""测试设备连接"""
payload = {
"driver": "netmiko",
"connection_args": {
"device_type": device["device_type"],
"host": device["host"],
"username": device["username"],
"password": device["password"],
"port": device.get("port", 22),
"timeout": 30
}
}
response = requests.post(
f"{self.base_url}/device/test-connection",
json=payload,
headers=self.headers
)
result = response.json()
return result["data"]
def collect_device_info(self, device: Dict) -> Dict:
"""收集设备信息"""
commands = [
"show version",
"show ip interface brief",
"show running-config | include hostname"
]
device_info = {}
for command in commands:
payload = {
"driver": "netmiko",
"connection_args": {
"device_type": device["device_type"],
"host": device["host"],
"username": device["username"],
"password": device["password"]
},
"command": command,
"options": {
"queue_strategy": "pinned",
"ttl": 300
}
}
response = requests.post(
f"{self.base_url}/device/execute",
json=payload,
headers=self.headers
)
job_result = response.json()
job_id = job_result["data"]["id"]
# 等待任务完成
job_status = self.wait_for_job_completion(job_id)
if job_status["status"] == "finished":
device_info[command] = job_status["result"]["output"]
else:
device_info[command] = f"Error: {job_status['result']['error']}"
return device_info
def wait_for_job_completion(self, job_id: str, timeout: int = 300):
"""等待任务完成"""
import time
start_time = time.time()
while time.time() - start_time < timeout:
response = requests.get(
f"{self.base_url}/job?id={job_id}",
headers=self.headers
)
job_status = response.json()["data"][0]
if job_status["status"] in ["finished", "failed"]:
return job_status
time.sleep(1)
raise TimeoutError(f"任务 {job_id} 执行超时")
# 使用示例
discovery = NetworkDiscovery("http://localhost:9000", "your_api_key")
devices = [
{
"host": "192.168.1.1",
"username": "admin",
"password": "admin123",
"device_type": "cisco_ios"
},
{
"host": "192.168.1.2",
"username": "admin",
"password": "admin123",
"device_type": "cisco_ios"
},
{
"host": "192.168.1.3",
"username": "admin",
"password": "admin123",
"device_type": "juniper_junos"
}
]
results = discovery.discover_devices(devices)
print(f"发现设备: {len(results['discovered'])}")
print(f"失败设备: {len(results['failed'])}")
场景2: 批量配置备份¶
业务需求¶
- 定期备份所有设备的配置
- 支持多种设备类型
- 保存配置到文件系统
实现方案¶
import requests
import json
import os
from datetime import datetime
from typing import List, Dict
class ConfigBackup:
def __init__(self, base_url: str, api_key: str, backup_dir: str = "backups"):
self.base_url = base_url
self.api_key = api_key
self.headers = {"X-API-KEY": api_key}
self.backup_dir = backup_dir
os.makedirs(backup_dir, exist_ok=True)
def backup_all_devices(self, devices: List[Dict]) -> Dict:
"""备份所有设备配置"""
backup_results = {
"success": [],
"failed": [],
"total": len(devices)
}
for device in devices:
try:
backup_result = self.backup_single_device(device)
if backup_result["success"]:
backup_results["success"].append(backup_result)
else:
backup_results["failed"].append(backup_result)
except Exception as e:
backup_results["failed"].append({
"device": device,
"error": str(e)
})
return backup_results
def backup_single_device(self, device: Dict) -> Dict:
"""备份单个设备配置"""
# 确定备份命令
backup_commands = self.get_backup_commands(device["device_type"])
backup_data = {
"device": device,
"timestamp": datetime.now().isoformat(),
"configs": {}
}
for command_name, command in backup_commands.items():
try:
payload = {
"driver": "netmiko",
"connection_args": {
"device_type": device["device_type"],
"host": device["host"],
"username": device["username"],
"password": device["password"]
},
"command": command,
"options": {
"queue_strategy": "pinned",
"ttl": 300
}
}
response = requests.post(
f"{self.base_url}/device/execute",
json=payload,
headers=self.headers
)
job_result = response.json()
job_id = job_result["data"]["id"]
# 等待任务完成
job_status = self.wait_for_job_completion(job_id)
if job_status["status"] == "finished":
backup_data["configs"][command_name] = job_status["result"]["output"]
else:
backup_data["configs"][command_name] = f"Error: {job_status['result']['error']}"
except Exception as e:
backup_data["configs"][command_name] = f"Error: {str(e)}"
# 保存备份文件
filename = f"{device['host']}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
filepath = os.path.join(self.backup_dir, filename)
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(backup_data, f, indent=2, ensure_ascii=False)
backup_data["filepath"] = filepath
backup_data["success"] = True
return backup_data
def get_backup_commands(self, device_type: str) -> Dict[str, str]:
"""根据设备类型获取备份命令"""
commands = {
"cisco_ios": {
"running_config": "show running-config",
"startup_config": "show startup-config",
"version": "show version",
"interfaces": "show ip interface brief"
},
"juniper_junos": {
"running_config": "show configuration",
"startup_config": "show configuration | display-set",
"version": "show version",
"interfaces": "show interfaces terse"
},
"arista_eos": {
"running_config": "show running-config",
"startup_config": "show startup-config",
"version": "show version",
"interfaces": "show ip interface brief"
}
}
return commands.get(device_type, commands["cisco_ios"])
def wait_for_job_completion(self, job_id: str, timeout: int = 300):
"""等待任务完成"""
import time
start_time = time.time()
while time.time() - start_time < timeout:
response = requests.get(
f"{self.base_url}/job?id={job_id}",
headers=self.headers
)
job_status = response.json()["data"][0]
if job_status["status"] in ["finished", "failed"]:
return job_status
time.sleep(1)
raise TimeoutError(f"任务 {job_id} 执行超时")
# 使用示例
backup = ConfigBackup("http://localhost:9000", "your_api_key", "config_backups")
devices = [
{
"host": "192.168.1.1",
"username": "admin",
"password": "admin123",
"device_type": "cisco_ios"
},
{
"host": "192.168.1.2",
"username": "admin",
"password": "admin123",
"device_type": "juniper_junos"
}
]
results = backup.backup_all_devices(devices)
print(f"备份成功: {len(results['success'])}")
print(f"备份失败: {len(results['failed'])}")
场景3: 配置变更管理¶
业务需求¶
- 安全地推送配置变更
- 支持配置回滚
- 变更前备份
实现方案¶
import requests
import json
from typing import Dict, List
class ConfigChangeManager:
def __init__(self, base_url: str, api_key: str):
self.base_url = base_url
self.api_key = api_key
self.headers = {"X-API-KEY": api_key}
def safe_config_change(self, device: Dict, config_commands: List[str],
description: str = "") -> Dict:
"""安全的配置变更"""
change_result = {
"device": device,
"description": description,
"backup": None,
"change": None,
"rollback": None,
"success": False
}
try:
# 1. 备份当前配置
change_result["backup"] = self.backup_config(device)
# 2. 推送配置变更
change_result["change"] = self.push_config(device, config_commands)
if change_result["change"]["success"]:
change_result["success"] = True
print(f"✅ 设备 {device['host']} 配置变更成功")
else:
# 3. 配置失败,执行回滚
change_result["rollback"] = self.rollback_config(device, change_result["backup"])
print(f"❌ 设备 {device['host']} 配置变更失败,已回滚")
except Exception as e:
change_result["error"] = str(e)
print(f"❌ 设备 {device['host']} 配置变更异常: {e}")
return change_result
def backup_config(self, device: Dict) -> Dict:
"""备份设备配置"""
payload = {
"driver": "netmiko",
"connection_args": {
"device_type": device["device_type"],
"host": device["host"],
"username": device["username"],
"password": device["password"]
},
"command": "show running-config",
"options": {
"queue_strategy": "pinned",
"ttl": 300
}
}
response = requests.post(
f"{self.base_url}/device/execute",
json=payload,
headers=self.headers
)
job_result = response.json()
job_id = job_result["data"]["id"]
job_status = self.wait_for_job_completion(job_id)
return {
"success": job_status["status"] == "finished",
"config": job_status["result"]["output"] if job_status["status"] == "finished" else None,
"error": job_status["result"]["error"] if job_status["status"] == "failed" else None
}
def push_config(self, device: Dict, config_commands: List[str]) -> Dict:
"""推送配置变更"""
payload = {
"driver": "netmiko",
"connection_args": {
"device_type": device["device_type"],
"host": device["host"],
"username": device["username"],
"password": device["password"],
"enable_mode": True
},
"config": config_commands,
"driver_args": {
"save": True,
"exit_config_mode": True
},
"options": {
"queue_strategy": "pinned",
"ttl": 300
}
}
response = requests.post(
f"{self.base_url}/device/execute",
json=payload,
headers=self.headers
)
job_result = response.json()
job_id = job_result["data"]["id"]
job_status = self.wait_for_job_completion(job_id)
return {
"success": job_status["status"] == "finished",
"output": job_status["result"]["retval"] if job_status["status"] == "finished" and job_status.get("result") else None,
"error": job_status["result"]["error"] if job_status["status"] == "failed" and job_status.get("result") else None
}
def rollback_config(self, device: Dict, backup: Dict) -> Dict:
"""回滚配置"""
if not backup["success"] or not backup["config"]:
return {"success": False, "error": "备份配置不可用"}
# 解析备份配置并生成回滚命令
rollback_commands = self.generate_rollback_commands(backup["config"])
return self.push_config(device, rollback_commands)
def generate_rollback_commands(self, backup_config: str) -> List[str]:
"""生成回滚命令"""
# 这里简化处理,实际应该解析配置并生成对应的回滚命令
lines = backup_config.split('\n')
rollback_commands = []
for line in lines:
line = line.strip()
if line and not line.startswith('!') and not line.startswith('version'):
rollback_commands.append(line)
return rollback_commands
def wait_for_job_completion(self, job_id: str, timeout: int = 300):
"""等待任务完成"""
import time
start_time = time.time()
while time.time() - start_time < timeout:
response = requests.get(
f"{self.base_url}/job?id={job_id}",
headers=self.headers
)
job_status = response.json()["data"][0]
if job_status["status"] in ["finished", "failed"]:
return job_status
time.sleep(1)
raise TimeoutError(f"任务 {job_id} 执行超时")
# 使用示例
config_manager = ConfigChangeManager("http://localhost:9000", "your_api_key")
device = {
"host": "192.168.1.1",
"username": "admin",
"password": "admin123",
"device_type": "cisco_ios"
}
config_commands = [
"interface GigabitEthernet0/1",
" description Test Interface",
" ip address 192.168.1.1 255.255.255.0",
" no shutdown"
]
result = config_manager.safe_config_change(device, config_commands, "添加测试接口配置")
print(f"配置变更结果: {result['success']}")
场景4: 批量设备监控¶
业务需求¶
- 定期监控设备状态
- 收集性能数据
- 生成监控报告
实现方案¶
import requests
import json
import time
from datetime import datetime
from typing import List, Dict
class DeviceMonitor:
def __init__(self, base_url: str, api_key: str):
self.base_url = base_url
self.api_key = api_key
self.headers = {"X-API-KEY": api_key}
def monitor_devices(self, devices: List[Dict]) -> Dict:
"""批量监控设备"""
monitor_results = {
"devices": [],
"summary": {
"total": len(devices),
"online": 0,
"offline": 0,
"error": 0
}
}
for device in devices:
try:
device_status = self.monitor_single_device(device)
monitor_results["devices"].append(device_status)
if device_status["status"] == "online":
monitor_results["summary"]["online"] += 1
elif device_status["status"] == "offline":
monitor_results["summary"]["offline"] += 1
else:
monitor_results["summary"]["error"] += 1
except Exception as e:
monitor_results["devices"].append({
"device": device,
"status": "error",
"error": str(e),
"timestamp": datetime.now().isoformat()
})
monitor_results["summary"]["error"] += 1
return monitor_results
def monitor_single_device(self, device: Dict) -> Dict:
"""监控单个设备"""
device_status = {
"device": device,
"status": "unknown",
"timestamp": datetime.now().isoformat(),
"metrics": {}
}
try:
# 1. 测试连接
connection_result = self.test_connection(device)
if not connection_result["success"]:
device_status["status"] = "offline"
device_status["error"] = connection_result["error_message"]
return device_status
device_status["status"] = "online"
# 2. 收集性能指标
device_status["metrics"] = self.collect_metrics(device)
except Exception as e:
device_status["status"] = "error"
device_status["error"] = str(e)
return device_status
def test_connection(self, device: Dict) -> Dict:
"""测试设备连接"""
payload = {
"driver": "netmiko",
"connection_args": {
"device_type": device["device_type"],
"host": device["host"],
"username": device["username"],
"password": device["password"],
"timeout": 10
}
}
response = requests.post(
f"{self.base_url}/device/test-connection",
json=payload,
headers=self.headers
)
result = response.json()
return result["data"]
def collect_metrics(self, device: Dict) -> Dict:
"""收集性能指标"""
metrics = {}
# 收集接口状态
interface_result = self.execute_command(device, "show ip interface brief")
if interface_result["success"]:
metrics["interfaces"] = self.parse_interface_status(interface_result["output"])
# 收集CPU使用率
cpu_result = self.execute_command(device, "show processes cpu")
if cpu_result["success"]:
metrics["cpu"] = self.parse_cpu_usage(cpu_result["output"])
# 收集内存使用率
memory_result = self.execute_command(device, "show memory statistics")
if memory_result["success"]:
metrics["memory"] = self.parse_memory_usage(memory_result["output"])
return metrics
def execute_command(self, device: Dict, command: str) -> Dict:
"""执行设备命令"""
payload = {
"driver": "netmiko",
"connection_args": {
"device_type": device["device_type"],
"host": device["host"],
"username": device["username"],
"password": device["password"]
},
"command": command,
"options": {
"queue_strategy": "pinned",
"ttl": 60
}
}
response = requests.post(
f"{self.base_url}/device/execute",
json=payload,
headers=self.headers
)
job_result = response.json()
job_id = job_result["data"]["id"]
job_status = self.wait_for_job_completion(job_id, timeout=60)
return {
"success": job_status["status"] == "finished",
"output": job_status["result"]["retval"] if job_status["status"] == "finished" and job_status.get("result") else None,
"error": job_status["result"]["error"] if job_status["status"] == "failed" and job_status.get("result") else None
}
def parse_interface_status(self, output: str) -> Dict:
"""解析接口状态"""
interfaces = {}
lines = output.split('\n')
for line in lines:
if 'Interface' in line and 'IP-Address' in line:
continue
parts = line.split()
if len(parts) >= 4:
interface_name = parts[0]
interfaces[interface_name] = {
"ip_address": parts[1] if parts[1] != "unassigned" else None,
"status": parts[2],
"protocol": parts[3]
}
return interfaces
def parse_cpu_usage(self, output: str) -> Dict:
"""解析CPU使用率"""
# 简化解析,实际应该根据具体设备类型解析
return {"usage": "N/A", "raw_output": output}
def parse_memory_usage(self, output: str) -> Dict:
"""解析内存使用率"""
# 简化解析,实际应该根据具体设备类型解析
return {"usage": "N/A", "raw_output": output}
def wait_for_job_completion(self, job_id: str, timeout: int = 60):
"""等待任务完成"""
start_time = time.time()
while time.time() - start_time < timeout:
response = requests.get(
f"{self.base_url}/job?id={job_id}",
headers=self.headers
)
job_status = response.json()["data"][0]
if job_status["status"] in ["finished", "failed"]:
return job_status
time.sleep(1)
raise TimeoutError(f"任务 {job_id} 执行超时")
# 使用示例
monitor = DeviceMonitor("http://localhost:9000", "your_api_key")
devices = [
{
"host": "192.168.1.1",
"username": "admin",
"password": "admin123",
"device_type": "cisco_ios"
},
{
"host": "192.168.1.2",
"username": "admin",
"password": "admin123",
"device_type": "cisco_ios"
}
]
# 定期监控
while True:
results = monitor.monitor_devices(devices)
print(f"监控报告 - {datetime.now()}")
print(f"在线设备: {results['summary']['online']}")
print(f"离线设备: {results['summary']['offline']}")
print(f"错误设备: {results['summary']['error']}")
# 等待5分钟后再次监控
time.sleep(300)
最佳实践¶
1. 错误处理¶
- 始终检查API响应状态
- 实现重试机制
- 记录详细的错误日志
2. 性能优化¶
- 使用批量操作减少API调用
- 合理设置超时时间
- 避免频繁的状态查询
3. 安全考虑¶
- 使用环境变量存储敏感信息
- 定期轮换API密钥
- 限制API访问权限
4. 监控告警¶
- 设置任务执行超时告警
- 监控Worker健康状态
- 记录API调用统计