2025-08-12 00:31:15 +08:00

205 lines
6.1 KiB
Python

import socket
from fastapi import (APIRouter, HTTPException, Response)
from typing import List
from pydantic import BaseModel
import psutil
import ipaddress
from ...app.services.ai_services import AIService
from ...app.api.network_config import SwitchConfigurator
from ...config import settings
from ..services.network_scanner import NetworkScanner
router = APIRouter(prefix="", tags=["API"])
scanner = NetworkScanner()
@router.get("/", include_in_schema=False)
async def root():
return {
"message": "欢迎使用AI交换机配置系统",
"docs": f"{settings.API_PREFIX}/docs",
"redoc": f"{settings.API_PREFIX}/redoc",
"endpoints": [
"/parse_command",
"/apply_config",
"/scan_network",
"/list_devices",
"/batch_apply_config"
"/traffic/switch/current",
"/traffic/switch/history"
]
}
@router.get("/favicon.ico", include_in_schema=False)
async def favicon():
return Response(status_code=204)
class BatchConfigRequest(BaseModel):
config: dict
switch_ips: List[str]
username: str = None
password: str = None
timeout: int = None
@router.get("/test")
async def test_endpoint():
return {"message": "Hello World"}
@router.get("/scan_network", summary="扫描网络中的交换机")
async def scan_network(subnet: str = "192.168.1.0/24"):
try:
devices = await scanner.scan_subnet(subnet)
return {
"success": True,
"devices": devices,
"count": len(devices)
}
except Exception as e:
raise HTTPException(500, f"扫描失败: {str(e)}")
@router.get("/list_devices", summary="列出已发现的交换机")
async def list_devices():
return {
"devices": await scanner.load_cached_devices()
}
class CommandRequest(BaseModel):
command: str
class ConfigRequest(BaseModel):
config: dict
switch_ip: str
username: str = None
password: str = None
timeout: int = None
@router.post("/parse_command", response_model=dict)
async def parse_command(request: CommandRequest):
"""
解析中文命令并返回JSON配置
"""
try:
ai_service = AIService(settings.SILICONFLOW_API_KEY, settings.SILICONFLOW_API_URL)
config = await ai_service.parse_command(request.command)
return {"success": True, "config": config}
except Exception as e:
raise HTTPException(
status_code=400,
detail=f"Failed to parse command: {str(e)}"
)
@router.post("/apply_config", response_model=dict)
async def apply_config(request: ConfigRequest):
"""
应用配置到交换机(弃用)
"""
try:
configurator = SwitchConfigurator(
username=request.username,
password=request.password,
timeout=request.timeout
)
result = await configurator.safe_apply(request.switch_ip, request.config)
return {"success": True, "result": result}
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Failed to apply config: {str(e)}"
)
class CLICommandRequest(BaseModel):
switch_ip: str
commands: List[str]
is_ensp: bool = False
def extract_credentials(self) -> tuple:
"""从commands中提取用户名和密码"""
username = None
password = None
for cmd in self.commands:
if cmd.startswith("!username="):
username = cmd.split("=")[1]
elif cmd.startswith("!password="):
password = cmd.split("=")[1]
return username, password
def get_clean_commands(self) -> List[str]:
"""获取去除凭据后的实际命令"""
return [cmd for cmd in self.commands
if not (cmd.startswith("!username=") or cmd.startswith("!password="))]
@router.post("/execute_cli_commands", response_model=dict)
async def execute_cli_commands(request: CLICommandRequest):
"""
执行前端生成的CLI命令
支持在commands中嵌入凭据:
!username=admin
!password=cisco123
"""
try:
username, password = request.extract_credentials()
clean_commands = request.get_clean_commands()
configurator = SwitchConfigurator(
username=username,
password=password,
timeout=settings.SWITCH_TIMEOUT,
ensp_mode=request.is_ensp
)
result = await configurator.execute_raw_commands(
ip=request.switch_ip,
commands=request.commands
)
return {
"success": True,
"output": result,
"mode": "eNSP" if request.is_ensp else "SSH"
}
except Exception as e:
raise HTTPException(500, detail=str(e))
@router.get("/", include_in_schema=False)
async def root():
return {
"message": "欢迎使用AI交换机配置系统",
"docs": f"{settings.API_PREFIX}/docs",
"redoc": f"{settings.API_PREFIX}/redoc",
"endpoints": [
"/parse_command",
"/apply_config",
"/scan_network",
"/list_devices",
"/batch_apply_config",
"/traffic/switch/current",
"/traffic/switch/history"
]
}
@router.get("/network_adapters", summary="获取网络适配器网段")
async def get_network_adapters():
try:
net_if_addrs = psutil.net_if_addrs()
networks = []
for interface, addrs in net_if_addrs.items():
for addr in addrs:
if addr.family == socket.AF_INET:
ip = addr.address
netmask = addr.netmask
network = ipaddress.IPv4Network(f"{ip}/{netmask}", strict=False)
networks.append({
"adapter": interface,
"network": str(network),
"ip": ip,
"subnet_mask": netmask
})
return {"networks": networks}
except Exception as e:
return {"error": f"获取网络适配器信息失败: {str(e)}"}