流量监控

This commit is contained in:
Jerry 2025-10-08 02:19:19 +08:00
parent 9d540c77b7
commit 08a6ac56b8
3 changed files with 340 additions and 5 deletions

View File

@ -1,16 +1,20 @@
import socket
from fastapi import (APIRouter, HTTPException, Response)
from typing import List
from typing import List, Optional
from pydantic import BaseModel
import asyncio
import psutil
import ipaddress
import time
from datetime import datetime
from ..models.requests import CLICommandRequest, ConfigRequest
from ..models.requests import CLICommandRequest, ConfigRequest, TrafficQueryRequest
from ...app.services.ai_services import AIService
from ...app.api.network_config import SwitchConfigurator
from ...config import settings
from ..services.network_scanner import NetworkScanner
from ..services.traffic_monitor import traffic_monitor
from ..utils.logger import logger
router = APIRouter(prefix="", tags=["API"])
scanner = NetworkScanner()
@ -28,8 +32,8 @@ async def root():
"/scan_network",
"/list_devices",
"/batch_apply_config",
"/traffic/switch/current",
"/traffic/switch/history"
"/traffic/realtime",
"/traffic/cache/clear"
]
}
@ -164,4 +168,132 @@ async def get_network_adapters():
networks = await asyncio.to_thread(sync_get_adapters)
return {"networks": networks}
except Exception as e:
return {"error": f"获取网络适配器信息失败: {str(e)}"}
return {"error": f"获取网络适配器信息失败: {str(e)}"}
@router.post("/traffic/realtime", summary="查询交换机接口实时流量")
async def get_realtime_traffic(request: TrafficQueryRequest):
"""
查询交换机接口实时流量速率(Kbps)
- 支持多个接口同时查询
- 首次查询速率返回 0
- 单个接口查询失败不影响其他接口
"""
# 提取认证信息
username = request.username or settings.SWITCH_USERNAME
password = request.password or settings.SWITCH_PASSWORD
# 创建配置器(复用连接池)
configurator = SwitchConfigurator(
username=username,
password=password,
timeout=settings.SWITCH_TIMEOUT
)
results = []
current_time = time.time()
# 遍历所有接口
for interface in request.interfaces:
interface_data = {
"interface": interface,
"status": "unknown",
"in_speed_kbps": 0.0,
"out_speed_kbps": 0.0,
"in_bytes": 0,
"out_bytes": 0,
"error": None
}
try:
# 获取查询命令
command = traffic_monitor.get_query_command(request.vendor, interface)
if not command:
interface_data["error"] = f"不支持的厂商: {request.vendor}"
results.append(interface_data)
continue
# 执行查询命令
try:
output = await configurator.execute_raw_commands(
ip=request.switch_ip,
commands=[command]
)
# 解析输出
stats = traffic_monitor.parse_interface_stats(request.vendor, str(output))
if stats is None:
interface_data["error"] = "解析接口统计失败"
results.append(interface_data)
continue
in_bytes, out_bytes, status = stats
# 计算速率
in_speed_kbps, out_speed_kbps = traffic_monitor.calculate_speed(
request.switch_ip,
interface,
in_bytes,
out_bytes,
current_time
)
# 更新结果
interface_data.update({
"status": status,
"in_speed_kbps": round(in_speed_kbps, 2),
"out_speed_kbps": round(out_speed_kbps, 2),
"in_bytes": in_bytes,
"out_bytes": out_bytes
})
except Exception as e:
interface_data["error"] = f"查询失败: {str(e)}"
logger.error(f"查询接口 {interface} 失败: {e}")
except Exception as e:
interface_data["error"] = f"未知错误: {str(e)}"
logger.error(f"处理接口 {interface} 时发生异常: {e}", exc_info=True)
results.append(interface_data)
return {
"success": True,
"switch_ip": request.switch_ip,
"vendor": request.vendor,
"timestamp": datetime.utcnow().isoformat() + "Z",
"data": results
}
@router.delete("/traffic/cache/clear", summary="清除流量监控缓存")
async def clear_traffic_cache(switch_ip: Optional[str] = None):
"""
清除流量监控缓存
- 不指定 switch_ip: 清除所有缓存
- 指定 switch_ip: 只清除该交换机的缓存
"""
try:
count = traffic_monitor.clear_cache(switch_ip)
return {
"success": True,
"message": f"已清除 {count} 条缓存记录",
"switch_ip": switch_ip or "all"
}
except Exception as e:
raise HTTPException(500, f"清除缓存失败: {str(e)}")
@router.get("/traffic/cache/stats", summary="获取缓存统计信息")
async def get_cache_stats():
"""获取流量监控缓存统计信息"""
try:
stats = traffic_monitor.get_cache_stats()
return {
"success": True,
"stats": stats
}
except Exception as e:
raise HTTPException(500, f"获取缓存统计失败: {str(e)}")

