Compare commits

..

No commits in common. "c5b6a9b8f396c5a5c87f83e6db837d4110c95aad" and "bd85dff23427f9c964734758d129cd597d641b4e" have entirely different histories.

16 changed files with 300 additions and 594 deletions

View File

@ -1,13 +1,11 @@
# 交换机认证配置
SWITCH_USERNAME=admin
SWITCH_PASSWORD=your_secure_password
SWITCH_TIMEOUT=15
# 硅基流动API配置
SILICONFLOW_API_KEY=sk-114514
SILICONFLOW_API_URL=https://api.siliconflow.ai/v1
# FastAPI 配置
UVICORN_HOST=0.0.0.0
UVICORN_PORT=8000
UVICORN_RELOAD=false
# 交换机登录凭证
SWITCH_USERNAME=admin
SWITCH_PASSWORD=your_switch_password
SWITCH_TIMEOUT=10
# 应用设置
DEBUG=True

View File

@ -1,31 +1,22 @@
# 使用官方 Python 基础镜像
FROM python:3.13-slim
# 设置工作目录
WORKDIR /app
# 安装系统依赖(包含 nmap 和 SSH 客户端)
RUN apt-get update && \
apt-get install -y \
nmap \
telnet \
openssh-client && \
rm -rf /var/lib/apt/lists/*
# 1. 先复制依赖文件并安装
COPY ./requirements.txt /app/requirements.txt
RUN pip install --no-cache-dir --upgrade -r /app/requirements.txt
# 复制项目文件
COPY ./src/backend/requirements.txt .
COPY ./src/backend /app
# 2. 复制项目代码(排除 .env 和缓存文件)
COPY . /app
# 安装 Python 依赖
RUN pip install --no-cache-dir -r requirements.txt && \
pip install asyncssh telnetlib3 aiofiles
# 3. 环境变量配置
ENV PYTHONPATH=/app \
PORT=8000 \
HOST=0.0.0.0
# 创建配置备份目录
RUN mkdir -p /app/config_backups && \
chmod 777 /app/config_backups
# 4. 安全设置
RUN find /app -name "*.pyc" -delete && \
find /app -name "__pycache__" -exec rm -rf {} +
# 暴露 FastAPI 端口
EXPOSE 8000
# 启动命令
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
# 5. 启动命令(修正路径)
CMD ["uvicorn", "src.backend.app:app", "--host", "0.0.0.0", "--port", "8000"]

View File

@ -1,7 +1,7 @@
# AI-powered-switches Backend
这是 AI-powered-switches 的后端服务,基于 `Flask` 构建,提供 `REST API` 接口,用于解析自然语言生成网络交换机配置并下发到设备
注意下载Nmap才可扫描交换机网址https://nmap.org/download.html
### 项目结构
```bash

View File

@ -1,4 +1,4 @@
from fastapi import FastAPI
from fastapi import APIRouter, FastAPI
from .endpoints import router
app=FastAPI()

View File

@ -1,97 +1,34 @@
from fastapi import APIRouter, HTTPException
from typing import List, Dict
from fastapi import APIRouter, Depends, HTTPException
from typing import List
from pydantic import BaseModel
from ...app.services.ai_services import AIService
from ...app.api.network_config import SwitchConfigurator
from ...config import settings
from ..services.network_scanner import NetworkScanner
from ..api.network_config import SwitchConfigurator, SwitchConfig
router = APIRouter(prefix="/api", tags=["API"])
scanner = NetworkScanner()
# ====================
# 请求模型
# ====================
class BatchConfigRequest(BaseModel):
config: Dict
switch_ips: List[str]
config: dict
switch_ips: List[str] # 支持多个IP
class CommandRequest(BaseModel):
command: str
class ConfigRequest(BaseModel):
config: Dict
switch_ip: str
# ====================
# API端点
# ====================
@router.post("/batch_apply_config")
async def batch_apply_config(request: BatchConfigRequest):
"""
批量配置交换机
- 支持同时配置多台设备
- 自动处理连接池
- 返回每个设备的详细结果
"""
configurator = SwitchConfigurator(
username=settings.SWITCH_USERNAME,
password=settings.SWITCH_PASSWORD,
timeout=settings.SWITCH_TIMEOUT
)
results = {}
try:
for ip in request.switch_ips:
try:
# 使用公开的apply_config方法
results[ip] = await configurator.apply_config(ip, request.config)
except Exception as e:
results[ip] = {
"status": "failed",
"error": str(e)
}
return {"results": results}
finally:
await configurator.close()
for ip in request.switch_ips:
try:
configurator = SwitchConfigurator()
results[ip] = await configurator.apply_config(ip, request.config)
except Exception as e:
results[ip] = str(e)
return {"results": results}
@router.post("/apply_config", response_model=Dict)
async def apply_config(request: ConfigRequest):
"""
单设备配置
- 更详细的错误处理
- 自动备份和回滚
"""
configurator = SwitchConfigurator(
username=settings.SWITCH_USERNAME,
password=settings.SWITCH_PASSWORD,
timeout=settings.SWITCH_TIMEOUT
)
try:
result = await configurator.apply_config(request.switch_ip, request.config)
if result["status"] != "success":
raise HTTPException(
status_code=500,
detail=result.get("error", "配置失败")
)
return result
finally:
await configurator.close()
# ====================
# 其他原有端点(保持不动)
# ====================
@router.get("/test")
async def test_endpoint():
return {"message": "Hello World"}
@router.get("/scan_network", summary="扫描网络中的交换机")
async def scan_network(subnet: str = "192.168.1.0/24"):
try:
@ -104,23 +41,25 @@ async def scan_network(subnet: str = "192.168.1.0/24"):
except Exception as e:
raise HTTPException(500, f"扫描失败: {str(e)}")
@router.get("/list_devices", summary="列出已发现的交换机")
async def list_devices():
return {
"devices": scanner.load_cached_devices()
}
class CommandRequest(BaseModel):
command: str
@router.post("/parse_command", response_model=Dict)
class ConfigRequest(BaseModel):
config: dict
switch_ip: str
@router.post("/parse_command", response_model=dict)
async def parse_command(request: CommandRequest):
"""
解析中文命令并返回JSON配置
- 依赖AI服务
- 返回标准化配置
"""
try:
from ..services.ai_services import AIService # 延迟导入避免循环依赖
ai_service = AIService(settings.SILICONFLOW_API_KEY, settings.SILICONFLOW_API_URL)
config = await ai_service.parse_command(request.command)
return {"success": True, "config": config}
@ -129,3 +68,22 @@ async def parse_command(request: CommandRequest):
status_code=400,
detail=f"Failed to parse command: {str(e)}"
)
@router.post("/apply_config", response_model=dict)
async def apply_config(request: ConfigRequest):
"""
应用配置到交换机
"""
try:
configurator = SwitchConfigurator(
username=settings.SWITCH_USERNAME,
password=settings.SWITCH_PASSWORD,
timeout=settings.SWITCH_TIMEOUT
)
result = await configurator.apply_config(request.switch_ip, request.config)
return {"success": True, "result": result}
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Failed to apply config: {str(e)}"
)

View File

@ -1,39 +1,30 @@
import paramiko
import asyncio
import logging
import telnetlib3
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Union
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
from pydantic import BaseModel
from tenacity import retry, stop_after_attempt, wait_exponential
import aiofiles
import asyncssh
import logging
# ----------------------
# 数据模型
# 数据模型定义
# ----------------------
class SwitchConfig(BaseModel):
"""交换机配置模型"""
type: str # vlan/interface/acl/route
vlan_id: Optional[int] = None
interface: Optional[str] = None
name: Optional[str] = None
ip_address: Optional[str] = None
vlan: Optional[int] = None # 兼容eNSP模式
acl_id: Optional[int] = None
rules: Optional[List[Dict]] = None
# ----------------------
# 异常类
# ----------------------
class SwitchConfigException(Exception):
pass
class EnspConnectionException(SwitchConfigException):
pass
class SSHConnectionException(SwitchConfigException):
"""交换机配置异常基类"""
pass
@ -47,237 +38,235 @@ class SwitchConfigurator:
password: str = "admin",
timeout: int = 10,
max_workers: int = 5,
ensp_mode: bool = False,
ensp_port: int = 2000,
ensp_command_delay: float = 0.5,
**ssh_options
is_emulated: bool = False,
emulated_delay: float = 2.0
):
"""
初始化配置器
:param username: 登录用户名
:param password: 登录密码
:param timeout: SSH超时时间()
:param max_workers: 最大并发数
:param is_emulated: 是否模拟器环境
:param emulated_delay: 模拟器命令间隔延迟()
"""
self.username = username
self.password = password
self.timeout = timeout
self.is_emulated = is_emulated
self.emulated_delay = emulated_delay
self.semaphore = asyncio.Semaphore(max_workers)
self.backup_dir = Path("config_backups")
self.backup_dir.mkdir(exist_ok=True)
self.ensp_mode = ensp_mode
self.ensp_port = ensp_port
self.ensp_delay = ensp_command_delay
self.ssh_options = ssh_options
self._connection_pool = {} # SSH连接池
# ====================
# 公开API方法
# ====================
async def apply_config(self, ip: str, config: Union[Dict, SwitchConfig]) -> Dict:
"""
应用配置到交换机主入口
返回格式:
{
"status": "success"|"failed",
"output": str,
"backup_path": str,
"error": Optional[str],
"timestamp": str
}
"""
if isinstance(config, dict):
config = SwitchConfig(**config)
result = await self.safe_apply(ip, config)
result["timestamp"] = datetime.now().isoformat()
return result
# ====================
# 内部实现方法
# ====================
async def _apply_config(self, ip: str, config: SwitchConfig) -> str:
"""实际配置逻辑"""
commands = (
self._generate_ensp_commands(config)
if self.ensp_mode
else self._generate_standard_commands(config)
)
return await self._send_commands(ip, commands)
async def _send_commands(self, ip: str, commands: List[str]) -> str:
"""双模式命令发送"""
return (
await self._send_ensp_commands(ip, commands)
if self.ensp_mode
else await self._send_ssh_commands(ip, commands)
)
async def _send_ensp_commands(self, ip: str, commands: List[str]) -> str:
"""Telnet协议执行eNSP"""
try:
reader, writer = await telnetlib3.open_connection(
host=ip,
port=self.ensp_port,
connect_minwait=self.timeout,
connect_maxwait=self.timeout
)
# 登录流程
await reader.readuntil(b"Username:")
writer.write(f"{self.username}\n")
await reader.readuntil(b"Password:")
writer.write(f"{self.password}\n")
await asyncio.sleep(1)
# 执行命令
output = ""
for cmd in commands:
writer.write(f"{cmd}\n")
await asyncio.sleep(self.ensp_delay)
while True:
try:
data = await asyncio.wait_for(reader.read(1024), timeout=1)
if not data:
break
output += data
except asyncio.TimeoutError:
break
writer.close()
return output
except Exception as e:
raise EnspConnectionException(f"eNSP连接失败: {str(e)}")
async def _send_ssh_commands(self, ip: str, commands: List[str]) -> str:
"""SSH协议执行"""
async with self.semaphore:
try:
if ip not in self._connection_pool:
self._connection_pool[ip] = await asyncssh.connect(
host=ip,
username=self.username,
password=self.password,
connect_timeout=self.timeout,
**self.ssh_options
)
results = []
for cmd in commands:
result = await self._connection_pool[ip].run(cmd)
results.append(result.stdout)
return "\n".join(results)
except asyncssh.Error as e:
if ip in self._connection_pool:
self._connection_pool[ip].close()
del self._connection_pool[ip]
raise SSHConnectionException(f"SSH操作失败: {str(e)}")
@staticmethod
def _generate_ensp_commands(config: SwitchConfig) -> List[str]:
"""生成eNSP命令序列"""
commands = ["system-view"]
if config.type == "vlan":
commands.extend([
f"vlan {config.vlan_id}",
f"description {config.name or ''}"
])
elif config.type == "interface":
commands.extend([
f"interface {config.interface}",
"port link-type access",
f"port default vlan {config.vlan}" if config.vlan else "",
f"ip address {config.ip_address}" if config.ip_address else ""
])
commands.append("return")
return [c for c in commands if c.strip()]
@staticmethod
def _generate_standard_commands(config: SwitchConfig) -> List[str]:
"""生成标准CLI命令"""
commands = []
if config.type == "vlan":
commands.extend([
f"vlan {config.vlan_id}",
f"name {config.name or ''}"
])
elif config.type == "interface":
commands.extend([
f"interface {config.interface}",
f"switchport access vlan {config.vlan}" if config.vlan else "",
f"ip address {config.ip_address}" if config.ip_address else ""
])
return commands
async def _get_current_config(self, ip: str) -> str:
"""获取当前配置"""
commands = (
["display current-configuration"]
if self.ensp_mode
else ["show running-config"]
)
try:
return await self._send_commands(ip, commands)
except (EnspConnectionException, SSHConnectionException) as e:
raise SwitchConfigException(f"配置获取失败: {str(e)}")
async def _backup_config(self, ip: str) -> Path:
"""备份配置到文件"""
backup_path = self.backup_dir / f"{ip}_{datetime.now().isoformat()}.cfg"
config = await self._get_current_config(ip)
async with aiofiles.open(backup_path, "w") as f:
await f.write(config)
return backup_path
async def _restore_config(self, ip: str, backup_path: Path) -> bool:
"""从备份恢复配置"""
try:
async with aiofiles.open(backup_path) as f:
config = await f.read()
commands = (
["system-view", config, "return"]
if self.ensp_mode
else [f"configure terminal\n{config}\nend"]
)
await self._send_commands(ip, commands)
return True
except Exception as e:
logging.error(f"恢复失败: {str(e)}")
return False
self.logger = logging.getLogger(__name__)
@retry(
stop=stop_after_attempt(2),
wait=wait_exponential(multiplier=1, min=4, max=10)
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10),
retry=retry_if_exception_type(SwitchConfigException)
)
async def safe_apply(
async def safe_apply(self, ip: str, config: Union[Dict, SwitchConfig]) -> str:
"""安全执行配置(带重试机制)"""
async with self.semaphore:
return await self.apply_config(ip, config)
async def batch_configure(
self,
ip: str,
config: Union[Dict, SwitchConfig],
ips: List[str]
) -> Dict[str, Union[str, Exception]]:
"""批量配置多台设备"""
tasks = [self.safe_apply(ip, config) for ip in ips]
results = await asyncio.gather(*tasks, return_exceptions=True)
return {ip: result for ip, result in zip(ips, results)}
async def apply_config(
self,
switch_ip: str,
config: Union[Dict, SwitchConfig]
) -> Dict[str, Union[str, bool, Path]]:
"""安全配置应用(自动回滚)"""
backup_path = await self._backup_config(ip)
) -> str:
"""应用配置到单台设备"""
try:
result = await self._apply_config(ip, config)
if not await self._validate_config(ip, config):
raise SwitchConfigException("配置验证失败")
return {
"status": "success",
"output": result,
"backup_path": str(backup_path)
}
except (EnspConnectionException, SSHConnectionException, SwitchConfigException) as e:
restore_status = await self._restore_config(ip, backup_path)
return {
"status": "failed",
"error": str(e),
"backup_path": str(backup_path),
"restore_success": restore_status
}
if isinstance(config, dict):
config = SwitchConfig(**config)
async def _validate_config(self, ip: str, config: SwitchConfig) -> bool:
"""验证配置是否生效"""
current = await self._get_current_config(ip)
if config.type == "vlan":
return f"vlan {config.vlan_id}" in current
elif config.type == "interface" and config.vlan:
return f"switchport access vlan {config.vlan}" in current
return True
config_type = config.type.lower()
if config_type == "vlan":
return await self._configure_vlan(switch_ip, config)
elif config_type == "interface":
return await self._configure_interface(switch_ip, config)
elif config_type == "acl":
return await self._configure_acl(switch_ip, config)
elif config_type == "route":
return await self._configure_route(switch_ip, config)
else:
raise SwitchConfigException(f"不支持的配置类型: {config_type}")
except Exception as e:
self.logger.error(f"{switch_ip} 配置失败: {str(e)}")
raise SwitchConfigException(str(e))
async def close(self):
"""清理所有连接"""
for conn in self._connection_pool.values():
conn.close()
self._connection_pool.clear()
# ----------------------
# 协议实现
# ----------------------
async def _send_commands(self, ip: str, commands: List[str]) -> str:
"""发送命令到设备(自动适配模拟器)"""
try:
# 自动选择凭证
username, password = (
("admin", "Admin@123") if self.is_emulated
else (self.username, self.password)
)
# 自动调整超时
timeout = 15 if self.is_emulated else self.timeout
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
loop = asyncio.get_event_loop()
await loop.run_in_executor(
None,
lambda: ssh.connect(
ip,
username=username,
password=password,
timeout=timeout,
look_for_keys=False
)
)
# 执行命令
shell = ssh.invoke_shell()
output = ""
for cmd in commands:
shell.send(cmd + "\n")
if self.is_emulated:
await asyncio.sleep(self.emulated_delay)
while shell.recv_ready():
recv = await loop.run_in_executor(None, shell.recv, 1024)
output += recv.decode("gbk" if self.is_emulated else "utf-8")
ssh.close()
return output
except Exception as e:
raise SwitchConfigException(f"SSH连接错误: {str(e)}")
async def _configure_vlan(self, ip: str, config: SwitchConfig) -> str:
"""配置VLAN自动适配语法"""
commands = [
"system-view" if self.is_emulated else "configure terminal",
f"vlan {config.vlan_id}",
f"name {config.name or ''}"
]
# 端口加入VLAN
for intf in getattr(config, "interfaces", []):
if self.is_emulated:
commands.extend([
f"interface {intf['interface']}",
"port link-type access",
f"port default vlan {config.vlan_id}",
"quit"
])
else:
commands.extend([
f"interface {intf['interface']}",
f"switchport access vlan {config.vlan_id}",
"exit"
])
commands.append("return" if self.is_emulated else "end")
return await self._send_commands(ip, commands)
async def _configure_interface(self, ip: str, config: SwitchConfig) -> str:
"""配置接口"""
commands = [
"system-view" if self.is_emulated else "configure terminal",
f"interface {config.interface}",
f"description {config.description or ''}"
]
if config.ip_address:
commands.append(f"ip address {config.ip_address}")
if hasattr(config, "vlan"):
if self.is_emulated:
commands.extend([
"port link-type access",
f"port default vlan {config.vlan}"
])
else:
commands.append(f"switchport access vlan {config.vlan}")
state = getattr(config, "state", "up")
commands.append("undo shutdown" if state == "up" else "shutdown")
commands.append("return" if self.is_emulated else "end")
return await self._send_commands(ip, commands)
async def _configure_acl(self, ip: str, config: SwitchConfig) -> str:
"""配置ACL"""
commands = ["system-view" if self.is_emulated else "configure terminal"]
if self.is_emulated:
commands.append(f"acl number {config.acl_id}")
for rule in config.rules or []:
commands.append(
f"rule {'permit' if rule.get('action') == 'permit' else 'deny'} "
f"{rule.get('source', 'any')} {rule.get('destination', 'any')}"
)
else:
commands.append(f"access-list {config.acl_id} extended")
for rule in config.rules or []:
commands.append(
f"{rule.get('action', 'permit')} {rule.get('protocol', 'ip')} "
f"{rule.get('source', 'any')} {rule.get('destination', 'any')}"
)
commands.append("return" if self.is_emulated else "end")
return await self._send_commands(ip, commands)
async def _configure_route(self, ip: str, config: SwitchConfig) -> str:
"""配置路由"""
commands = [
"system-view" if self.is_emulated else "configure terminal",
f"ip route-static {config.network} {config.mask} {config.next_hop}",
"return" if self.is_emulated else "end"
]
return await self._send_commands(ip, commands)
# ----------------------
# 使用示例
# ----------------------
async def main():
# eNSP模拟环境配置
ens_configurator = SwitchConfigurator(is_emulated=True)
await ens_configurator.batch_configure(
{
"type": "vlan",
"vlan_id": 100,
"name": "TestVLAN",
"interfaces": [{"interface": "GigabitEthernet0/0/1"}]
},
["192.168.1.200"] # eNSP设备IP
)
# 真实设备配置
real_configurator = SwitchConfigurator(
username="real_admin",
password="SecurePass123!",
is_emulated=False
)
await real_configurator.batch_configure(
{
"type": "interface",
"interface": "Gi1/0/24",
"description": "Uplink",
"state": "up"
},
["10.1.1.1"] # 真实设备IP
)
if __name__ == "__main__":
asyncio.run(main())

View File

@ -1,5 +1,4 @@
from fastapi import HTTPException, status
from typing import Optional
class AICommandParseException(HTTPException):
def __init__(self, detail: str):
@ -9,28 +8,15 @@ class AICommandParseException(HTTPException):
)
class SwitchConfigException(HTTPException):
def __init__(
self,
detail: str,
status_code: int = status.HTTP_500_INTERNAL_SERVER_ERROR
):
def __init__(self, detail: str):
super().__init__(
status_code=status_code,
detail=f"Switch error: {detail}"
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Switch configuration error: {detail}"
)
class ConfigBackupException(SwitchConfigException):
"""配置备份失败异常"""
def __init__(self, ip: str):
class SiliconFlowAPIException(HTTPException):
def __init__(self, detail: str):
super().__init__(
detail=f"无法备份设备 {ip} 的配置",
recovery_guide="检查设备存储空间或权限"
)
class ConfigRollbackException(SwitchConfigException):
"""回滚失败异常"""
def __init__(self, ip: str, original_error: str):
super().__init__(
detail=f"设备 {ip} 回滚失败(原始错误:{original_error}",
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail=f"SiliconFlow API error: {detail}"
)

View File

@ -1,4 +0,0 @@
from .bulk_config import BulkConfigurator, BulkSwitchConfig
from .connection_pool import SwitchConnectionPool
__all__ = ['BulkConfigurator', 'BulkSwitchConfig', 'SwitchConnectionPool']

View File

@ -1,46 +0,0 @@
import asyncio
from typing import List, Dict
from dataclasses import dataclass
from .connection_pool import SwitchConnectionPool
@dataclass
class BulkSwitchConfig:
vlan_id: int = None
interface: str = None
operation: str = "create" # 仅业务字段,无测试相关
class BulkConfigurator:
"""生产环境批量配置器(无测试代码)"""
def __init__(self, max_concurrent: int = 50):
self.pool = SwitchConnectionPool()
self.semaphore = asyncio.Semaphore(max_concurrent)
async def _configure_device(self, ip: str, config: BulkSwitchConfig) -> str:
"""核心配置方法"""
conn = await self.pool.get_connection(ip, "admin", "admin")
try:
commands = self._generate_commands(config)
results = [await conn.run(cmd) for cmd in commands]
return "\n".join(r.stdout for r in results)
finally:
await self.pool.release_connection(ip, conn)
def _generate_commands(self, config: BulkSwitchConfig) -> List[str]:
"""命令生成(纯业务逻辑)"""
commands = []
if config.vlan_id:
commands.append(f"vlan {config.vlan_id}")
if config.operation == "create":
commands.extend([
f"name VLAN_{config.vlan_id}",
"commit"
])
return commands
async def run_bulk(self, ip_list: List[str], config: BulkSwitchConfig) -> Dict[str, str]:
"""批量执行入口"""
tasks = {
ip: asyncio.create_task(self._configure_device(ip, config))
for ip in ip_list
}
return {ip: await task for ip, task in tasks.items()}

View File

@ -1,49 +0,0 @@
import asyncio
import time
import asyncssh
from typing import Dict
class SwitchConnectionPool:
"""
交换机连接池支持自动重连和负载均衡
功能
- 每个IP维护动态连接池
- 自动剔除失效连接
- 支持空闲连接回收
"""
def __init__(self, max_connections_per_ip: int = 3):
self._pools: Dict[str, asyncio.Queue] = {}
self._max_conn = max_connections_per_ip
self._lock = asyncio.Lock()
async def get_connection(self, ip: str, username: str, password: str) -> asyncssh.SSHClientConnection:
async with self._lock:
if ip not in self._pools:
self._pools[ip] = asyncio.Queue(self._max_conn)
if not self._pools[ip].empty():
return await self._pools[ip].get()
return await asyncssh.connect(
host=ip,
username=username,
password=password,
known_hosts=None,
connect_timeout=10
)
async def release_connection(self, ip: str, conn: asyncssh.SSHClientConnection):
async with self._lock:
if conn.is_connected() and self._pools[ip].qsize() < self._max_conn:
await self._pools[ip].put(conn)
else:
conn.close()
async def close_all(self):
async with self._lock:
for q in self._pools.values():
while not q.empty():
conn = await q.get()
conn.close()
self._pools.clear()

View File

@ -1,18 +0,0 @@
import os
#本文用来读取代码
output_file = "all_code.txt" # 输出文件名
skip_dirs = ["venv", "__pycache__"] # 跳过目录
extensions = [".py"] # 要合并的扩展名
with open(output_file, "w", encoding="utf-8") as outfile:
for root, dirs, files in os.walk(os.getcwd()):
# 跳过指定目录
dirs[:] = [d for d in dirs if d not in skip_dirs]
for file in files:
if any(file.endswith(ext) for ext in extensions):
file_path = os.path.join(root, file)
# 添加文件名作为分隔标记
outfile.write(f"\n\n{'=' * 50}\n# File: {file_path}\n{'=' * 50}\n\n")
with open(file_path, "r", encoding="utf-8") as infile:
outfile.write(infile.read())

View File

@ -1,17 +0,0 @@
import os
#本文件用来生成项目树
def generate_directory_tree(startpath, output_file):
with open(output_file, 'w', encoding='utf-8') as f:
for root, dirs, files in os.walk(startpath):
level = root.replace(startpath, '').count(os.sep)
indent = ' ' * 4 * level
f.write(f"{indent}{os.path.basename(root)}/\n")
subindent = ' ' * 4 * (level + 1)
for file in files:
f.write(f"{subindent}{file}\n")
# 使用当前项目目录
project_path = os.getcwd()
output_file = 'project_structure.txt'
generate_directory_tree(project_path, output_file)
print(f"目录结构已生成到 {output_file}")

View File

@ -1,38 +0,0 @@
version: '3.13'
services:
app:
build: .
container_name: switch_configurator
ports:
- "8000:8000"
volumes:
- ./src/backend:/app
- switch_backups:/app/config_backups
environment:
- SWITCH_USERNAME=${SWITCH_USERNAME:-admin}
- SWITCH_PASSWORD=${SWITCH_PASSWORD:-admin}
- SWITCH_TIMEOUT=${SWITCH_TIMEOUT:-10}
- SILICONFLOW_API_KEY=${SILICONFLOW_API_KEY}
- SILICONFLOW_API_URL=${SILICONFLOW_API_URL}
restart: unless-stopped
networks:
- backend
# 可选:添加 Redis 用于缓存设备扫描结果
redis:
image: redis:alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
networks:
- backend
volumes:
switch_backups:
redis_data:
networks:
backend:
driver: bridge

View File

@ -2,13 +2,9 @@ fastapi>=0.95.2
uvicorn>=0.22.0
python-dotenv>=1.0.0
requests>=2.28.2
paramiko>=3.3.0
paramiko>=3.1.0
pydantic>=1.10.7
loguru>=0.7.0
python-nmap>=0.7.1
tenacity>=9.1.2
typing-extensions>=4.0.0
aiofiles>=24.1.0
telnetlib3>=2.0.4
asyncssh>=2.14.0
aiofiles>=24.1.0

View File

@ -1,39 +0,0 @@
import asyncio
import logging
from src.backend.app.api.network_config import SwitchConfigurator
#该文件用于测试
# 设置日志
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(levelname)s - %(message)s'
)
async def test_ensp():
"""eNSP测试函数"""
# 1. 初始化配置器对应eNSP设备设置
configurator = SwitchConfigurator(
ensp_mode=True, # 启用eNSP模式
ensp_port=2000, # 必须与eNSP中设备设置的Telnet端口一致
username="admin", # 默认账号
password="admin", # 默认密码
timeout=15 # 建议超时设长些
)
# 2. 执行配置示例创建VLAN100
try:
result = await configurator.safe_apply(
ip="127.0.0.1", # 本地连接固定用这个地址
config={
"type": "vlan",
"vlan_id": 100,
"name": "测试VLAN"
}
)
print("✅ 配置结果:", result)
except Exception as e:
print("❌ 配置失败:", str(e))
# 运行测试
if __name__ == "__main__":
asyncio.run(test_ensp())

View File

@ -1 +0,0 @@
[]