Compare commits

...

7 Commits

Author SHA1 Message Date
842e562b91 1 2025-08-30 15:47:09 +08:00
29dd4ec839 Merge remote-tracking branch 'origin/main' 2025-08-30 15:37:04 +08:00
f80fc9fc56 1 2025-08-30 15:36:38 +08:00
3
c64e983fba 更正 2025-08-30 15:36:22 +08:00
3
6a21cdef9a Merge remote-tracking branch 'origin/main'
# Conflicts:
#	src/backend/app/api/endpoints.py
#	src/backend/app/services/ai_services.py
#	src/backend/config.py
2025-08-30 15:32:12 +08:00
3
2a24a44a96 修复了异步连接 2025-08-30 15:31:27 +08:00
3
31e3baff9a 修复了异步连接 2025-08-12 01:17:39 +08:00
4 changed files with 158 additions and 82 deletions

View File

@ -4,14 +4,13 @@ from fastapi import (APIRouter, HTTPException, Response, WebSocket, WebSocketDis
from typing import List from typing import List
from pydantic import BaseModel from pydantic import BaseModel
import asyncio import asyncio
from fastapi.responses import HTMLResponse, JSONResponse from fastapi.responses import HTMLResponse
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
import io import io
import base64 import base64
import psutil import psutil
import ipaddress import ipaddress
from ..models.requests import CLICommandRequest, ConfigRequest
from ..services.switch_traffic_monitor import get_switch_monitor from ..services.switch_traffic_monitor import get_switch_monitor
from ..utils import logger from ..utils import logger
from ...app.services.ai_services import AIService from ...app.services.ai_services import AIService
@ -56,6 +55,22 @@ class BatchConfigRequest(BaseModel):
password: str = None password: str = None
timeout: int = None timeout: int = None
@router.post("/batch_apply_config")
async def batch_apply_config(request: BatchConfigRequest):
results = {}
for ip in request.switch_ips:
try:
configurator = SwitchConfigurator(
username=request.username,
password=request.password,
timeout=request.timeout)
results[ip] = await configurator.apply_config(ip, request.config)
except Exception as e:
results[ip] = str(e)
return {"results": results}
@router.get("/test") @router.get("/test")
async def test_endpoint(): async def test_endpoint():
return {"message": "Hello World"} return {"message": "Hello World"}
@ -81,29 +96,27 @@ async def list_devices():
} }
class DeviceItem(BaseModel):
name: str
ip: str
vendor: str
class CommandRequest(BaseModel): class CommandRequest(BaseModel):
command: str command: str
devices: List[DeviceItem] vendor: str = "huawei"
class ConfigRequest(BaseModel):
config: dict
switch_ip: str
username: str = None
password: str = None
timeout: int = None
vendor: str = "huawei"
@router.post("/parse_command", response_model=dict) @router.post("/parse_command", response_model=dict)
async def parse_command(request: CommandRequest): async def parse_command(request: CommandRequest):
"""解析中文命令并返回每台设备的配置 JSON""" """解析中文命令并返回JSON配置"""
missing_vendor = [d for d in request.devices if not d.vendor or d.vendor.strip() == ""]
if missing_vendor:
names = ", ".join([d.name for d in missing_vendor])
raise HTTPException(
status_code=400,
detail=f"以下设备未配置厂商: {names}"
)
try: try:
ai_service = AIService(settings.SILICONFLOW_API_KEY, settings.SILICONFLOW_API_URL) ai_service = AIService(settings.SILICONFLOW_API_KEY, settings.SILICONFLOW_API_URL)
config = await ai_service.parse_command(request.command, [d.dict() for d in request.devices]) config = await ai_service.parse_command(request.command, request.vendor)
return {"success": True, "config": config.get("results", [])} return {"success": True, "config": config}
except Exception as e: except Exception as e:
raise HTTPException( raise HTTPException(
status_code=400, status_code=400,
@ -128,16 +141,44 @@ async def apply_config(request: ConfigRequest):
status_code=500, status_code=500,
detail=f"Failed to apply config: {str(e)}" detail=f"Failed to apply config: {str(e)}"
) )
class CLICommandRequest(BaseModel):
switch_ip: str
commands: List[str]
is_ensp: bool = False
def extract_credentials(self) -> tuple:
"""从commands中提取用户名和密码"""
username = None
password = None
for cmd in self.commands:
if cmd.startswith("!username="):
username = cmd.split("=")[1]
elif cmd.startswith("!password="):
password = cmd.split("=")[1]
return username, password
def get_clean_commands(self) -> List[str]:
"""获取去除凭据后的实际命令"""
return [cmd for cmd in self.commands
if not (cmd.startswith("!username=") or cmd.startswith("!password="))]
@router.post("/execute_cli_commands", response_model=dict) @router.post("/execute_cli_commands", response_model=dict)
async def execute_cli_commands(request: CLICommandRequest): async def execute_cli_commands(request: CLICommandRequest):
"""执行前端生成的CLI命令""" """执行前端生成的CLI命令"""
try: try:
username, password = request.extract_credentials() username, password = request.extract_credentials()
clean_commands = request.get_clean_commands()
configurator = SwitchConfigurator( configurator = SwitchConfigurator(
username=username, username=username,
password=password, password=password,
timeout=settings.SWITCH_TIMEOUT, timeout=settings.SWITCH_TIMEOUT,
ensp_mode=request.is_ensp
) )
result = await configurator.execute_raw_commands( result = await configurator.execute_raw_commands(
@ -147,6 +188,7 @@ async def execute_cli_commands(request: CLICommandRequest):
return { return {
"success": True, "success": True,
"output": result, "output": result,
"mode": "eNSP" if request.is_ensp else "SSH"
} }
except Exception as e: except Exception as e:
raise HTTPException(500, detail=str(e)) raise HTTPException(500, detail=str(e))

View File

@ -1,5 +1,4 @@
import asyncio import asyncio
import logging
import telnetlib3 import telnetlib3
from pathlib import Path from pathlib import Path
from typing import Dict, List, Optional from typing import Dict, List, Optional
@ -41,8 +40,6 @@ class SwitchConfigurator:
password: str = None, password: str = None,
timeout: int = None, timeout: int = None,
max_workers: int = 5, max_workers: int = 5,
ensp_mode: bool = False,
ensp_port: int = 2000,
ensp_command_delay: float = 0.5, ensp_command_delay: float = 0.5,
**ssh_options **ssh_options
): ):
@ -52,34 +49,45 @@ class SwitchConfigurator:
self.semaphore = asyncio.Semaphore(max_workers) self.semaphore = asyncio.Semaphore(max_workers)
self.backup_dir = Path("config_backups") self.backup_dir = Path("config_backups")
self.backup_dir.mkdir(exist_ok=True) self.backup_dir.mkdir(exist_ok=True)
self.ensp_mode = ensp_mode
self.ensp_port = ensp_port
self.ensp_delay = ensp_command_delay self.ensp_delay = ensp_command_delay
self.ssh_options = ssh_options self.ssh_options = ssh_options
async def _get_or_create_connection(self, ip: str): async def _get_or_create_connection(self, ip: str):
"""
从连接池获取连接如果没有则新建 Telnet 连接
"""
if ip in self.connection_pool: if ip in self.connection_pool:
logger.debug(f"复用已有连接: {ip}") logger.debug(f"复用已有连接: {ip}")
return self.connection_pool[ip] return self.connection_pool[ip]
logger.info(f"建立新连接: {ip}") logger.info(f"建立新连接: {ip}")
reader, writer = await telnetlib3.open_connection(host=ip, port=23) reader, writer = await telnetlib3.open_connection(
host=ip,
port=23,
encoding=None,
shell=None
)
try: try:
if self.username != 'NONE' : if self.username and self.username.upper() != "NONE":
await asyncio.wait_for(reader.readuntil(b"Username:"), timeout=self.timeout) try:
writer.write(f"{self.username}\n") logger.debug("等待用户名提示...")
await asyncio.wait_for(reader.readuntil(b"Username:"), timeout=self.timeout)
writer.write(self.username.encode() + b"\n")
await writer.drain()
logger.info("用户名发送完成")
except asyncio.TimeoutError:
logger.warning("未收到用户名提示,可能交换机不要求用户名")
await asyncio.wait_for(reader.readuntil(b"Password:"), timeout=self.timeout) try:
writer.write(f"{self.password}\n") logger.debug("等待密码提示...")
await asyncio.wait_for(reader.readuntil(b"assword:"), timeout=self.timeout)
writer.write(self.password.encode() + b"\n")
await writer.drain()
logger.info("密码发送完成")
except asyncio.TimeoutError:
writer.close()
raise EnspConnectionException("未收到密码提示,登录失败")
await asyncio.sleep(1) await asyncio.sleep(1)
except asyncio.TimeoutError:
writer.close()
raise EnspConnectionException("登录超时,未收到用户名或密码提示")
except Exception as e: except Exception as e:
writer.close() writer.close()
raise EnspConnectionException(f"登录异常: {e}") raise EnspConnectionException(f"登录异常: {e}")
@ -88,9 +96,6 @@ class SwitchConfigurator:
return reader, writer return reader, writer
async def _send_ensp_commands(self, ip: str, commands: List[str]) -> bool: async def _send_ensp_commands(self, ip: str, commands: List[str]) -> bool:
"""
通过 Telnet 协议发送命令
"""
try: try:
reader, writer = await self._get_or_create_connection(ip) reader, writer = await self._get_or_create_connection(ip)
@ -98,10 +103,24 @@ class SwitchConfigurator:
if cmd.startswith("!"): if cmd.startswith("!"):
logger.debug(f"跳过特殊命令: {cmd}") logger.debug(f"跳过特殊命令: {cmd}")
continue continue
logger.info(f"[{ip}] 发送命令: {cmd}") logger.info(f"[{ip}] 发送命令: {cmd}")
writer.write(f"{cmd}\n") writer.write(cmd.encode() + b"\n")
await writer.drain() await writer.drain()
await asyncio.sleep(self.ensp_delay) await asyncio.sleep(self.ensp_delay)
try:
output = b""
while True:
chunk = await asyncio.wait_for(reader.read(1024), timeout=1)
if not chunk:
break
output += chunk
if output:
logger.info(f"[{ip}] 返回结果:\n{output.decode(errors='ignore').strip()}")
else:
logger.warning(f"[{ip}] 返回为空")
except asyncio.TimeoutError:
logger.warning(f"[{ip}] 读取返回超时 (命令: {cmd})")
logger.info(f"[{ip}] 所有命令发送完成") logger.info(f"[{ip}] 所有命令发送完成")
return True return True
@ -114,9 +133,5 @@ class SwitchConfigurator:
return False return False
async def execute_raw_commands(self, ip: str, commands: List[str]) -> bool: async def execute_raw_commands(self, ip: str, commands: List[str]) -> bool:
"""
对外接口单台交换机执行命令
"""
async with self.semaphore: async with self.semaphore:
success = await self._send_ensp_commands(ip, commands) return await self._send_ensp_commands(ip, commands)
return success