View File

@ -24,3 +24,11 @@ class CLICommandRequest(BaseModel):
def extract_credentials(self):
return self.username or "NONE", self.password or "NONE"
class TrafficQueryRequest(BaseModel):
"""实时流量查询请求"""
switch_ip: str
vendor: str # huawei/cisco/h3c/ruijie/zte
interfaces: List[str] # 例如: ["GigabitEthernet0/0/1", "GigabitEthernet0/0/2"]
username: Optional[str] = None
password: Optional[str] = None

View File

@ -0,0 +1,195 @@
import re
import time
import asyncio
from typing import Dict, List, Optional, Tuple
from datetime import datetime
from src.backend.app.utils.logger import logger
class TrafficMonitor:
"""
交换机流量监控服务
通过 Telnet CLI 查询接口流量统计,计算实时速率
"""
# 各厂商查询接口流量的CLI命令模板
VENDOR_COMMANDS = {
"huawei": "display interface {interface}",
"cisco": "show interface {interface}",
"h3c": "display interface {interface}",
"ruijie": "show interface {interface}",
"zte": "show interface {interface}",
}
def __init__(self):
# 流量计数器缓存: key = "switch_ip:interface", value = {"timestamp": float, "in_bytes": int, "out_bytes": int}
self.traffic_cache: Dict[str, Dict] = {}
# 缓存TTL(秒), 30分钟未访问则自动清理
self.cache_ttl = 1800
def get_query_command(self, vendor: str, interface: str) -> Optional[str]:
"""根据厂商和接口名生成查询命令"""
vendor_lower = vendor.lower()
if vendor_lower not in self.VENDOR_COMMANDS:
logger.warning(f"不支持的厂商: {vendor}")
return None
return self.VENDOR_COMMANDS[vendor_lower].format(interface=interface)
def parse_interface_stats(self, vendor: str, output: str) -> Optional[Tuple[int, int, str]]:
"""
解析CLI输出,提取入/出方向字节数和接口状态
返回: (in_bytes, out_bytes, status) None
"""
vendor_lower = vendor.lower()
try:
# 提取接口状态
status = "unknown"
if re.search(r'(current state|line protocol).*?(UP|up)', output, re.IGNORECASE):
status = "up"
elif re.search(r'(current state|line protocol).*?(DOWN|down)', output, re.IGNORECASE):
status = "down"
# 华为/H3C格式: "Input: 12345 packets, 1048576000 bytes"
if vendor_lower in ["huawei", "h3c"]:
match_in = re.search(r'Input:.*?(\d+)\s+bytes', output, re.IGNORECASE)
match_out = re.search(r'Output:.*?(\d+)\s+bytes', output, re.IGNORECASE)
if match_in and match_out:
in_bytes = int(match_in.group(1))
out_bytes = int(match_out.group(1))
return (in_bytes, out_bytes, status)
# Cisco/锐捷/中兴格式: "12345 packets input, 1048576000 bytes"
elif vendor_lower in ["cisco", "ruijie", "zte"]:
match_in = re.search(r'(\d+)\s+packets input,\s+(\d+)\s+bytes', output, re.IGNORECASE)
match_out = re.search(r'(\d+)\s+packets output,\s+(\d+)\s+bytes', output, re.IGNORECASE)
if match_in and match_out:
in_bytes = int(match_in.group(2))
out_bytes = int(match_out.group(2))
return (in_bytes, out_bytes, status)
logger.warning(f"无法解析 {vendor} 厂商的输出")
return None
except Exception as e:
logger.error(f"解析接口统计失败: {e}", exc_info=True)
return None
def calculate_speed(
self,
switch_ip: str,
interface: str,
current_in: int,
current_out: int,
current_time: float
) -> Tuple[float, float]:
"""
计算接口速率(Kbps)
返回: (in_speed_kbps, out_speed_kbps)
"""
cache_key = f"{switch_ip}:{interface}"
# 检查是否有历史数据
if cache_key not in self.traffic_cache:
# 首次查询,保存数据但返回0速率
self.traffic_cache[cache_key] = {
"timestamp": current_time,
"in_bytes": current_in,
"out_bytes": current_out
}
logger.info(f"首次查询 {cache_key}, 速率返回 0")
return (0.0, 0.0)
# 获取历史数据
cached = self.traffic_cache[cache_key]
time_diff = current_time - cached["timestamp"]
# 时间间隔太短,避免除零
if time_diff < 0.1:
logger.warning(f"{cache_key} 查询间隔过短 ({time_diff}s), 返回上次速率")
return (0.0, 0.0)
# 计算字节差(处理计数器溢出)
in_diff = self._calculate_diff(current_in, cached["in_bytes"])
out_diff = self._calculate_diff(current_out, cached["out_bytes"])
# 计算速率: (字节差 * 8 bits/byte) / 时间差(秒) / 1000 = Kbps
in_speed_kbps = (in_diff * 8) / time_diff / 1000
out_speed_kbps = (out_diff * 8) / time_diff / 1000
# 更新缓存
self.traffic_cache[cache_key] = {
"timestamp": current_time,
"in_bytes": current_in,
"out_bytes": current_out
}
logger.debug(f"{cache_key} 速率: IN={in_speed_kbps:.2f} Kbps, OUT={out_speed_kbps:.2f} Kbps")
return (in_speed_kbps, out_speed_kbps)
def _calculate_diff(self, current: int, previous: int) -> int:
"""
计算字节差,处理32位计数器溢出
Reason: 交换机的流量计数器通常是32位,超过4GB会回绕到0
"""
if current >= previous:
return current - previous
else:
# 计数器溢出,假设32位
return (2**32 - previous) + current
def cleanup_expired_cache(self):
"""清理过期的缓存数据"""
current_time = time.time()
expired_keys = []
for key, data in self.traffic_cache.items():
if current_time - data["timestamp"] > self.cache_ttl:
expired_keys.append(key)
for key in expired_keys:
del self.traffic_cache[key]
logger.info(f"清理过期缓存: {key}")
return len(expired_keys)
def clear_cache(self, switch_ip: Optional[str] = None):
"""
清除缓存
Args:
switch_ip: 如果指定,只清除该交换机的缓存;否则清除所有
"""
if switch_ip:
# 清除指定交换机的缓存
keys_to_remove = [k for k in self.traffic_cache.keys() if k.startswith(f"{switch_ip}:")]
for key in keys_to_remove:
del self.traffic_cache[key]
logger.info(f"清除交换机 {switch_ip} 的缓存,共 {len(keys_to_remove)}")
return len(keys_to_remove)
else:
# 清除所有缓存
count = len(self.traffic_cache)
self.traffic_cache.clear()
logger.info(f"清除所有缓存,共 {count}")
return count
def get_cache_stats(self) -> Dict:
"""获取缓存统计信息"""
return {
"total_entries": len(self.traffic_cache),
"cache_ttl_seconds": self.cache_ttl,
"entries": list(self.traffic_cache.keys())
}
# 全局单例
traffic_monitor = TrafficMonitor()