添加了eNSP调试程序以及代码树的程序(便于调试)

This commit is contained in:
3 2025-06-13 12:55:50 +08:00
parent 31a864b0b0
commit b3e6aa7d5c
6 changed files with 318 additions and 208 deletions

View File

@ -1,4 +1,4 @@
from fastapi import APIRouter, FastAPI from fastapi import FastAPI
from .endpoints import router from .endpoints import router
app=FastAPI() app=FastAPI()

View File

@ -1,38 +1,45 @@
import paramiko
import asyncio import asyncio
import aiofiles
from datetime import datetime
from typing import Dict, List, Optional, Union
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
from pydantic import BaseModel
from paramiko.channel import ChannelFile
import logging import logging
import telnetlib3
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Union
import aiofiles
import asyncssh
from pydantic import BaseModel
from tenacity import retry, stop_after_attempt, wait_exponential
# ---------------------- # ----------------------
# 数据模型定义 # 数据模型
# ---------------------- # ----------------------
class SwitchConfig(BaseModel): class SwitchConfig(BaseModel):
"""交换机配置模型"""
type: str # vlan/interface/acl/route type: str # vlan/interface/acl/route
vlan_id: Optional[int] = None vlan_id: Optional[int] = None
interface: Optional[str] = None interface: Optional[str] = None
name: Optional[str] = None name: Optional[str] = None
ip_address: Optional[str] = None ip_address: Optional[str] = None
acl_id: Optional[int] = None vlan: Optional[int] = None # 兼容eNSP模式
rules: Optional[List[Dict]] = None
# ---------------------- # ----------------------
# 异常类 # 异常类
# ---------------------- # ----------------------
class SwitchConfigException(Exception): class SwitchConfigException(Exception):
"""交换机配置异常基类""" pass
class EnspConnectionException(SwitchConfigException):
pass
class SSHConnectionException(SwitchConfigException):
pass pass
# ---------------------- # ----------------------
# 核心配置器 # 核心配置器(完整双模式)
# ---------------------- # ----------------------
class SwitchConfigurator: class SwitchConfigurator:
def __init__( def __init__(
@ -41,236 +48,262 @@ class SwitchConfigurator:
password: str = "admin", password: str = "admin",
timeout: int = 10, timeout: int = 10,
max_workers: int = 5, max_workers: int = 5,
is_emulated: bool = False, ensp_mode: bool = False,
emulated_delay: float = 2.0 ensp_port: int = 2000,
ensp_command_delay: float = 0.5,
**ssh_options
): ):
"""
初始化配置器
:param username: 登录用户名
:param password: 登录密码
:param timeout: SSH超时时间()
:param max_workers: 最大并发数
:param is_emulated: 是否模拟器环境
:param emulated_delay: 模拟器命令间隔延迟()
"""
self.username = username self.username = username
self.password = password self.password = password
self.timeout = timeout self.timeout = timeout
self.is_emulated = is_emulated
self.emulated_delay = emulated_delay
self.semaphore = asyncio.Semaphore(max_workers) self.semaphore = asyncio.Semaphore(max_workers)
self.logger = logging.getLogger(__name__) self.backup_dir = Path("config_backups")
self.backup_dir.mkdir(exist_ok=True)
self.ensp_mode = ensp_mode
self.ensp_port = ensp_port
self.ensp_delay = ensp_command_delay
self.ssh_options = ssh_options
@retry( async def _apply_config(self, ip: str, config: Union[Dict, SwitchConfig]) -> str:
stop=stop_after_attempt(3), """实际配置逻辑"""
wait=wait_exponential(multiplier=1, min=4, max=10),
retry=retry_if_exception_type(SwitchConfigException)
)
async def safe_apply(self, ip: str, config: Union[Dict, SwitchConfig]) -> str:
"""安全执行配置(带重试机制)"""
async with self.semaphore:
return await self.apply_config(ip, config)
async def batch_configure(
self,
config: Union[Dict, SwitchConfig],
ips: List[str]
) -> Dict[str, Union[str, Exception]]:
"""批量配置多台设备"""
tasks = [self.safe_apply(ip, config) for ip in ips]
results = await asyncio.gather(*tasks, return_exceptions=True)
return {ip: result for ip, result in zip(ips, results)}
async def apply_config(
self,
switch_ip: str,
config: Union[Dict, SwitchConfig]
) -> str:
"""应用配置到单台设备"""
try:
if isinstance(config, dict): if isinstance(config, dict):
config = SwitchConfig(**config) config = SwitchConfig(**config)
config_type = config.type.lower() commands = (
if config_type == "vlan": self._generate_ensp_commands(config)
return await self._configure_vlan(switch_ip, config) if self.ensp_mode
elif config_type == "interface": else self._generate_standard_commands(config)
return await self._configure_interface(switch_ip, config) )
elif config_type == "acl": return await self._send_commands(ip, commands)
return await self._configure_acl(switch_ip, config)
elif config_type == "route":
return await self._configure_route(switch_ip, config)
else:
raise SwitchConfigException(f"不支持的配置类型: {config_type}")
except Exception as e:
self.logger.error(f"{switch_ip} 配置失败: {str(e)}")
raise SwitchConfigException(str(e))
# ----------------------
# 协议实现
# ----------------------
async def _send_commands(self, ip: str, commands: List[str]) -> str: async def _send_commands(self, ip: str, commands: List[str]) -> str:
"""发送命令到设备(自动适配模拟器)""" """双模式命令发送"""
return (
await self._send_ensp_commands(ip, commands)
if self.ensp_mode
else await self._send_ssh_commands(ip, commands)
)
# --------- eNSP模式专用 ---------
async def _send_ensp_commands(self, ip: str, commands: List[str]) -> str:
"""Telnet协议执行eNSP"""
try: try:
# 自动选择凭证 # 修复点使用正确的timeout参数
username, password = ( reader, writer = await telnetlib3.open_connection(
("admin", "Admin@123") if self.is_emulated host=ip,
else (self.username, self.password) port=self.ensp_port,
connect_minwait=self.timeout, # telnetlib3的实际可用参数
connect_maxwait=self.timeout
) )
# 自动调整超时 # 登录流程(增加超时处理)
timeout = 15 if self.is_emulated else self.timeout try:
await asyncio.wait_for(reader.readuntil(b"Username:"), timeout=self.timeout)
writer.write(f"{self.username}\n")
ssh = paramiko.SSHClient() await asyncio.wait_for(reader.readuntil(b"Password:"), timeout=self.timeout)
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) writer.write(f"{self.password}\n")
loop = asyncio.get_event_loop() # 等待登录完成
await loop.run_in_executor( await asyncio.sleep(1)
None, except asyncio.TimeoutError:
lambda: ssh.connect( raise EnspConnectionException("登录超时")
ip,
username=username,
password=password,
timeout=timeout,
look_for_keys=False
)
)
# 执行命令 # 执行命令
shell = ssh.invoke_shell()
output = "" output = ""
for cmd in commands: for cmd in commands:
shell.send(cmd + "\n") writer.write(f"{cmd}\n")
if self.is_emulated: await writer.drain() # 确保命令发送完成
await asyncio.sleep(self.emulated_delay)
while shell.recv_ready(): # 读取响应(增加超时处理)
recv = await loop.run_in_executor(None, shell.recv, 1024) try:
output += recv.decode("gbk" if self.is_emulated else "utf-8") while True:
data = await asyncio.wait_for(reader.read(1024), timeout=1)
if not data:
break
output += data
except asyncio.TimeoutError:
continue # 单次读取超时不视为错误
# 关闭连接
writer.close()
try:
await writer.wait_closed()
except:
logging.debug("连接关闭时出现异常", exc_info=True) # 至少记录异常信息
pass
ssh.close()
return output return output
except Exception as e: except Exception as e:
raise SwitchConfigException(f"SSH连接错误: {str(e)}") raise EnspConnectionException(f"eNSP连接失败: {str(e)}")
async def _configure_vlan(self, ip: str, config: SwitchConfig) -> str: @staticmethod
"""配置VLAN自动适配语法""" def _generate_ensp_commands(config: SwitchConfig) -> List[str]:
commands = [ """生成eNSP命令序列"""
"system-view" if self.is_emulated else "configure terminal", commands = ["system-view"]
if config.type == "vlan":
commands.extend([
f"vlan {config.vlan_id}",
f"description {config.name or ''}"
])
elif config.type == "interface":
commands.extend([
f"interface {config.interface}",
"port link-type access",
f"port default vlan {config.vlan}" if config.vlan else "",
f"ip address {config.ip_address}" if config.ip_address else ""
])
commands.append("return")
return [c for c in commands if c.strip()]
# --------- SSH模式专用使用AsyncSSH ---------
async def _send_ssh_commands(self, ip: str, commands: List[str]) -> str:
"""AsyncSSH执行命令"""
async with self.semaphore:
try:
async with asyncssh.connect(
host=ip,
username=self.username,
password=self.password,
connect_timeout=self.timeout, # AsyncSSH的正确参数名
**self.ssh_options
) as conn:
results = []
for cmd in commands:
result = await conn.run(cmd, check=True)
results.append(result.stdout)
return "\n".join(results)
except asyncssh.Error as e:
raise SSHConnectionException(f"SSH操作失败: {str(e)}")
except Exception as e:
raise SSHConnectionException(f"连接异常: {str(e)}")
@staticmethod
def _generate_standard_commands(config: SwitchConfig) -> List[str]:
"""生成标准CLI命令"""
commands = []
if config.type == "vlan":
commands.extend([
f"vlan {config.vlan_id}", f"vlan {config.vlan_id}",
f"name {config.name or ''}" f"name {config.name or ''}"
]
# 端口加入VLAN
for intf in getattr(config, "interfaces", []):
if self.is_emulated:
commands.extend([
f"interface {intf['interface']}",
"port link-type access",
f"port default vlan {config.vlan_id}",
"quit"
]) ])
else: elif config.type == "interface":
commands.extend([ commands.extend([
f"interface {intf['interface']}",
f"switchport access vlan {config.vlan_id}",
"exit"
])
commands.append("return" if self.is_emulated else "end")
return await self._send_commands(ip, commands)
async def _configure_interface(self, ip: str, config: SwitchConfig) -> str:
"""配置接口"""
commands = [
"system-view" if self.is_emulated else "configure terminal",
f"interface {config.interface}", f"interface {config.interface}",
f"description {config.description or ''}" f"switchport access vlan {config.vlan}" if config.vlan else "",
] f"ip address {config.ip_address}" if config.ip_address else ""
if config.ip_address:
commands.append(f"ip address {config.ip_address}")
if hasattr(config, "vlan"):
if self.is_emulated:
commands.extend([
"port link-type access",
f"port default vlan {config.vlan}"
]) ])
else: return commands
commands.append(f"switchport access vlan {config.vlan}")
state = getattr(config, "state", "up") # --------- 通用功能 ---------
commands.append("undo shutdown" if state == "up" else "shutdown") async def _validate_config(self, ip: str, config: SwitchConfig) -> bool:
commands.append("return" if self.is_emulated else "end") """验证配置是否生效"""
current = await self._get_current_config(ip)
if config.type == "vlan":
return f"vlan {config.vlan_id}" in current
elif config.type == "interface" and config.vlan:
return f"switchport access vlan {config.vlan}" in current
return True
return await self._send_commands(ip, commands) async def _get_current_config(self, ip: str) -> str:
"""获取当前配置"""
async def _configure_acl(self, ip: str, config: SwitchConfig) -> str: commands = (
"""配置ACL""" ["display current-configuration"]
commands = ["system-view" if self.is_emulated else "configure terminal"] if self.ensp_mode
else ["show running-config"]
if self.is_emulated:
commands.append(f"acl number {config.acl_id}")
for rule in config.rules or []:
commands.append(
f"rule {'permit' if rule.get('action') == 'permit' else 'deny'} "
f"{rule.get('source', 'any')} {rule.get('destination', 'any')}"
) )
else: try:
commands.append(f"access-list {config.acl_id} extended") return await self._send_commands(ip, commands)
for rule in config.rules or []: except (EnspConnectionException, SSHConnectionException) as e:
commands.append( raise SwitchConfigException(f"配置获取失败: {str(e)}")
f"{rule.get('action', 'permit')} {rule.get('protocol', 'ip')} "
f"{rule.get('source', 'any')} {rule.get('destination', 'any')}" async def _backup_config(self, ip: str) -> Path:
"""备份配置到文件"""
backup_path = self.backup_dir / f"{ip}_{datetime.now().isoformat()}.cfg"
config = await self._get_current_config(ip)
async with aiofiles.open(backup_path, "w") as f:
await f.write(config)
return backup_path
async def _restore_config(self, ip: str, backup_path: Path) -> bool:
"""从备份恢复配置"""
try:
async with aiofiles.open(backup_path) as f:
config = await f.read()
commands = (
["system-view", config, "return"]
if self.ensp_mode
else [f"configure terminal\n{config}\nend"]
) )
await self._send_commands(ip, commands)
return True
except Exception as e:
logging.error(f"恢复失败: {str(e)}")
return False
commands.append("return" if self.is_emulated else "end") @retry(
return await self._send_commands(ip, commands) stop=stop_after_attempt(2),
wait=wait_exponential(multiplier=1, min=4, max=10)
async def _configure_route(self, ip: str, config: SwitchConfig) -> str: )
"""配置路由""" async def safe_apply(
commands = [ self,
"system-view" if self.is_emulated else "configure terminal", ip: str,
f"ip route-static {config.network} {config.mask} {config.next_hop}", config: Union[Dict, SwitchConfig]
"return" if self.is_emulated else "end" ) -> Dict[str, Union[str, bool, Path]]:
] """安全配置应用(自动回滚)"""
return await self._send_commands(ip, commands) backup_path = await self._backup_config(ip)
try:
result = await self._apply_config(ip, config)
if not await self._validate_config(ip, config):
raise SwitchConfigException("配置验证失败")
return {
"status": "success",
"output": result,
"backup_path": str(backup_path)
}
except (EnspConnectionException, SSHConnectionException, SwitchConfigException) as e:
restore_status = await self._restore_config(ip, backup_path)
return {
"status": "failed",
"error": str(e),
"backup_path": str(backup_path),
"restore_success": restore_status
}
# ---------------------- # ----------------------
# 使用示例 # 使用示例
# ---------------------- # ----------------------
async def main(): async def demo():
# eNSP模拟环境配置 # 示例1: eNSP设备配置Telnet模式
ens_configurator = SwitchConfigurator(is_emulated=True) ensp_configurator = SwitchConfigurator(
await ens_configurator.batch_configure( ensp_mode=True,
{ ensp_port=2000,
"type": "vlan", username="admin",
"vlan_id": 100, password="admin",
"name": "TestVLAN", timeout=15
"interfaces": [{"interface": "GigabitEthernet0/0/1"}]
},
["192.168.1.200"] # eNSP设备IP
) )
ensp_result = await ensp_configurator.safe_apply("127.0.0.1", {
# 真实设备配置
real_configurator = SwitchConfigurator(
username="real_admin",
password="SecurePass123!",
is_emulated=False
)
await real_configurator.batch_configure(
{
"type": "interface", "type": "interface",
"interface": "Gi1/0/24", "interface": "GigabitEthernet0/0/1",
"description": "Uplink", "vlan": 100,
"state": "up" "ip_address": "192.168.1.2 255.255.255.0"
}, })
["10.1.1.1"] # 真实设备IP print("eNSP配置结果:", ensp_result)
# 示例2: 真实设备配置SSH模式
ssh_configurator = SwitchConfigurator(
username="cisco",
password="cisco123",
timeout=15
) )
ssh_result = await ssh_configurator.safe_apply("192.168.1.1", {
"type": "vlan",
"vlan_id": 200,
"name": "Production"
})
print("SSH配置结果:", ssh_result)
if __name__ == "__main__": if __name__ == "__main__":
asyncio.run(main()) asyncio.run(demo())

