跳转至

API 最佳实践

概述

本文档提供 NetPulse API 使用的最佳实践,帮助您构建高效、可靠的网络自动化解决方案。

认证和安全性

API 密钥管理

最佳实践

# 使用环境变量存储API密钥
export NETPULSE_API_KEY="your-api-key-here"

# 在请求中使用
curl -H "X-API-KEY: $NETPULSE_API_KEY" \
     http://localhost:9000/health

安全建议

  • 定期轮换API密钥
  • 不要在代码中硬编码密钥
  • 使用HTTPS进行传输
  • 限制API密钥的访问权限

请求头设置

# 标准请求头
curl -X POST \
  -H "X-API-KEY: YOUR_API_KEY" \
  -H "Content-Type: application/json" \
  -H "Accept: application/json" \
  -d '{"key": "value"}' \
  http://localhost:9000/api/endpoint

连接管理

队列策略选择

FIFO 队列 (推荐用于一般操作)

{
  "driver": "netmiko",
  "connection_args": {
    "host": "192.168.1.1",
    "username": "admin",
    "password": "password",
    "device_type": "cisco_ios"
  },
  "options": {
    "queue_strategy": "fifo"
  }
}

Pinned 队列 (推荐用于频繁操作)

{
  "driver": "netmiko",
  "connection_args": {
    "host": "192.168.1.1",
    "username": "admin",
    "password": "password",
    "device_type": "cisco_ios"
  },
  "options": {
    "queue_strategy": "pinned",
    "ttl": 300
  }
}

连接参数优化

超时设置

{
  "connection_args": {
    "host": "192.168.1.1",
    "username": "admin",
    "password": "password",
    "device_type": "cisco_ios",
    "timeout": 30,
    "read_timeout": 60,
    "delay_factor": 2
  }
}

连接复用

{
  "options": {
    "queue_strategy": "pinned",
    "ttl": 300
  }
}

连接复用说明

使用 queue_strategy: "pinned" 时,系统会自动复用连接,无需额外配置。

批量操作优化

设备分组策略

按厂商分组

def group_devices_by_vendor(devices):
    """按厂商分组设备"""
    groups = {}
    for device in devices:
        vendor = get_vendor_from_device_type(device['device_type'])
        if vendor not in groups:
            groups[vendor] = []
        groups[vendor].append(device)
    return groups

按地理位置分组

def group_devices_by_location(devices):
    """按地理位置分组设备"""
    groups = {}
    for device in devices:
        location = device.get('location', 'unknown')
        if location not in groups:
            groups[location] = []
        groups[location].append(device)
    return groups

批量操作最佳实践

1. 合理的批次大小

# 建议批次大小:10-50台设备
BATCH_SIZE = 20

def execute_batch_commands(devices, command):
    """分批执行命令"""
    results = []
    for i in range(0, len(devices), BATCH_SIZE):
        batch = devices[i:i + BATCH_SIZE]
        batch_result = execute_command_batch(batch, command)
        results.extend(batch_result)
    return results

2. 错误处理和重试

import time
from typing import List, Dict

def execute_with_retry(devices: List[Dict], command: str, max_retries: int = 3):
    """带重试的命令执行"""
    results = []
    failed_devices = []

    for device in devices:
        for attempt in range(max_retries):
            try:
                result = execute_single_command(device, command)
                results.append(result)
                break
            except Exception as e:
                if attempt == max_retries - 1:
                    failed_devices.append({
                        'device': device,
                        'error': str(e)
                    })
                else:
                    time.sleep(2 ** attempt)  # 指数退避

    return results, failed_devices

模板使用

Jinja2 模板最佳实践

1. 模板结构

