AI-powered-switches/src/backend/app/api/network_config.py

320 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import logging
import telnetlib3
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Union
import aiofiles
import asyncssh
from pydantic import BaseModel
from tenacity import retry, stop_after_attempt, wait_exponential
from src.backend.app.utils.logger import logger
from src.backend.config import settings
# ----------------------
# 数据模型
# ----------------------
class SwitchConfig(BaseModel):
type: str
vlan_id: Optional[int] = None
interface: Optional[str] = None
name: Optional[str] = None
ip_address: Optional[str] = None
vlan: Optional[int] = None
# ----------------------
# 异常类
# ----------------------
class SwitchConfigException(Exception):
pass
class EnspConnectionException(SwitchConfigException):
pass
class SSHConnectionException(SwitchConfigException):
pass
# ----------------------
# 核心配置器(完整双模式)
# ----------------------
class SwitchConfigurator:
def __init__(
self,
username: str = None,
password: str = None,
timeout: int = None,
max_workers: int = 5,
ensp_mode: bool = False,
ensp_port: int = 2000,
ensp_command_delay: float = 0.5,
**ssh_options
):
self.username = username if username is not None else settings.SWITCH_USERNAME
self.password = password if password is not None else settings.SWITCH_PASSWORD
self.timeout = timeout if timeout is not None else settings.SWITCH_TIMEOUT
self.semaphore = asyncio.Semaphore(max_workers)
self.backup_dir = Path("config_backups")
self.backup_dir.mkdir(exist_ok=True)
self.ensp_mode = ensp_mode
self.ensp_port = ensp_port
self.ensp_delay = ensp_command_delay
self.ssh_options = ssh_options
async def apply_config(self, ip: str, config: Union[Dict, SwitchConfig]) -> str:
"""实际配置逻辑"""
if isinstance(config, dict):
config = SwitchConfig(**config)
commands = (
self._generate_ensp_commands(config)
if self.ensp_mode
else self._generate_standard_commands(config)
)
return await self._send_commands(ip, commands)
async def _send_commands(self, ip: str, commands: List[str]) -> str:
"""双模式命令发送"""
return (
await self._send_ensp_commands(ip, commands)
)
async def _send_ensp_commands(self, ip: str, commands: List[str]) -> str:
"""
通过 Telnet 协议连接 eNSP 设备
"""
try:
logger.info(f"连接设备 {ip}端口23")
reader, writer = await telnetlib3.open_connection(host=ip, port=23)
logger.debug("连接成功,开始登录流程")
try:
if self.username != 'NONE':
await asyncio.wait_for(reader.readuntil(b"Username:"), timeout=self.timeout)
logger.debug("收到 'Username:' 提示,发送用户名")
writer.write(f"{self.username}\n")
await asyncio.wait_for(reader.readuntil(b"Password:"), timeout=self.timeout)
logger.debug("收到 'Password:' 提示,发送密码")
writer.write(f"{self.password}\n")
await asyncio.sleep(1)
except asyncio.TimeoutError:
raise EnspConnectionException("登录超时,未收到用户名或密码提示")
output = ""
for cmd in commands:
if cmd.startswith("!"):
logger.debug(f"跳过特殊命令: {cmd}")
continue
logger.info(f"发送命令: {cmd}")
writer.write(f"{cmd}\n")
await writer.drain()
command_output = ""
try:
while True:
data = await asyncio.wait_for(reader.read(1024), timeout=1)
if not data:
logger.debug("读取到空数据,结束当前命令读取")
break
command_output += data
logger.debug(f"收到数据: {repr(data)}")
except asyncio.TimeoutError:
logger.debug("命令输出读取超时,继续执行下一条命令")
output += f"\n[命令: {cmd} 输出开始]\n{command_output}\n[命令: {cmd} 输出结束]\n"
logger.info("所有命令执行完毕,关闭连接")
writer.close()
return output
except asyncio.TimeoutError as e:
logger.error(f"连接或读取超时: {e}")
raise EnspConnectionException(f"eNSP连接超时: {e}")
except Exception as e:
logger.error(f"连接或执行异常: {e}", exc_info=True)
raise EnspConnectionException(f"eNSP连接失败: {e}")
@staticmethod
def _generate_ensp_commands(config: SwitchConfig) -> List[str]:
"""生成eNSP命令序列"""
commands = ["system-view"]
if config.type == "vlan":
commands.extend([
f"vlan {config.vlan_id}",
f"description {config.name or ''}"
])
elif config.type == "interface":
commands.extend([
f"interface {config.interface}",
"port link-type access",
f"port default vlan {config.vlan}" if config.vlan else "",
f"ip address {config.ip_address}" if config.ip_address else ""
])
commands.append("return")
return [c for c in commands if c.strip()]
async def _send_ssh_commands(self, ip: str, commands: List[str]) -> str:
"""AsyncSSH执行命令"""
async with self.semaphore:
try:
async with asyncssh.connect(
host=ip,
username=self.username,
password=self.password,
connect_timeout=self.timeout,
**self.ssh_options
) as conn:
results = []
for cmd in commands:
result = await conn.run(cmd, check=True)
results.append(result.stdout)
return "\n".join(results)
except asyncssh.Error as e:
raise SSHConnectionException(f"SSH操作失败: {str(e)}")
except Exception as e:
raise SSHConnectionException(f"连接异常: {str(e)}")
async def execute_raw_commands(self, ip: str, commands: List[str]) -> str:
"""
执行原始CLI命令
"""
return await self._send_commands(ip, commands)
@staticmethod
def _generate_standard_commands(config: SwitchConfig) -> List[str]:
"""生成标准CLI命令"""
commands = []
if config.type == "vlan":
commands.extend([
f"vlan {config.vlan_id}",
f"name {config.name or ''}"
])
elif config.type == "interface":
commands.extend([
f"interface {config.interface}",
f"switchport access vlan {config.vlan}" if config.vlan else "",
f"ip address {config.ip_address}" if config.ip_address else ""
])
return commands
async def _validate_config(self, ip: str, config: SwitchConfig) -> bool:
"""验证配置是否生效"""
current = await self._get_current_config(ip)
if config.type == "vlan":
return f"vlan {config.vlan_id}" in current
elif config.type == "interface" and config.vlan:
return f"switchport access vlan {config.vlan}" in current
return True
async def _get_current_config(self, ip: str) -> str:
"""获取当前配置"""
commands = (
["display current-configuration"]
if self.ensp_mode
else ["show running-config"]
)
try:
return await self._send_commands(ip, commands)
except (EnspConnectionException, SSHConnectionException) as e:
raise SwitchConfigException(f"配置获取失败: {str(e)}")
async def _backup_config(self, ip: str) -> Path:
"""备份配置到文件"""
backup_path = self.backup_dir / f"{ip}_{datetime.now().isoformat()}.cfg"
config = await self._get_current_config(ip)
async with aiofiles.open(backup_path, "w") as f:
await f.write(config)
return backup_path
async def _restore_config(self, ip: str, backup_path: Path) -> bool:
"""从备份恢复配置"""
try:
async with aiofiles.open(backup_path) as f:
config = await f.read()
commands = (
["system-view", config, "return"]
if self.ensp_mode
else [f"configure terminal\n{config}\nend"]
)
await self._send_commands(ip, commands)
return True
except Exception as e:
logging.error(f"恢复失败: {str(e)}")
return False
@retry(
stop=stop_after_attempt(2),
wait=wait_exponential(multiplier=1, min=4, max=10)
)
async def safe_apply(
self,
ip: str,
config: Union[Dict, SwitchConfig]
) -> Dict[str, Union[str, bool, Path]]:
"""安全配置应用(自动回滚)"""
backup_path = await self._backup_config(ip)
try:
result = await self.apply_config(ip, config)
if not await self._validate_config(ip, config):
raise SwitchConfigException("配置验证失败")
return {
"status": "success",
"output": result,
"backup_path": str(backup_path)
}
except (EnspConnectionException, SSHConnectionException, SwitchConfigException) as e:
restore_status = await self._restore_config(ip, backup_path)
return {
"status": "failed",
"error": str(e),
"backup_path": str(backup_path),
"restore_success": restore_status
}
# ----------------------
# 使用示例
# ----------------------
async def demo():
ensp_configurator = SwitchConfigurator(
ensp_mode=True,
ensp_port=2000,
username="admin",
password="admin",
timeout=15
)
ensp_result = await ensp_configurator.safe_apply("127.0.0.1", {
"type": "interface",
"interface": "GigabitEthernet0/0/1",
"vlan": 100,
"ip_address": "192.168.1.2 255.255.255.0"
})
print("eNSP配置结果:", ensp_result)
ssh_configurator = SwitchConfigurator(
username="cisco",
password="cisco123",
timeout=15
)
ssh_result = await ssh_configurator.safe_apply("192.168.1.1", {
"type": "vlan",
"vlan_id": 200,
"name": "Production"
})
print("SSH配置结果:", ssh_result)
if __name__ == "__main__":
asyncio.run(demo())