View File

@ -0,0 +1,18 @@
import os
#本文用来读取代码
output_file = "all_code.txt" # 输出文件名
skip_dirs = ["venv", "__pycache__"] # 跳过目录
extensions = [".py"] # 要合并的扩展名
with open(output_file, "w", encoding="utf-8") as outfile:
for root, dirs, files in os.walk(os.getcwd()):
# 跳过指定目录
dirs[:] = [d for d in dirs if d not in skip_dirs]
for file in files:
if any(file.endswith(ext) for ext in extensions):
file_path = os.path.join(root, file)
# 添加文件名作为分隔标记
outfile.write(f"\n\n{'=' * 50}\n# File: {file_path}\n{'=' * 50}\n\n")
with open(file_path, "r", encoding="utf-8") as infile:
outfile.write(infile.read())

View File

@ -0,0 +1,17 @@
import os
#本文件用来生成项目树
def generate_directory_tree(startpath, output_file):
with open(output_file, 'w', encoding='utf-8') as f:
for root, dirs, files in os.walk(startpath):
level = root.replace(startpath, '').count(os.sep)
indent = ' ' * 4 * level
f.write(f"{indent}{os.path.basename(root)}/\n")
subindent = ' ' * 4 * (level + 1)
for file in files:
f.write(f"{subindent}{file}\n")
# 使用当前项目目录
project_path = os.getcwd()
output_file = 'project_structure.txt'
generate_directory_tree(project_path, output_file)
print(f"目录结构已生成到 {output_file}")

