基础api服务可进行,可扫描网络交换机(已修改相关问题,注意下载Nmap才可扫描交换机网址:https://nmap.org/download.html),添加了配置多台交换机的功能,ensp配置.需用实际设备进一步调试

This commit is contained in:
3 2025-06-04 13:35:12 +08:00
parent 15dcec276f
commit ed09ef95ea

View File

@ -1,35 +1,95 @@
import paramiko
from tenacity import retry, stop_after_attempt
import asyncio
from typing import Dict, Any,List
from ..utils.exceptions import SwitchConfigException
from typing import Dict, List, Optional, Union
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
from pydantic import BaseModel
import logging
# ----------------------
# 数据模型定义
# ----------------------
class SwitchConfig(BaseModel):
"""交换机配置模型"""
type: str # vlan/interface/acl/route
vlan_id: Optional[int] = None
interface: Optional[str] = None
name: Optional[str] = None
ip_address: Optional[str] = None
acl_id: Optional[int] = None
rules: Optional[List[Dict]] = None
# ----------------------
# 异常类
# ----------------------
class SwitchConfigException(Exception):
"""交换机配置异常基类"""
pass
# ----------------------
# 核心配置器
# ----------------------
class SwitchConfigurator:
def __init__(self, username: str, password: str, timeout: int = 10,max_workers=5):
def __init__(
self,
username: str = "admin",
password: str = "admin",
timeout: int = 10,
max_workers: int = 5,
is_emulated: bool = False,
emulated_delay: float = 2.0
):
"""
初始化配置器
:param username: 登录用户名
:param password: 登录密码
:param timeout: SSH超时时间()
:param max_workers: 最大并发数
:param is_emulated: 是否模拟器环境
:param emulated_delay: 模拟器命令间隔延迟()
"""
self.username = username
self.password = password
self.timeout = timeout
self.is_emulated = is_emulated
self.emulated_delay = emulated_delay
self.semaphore = asyncio.Semaphore(max_workers)
self.logger = logging.getLogger(__name__)
@retry(stop=stop_after_attempt(3))
async def safe_apply(self, ip: str, config: dict):
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10),
retry=retry_if_exception_type(SwitchConfigException)
)
async def safe_apply(self, ip: str, config: Union[Dict, SwitchConfig]) -> str:
"""安全执行配置(带重试机制)"""
async with self.semaphore:
return await self.apply_config(ip, config)
async def batch_configure(self, config: dict, ips: List[str]):
"""并发配置多台设备"""
tasks = [self.apply_config(ip, config) for ip in ips]
return await asyncio.gather(*tasks, return_exceptions=True)
async def batch_configure(
self,
config: Union[Dict, SwitchConfig],
ips: List[str]
) -> Dict[str, Union[str, Exception]]:
"""批量配置多台设备"""
tasks = [self.safe_apply(ip, config) for ip in ips]
results = await asyncio.gather(*tasks, return_exceptions=True)
return {ip: result for ip, result in zip(ips, results)}
async def apply_config(self, switch_ip: str, config: Dict[str, Any]) -> str:
"""
应用配置到交换机
"""
async def apply_config(
self,
switch_ip: str,
config: Union[Dict, SwitchConfig]
) -> str:
"""应用配置到单台设备"""
try:
# 根据配置类型调用不同的方法
config_type = config.get("type", "").lower()
if isinstance(config, dict):
config = SwitchConfig(**config)
config_type = config.type.lower()
if config_type == "vlan":
return await self._configure_vlan(switch_ip, config)
elif config_type == "interface":
@ -39,148 +99,174 @@ class SwitchConfigurator:
elif config_type == "route":
return await self._configure_route(switch_ip, config)
else:
raise SwitchConfigException(f"Unsupported config type: {config_type}")
raise SwitchConfigException(f"不支持的配置类型: {config_type}")
except Exception as e:
self.logger.error(f"{switch_ip} 配置失败: {str(e)}")
raise SwitchConfigException(str(e))
async def _send_commands(self, switch_ip: str, commands: list) -> str:
"""
发送命令到交换机
"""
# ----------------------
# 协议实现
# ----------------------
async def _send_commands(self, ip: str, commands: List[str]) -> str:
"""发送命令到设备(自动适配模拟器)"""
try:
# 使用Paramiko建立SSH连接
# 自动选择凭证
username, password = (
("admin", "Admin@123") if self.is_emulated
else (self.username, self.password)
)
# 自动调整超时
timeout = 15 if self.is_emulated else self.timeout
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 在异步上下文中运行阻塞操作
loop = asyncio.get_event_loop()
await loop.run_in_executor(
None,
lambda: ssh.connect(
switch_ip,
username=self.username,
password=self.password,
timeout=self.timeout
ip,
username=username,
password=password,
timeout=timeout,
look_for_keys=False
)
)
# 获取SSH shell
shell = await loop.run_in_executor(None, ssh.invoke_shell)
# 发送配置命令
# 执行命令
shell = ssh.invoke_shell()
output = ""
for cmd in commands:
await loop.run_in_executor(None, shell.send, cmd + "\n")
await asyncio.sleep(0.5)
shell.send(cmd + "\n")
if self.is_emulated:
await asyncio.sleep(self.emulated_delay)
while shell.recv_ready():
output += (await loop.run_in_executor(None, shell.recv, 1024)).decode("utf-8")
# 关闭连接
await loop.run_in_executor(None, ssh.close)
recv = await loop.run_in_executor(None, shell.recv, 1024)
output += recv.decode("gbk" if self.is_emulated else "utf-8")
ssh.close()
return output
except Exception as e:
raise SwitchConfigException(f"SSH connection failed: {str(e)}")
async def _configure_vlan(self, switch_ip: str, config: Dict[str, Any]) -> str:
"""
配置VLAN
"""
vlan_id = config["vlan_id"]
vlan_name = config.get("name", f"VLAN{vlan_id}")
interfaces = config.get("interfaces", [])
raise SwitchConfigException(f"SSH连接错误: {str(e)}")
async def _configure_vlan(self, ip: str, config: SwitchConfig) -> str:
"""配置VLAN自动适配语法"""
commands = [
"configure terminal",
f"vlan {vlan_id}",
f"name {vlan_name}",
"system-view" if self.is_emulated else "configure terminal",
f"vlan {config.vlan_id}",
f"name {config.name or ''}"
]
# 配置接口
for intf in interfaces:
# 端口加入VLAN
for intf in getattr(config, "interfaces", []):
if self.is_emulated:
commands.extend([
f"interface {intf['interface']}",
f"switchport access vlan {vlan_id}",
"port link-type access",
f"port default vlan {config.vlan_id}",
"quit"
])
else:
commands.extend([
f"interface {intf['interface']}",
f"switchport access vlan {config.vlan_id}",
"exit"
])
commands.append("end")
return await self._send_commands(switch_ip, commands)
async def _configure_interface(self, switch_ip: str, config: Dict[str, Any]) -> str:
"""
配置接口
"""
interface = config["interface"]
ip_address = config.get("ip_address")
description = config.get("description", "")
vlan = config.get("vlan")
state = config.get("state", "up")
commands.append("return" if self.is_emulated else "end")
return await self._send_commands(ip, commands)
async def _configure_interface(self, ip: str, config: SwitchConfig) -> str:
"""配置接口"""
commands = [
"configure terminal",
f"interface {interface}",
f"description {description}",
"system-view" if self.is_emulated else "configure terminal",
f"interface {config.interface}",
f"description {config.description or ''}"
]
if ip_address:
commands.append(f"ip address {ip_address}")
if config.ip_address:
commands.append(f"ip address {config.ip_address}")
if vlan:
commands.append(f"switchport access vlan {vlan}")
if state.lower() == "up":
commands.append("no shutdown")
if hasattr(config, "vlan"):
if self.is_emulated:
commands.extend([
"port link-type access",
f"port default vlan {config.vlan}"
])
else:
commands.append("shutdown")
commands.append(f"switchport access vlan {config.vlan}")
commands.extend(["exit", "end"])
state = getattr(config, "state", "up")
commands.append("undo shutdown" if state == "up" else "shutdown")
commands.append("return" if self.is_emulated else "end")
return await self._send_commands(switch_ip, commands)
return await self._send_commands(ip, commands)
async def _configure_acl(self, switch_ip: str, config: Dict[str, Any]) -> str:
"""
配置ACL
"""
acl_id = config["acl_id"]
acl_type = config.get("type", "standard")
rules = config.get("rules", [])
async def _configure_acl(self, ip: str, config: SwitchConfig) -> str:
"""配置ACL"""
commands = ["system-view" if self.is_emulated else "configure terminal"]
commands = ["configure terminal"]
if acl_type == "standard":
commands.append(f"access-list {acl_id} standard")
if self.is_emulated:
commands.append(f"acl number {config.acl_id}")
for rule in config.rules or []:
commands.append(
f"rule {'permit' if rule.get('action') == 'permit' else 'deny'} "
f"{rule.get('source', 'any')} {rule.get('destination', 'any')}"
)
else:
commands.append(f"access-list {acl_id} extended")
commands.append(f"access-list {config.acl_id} extended")
for rule in config.rules or []:
commands.append(
f"{rule.get('action', 'permit')} {rule.get('protocol', 'ip')} "
f"{rule.get('source', 'any')} {rule.get('destination', 'any')}"
)
for rule in rules:
action = rule.get("action", "permit")
source = rule.get("source", "any")
destination = rule.get("destination", "any")
protocol = rule.get("protocol", "ip")
if acl_type == "standard":
commands.append(f"{action} {source}")
else:
commands.append(f"{action} {protocol} {source} {destination}")
commands.append("end")
return await self._send_commands(switch_ip, commands)
async def _configure_route(self, switch_ip: str, config: Dict[str, Any]) -> str:
"""
配置路由
"""
network = config["network"]
mask = config["mask"]
next_hop = config["next_hop"]
commands.append("return" if self.is_emulated else "end")
return await self._send_commands(ip, commands)
async def _configure_route(self, ip: str, config: SwitchConfig) -> str:
"""配置路由"""
commands = [
"configure terminal",
f"ip route {network} {mask} {next_hop}",
"end"
"system-view" if self.is_emulated else "configure terminal",
f"ip route-static {config.network} {config.mask} {config.next_hop}",
"return" if self.is_emulated else "end"
]
return await self._send_commands(ip, commands)
return await self._send_commands(switch_ip, commands)
# ----------------------
# 使用示例
# ----------------------
async def main():
# eNSP模拟环境配置
ens_configurator = SwitchConfigurator(is_emulated=True)
await ens_configurator.batch_configure(
{
"type": "vlan",
"vlan_id": 100,
"name": "TestVLAN",
"interfaces": [{"interface": "GigabitEthernet0/0/1"}]
},
["192.168.1.200"] # eNSP设备IP
)
# 真实设备配置
real_configurator = SwitchConfigurator(
username="real_admin",
password="SecurePass123!",
is_emulated=False
)
await real_configurator.batch_configure(
{
"type": "interface",
"interface": "Gi1/0/24",
"description": "Uplink",
"state": "up"
},
["10.1.1.1"] # 真实设备IP
)
if __name__ == "__main__":
asyncio.run(main())