From 08a6ac56b81803fecaf40e2019d3e7b5003f4055 Mon Sep 17 00:00:00 2001 From: Jerryplusy Date: Wed, 8 Oct 2025 02:19:19 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B5=81=E9=87=8F=E7=9B=91=E6=8E=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/backend/app/api/endpoints.py | 142 +++++++++++++- src/backend/app/models/requests.py | 8 + src/backend/app/services/traffic_monitor.py | 195 ++++++++++++++++++++ 3 files changed, 340 insertions(+), 5 deletions(-) create mode 100644 src/backend/app/services/traffic_monitor.py diff --git a/src/backend/app/api/endpoints.py b/src/backend/app/api/endpoints.py index 14ec956..12bc9af 100644 --- a/src/backend/app/api/endpoints.py +++ b/src/backend/app/api/endpoints.py @@ -1,16 +1,20 @@ import socket from fastapi import (APIRouter, HTTPException, Response) -from typing import List +from typing import List, Optional from pydantic import BaseModel import asyncio import psutil import ipaddress +import time +from datetime import datetime -from ..models.requests import CLICommandRequest, ConfigRequest +from ..models.requests import CLICommandRequest, ConfigRequest, TrafficQueryRequest from ...app.services.ai_services import AIService from ...app.api.network_config import SwitchConfigurator from ...config import settings from ..services.network_scanner import NetworkScanner +from ..services.traffic_monitor import traffic_monitor +from ..utils.logger import logger router = APIRouter(prefix="", tags=["API"]) scanner = NetworkScanner() @@ -28,8 +32,8 @@ async def root(): "/scan_network", "/list_devices", "/batch_apply_config", - "/traffic/switch/current", - "/traffic/switch/history" + "/traffic/realtime", + "/traffic/cache/clear" ] } @@ -164,4 +168,132 @@ async def get_network_adapters(): networks = await asyncio.to_thread(sync_get_adapters) return {"networks": networks} except Exception as e: - return {"error": f"获取网络适配器信息失败: {str(e)}"} \ No newline at end of file + return {"error": f"获取网络适配器信息失败: {str(e)}"} + + +@router.post("/traffic/realtime", summary="查询交换机接口实时流量") +async def get_realtime_traffic(request: TrafficQueryRequest): + """ + 查询交换机接口实时流量速率(Kbps) + + - 支持多个接口同时查询 + - 首次查询速率返回 0 + - 单个接口查询失败不影响其他接口 + """ + # 提取认证信息 + username = request.username or settings.SWITCH_USERNAME + password = request.password or settings.SWITCH_PASSWORD + + # 创建配置器(复用连接池) + configurator = SwitchConfigurator( + username=username, + password=password, + timeout=settings.SWITCH_TIMEOUT + ) + + results = [] + current_time = time.time() + + # 遍历所有接口 + for interface in request.interfaces: + interface_data = { + "interface": interface, + "status": "unknown", + "in_speed_kbps": 0.0, + "out_speed_kbps": 0.0, + "in_bytes": 0, + "out_bytes": 0, + "error": None + } + + try: + # 获取查询命令 + command = traffic_monitor.get_query_command(request.vendor, interface) + if not command: + interface_data["error"] = f"不支持的厂商: {request.vendor}" + results.append(interface_data) + continue + + # 执行查询命令 + try: + output = await configurator.execute_raw_commands( + ip=request.switch_ip, + commands=[command] + ) + + # 解析输出 + stats = traffic_monitor.parse_interface_stats(request.vendor, str(output)) + if stats is None: + interface_data["error"] = "解析接口统计失败" + results.append(interface_data) + continue + + in_bytes, out_bytes, status = stats + + # 计算速率 + in_speed_kbps, out_speed_kbps = traffic_monitor.calculate_speed( + request.switch_ip, + interface, + in_bytes, + out_bytes, + current_time + ) + + # 更新结果 + interface_data.update({ + "status": status, + "in_speed_kbps": round(in_speed_kbps, 2), + "out_speed_kbps": round(out_speed_kbps, 2), + "in_bytes": in_bytes, + "out_bytes": out_bytes + }) + + except Exception as e: + interface_data["error"] = f"查询失败: {str(e)}" + logger.error(f"查询接口 {interface} 失败: {e}") + + except Exception as e: + interface_data["error"] = f"未知错误: {str(e)}" + logger.error(f"处理接口 {interface} 时发生异常: {e}", exc_info=True) + + results.append(interface_data) + + return { + "success": True, + "switch_ip": request.switch_ip, + "vendor": request.vendor, + "timestamp": datetime.utcnow().isoformat() + "Z", + "data": results + } + + +@router.delete("/traffic/cache/clear", summary="清除流量监控缓存") +async def clear_traffic_cache(switch_ip: Optional[str] = None): + """ + 清除流量监控缓存 + + - 不指定 switch_ip: 清除所有缓存 + - 指定 switch_ip: 只清除该交换机的缓存 + """ + try: + count = traffic_monitor.clear_cache(switch_ip) + return { + "success": True, + "message": f"已清除 {count} 条缓存记录", + "switch_ip": switch_ip or "all" + } + except Exception as e: + raise HTTPException(500, f"清除缓存失败: {str(e)}") + + +@router.get("/traffic/cache/stats", summary="获取缓存统计信息") +async def get_cache_stats(): + """获取流量监控缓存统计信息""" + try: + stats = traffic_monitor.get_cache_stats() + return { + "success": True, + "stats": stats + } + except Exception as e: + raise HTTPException(500, f"获取缓存统计失败: {str(e)}") \ No newline at end of file diff --git a/src/backend/app/models/requests.py b/src/backend/app/models/requests.py index 4c696f1..5d80e5c 100644 --- a/src/backend/app/models/requests.py +++ b/src/backend/app/models/requests.py @@ -24,3 +24,11 @@ class CLICommandRequest(BaseModel): def extract_credentials(self): return self.username or "NONE", self.password or "NONE" + +class TrafficQueryRequest(BaseModel): + """实时流量查询请求""" + switch_ip: str + vendor: str # huawei/cisco/h3c/ruijie/zte + interfaces: List[str] # 例如: ["GigabitEthernet0/0/1", "GigabitEthernet0/0/2"] + username: Optional[str] = None + password: Optional[str] = None diff --git a/src/backend/app/services/traffic_monitor.py b/src/backend/app/services/traffic_monitor.py new file mode 100644 index 0000000..e21e92e --- /dev/null +++ b/src/backend/app/services/traffic_monitor.py @@ -0,0 +1,195 @@ +import re +import time +import asyncio +from typing import Dict, List, Optional, Tuple +from datetime import datetime + +from src.backend.app.utils.logger import logger + + +class TrafficMonitor: + """ + 交换机流量监控服务 + 通过 Telnet CLI 查询接口流量统计,计算实时速率 + """ + + # 各厂商查询接口流量的CLI命令模板 + VENDOR_COMMANDS = { + "huawei": "display interface {interface}", + "cisco": "show interface {interface}", + "h3c": "display interface {interface}", + "ruijie": "show interface {interface}", + "zte": "show interface {interface}", + } + + def __init__(self): + # 流量计数器缓存: key = "switch_ip:interface", value = {"timestamp": float, "in_bytes": int, "out_bytes": int} + self.traffic_cache: Dict[str, Dict] = {} + + # 缓存TTL(秒), 30分钟未访问则自动清理 + self.cache_ttl = 1800 + + def get_query_command(self, vendor: str, interface: str) -> Optional[str]: + """根据厂商和接口名生成查询命令""" + vendor_lower = vendor.lower() + if vendor_lower not in self.VENDOR_COMMANDS: + logger.warning(f"不支持的厂商: {vendor}") + return None + + return self.VENDOR_COMMANDS[vendor_lower].format(interface=interface) + + def parse_interface_stats(self, vendor: str, output: str) -> Optional[Tuple[int, int, str]]: + """ + 解析CLI输出,提取入/出方向字节数和接口状态 + + 返回: (in_bytes, out_bytes, status) 或 None + """ + vendor_lower = vendor.lower() + + try: + # 提取接口状态 + status = "unknown" + if re.search(r'(current state|line protocol).*?(UP|up)', output, re.IGNORECASE): + status = "up" + elif re.search(r'(current state|line protocol).*?(DOWN|down)', output, re.IGNORECASE): + status = "down" + + # 华为/H3C格式: "Input: 12345 packets, 1048576000 bytes" + if vendor_lower in ["huawei", "h3c"]: + match_in = re.search(r'Input:.*?(\d+)\s+bytes', output, re.IGNORECASE) + match_out = re.search(r'Output:.*?(\d+)\s+bytes', output, re.IGNORECASE) + + if match_in and match_out: + in_bytes = int(match_in.group(1)) + out_bytes = int(match_out.group(1)) + return (in_bytes, out_bytes, status) + + # Cisco/锐捷/中兴格式: "12345 packets input, 1048576000 bytes" + elif vendor_lower in ["cisco", "ruijie", "zte"]: + match_in = re.search(r'(\d+)\s+packets input,\s+(\d+)\s+bytes', output, re.IGNORECASE) + match_out = re.search(r'(\d+)\s+packets output,\s+(\d+)\s+bytes', output, re.IGNORECASE) + + if match_in and match_out: + in_bytes = int(match_in.group(2)) + out_bytes = int(match_out.group(2)) + return (in_bytes, out_bytes, status) + + logger.warning(f"无法解析 {vendor} 厂商的输出") + return None + + except Exception as e: + logger.error(f"解析接口统计失败: {e}", exc_info=True) + return None + + def calculate_speed( + self, + switch_ip: str, + interface: str, + current_in: int, + current_out: int, + current_time: float + ) -> Tuple[float, float]: + """ + 计算接口速率(Kbps) + + 返回: (in_speed_kbps, out_speed_kbps) + """ + cache_key = f"{switch_ip}:{interface}" + + # 检查是否有历史数据 + if cache_key not in self.traffic_cache: + # 首次查询,保存数据但返回0速率 + self.traffic_cache[cache_key] = { + "timestamp": current_time, + "in_bytes": current_in, + "out_bytes": current_out + } + logger.info(f"首次查询 {cache_key}, 速率返回 0") + return (0.0, 0.0) + + # 获取历史数据 + cached = self.traffic_cache[cache_key] + time_diff = current_time - cached["timestamp"] + + # 时间间隔太短,避免除零 + if time_diff < 0.1: + logger.warning(f"{cache_key} 查询间隔过短 ({time_diff}s), 返回上次速率") + return (0.0, 0.0) + + # 计算字节差(处理计数器溢出) + in_diff = self._calculate_diff(current_in, cached["in_bytes"]) + out_diff = self._calculate_diff(current_out, cached["out_bytes"]) + + # 计算速率: (字节差 * 8 bits/byte) / 时间差(秒) / 1000 = Kbps + in_speed_kbps = (in_diff * 8) / time_diff / 1000 + out_speed_kbps = (out_diff * 8) / time_diff / 1000 + + # 更新缓存 + self.traffic_cache[cache_key] = { + "timestamp": current_time, + "in_bytes": current_in, + "out_bytes": current_out + } + + logger.debug(f"{cache_key} 速率: IN={in_speed_kbps:.2f} Kbps, OUT={out_speed_kbps:.2f} Kbps") + return (in_speed_kbps, out_speed_kbps) + + def _calculate_diff(self, current: int, previous: int) -> int: + """ + 计算字节差,处理32位计数器溢出 + + Reason: 交换机的流量计数器通常是32位,超过4GB会回绕到0 + """ + if current >= previous: + return current - previous + else: + # 计数器溢出,假设32位 + return (2**32 - previous) + current + + def cleanup_expired_cache(self): + """清理过期的缓存数据""" + current_time = time.time() + expired_keys = [] + + for key, data in self.traffic_cache.items(): + if current_time - data["timestamp"] > self.cache_ttl: + expired_keys.append(key) + + for key in expired_keys: + del self.traffic_cache[key] + logger.info(f"清理过期缓存: {key}") + + return len(expired_keys) + + def clear_cache(self, switch_ip: Optional[str] = None): + """ + 清除缓存 + + Args: + switch_ip: 如果指定,只清除该交换机的缓存;否则清除所有 + """ + if switch_ip: + # 清除指定交换机的缓存 + keys_to_remove = [k for k in self.traffic_cache.keys() if k.startswith(f"{switch_ip}:")] + for key in keys_to_remove: + del self.traffic_cache[key] + logger.info(f"清除交换机 {switch_ip} 的缓存,共 {len(keys_to_remove)} 条") + return len(keys_to_remove) + else: + # 清除所有缓存 + count = len(self.traffic_cache) + self.traffic_cache.clear() + logger.info(f"清除所有缓存,共 {count} 条") + return count + + def get_cache_stats(self) -> Dict: + """获取缓存统计信息""" + return { + "total_entries": len(self.traffic_cache), + "cache_ttl_seconds": self.cache_ttl, + "entries": list(self.traffic_cache.keys()) + } + + +# 全局单例 +traffic_monitor = TrafficMonitor()