View File

@ -2,10 +2,13 @@ fastapi>=0.95.2
uvicorn>=0.22.0 uvicorn>=0.22.0
python-dotenv>=1.0.0 python-dotenv>=1.0.0
requests>=2.28.2 requests>=2.28.2
paramiko>=3.1.0 paramiko>=3.3.0
pydantic>=1.10.7 pydantic>=1.10.7
loguru>=0.7.0 loguru>=0.7.0
python-nmap>=0.7.1 python-nmap>=0.7.1
tenacity>=9.1.2 tenacity>=9.1.2
typing-extensions>=4.0.0 typing-extensions>=4.0.0
aiofiles>=24.1.0 aiofiles>=24.1.0
telnetlib3>=2.0.4
asyncssh>=2.14.0
aiofiles>=24.1.0

39
src/backend/test_ensp.py Normal file
View File

@ -0,0 +1,39 @@
import asyncio
import logging
from src.backend.app.api.network_config import SwitchConfigurator # 导入你的核心类
#该文件用于测试
# 设置日志
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(levelname)s - %(message)s'
)
async def test_ensp():
"""eNSP测试函数"""
# 1. 初始化配置器对应eNSP设备设置
configurator = SwitchConfigurator(
ensp_mode=True, # 启用eNSP模式
ensp_port=2000, # 必须与eNSP中设备设置的Telnet端口一致
username="admin", # 默认账号
password="admin", # 默认密码
timeout=15 # 建议超时设长些
)
# 2. 执行配置示例创建VLAN100
try:
result = await configurator.safe_apply(
ip="127.0.0.1", # 本地连接固定用这个地址
config={
"type": "vlan",
"vlan_id": 100,
"name": "测试VLAN"
}
)
print("✅ 配置结果:", result)
except Exception as e:
print("❌ 配置失败:", str(e))
# 运行测试
if __name__ == "__main__":
asyncio.run(test_ensp())