{# 配置模板示例 #}
{% for interface in interfaces %}
interface {{ interface.name }}
 description {{ interface.description }}
 ip address {{ interface.ip_address }} {{ interface.subnet_mask }}
 no shutdown
{% endfor %}

2. 变量验证

def validate_template_variables(template_vars):
    """验证模板变量"""
    required_vars = ['interfaces', 'hostname', 'domain_name']
    missing_vars = [var for var in required_vars if var not in template_vars]

    if missing_vars:
        raise ValueError(f"Missing required variables: {missing_vars}")

    return True

TextFSM 解析最佳实践

1. 模板设计

Value HOSTNAME (\S+)
Value UPTIME (.+)
Value VERSION (.+)

Start
  ^\s*${HOSTNAME}\s+uptime\s+is\s+${UPTIME}
  ^\s*.*Version\s+${VERSION}
  ^\s*$$
  ^.* -> Error

2. 解析结果处理

def parse_command_output(output: str, template_name: str):
    """解析命令输出"""
    try:
        parsed_result = parse_with_textfsm(output, template_name)
        return {
            'success': True,
            'data': parsed_result
        }
    except Exception as e:
        return {
            'success': False,
            'error': str(e),
            'raw_output': output
        }

错误处理

常见错误码处理

def handle_api_response(response):
    """处理API响应"""
    if response.status_code == 200:
        data = response.json()
        if data['code'] == 200:
            return {'success': True, 'data': data['data']}
        else:
            return {'success': False, 'error': data['message']}
    elif response.status_code == 401:
        return {'success': False, 'error': '认证失败,请检查API密钥'}
    elif response.status_code == 404:
        return {'success': False, 'error': '资源不存在'}
    elif response.status_code == 500:
        return {'success': False, 'error': '服务器内部错误'}
    else:
        return {'success': False, 'error': f'未知错误: {response.status_code}'}

重试机制

import requests
import time
from functools import wraps

def retry_on_failure(max_retries=3, delay=1):
    """重试装饰器"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    if attempt == max_retries - 1:
                        raise e
                    time.sleep(delay * (2 ** attempt))
            return None
        return wrapper
    return decorator

@retry_on_failure(max_retries=3, delay=2)
def api_request(url, headers, data):
    """带重试的API请求"""
    response = requests.post(url, headers=headers, json=data)
    response.raise_for_status()
    return response.json()

性能优化

1. 连接池管理

class ConnectionPool:
    """连接池管理"""
    def __init__(self, max_connections=10):
        self.max_connections = max_connections
        self.active_connections = {}

    def get_connection(self, device_id):
        """获取连接"""
        if device_id in self.active_connections:
            return self.active_connections[device_id]
        return None

    def add_connection(self, device_id, connection):
        """添加连接"""
        if len(self.active_connections) < self.max_connections:
            self.active_connections[device_id] = connection
            return True
        return False

2. 异步处理

import asyncio
import aiohttp

async def execute_commands_async(devices, command):
    """异步执行命令"""
    async with aiohttp.ClientSession() as session:
        tasks = []
        for device in devices:
            task = execute_single_command_async(session, device, command)
            tasks.append(task)

        results = await asyncio.gather(*tasks, return_exceptions=True)
        return results

async def execute_single_command_async(session, device, command):
    """异步执行单个命令"""
    url = "http://localhost:9000/device/execute"
    headers = {"X-API-KEY": f"{API_KEY}"}
    data = {
        "driver": "netmiko",
        "connection_args": device,
        "command": command
    }

    async with session.post(url, headers=headers, json=data) as response:
        return await response.json()

3. 缓存策略

import redis
import json
import hashlib

class ResultCache:
    """结果缓存"""
    def __init__(self, redis_client):
        self.redis = redis_client
        self.ttl = 3600  # 1小时

    def get_cache_key(self, device_id, command):
        """生成缓存键"""
        content = f"{device_id}:{command}"
        return hashlib.md5(content.encode()).hexdigest()

    def get_cached_result(self, device_id, command):
        """获取缓存结果"""
        key = self.get_cache_key(device_id, command)
        result = self.redis.get(key)
        return json.loads(result) if result else None

    def cache_result(self, device_id, command, result):
        """缓存结果"""
        key = self.get_cache_key(device_id, command)
        self.redis.setex(key, self.ttl, json.dumps(result))

监控和日志

1. 任务监控

def monitor_job_status(job_id):
    """监控任务状态"""
    while True:
        response = requests.get(
            f"http://localhost:9000/job?id={job_id}",
            headers={"X-API-KEY": f"{API_KEY}"}
        )
        data = response.json()

        if data['data']['status'] in ['completed', 'failed']:
            return data['data']

        time.sleep(5)  # 每5秒检查一次

2. 性能指标

import time
from dataclasses import dataclass

@dataclass
class PerformanceMetrics:
    """性能指标"""
    total_devices: int
    successful_operations: int
    failed_operations: int
    total_time: float
    average_time_per_device: float

def calculate_performance_metrics(results):
    """计算性能指标"""
    total_devices = len(results)
    successful = sum(1 for r in results if r.get('success', False))
    failed = total_devices - successful

    return PerformanceMetrics(
        total_devices=total_devices,
        successful_operations=successful,
        failed_operations=failed,
        total_time=0,  # 需要实际计算
        average_time_per_device=0  # 需要实际计算
    )

Webhook 集成

1. Webhook 配置

{
  "options": {
    "webhook": {
      "url": "https://your-webhook-url.com/callback",
      "method": "POST",
      "headers": {
        "Content-Type": "application/json",
        "X-Custom-Header": "custom-value"
      },
      "timeout": 30
    }
  }
}

2. Webhook 处理

from flask import Flask, request
import json

app = Flask(__name__)

@app.route('/webhook/callback', methods=['POST'])
def webhook_callback():
    """处理webhook回调"""
    data = request.json

    # 处理不同类型的回调
    if data['type'] == 'job_completed':
        handle_job_completed(data)
    elif data['type'] == 'job_failed':
        handle_job_failed(data)
    elif data['type'] == 'device_connected':
        handle_device_connected(data)

    return {'status': 'success'}

def handle_job_completed(data):
    """处理任务完成回调"""
    job_id = data['job_id']
    result = data['result']

    # 记录日志
    print(f"Job {job_id} completed successfully")

    # 发送通知
    send_notification(f"任务 {job_id} 执行完成")

最佳实践总结

1. 开发阶段

  • 使用开发环境的API密钥
  • 启用详细日志记录
  • 使用小规模测试数据
  • 实现完整的错误处理

2. 生产阶段

  • 使用强密码的API密钥
  • 启用HTTPS传输
  • 实现监控和告警
  • 定期备份配置

3. 维护阶段

  • 定期检查系统状态
  • 监控性能指标
  • 更新安全补丁
  • 优化配置参数

相关文档