import asyncio import logging import telnetlib3 from datetime import datetime from pathlib import Path from typing import Dict, List, Optional, Union import aiofiles import asyncssh from pydantic import BaseModel from tenacity import retry, stop_after_attempt, wait_exponential from src.backend.app.utils.logger import logger from src.backend.config import settings # ---------------------- # 数据模型 # ---------------------- class SwitchConfig(BaseModel): type: str vlan_id: Optional[int] = None interface: Optional[str] = None name: Optional[str] = None ip_address: Optional[str] = None vlan: Optional[int] = None # ---------------------- # 异常类 # ---------------------- class SwitchConfigException(Exception): pass class EnspConnectionException(SwitchConfigException): pass class SSHConnectionException(SwitchConfigException): pass # ---------------------- # 核心配置器(完整双模式) # ---------------------- class SwitchConfigurator: def __init__( self, username: str = None, password: str = None, timeout: int = None, max_workers: int = 5, ensp_mode: bool = False, ensp_port: int = 2000, ensp_command_delay: float = 0.5, **ssh_options ): self.username = username if username is not None else settings.SWITCH_USERNAME self.password = password if password is not None else settings.SWITCH_PASSWORD self.timeout = timeout if timeout is not None else settings.SWITCH_TIMEOUT self.semaphore = asyncio.Semaphore(max_workers) self.backup_dir = Path("config_backups") self.backup_dir.mkdir(exist_ok=True) self.ensp_mode = ensp_mode self.ensp_port = ensp_port self.ensp_delay = ensp_command_delay self.ssh_options = ssh_options async def apply_config(self, ip: str, config: Union[Dict, SwitchConfig]) -> str: """实际配置逻辑""" if isinstance(config, dict): config = SwitchConfig(**config) commands = ( self._generate_ensp_commands(config) if self.ensp_mode else self._generate_standard_commands(config) ) return await self._send_commands(ip, commands) async def _send_commands(self, ip: str, commands: List[str]) -> str: """双模式命令发送""" return ( await self._send_ensp_commands(ip, commands) ) async def _send_ensp_commands(self, ip: str, commands: List[str]) -> str: """ 通过 Telnet 协议连接 eNSP 设备 """ try: logger.info(f"连接设备 {ip},端口23") reader, writer = await telnetlib3.open_connection(host=ip, port=23) logger.debug("连接成功,开始登录流程") try: if self.username != 'NONE': await asyncio.wait_for(reader.readuntil(b"Username:"), timeout=self.timeout) logger.debug("收到 'Username:' 提示,发送用户名") writer.write(f"{self.username}\n") await asyncio.wait_for(reader.readuntil(b"Password:"), timeout=self.timeout) logger.debug("收到 'Password:' 提示,发送密码") writer.write(f"{self.password}\n") await asyncio.sleep(1) except asyncio.TimeoutError: raise EnspConnectionException("登录超时,未收到用户名或密码提示") output = "" for cmd in commands: if cmd.startswith("!"): logger.debug(f"跳过特殊命令: {cmd}") continue logger.info(f"发送命令: {cmd}") writer.write(f"{cmd}\n") await writer.drain() command_output = "" try: while True: data = await asyncio.wait_for(reader.read(1024), timeout=1) if not data: logger.debug("读取到空数据,结束当前命令读取") break command_output += data logger.debug(f"收到数据: {repr(data)}") except asyncio.TimeoutError: logger.debug("命令输出读取超时,继续执行下一条命令") output += f"\n[命令: {cmd} 输出开始]\n{command_output}\n[命令: {cmd} 输出结束]\n" logger.info("所有命令执行完毕,关闭连接") writer.close() return output except asyncio.TimeoutError as e: logger.error(f"连接或读取超时: {e}") raise EnspConnectionException(f"eNSP连接超时: {e}") except Exception as e: logger.error(f"连接或执行异常: {e}", exc_info=True) raise EnspConnectionException(f"eNSP连接失败: {e}") @staticmethod def _generate_ensp_commands(config: SwitchConfig) -> List[str]: """生成eNSP命令序列""" commands = ["system-view"] if config.type == "vlan": commands.extend([ f"vlan {config.vlan_id}", f"description {config.name or ''}" ]) elif config.type == "interface": commands.extend([ f"interface {config.interface}", "port link-type access", f"port default vlan {config.vlan}" if config.vlan else "", f"ip address {config.ip_address}" if config.ip_address else "" ]) commands.append("return") return [c for c in commands if c.strip()] async def _send_ssh_commands(self, ip: str, commands: List[str]) -> str: """AsyncSSH执行命令""" async with self.semaphore: try: async with asyncssh.connect( host=ip, username=self.username, password=self.password, connect_timeout=self.timeout, **self.ssh_options ) as conn: results = [] for cmd in commands: result = await conn.run(cmd, check=True) results.append(result.stdout) return "\n".join(results) except asyncssh.Error as e: raise SSHConnectionException(f"SSH操作失败: {str(e)}") except Exception as e: raise SSHConnectionException(f"连接异常: {str(e)}") async def execute_raw_commands(self, ip: str, commands: List[str]) -> str: """ 执行原始CLI命令 """ return await self._send_commands(ip, commands) @staticmethod def _generate_standard_commands(config: SwitchConfig) -> List[str]: """生成标准CLI命令""" commands = [] if config.type == "vlan": commands.extend([ f"vlan {config.vlan_id}", f"name {config.name or ''}" ]) elif config.type == "interface": commands.extend([ f"interface {config.interface}", f"switchport access vlan {config.vlan}" if config.vlan else "", f"ip address {config.ip_address}" if config.ip_address else "" ]) return commands async def _validate_config(self, ip: str, config: SwitchConfig) -> bool: """验证配置是否生效""" current = await self._get_current_config(ip) if config.type == "vlan": return f"vlan {config.vlan_id}" in current elif config.type == "interface" and config.vlan: return f"switchport access vlan {config.vlan}" in current return True async def _get_current_config(self, ip: str) -> str: """获取当前配置""" commands = ( ["display current-configuration"] if self.ensp_mode else ["show running-config"] ) try: return await self._send_commands(ip, commands) except (EnspConnectionException, SSHConnectionException) as e: raise SwitchConfigException(f"配置获取失败: {str(e)}") async def _backup_config(self, ip: str) -> Path: """备份配置到文件""" backup_path = self.backup_dir / f"{ip}_{datetime.now().isoformat()}.cfg" config = await self._get_current_config(ip) async with aiofiles.open(backup_path, "w") as f: await f.write(config) return backup_path async def _restore_config(self, ip: str, backup_path: Path) -> bool: """从备份恢复配置""" try: async with aiofiles.open(backup_path) as f: config = await f.read() commands = ( ["system-view", config, "return"] if self.ensp_mode else [f"configure terminal\n{config}\nend"] ) await self._send_commands(ip, commands) return True except Exception as e: logging.error(f"恢复失败: {str(e)}") return False @retry( stop=stop_after_attempt(2), wait=wait_exponential(multiplier=1, min=4, max=10) ) async def safe_apply( self, ip: str, config: Union[Dict, SwitchConfig] ) -> Dict[str, Union[str, bool, Path]]: """安全配置应用(自动回滚)""" backup_path = await self._backup_config(ip) try: result = await self.apply_config(ip, config) if not await self._validate_config(ip, config): raise SwitchConfigException("配置验证失败") return { "status": "success", "output": result, "backup_path": str(backup_path) } except (EnspConnectionException, SSHConnectionException, SwitchConfigException) as e: restore_status = await self._restore_config(ip, backup_path) return { "status": "failed", "error": str(e), "backup_path": str(backup_path), "restore_success": restore_status } # ---------------------- # 使用示例 # ---------------------- async def demo(): ensp_configurator = SwitchConfigurator( ensp_mode=True, ensp_port=2000, username="admin", password="admin", timeout=15 ) ensp_result = await ensp_configurator.safe_apply("127.0.0.1", { "type": "interface", "interface": "GigabitEthernet0/0/1", "vlan": 100, "ip_address": "192.168.1.2 255.255.255.0" }) print("eNSP配置结果:", ensp_result) ssh_configurator = SwitchConfigurator( username="cisco", password="cisco123", timeout=15 ) ssh_result = await ssh_configurator.safe_apply("192.168.1.1", { "type": "vlan", "vlan_id": 200, "name": "Production" }) print("SSH配置结果:", ssh_result) if __name__ == "__main__": asyncio.run(demo())