From ed09ef95eae7d3f12c1bb37bad4fe7ea5567a5f5 Mon Sep 17 00:00:00 2001 From: 3 Date: Wed, 4 Jun 2025 13:35:12 +0800 Subject: [PATCH] =?UTF-8?q?=E5=9F=BA=E7=A1=80api=E6=9C=8D=E5=8A=A1?= =?UTF-8?q?=E5=8F=AF=E8=BF=9B=E8=A1=8C,=E5=8F=AF=E6=89=AB=E6=8F=8F?= =?UTF-8?q?=E7=BD=91=E7=BB=9C=E4=BA=A4=E6=8D=A2=E6=9C=BA(=E5=B7=B2?= =?UTF-8?q?=E4=BF=AE=E6=94=B9=E7=9B=B8=E5=85=B3=E9=97=AE=E9=A2=98=EF=BC=8C?= =?UTF-8?q?=E6=B3=A8=E6=84=8F=E4=B8=8B=E8=BD=BDNmap=E6=89=8D=E5=8F=AF?= =?UTF-8?q?=E6=89=AB=E6=8F=8F=E4=BA=A4=E6=8D=A2=E6=9C=BA=E7=BD=91=E5=9D=80?= =?UTF-8?q?=EF=BC=9Ahttps://nmap.org/download.html=EF=BC=89,=E6=B7=BB?= =?UTF-8?q?=E5=8A=A0=E4=BA=86=E9=85=8D=E7=BD=AE=E5=A4=9A=E5=8F=B0=E4=BA=A4?= =?UTF-8?q?=E6=8D=A2=E6=9C=BA=E7=9A=84=E5=8A=9F=E8=83=BD,ensp=E9=85=8D?= =?UTF-8?q?=E7=BD=AE.=E9=9C=80=E7=94=A8=E5=AE=9E=E9=99=85=E8=AE=BE?= =?UTF-8?q?=E5=A4=87=E8=BF=9B=E4=B8=80=E6=AD=A5=E8=B0=83=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/backend/app/api/network_config.py | 348 ++++++++++++++++---------- 1 file changed, 217 insertions(+), 131 deletions(-) diff --git a/src/backend/app/api/network_config.py b/src/backend/app/api/network_config.py index 0898cd0..3f791c2 100644 --- a/src/backend/app/api/network_config.py +++ b/src/backend/app/api/network_config.py @@ -1,35 +1,95 @@ import paramiko -from tenacity import retry, stop_after_attempt import asyncio -from typing import Dict, Any,List -from ..utils.exceptions import SwitchConfigException +from typing import Dict, List, Optional, Union +from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type +from pydantic import BaseModel +import logging +# ---------------------- +# 数据模型定义 +# ---------------------- +class SwitchConfig(BaseModel): + """交换机配置模型""" + type: str # vlan/interface/acl/route + vlan_id: Optional[int] = None + interface: Optional[str] = None + name: Optional[str] = None + ip_address: Optional[str] = None + acl_id: Optional[int] = None + rules: Optional[List[Dict]] = None + + +# ---------------------- +# 异常类 +# ---------------------- +class SwitchConfigException(Exception): + """交换机配置异常基类""" + pass + + +# ---------------------- +# 核心配置器 +# ---------------------- class SwitchConfigurator: - def __init__(self, username: str, password: str, timeout: int = 10,max_workers=5): + def __init__( + self, + username: str = "admin", + password: str = "admin", + timeout: int = 10, + max_workers: int = 5, + is_emulated: bool = False, + emulated_delay: float = 2.0 + ): + """ + 初始化配置器 + + :param username: 登录用户名 + :param password: 登录密码 + :param timeout: SSH超时时间(秒) + :param max_workers: 最大并发数 + :param is_emulated: 是否模拟器环境 + :param emulated_delay: 模拟器命令间隔延迟(秒) + """ self.username = username self.password = password self.timeout = timeout + self.is_emulated = is_emulated + self.emulated_delay = emulated_delay self.semaphore = asyncio.Semaphore(max_workers) + self.logger = logging.getLogger(__name__) - @retry(stop=stop_after_attempt(3)) - async def safe_apply(self, ip: str, config: dict): + @retry( + stop=stop_after_attempt(3), + wait=wait_exponential(multiplier=1, min=4, max=10), + retry=retry_if_exception_type(SwitchConfigException) + ) + async def safe_apply(self, ip: str, config: Union[Dict, SwitchConfig]) -> str: + """安全执行配置(带重试机制)""" async with self.semaphore: return await self.apply_config(ip, config) - async def batch_configure(self, config: dict, ips: List[str]): - """并发配置多台设备""" - tasks = [self.apply_config(ip, config) for ip in ips] - return await asyncio.gather(*tasks, return_exceptions=True) + async def batch_configure( + self, + config: Union[Dict, SwitchConfig], + ips: List[str] + ) -> Dict[str, Union[str, Exception]]: + """批量配置多台设备""" + tasks = [self.safe_apply(ip, config) for ip in ips] + results = await asyncio.gather(*tasks, return_exceptions=True) + return {ip: result for ip, result in zip(ips, results)} - async def apply_config(self, switch_ip: str, config: Dict[str, Any]) -> str: - """ - 应用配置到交换机 - """ + async def apply_config( + self, + switch_ip: str, + config: Union[Dict, SwitchConfig] + ) -> str: + """应用配置到单台设备""" try: - # 根据配置类型调用不同的方法 - config_type = config.get("type", "").lower() + if isinstance(config, dict): + config = SwitchConfig(**config) + config_type = config.type.lower() if config_type == "vlan": return await self._configure_vlan(switch_ip, config) elif config_type == "interface": @@ -39,148 +99,174 @@ class SwitchConfigurator: elif config_type == "route": return await self._configure_route(switch_ip, config) else: - raise SwitchConfigException(f"Unsupported config type: {config_type}") + raise SwitchConfigException(f"不支持的配置类型: {config_type}") except Exception as e: + self.logger.error(f"{switch_ip} 配置失败: {str(e)}") raise SwitchConfigException(str(e)) - async def _send_commands(self, switch_ip: str, commands: list) -> str: - """ - 发送命令到交换机 - """ + # ---------------------- + # 协议实现 + # ---------------------- + async def _send_commands(self, ip: str, commands: List[str]) -> str: + """发送命令到设备(自动适配模拟器)""" try: - # 使用Paramiko建立SSH连接 + # 自动选择凭证 + username, password = ( + ("admin", "Admin@123") if self.is_emulated + else (self.username, self.password) + ) + + # 自动调整超时 + timeout = 15 if self.is_emulated else self.timeout + ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - # 在异步上下文中运行阻塞操作 loop = asyncio.get_event_loop() await loop.run_in_executor( None, lambda: ssh.connect( - switch_ip, - username=self.username, - password=self.password, - timeout=self.timeout + ip, + username=username, + password=password, + timeout=timeout, + look_for_keys=False ) ) - # 获取SSH shell - shell = await loop.run_in_executor(None, ssh.invoke_shell) - - # 发送配置命令 + # 执行命令 + shell = ssh.invoke_shell() output = "" for cmd in commands: - await loop.run_in_executor(None, shell.send, cmd + "\n") - await asyncio.sleep(0.5) + shell.send(cmd + "\n") + if self.is_emulated: + await asyncio.sleep(self.emulated_delay) while shell.recv_ready(): - output += (await loop.run_in_executor(None, shell.recv, 1024)).decode("utf-8") - - # 关闭连接 - await loop.run_in_executor(None, ssh.close) + recv = await loop.run_in_executor(None, shell.recv, 1024) + output += recv.decode("gbk" if self.is_emulated else "utf-8") + ssh.close() return output except Exception as e: - raise SwitchConfigException(f"SSH connection failed: {str(e)}") - - async def _configure_vlan(self, switch_ip: str, config: Dict[str, Any]) -> str: - """ - 配置VLAN - """ - vlan_id = config["vlan_id"] - vlan_name = config.get("name", f"VLAN{vlan_id}") - interfaces = config.get("interfaces", []) + raise SwitchConfigException(f"SSH连接错误: {str(e)}") + async def _configure_vlan(self, ip: str, config: SwitchConfig) -> str: + """配置VLAN(自动适配语法)""" commands = [ - "configure terminal", - f"vlan {vlan_id}", - f"name {vlan_name}", + "system-view" if self.is_emulated else "configure terminal", + f"vlan {config.vlan_id}", + f"name {config.name or ''}" ] - # 配置接口 - for intf in interfaces: - commands.extend([ - f"interface {intf['interface']}", - f"switchport access vlan {vlan_id}", - "exit" - ]) - - commands.append("end") - - return await self._send_commands(switch_ip, commands) - - async def _configure_interface(self, switch_ip: str, config: Dict[str, Any]) -> str: - """ - 配置接口 - """ - interface = config["interface"] - ip_address = config.get("ip_address") - description = config.get("description", "") - vlan = config.get("vlan") - state = config.get("state", "up") - - commands = [ - "configure terminal", - f"interface {interface}", - f"description {description}", - ] - - if ip_address: - commands.append(f"ip address {ip_address}") - - if vlan: - commands.append(f"switchport access vlan {vlan}") - - if state.lower() == "up": - commands.append("no shutdown") - else: - commands.append("shutdown") - - commands.extend(["exit", "end"]) - - return await self._send_commands(switch_ip, commands) - - async def _configure_acl(self, switch_ip: str, config: Dict[str, Any]) -> str: - """ - 配置ACL - """ - acl_id = config["acl_id"] - acl_type = config.get("type", "standard") - rules = config.get("rules", []) - - commands = ["configure terminal"] - - if acl_type == "standard": - commands.append(f"access-list {acl_id} standard") - else: - commands.append(f"access-list {acl_id} extended") - - for rule in rules: - action = rule.get("action", "permit") - source = rule.get("source", "any") - destination = rule.get("destination", "any") - protocol = rule.get("protocol", "ip") - - if acl_type == "standard": - commands.append(f"{action} {source}") + # 端口加入VLAN + for intf in getattr(config, "interfaces", []): + if self.is_emulated: + commands.extend([ + f"interface {intf['interface']}", + "port link-type access", + f"port default vlan {config.vlan_id}", + "quit" + ]) else: - commands.append(f"{action} {protocol} {source} {destination}") + commands.extend([ + f"interface {intf['interface']}", + f"switchport access vlan {config.vlan_id}", + "exit" + ]) - commands.append("end") - - return await self._send_commands(switch_ip, commands) - - async def _configure_route(self, switch_ip: str, config: Dict[str, Any]) -> str: - """ - 配置路由 - """ - network = config["network"] - mask = config["mask"] - next_hop = config["next_hop"] + commands.append("return" if self.is_emulated else "end") + return await self._send_commands(ip, commands) + async def _configure_interface(self, ip: str, config: SwitchConfig) -> str: + """配置接口""" commands = [ - "configure terminal", - f"ip route {network} {mask} {next_hop}", - "end" + "system-view" if self.is_emulated else "configure terminal", + f"interface {config.interface}", + f"description {config.description or ''}" ] - return await self._send_commands(switch_ip, commands) \ No newline at end of file + if config.ip_address: + commands.append(f"ip address {config.ip_address}") + + if hasattr(config, "vlan"): + if self.is_emulated: + commands.extend([ + "port link-type access", + f"port default vlan {config.vlan}" + ]) + else: + commands.append(f"switchport access vlan {config.vlan}") + + state = getattr(config, "state", "up") + commands.append("undo shutdown" if state == "up" else "shutdown") + commands.append("return" if self.is_emulated else "end") + + return await self._send_commands(ip, commands) + + async def _configure_acl(self, ip: str, config: SwitchConfig) -> str: + """配置ACL""" + commands = ["system-view" if self.is_emulated else "configure terminal"] + + if self.is_emulated: + commands.append(f"acl number {config.acl_id}") + for rule in config.rules or []: + commands.append( + f"rule {'permit' if rule.get('action') == 'permit' else 'deny'} " + f"{rule.get('source', 'any')} {rule.get('destination', 'any')}" + ) + else: + commands.append(f"access-list {config.acl_id} extended") + for rule in config.rules or []: + commands.append( + f"{rule.get('action', 'permit')} {rule.get('protocol', 'ip')} " + f"{rule.get('source', 'any')} {rule.get('destination', 'any')}" + ) + + commands.append("return" if self.is_emulated else "end") + return await self._send_commands(ip, commands) + + async def _configure_route(self, ip: str, config: SwitchConfig) -> str: + """配置路由""" + commands = [ + "system-view" if self.is_emulated else "configure terminal", + f"ip route-static {config.network} {config.mask} {config.next_hop}", + "return" if self.is_emulated else "end" + ] + return await self._send_commands(ip, commands) + + +# ---------------------- +# 使用示例 +# ---------------------- +async def main(): + # eNSP模拟环境配置 + ens_configurator = SwitchConfigurator(is_emulated=True) + await ens_configurator.batch_configure( + { + "type": "vlan", + "vlan_id": 100, + "name": "TestVLAN", + "interfaces": [{"interface": "GigabitEthernet0/0/1"}] + }, + ["192.168.1.200"] # eNSP设备IP + ) + + # 真实设备配置 + real_configurator = SwitchConfigurator( + username="real_admin", + password="SecurePass123!", + is_emulated=False + ) + await real_configurator.batch_configure( + { + "type": "interface", + "interface": "Gi1/0/24", + "description": "Uplink", + "state": "up" + }, + ["10.1.1.1"] # 真实设备IP + ) + + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file