View File

@ -1,47 +1,56 @@
from typing import Any, List, Dict from typing import Dict, Any, Coroutine
import httpx
from openai import AsyncOpenAI from openai import AsyncOpenAI
import json import json
from src.backend.app.utils.exceptions import SiliconFlowAPIException from src.backend.app.utils.exceptions import SiliconFlowAPIException
from openai.types.chat import ChatCompletionSystemMessageParam, ChatCompletionUserMessageParam from openai.types.chat import ChatCompletionSystemMessageParam, ChatCompletionUserMessageParam
from src.backend.app.utils.logger import logger
class AIService: class AIService:
def __init__(self, api_key: str, api_url: str): def __init__(self, api_key: str, api_url: str):
self.client = AsyncOpenAI(api_key=api_key, base_url=api_url) self.api_key = api_key
self.api_url = api_url
self.client = AsyncOpenAI(
api_key=self.api_key,
base_url=self.api_url,
timeout=httpx.Timeout(30.0)
)
async def parse_command(self, command: str, devices: List[Dict]) -> Dict[str, Any]: async def parse_command(self, command: str, vendor: str = "huawei") -> Any | None:
""" """
针对一组设备和一条自然语言命令生成每台设备的配置 JSON 调用硅基流动API解析中文命令
""" """
devices_str = json.dumps(devices, ensure_ascii=False, indent=2) vendor_prompts = {
"huawei": "华为交换机配置命令",
example = """[{"device": {"name": "sw1","ip": "192.168.1.10","vendor": "huawei","username": "NONE", "password": "Huawei"},"config": {"type": "vlan","vlan_id": 300,"name": "Sales","commands": ["system-view","vlan 300","name Sales","quit","quit","save","Y"]}}]""" "cisco": "思科交换机配置命令",
"h3c": "H3C交换机配置命令",
"ruijie": "锐捷交换机配置命令",
"zte": "中兴交换机配置命令"
}
prompt = f""" prompt = f"""
你是一个网络设备配置专家现在有以下设备 你是一个网络设备配置专家精通各种类型的路由器的配置请将以下用户的中文命令转换为{vendor_prompts.get(vendor, '网络设备')}配置JSON
{devices_str} 但是请注意由于贪婪的人们追求极高的效率所以你必须严格按照 JSON 格式返回数据不要包含任何额外文本或 Markdown 代码块
返回格式要求
1. 必须包含'type'字段指明配置类型(vlan/interface/acl/route等)
2. 必须包含'commands'字段包含可直接执行的命令列表
3. 其他参数根据配置类型动态添加
4. 不要包含解释性文本步骤说明或注释
5. 要包含使用ssh连接交换机后的完整命令包括但不完全包括system-view退出保存等完整操作注意保存还需要输入Y
用户输入了一条命令{command} 根据厂商{vendor}的不同命令格式如下
- 华为: system-view quit save Y
- 思科: enable configure terminal exit write memory
- H3C: system-view quit save
- 锐捷: enable configure terminal exit write
- 中兴: enable configure terminal exit write memory
你的任务 示例命令'创建VLAN 100名称为TEST'
- 为每台设备分别生成配置 华为示例返回{{"type": "vlan", "vlan_id": 100, "name": "TEST", "commands": ["system-view","vlan 100", "name TEST","quit","quit","save","Y"]}}
- 输出一个 JSON 数组每个元素对应一台设备 思科示例返回{{"type": "vlan", "vlan_id": 100, "name": "TEST", "commands": ["enable","configure terminal","vlan 100", "name TEST","exit","exit","write memory"]}}
- 每个对象必须包含: """
- device: 原始设备信息 (name, ip, vendor,username,password)
- config: 配置详情
- type: 配置类型 ( vlan/interface/acl/route)
- commands: 可直接执行的命令数组 (必须包含进入配置退出保存命令)
- 其他字段: 根据配置类型动态添加
- 严格返回 JSON不要包含解释说明或 markdown
各厂商保存命令规则
- 华为: system-view quit save Y
- 思科: enable configure terminal exit write memory
- H3C: system-view quit save
- 锐捷: enable configure terminal exit write
- 中兴: enable configure terminal exit write memory
返回示例仅作为格式参考不要照抄 VLAN ID 和命令内容请根据实际命令生成{example}
"""
messages = [ messages = [
ChatCompletionSystemMessageParam(role="system", content=prompt), ChatCompletionSystemMessageParam(role="system", content=prompt),
@ -52,18 +61,29 @@ class AIService:
response = await self.client.chat.completions.create( response = await self.client.chat.completions.create(
model="deepseek-ai/DeepSeek-V3", model="deepseek-ai/DeepSeek-V3",
messages=messages, messages=messages,
temperature=0.2, temperature=0.3,
max_tokens=1500, max_tokens=1000,
response_format={"type": "json_object"} response_format={"type": "json_object"}
) )
config_str = response.choices[0].message.content.strip() logger.debug(response)
configs = json.loads(config_str)
return {"success": True, "results": configs} config_str = response.choices[0].message.content.strip()
try:
config = json.loads(config_str)
return config
except json.JSONDecodeError:
if config_str.startswith("```json"):
config_str = config_str[7:-3].strip()
return json.loads(config_str)
raise SiliconFlowAPIException("Invalid JSON format returned from AI")
except KeyError:
logger.error(KeyError)
raise SiliconFlowAPIException("errrrrrrro")
except Exception as e: except Exception as e:
raise SiliconFlowAPIException( raise SiliconFlowAPIException(
detail=f"AI 解析配置失败: {str(e)}", detail=f"API请求失败: {str(e)}",
status_code=getattr(e, "status_code", 500) status_code=getattr(e, "status_code", 500)
) )

View File

@ -27,7 +27,6 @@ ENSP_DEFAULT_PORT=2000
print(f"已生成默认配置文件 {ENV_FILE} ,请修改后重新运行程序。") print(f"已生成默认配置文件 {ENV_FILE} ,请修改后重新运行程序。")
sys.exit(1) sys.exit(1)
# 加载 .env 文件
load_dotenv(ENV_FILE) load_dotenv(ENV_FILE)