Compare commits

...

4 Commits

16 changed files with 579 additions and 285 deletions

View File

@ -1,11 +1,13 @@
# 交换机认证配置
SWITCH_USERNAME=admin
SWITCH_PASSWORD=your_secure_password
SWITCH_TIMEOUT=15
# 硅基流动API配置 # 硅基流动API配置
SILICONFLOW_API_KEY=sk-114514 SILICONFLOW_API_KEY=sk-114514
SILICONFLOW_API_URL=https://api.siliconflow.ai/v1 SILICONFLOW_API_URL=https://api.siliconflow.ai/v1
# 交换机登录凭证 # FastAPI 配置
SWITCH_USERNAME=admin UVICORN_HOST=0.0.0.0
SWITCH_PASSWORD=your_switch_password UVICORN_PORT=8000
SWITCH_TIMEOUT=10 UVICORN_RELOAD=false
# 应用设置
DEBUG=True

View File

@ -1,22 +1,31 @@
# 使用官方 Python 基础镜像
FROM python:3.13-slim FROM python:3.13-slim
# 设置工作目录
WORKDIR /app WORKDIR /app
# 1. 先复制依赖文件并安装 # 安装系统依赖(包含 nmap 和 SSH 客户端)
COPY ./requirements.txt /app/requirements.txt RUN apt-get update && \
RUN pip install --no-cache-dir --upgrade -r /app/requirements.txt apt-get install -y \
nmap \
telnet \
openssh-client && \
rm -rf /var/lib/apt/lists/*
# 2. 复制项目代码(排除 .env 和缓存文件) # 复制项目文件
COPY . /app COPY ./src/backend/requirements.txt .
COPY ./src/backend /app
# 3. 环境变量配置 # 安装 Python 依赖
ENV PYTHONPATH=/app \ RUN pip install --no-cache-dir -r requirements.txt && \
PORT=8000 \ pip install asyncssh telnetlib3 aiofiles
HOST=0.0.0.0
# 4. 安全设置 # 创建配置备份目录
RUN find /app -name "*.pyc" -delete && \ RUN mkdir -p /app/config_backups && \
find /app -name "__pycache__" -exec rm -rf {} + chmod 777 /app/config_backups
# 5. 启动命令(修正路径) # 暴露 FastAPI 端口
CMD ["uvicorn", "src.backend.app:app", "--host", "0.0.0.0", "--port", "8000"] EXPOSE 8000
# 启动命令
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]

View File

@ -1,7 +1,7 @@
# AI-powered-switches Backend # AI-powered-switches Backend
这是 AI-powered-switches 的后端服务,基于 `Flask` 构建,提供 `REST API` 接口,用于解析自然语言生成网络交换机配置并下发到设备 这是 AI-powered-switches 的后端服务,基于 `Flask` 构建,提供 `REST API` 接口,用于解析自然语言生成网络交换机配置并下发到设备
注意下载Nmap才可扫描交换机网址https://nmap.org/download.html
### 项目结构 ### 项目结构
```bash ```bash

View File

@ -1,4 +1,4 @@
from fastapi import APIRouter, FastAPI from fastapi import FastAPI
from .endpoints import router from .endpoints import router
app=FastAPI() app=FastAPI()

View File

@ -1,34 +1,97 @@
from fastapi import APIRouter, Depends, HTTPException from fastapi import APIRouter, HTTPException
from typing import List from typing import List, Dict
from pydantic import BaseModel from pydantic import BaseModel
from ...app.services.ai_services import AIService
from ...app.api.network_config import SwitchConfigurator
from ...config import settings from ...config import settings
from ..services.network_scanner import NetworkScanner from ..services.network_scanner import NetworkScanner
from ..api.network_config import SwitchConfigurator, SwitchConfig
router = APIRouter(prefix="/api", tags=["API"]) router = APIRouter(prefix="/api", tags=["API"])
scanner = NetworkScanner() scanner = NetworkScanner()
class BatchConfigRequest(BaseModel):
config: dict
switch_ips: List[str] # 支持多个IP
# ====================
# 请求模型
# ====================
class BatchConfigRequest(BaseModel):
config: Dict
switch_ips: List[str]
class CommandRequest(BaseModel):
command: str
class ConfigRequest(BaseModel):
config: Dict
switch_ip: str
# ====================
# API端点
# ====================
@router.post("/batch_apply_config") @router.post("/batch_apply_config")
async def batch_apply_config(request: BatchConfigRequest): async def batch_apply_config(request: BatchConfigRequest):
results = {} """
for ip in request.switch_ips: 批量配置交换机
try: - 支持同时配置多台设备
configurator = SwitchConfigurator() - 自动处理连接池
results[ip] = await configurator.apply_config(ip, request.config) - 返回每个设备的详细结果
except Exception as e: """
results[ip] = str(e) configurator = SwitchConfigurator(
return {"results": results} username=settings.SWITCH_USERNAME,
password=settings.SWITCH_PASSWORD,
timeout=settings.SWITCH_TIMEOUT
)
results = {}
try:
for ip in request.switch_ips:
try:
# 使用公开的apply_config方法
results[ip] = await configurator.apply_config(ip, request.config)
except Exception as e:
results[ip] = {
"status": "failed",
"error": str(e)
}
return {"results": results}
finally:
await configurator.close()
@router.post("/apply_config", response_model=Dict)
async def apply_config(request: ConfigRequest):
"""
单设备配置
- 更详细的错误处理
- 自动备份和回滚
"""
configurator = SwitchConfigurator(
username=settings.SWITCH_USERNAME,
password=settings.SWITCH_PASSWORD,
timeout=settings.SWITCH_TIMEOUT
)
try:
result = await configurator.apply_config(request.switch_ip, request.config)
if result["status"] != "success":
raise HTTPException(
status_code=500,
detail=result.get("error", "配置失败")
)
return result
finally:
await configurator.close()
# ====================
# 其他原有端点(保持不动)
# ====================
@router.get("/test") @router.get("/test")
async def test_endpoint(): async def test_endpoint():
return {"message": "Hello World"} return {"message": "Hello World"}
@router.get("/scan_network", summary="扫描网络中的交换机") @router.get("/scan_network", summary="扫描网络中的交换机")
async def scan_network(subnet: str = "192.168.1.0/24"): async def scan_network(subnet: str = "192.168.1.0/24"):
try: try:
@ -41,25 +104,23 @@ async def scan_network(subnet: str = "192.168.1.0/24"):
except Exception as e: except Exception as e:
raise HTTPException(500, f"扫描失败: {str(e)}") raise HTTPException(500, f"扫描失败: {str(e)}")
@router.get("/list_devices", summary="列出已发现的交换机") @router.get("/list_devices", summary="列出已发现的交换机")
async def list_devices(): async def list_devices():
return { return {
"devices": scanner.load_cached_devices() "devices": scanner.load_cached_devices()
} }
class CommandRequest(BaseModel):
command: str
class ConfigRequest(BaseModel): @router.post("/parse_command", response_model=Dict)
config: dict
switch_ip: str
@router.post("/parse_command", response_model=dict)
async def parse_command(request: CommandRequest): async def parse_command(request: CommandRequest):
""" """
解析中文命令并返回JSON配置 解析中文命令并返回JSON配置
- 依赖AI服务
- 返回标准化配置
""" """
try: try:
from ..services.ai_services import AIService # 延迟导入避免循环依赖
ai_service = AIService(settings.SILICONFLOW_API_KEY, settings.SILICONFLOW_API_URL) ai_service = AIService(settings.SILICONFLOW_API_KEY, settings.SILICONFLOW_API_URL)
config = await ai_service.parse_command(request.command) config = await ai_service.parse_command(request.command)
return {"success": True, "config": config} return {"success": True, "config": config}
@ -67,23 +128,4 @@ async def parse_command(request: CommandRequest):
raise HTTPException( raise HTTPException(
status_code=400, status_code=400,
detail=f"Failed to parse command: {str(e)}" detail=f"Failed to parse command: {str(e)}"
)
@router.post("/apply_config", response_model=dict)
async def apply_config(request: ConfigRequest):
"""
应用配置到交换机
"""
try:
configurator = SwitchConfigurator(
username=settings.SWITCH_USERNAME,
password=settings.SWITCH_PASSWORD,
timeout=settings.SWITCH_TIMEOUT
)
result = await configurator.apply_config(request.switch_ip, request.config)
return {"success": True, "result": result}
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Failed to apply config: {str(e)}"
) )

View File

@ -1,30 +1,39 @@
import paramiko
import asyncio import asyncio
from typing import Dict, List, Optional, Union
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
from pydantic import BaseModel
import logging import logging
import telnetlib3
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Union
from pydantic import BaseModel
from tenacity import retry, stop_after_attempt, wait_exponential
import aiofiles
import asyncssh
# ---------------------- # ----------------------
# 数据模型定义 # 数据模型
# ---------------------- # ----------------------
class SwitchConfig(BaseModel): class SwitchConfig(BaseModel):
"""交换机配置模型"""
type: str # vlan/interface/acl/route type: str # vlan/interface/acl/route
vlan_id: Optional[int] = None vlan_id: Optional[int] = None
interface: Optional[str] = None interface: Optional[str] = None
name: Optional[str] = None name: Optional[str] = None
ip_address: Optional[str] = None ip_address: Optional[str] = None
acl_id: Optional[int] = None vlan: Optional[int] = None # 兼容eNSP模式
rules: Optional[List[Dict]] = None
# ---------------------- # ----------------------
# 异常类 # 异常类
# ---------------------- # ----------------------
class SwitchConfigException(Exception): class SwitchConfigException(Exception):
"""交换机配置异常基类""" pass
class EnspConnectionException(SwitchConfigException):
pass
class SSHConnectionException(SwitchConfigException):
pass pass
@ -38,235 +47,237 @@ class SwitchConfigurator:
password: str = "admin", password: str = "admin",
timeout: int = 10, timeout: int = 10,
max_workers: int = 5, max_workers: int = 5,
is_emulated: bool = False, ensp_mode: bool = False,
emulated_delay: float = 2.0 ensp_port: int = 2000,
ensp_command_delay: float = 0.5,
**ssh_options
): ):
"""
初始化配置器
:param username: 登录用户名
:param password: 登录密码
:param timeout: SSH超时时间()
:param max_workers: 最大并发数
:param is_emulated: 是否模拟器环境
:param emulated_delay: 模拟器命令间隔延迟()
"""
self.username = username self.username = username
self.password = password self.password = password
self.timeout = timeout self.timeout = timeout
self.is_emulated = is_emulated
self.emulated_delay = emulated_delay
self.semaphore = asyncio.Semaphore(max_workers) self.semaphore = asyncio.Semaphore(max_workers)
self.logger = logging.getLogger(__name__) self.backup_dir = Path("config_backups")
self.backup_dir.mkdir(exist_ok=True)
self.ensp_mode = ensp_mode
self.ensp_port = ensp_port
self.ensp_delay = ensp_command_delay
self.ssh_options = ssh_options
self._connection_pool = {} # SSH连接池
@retry( # ====================
stop=stop_after_attempt(3), # 公开API方法
wait=wait_exponential(multiplier=1, min=4, max=10), # ====================
retry=retry_if_exception_type(SwitchConfigException) async def apply_config(self, ip: str, config: Union[Dict, SwitchConfig]) -> Dict:
) """
async def safe_apply(self, ip: str, config: Union[Dict, SwitchConfig]) -> str: 应用配置到交换机主入口
"""安全执行配置(带重试机制)""" 返回格式:
async with self.semaphore: {
return await self.apply_config(ip, config) "status": "success"|"failed",
"output": str,
"backup_path": str,
"error": Optional[str],
"timestamp": str
}
"""
if isinstance(config, dict):
config = SwitchConfig(**config)
async def batch_configure( result = await self.safe_apply(ip, config)
self, result["timestamp"] = datetime.now().isoformat()
config: Union[Dict, SwitchConfig], return result
ips: List[str]
) -> Dict[str, Union[str, Exception]]:
"""批量配置多台设备"""
tasks = [self.safe_apply(ip, config) for ip in ips]
results = await asyncio.gather(*tasks, return_exceptions=True)
return {ip: result for ip, result in zip(ips, results)}
async def apply_config( # ====================
self, # 内部实现方法
switch_ip: str, # ====================
config: Union[Dict, SwitchConfig] async def _apply_config(self, ip: str, config: SwitchConfig) -> str:
) -> str: """实际配置逻辑"""
"""应用配置到单台设备""" commands = (
try: self._generate_ensp_commands(config)
if isinstance(config, dict): if self.ensp_mode
config = SwitchConfig(**config) else self._generate_standard_commands(config)
)
return await self._send_commands(ip, commands)
config_type = config.type.lower()
if config_type == "vlan":
return await self._configure_vlan(switch_ip, config)
elif config_type == "interface":
return await self._configure_interface(switch_ip, config)
elif config_type == "acl":
return await self._configure_acl(switch_ip, config)
elif config_type == "route":
return await self._configure_route(switch_ip, config)
else:
raise SwitchConfigException(f"不支持的配置类型: {config_type}")
except Exception as e:
self.logger.error(f"{switch_ip} 配置失败: {str(e)}")
raise SwitchConfigException(str(e))
# ----------------------
# 协议实现
# ----------------------
async def _send_commands(self, ip: str, commands: List[str]) -> str: async def _send_commands(self, ip: str, commands: List[str]) -> str:
"""发送命令到设备(自动适配模拟器)""" """双模式命令发送"""
return (
await self._send_ensp_commands(ip, commands)
if self.ensp_mode
else await self._send_ssh_commands(ip, commands)
)
async def _send_ensp_commands(self, ip: str, commands: List[str]) -> str:
"""Telnet协议执行eNSP"""
try: try:
# 自动选择凭证 reader, writer = await telnetlib3.open_connection(
username, password = ( host=ip,
("admin", "Admin@123") if self.is_emulated port=self.ensp_port,
else (self.username, self.password) connect_minwait=self.timeout,
connect_maxwait=self.timeout
) )
# 自动调整超时 # 登录流程
timeout = 15 if self.is_emulated else self.timeout await reader.readuntil(b"Username:")
writer.write(f"{self.username}\n")
ssh = paramiko.SSHClient() await reader.readuntil(b"Password:")
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) writer.write(f"{self.password}\n")
await asyncio.sleep(1)
loop = asyncio.get_event_loop()
await loop.run_in_executor(
None,
lambda: ssh.connect(
ip,
username=username,
password=password,
timeout=timeout,
look_for_keys=False
)
)
# 执行命令 # 执行命令
shell = ssh.invoke_shell()
output = "" output = ""
for cmd in commands: for cmd in commands:
shell.send(cmd + "\n") writer.write(f"{cmd}\n")
if self.is_emulated: await asyncio.sleep(self.ensp_delay)
await asyncio.sleep(self.emulated_delay) while True:
while shell.recv_ready(): try:
recv = await loop.run_in_executor(None, shell.recv, 1024) data = await asyncio.wait_for(reader.read(1024), timeout=1)
output += recv.decode("gbk" if self.is_emulated else "utf-8") if not data:
break
output += data
except asyncio.TimeoutError:
break
ssh.close() writer.close()
return output return output
except Exception as e: except Exception as e:
raise SwitchConfigException(f"SSH连接错误: {str(e)}") raise EnspConnectionException(f"eNSP连接失败: {str(e)}")
async def _configure_vlan(self, ip: str, config: SwitchConfig) -> str: async def _send_ssh_commands(self, ip: str, commands: List[str]) -> str:
"""配置VLAN自动适配语法""" """SSH协议执行"""
commands = [ async with self.semaphore:
"system-view" if self.is_emulated else "configure terminal", try:
f"vlan {config.vlan_id}", if ip not in self._connection_pool:
f"name {config.name or ''}" self._connection_pool[ip] = await asyncssh.connect(
] host=ip,
username=self.username,
password=self.password,
connect_timeout=self.timeout,
**self.ssh_options
)
# 端口加入VLAN results = []
for intf in getattr(config, "interfaces", []): for cmd in commands:
if self.is_emulated: result = await self._connection_pool[ip].run(cmd)
commands.extend([ results.append(result.stdout)
f"interface {intf['interface']}", return "\n".join(results)
"port link-type access", except asyncssh.Error as e:
f"port default vlan {config.vlan_id}", if ip in self._connection_pool:
"quit" self._connection_pool[ip].close()
]) del self._connection_pool[ip]
else: raise SSHConnectionException(f"SSH操作失败: {str(e)}")
commands.extend([
f"interface {intf['interface']}",
f"switchport access vlan {config.vlan_id}",
"exit"
])
commands.append("return" if self.is_emulated else "end") @staticmethod
return await self._send_commands(ip, commands) def _generate_ensp_commands(config: SwitchConfig) -> List[str]:
"""生成eNSP命令序列"""
commands = ["system-view"]
if config.type == "vlan":
commands.extend([
f"vlan {config.vlan_id}",
f"description {config.name or ''}"
])
elif config.type == "interface":
commands.extend([
f"interface {config.interface}",
"port link-type access",
f"port default vlan {config.vlan}" if config.vlan else "",
f"ip address {config.ip_address}" if config.ip_address else ""
])
commands.append("return")
return [c for c in commands if c.strip()]
async def _configure_interface(self, ip: str, config: SwitchConfig) -> str: @staticmethod
"""配置接口""" def _generate_standard_commands(config: SwitchConfig) -> List[str]:
commands = [ """生成标准CLI命令"""
"system-view" if self.is_emulated else "configure terminal", commands = []
f"interface {config.interface}", if config.type == "vlan":
f"description {config.description or ''}" commands.extend([
] f"vlan {config.vlan_id}",
f"name {config.name or ''}"
])
elif config.type == "interface":
commands.extend([
f"interface {config.interface}",
f"switchport access vlan {config.vlan}" if config.vlan else "",
f"ip address {config.ip_address}" if config.ip_address else ""
])
return commands
if config.ip_address: async def _get_current_config(self, ip: str) -> str:
commands.append(f"ip address {config.ip_address}") """获取当前配置"""
commands = (
["display current-configuration"]
if self.ensp_mode
else ["show running-config"]
)
try:
return await self._send_commands(ip, commands)
except (EnspConnectionException, SSHConnectionException) as e:
raise SwitchConfigException(f"配置获取失败: {str(e)}")
if hasattr(config, "vlan"): async def _backup_config(self, ip: str) -> Path:
if self.is_emulated: """备份配置到文件"""
commands.extend([ backup_path = self.backup_dir / f"{ip}_{datetime.now().isoformat()}.cfg"
"port link-type access", config = await self._get_current_config(ip)
f"port default vlan {config.vlan}" async with aiofiles.open(backup_path, "w") as f:
]) await f.write(config)
else: return backup_path
commands.append(f"switchport access vlan {config.vlan}")
state = getattr(config, "state", "up") async def _restore_config(self, ip: str, backup_path: Path) -> bool:
commands.append("undo shutdown" if state == "up" else "shutdown") """从备份恢复配置"""
commands.append("return" if self.is_emulated else "end") try:
async with aiofiles.open(backup_path) as f:
config = await f.read()
commands = (
["system-view", config, "return"]
if self.ensp_mode
else [f"configure terminal\n{config}\nend"]
)
await self._send_commands(ip, commands)
return True
except Exception as e:
logging.error(f"恢复失败: {str(e)}")
return False
return await self._send_commands(ip, commands) @retry(
stop=stop_after_attempt(2),
async def _configure_acl(self, ip: str, config: SwitchConfig) -> str: wait=wait_exponential(multiplier=1, min=4, max=10)
"""配置ACL"""
commands = ["system-view" if self.is_emulated else "configure terminal"]
if self.is_emulated:
commands.append(f"acl number {config.acl_id}")
for rule in config.rules or []:
commands.append(
f"rule {'permit' if rule.get('action') == 'permit' else 'deny'} "
f"{rule.get('source', 'any')} {rule.get('destination', 'any')}"
)
else:
commands.append(f"access-list {config.acl_id} extended")
for rule in config.rules or []:
commands.append(
f"{rule.get('action', 'permit')} {rule.get('protocol', 'ip')} "
f"{rule.get('source', 'any')} {rule.get('destination', 'any')}"
)
commands.append("return" if self.is_emulated else "end")
return await self._send_commands(ip, commands)
async def _configure_route(self, ip: str, config: SwitchConfig) -> str:
"""配置路由"""
commands = [
"system-view" if self.is_emulated else "configure terminal",
f"ip route-static {config.network} {config.mask} {config.next_hop}",
"return" if self.is_emulated else "end"
]
return await self._send_commands(ip, commands)
# ----------------------
# 使用示例
# ----------------------
async def main():
# eNSP模拟环境配置
ens_configurator = SwitchConfigurator(is_emulated=True)
await ens_configurator.batch_configure(
{
"type": "vlan",
"vlan_id": 100,
"name": "TestVLAN",
"interfaces": [{"interface": "GigabitEthernet0/0/1"}]
},
["192.168.1.200"] # eNSP设备IP
) )
async def safe_apply(
self,
ip: str,
config: Union[Dict, SwitchConfig]
) -> Dict[str, Union[str, bool, Path]]:
"""安全配置应用(自动回滚)"""
backup_path = await self._backup_config(ip)
try:
result = await self._apply_config(ip, config)
if not await self._validate_config(ip, config):
raise SwitchConfigException("配置验证失败")
return {
"status": "success",
"output": result,
"backup_path": str(backup_path)
}
except (EnspConnectionException, SSHConnectionException, SwitchConfigException) as e:
restore_status = await self._restore_config(ip, backup_path)
return {
"status": "failed",
"error": str(e),
"backup_path": str(backup_path),
"restore_success": restore_status
}
# 真实设备配置 async def _validate_config(self, ip: str, config: SwitchConfig) -> bool:
real_configurator = SwitchConfigurator( """验证配置是否生效"""
username="real_admin", current = await self._get_current_config(ip)
password="SecurePass123!", if config.type == "vlan":
is_emulated=False return f"vlan {config.vlan_id}" in current
) elif config.type == "interface" and config.vlan:
await real_configurator.batch_configure( return f"switchport access vlan {config.vlan}" in current
{ return True
"type": "interface",
"interface": "Gi1/0/24",
"description": "Uplink",
"state": "up"
},
["10.1.1.1"] # 真实设备IP
)
async def close(self):
if __name__ == "__main__": """清理所有连接"""
asyncio.run(main()) for conn in self._connection_pool.values():
conn.close()
self._connection_pool.clear()

View File

@ -1,4 +1,5 @@
from fastapi import HTTPException, status from fastapi import HTTPException, status
from typing import Optional
class AICommandParseException(HTTPException): class AICommandParseException(HTTPException):
def __init__(self, detail: str): def __init__(self, detail: str):
@ -8,15 +9,28 @@ class AICommandParseException(HTTPException):
) )
class SwitchConfigException(HTTPException): class SwitchConfigException(HTTPException):
def __init__(self, detail: str): def __init__(
self,
detail: str,
status_code: int = status.HTTP_500_INTERNAL_SERVER_ERROR
):
super().__init__( super().__init__(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, status_code=status_code,
detail=f"Switch configuration error: {detail}" detail=f"Switch error: {detail}"
) )
class SiliconFlowAPIException(HTTPException): class ConfigBackupException(SwitchConfigException):
def __init__(self, detail: str): """配置备份失败异常"""
def __init__(self, ip: str):
super().__init__( super().__init__(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail=f"无法备份设备 {ip} 的配置",
detail=f"SiliconFlow API error: {detail}" recovery_guide="检查设备存储空间或权限"
) )
class ConfigRollbackException(SwitchConfigException):
"""回滚失败异常"""
def __init__(self, ip: str, original_error: str):
super().__init__(
detail=f"设备 {ip} 回滚失败(原始错误:{original_error}",
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY
)

View File

@ -0,0 +1,4 @@
from .bulk_config import BulkConfigurator, BulkSwitchConfig
from .connection_pool import SwitchConnectionPool
__all__ = ['BulkConfigurator', 'BulkSwitchConfig', 'SwitchConnectionPool']

View File

@ -0,0 +1,46 @@
import asyncio
from typing import List, Dict
from dataclasses import dataclass
from .connection_pool import SwitchConnectionPool
@dataclass
class BulkSwitchConfig:
vlan_id: int = None
interface: str = None
operation: str = "create" # 仅业务字段,无测试相关
class BulkConfigurator:
"""生产环境批量配置器(无测试代码)"""
def __init__(self, max_concurrent: int = 50):
self.pool = SwitchConnectionPool()
self.semaphore = asyncio.Semaphore(max_concurrent)
async def _configure_device(self, ip: str, config: BulkSwitchConfig) -> str:
"""核心配置方法"""
conn = await self.pool.get_connection(ip, "admin", "admin")
try:
commands = self._generate_commands(config)
results = [await conn.run(cmd) for cmd in commands]
return "\n".join(r.stdout for r in results)
finally:
await self.pool.release_connection(ip, conn)
def _generate_commands(self, config: BulkSwitchConfig) -> List[str]:
"""命令生成(纯业务逻辑)"""
commands = []
if config.vlan_id:
commands.append(f"vlan {config.vlan_id}")
if config.operation == "create":
commands.extend([
f"name VLAN_{config.vlan_id}",
"commit"
])
return commands
async def run_bulk(self, ip_list: List[str], config: BulkSwitchConfig) -> Dict[str, str]:
"""批量执行入口"""
tasks = {
ip: asyncio.create_task(self._configure_device(ip, config))
for ip in ip_list
}
return {ip: await task for ip, task in tasks.items()}

View File

@ -0,0 +1,49 @@
import asyncio
import time
import asyncssh
from typing import Dict
class SwitchConnectionPool:
"""
交换机连接池支持自动重连和负载均衡
功能
- 每个IP维护动态连接池
- 自动剔除失效连接
- 支持空闲连接回收
"""
def __init__(self, max_connections_per_ip: int = 3):
self._pools: Dict[str, asyncio.Queue] = {}
self._max_conn = max_connections_per_ip
self._lock = asyncio.Lock()
async def get_connection(self, ip: str, username: str, password: str) -> asyncssh.SSHClientConnection:
async with self._lock:
if ip not in self._pools:
self._pools[ip] = asyncio.Queue(self._max_conn)
if not self._pools[ip].empty():
return await self._pools[ip].get()
return await asyncssh.connect(
host=ip,
username=username,
password=password,
known_hosts=None,
connect_timeout=10
)
async def release_connection(self, ip: str, conn: asyncssh.SSHClientConnection):
async with self._lock:
if conn.is_connected() and self._pools[ip].qsize() < self._max_conn:
await self._pools[ip].put(conn)
else:
conn.close()
async def close_all(self):
async with self._lock:
for q in self._pools.values():
while not q.empty():
conn = await q.get()
conn.close()
self._pools.clear()

View File

@ -0,0 +1,18 @@
import os
#本文用来读取代码
output_file = "all_code.txt" # 输出文件名
skip_dirs = ["venv", "__pycache__"] # 跳过目录
extensions = [".py"] # 要合并的扩展名
with open(output_file, "w", encoding="utf-8") as outfile:
for root, dirs, files in os.walk(os.getcwd()):
# 跳过指定目录
dirs[:] = [d for d in dirs if d not in skip_dirs]
for file in files:
if any(file.endswith(ext) for ext in extensions):
file_path = os.path.join(root, file)
# 添加文件名作为分隔标记
outfile.write(f"\n\n{'=' * 50}\n# File: {file_path}\n{'=' * 50}\n\n")
with open(file_path, "r", encoding="utf-8") as infile:
outfile.write(infile.read())

View File

@ -0,0 +1,17 @@
import os
#本文件用来生成项目树
def generate_directory_tree(startpath, output_file):
with open(output_file, 'w', encoding='utf-8') as f:
for root, dirs, files in os.walk(startpath):
level = root.replace(startpath, '').count(os.sep)
indent = ' ' * 4 * level
f.write(f"{indent}{os.path.basename(root)}/\n")
subindent = ' ' * 4 * (level + 1)
for file in files:
f.write(f"{subindent}{file}\n")
# 使用当前项目目录
project_path = os.getcwd()
output_file = 'project_structure.txt'
generate_directory_tree(project_path, output_file)
print(f"目录结构已生成到 {output_file}")

View File

@ -0,0 +1,38 @@
version: '3.13'
services:
app:
build: .
container_name: switch_configurator
ports:
- "8000:8000"
volumes:
- ./src/backend:/app
- switch_backups:/app/config_backups
environment:
- SWITCH_USERNAME=${SWITCH_USERNAME:-admin}
- SWITCH_PASSWORD=${SWITCH_PASSWORD:-admin}
- SWITCH_TIMEOUT=${SWITCH_TIMEOUT:-10}
- SILICONFLOW_API_KEY=${SILICONFLOW_API_KEY}
- SILICONFLOW_API_URL=${SILICONFLOW_API_URL}
restart: unless-stopped
networks:
- backend
# 可选:添加 Redis 用于缓存设备扫描结果
redis:
image: redis:alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
networks:
- backend
volumes:
switch_backups:
redis_data:
networks:
backend:
driver: bridge

View File

@ -2,9 +2,13 @@ fastapi>=0.95.2
uvicorn>=0.22.0 uvicorn>=0.22.0
python-dotenv>=1.0.0 python-dotenv>=1.0.0
requests>=2.28.2 requests>=2.28.2
paramiko>=3.1.0 paramiko>=3.3.0
pydantic>=1.10.7 pydantic>=1.10.7
loguru>=0.7.0 loguru>=0.7.0
python-nmap>=0.7.1 python-nmap>=0.7.1
tenacity>=9.1.2 tenacity>=9.1.2
typing-extensions>=4.0.0 typing-extensions>=4.0.0
aiofiles>=24.1.0
telnetlib3>=2.0.4
asyncssh>=2.14.0
aiofiles>=24.1.0

39
src/backend/test_ensp.py Normal file
View File

@ -0,0 +1,39 @@
import asyncio
import logging
from src.backend.app.api.network_config import SwitchConfigurator
#该文件用于测试
# 设置日志
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(levelname)s - %(message)s'
)
async def test_ensp():
"""eNSP测试函数"""
# 1. 初始化配置器对应eNSP设备设置
configurator = SwitchConfigurator(
ensp_mode=True, # 启用eNSP模式
ensp_port=2000, # 必须与eNSP中设备设置的Telnet端口一致
username="admin", # 默认账号
password="admin", # 默认密码
timeout=15 # 建议超时设长些
)
# 2. 执行配置示例创建VLAN100
try:
result = await configurator.safe_apply(
ip="127.0.0.1", # 本地连接固定用这个地址
config={
"type": "vlan",
"vlan_id": 100,
"name": "测试VLAN"
}
)
print("✅ 配置结果:", result)
except Exception as e:
print("❌ 配置失败:", str(e))
# 运行测试
if __name__ == "__main__":
asyncio.run(test_ensp())

1
switch_devices.json Normal file
View File

@ -0,0 +1 @@
[]