AI-powered-switches/src/backend/app/api/network_config.py
2025-09-12 13:28:01 +08:00

145 lines
5.1 KiB
Python

import asyncio
import telnetlib3
from pathlib import Path
from typing import Dict, List, Optional
from pydantic import BaseModel
from src.backend.app.utils.logger import logger
from src.backend.config import settings
# ----------------------
# 数据模型
# ----------------------
class SwitchConfig(BaseModel):
type: str
vlan_id: Optional[int] = None
interface: Optional[str] = None
name: Optional[str] = None
ip_address: Optional[str] = None
vlan: Optional[int] = None
# ----------------------
# 异常类
# ----------------------
class SwitchConfigException(Exception):
pass
class EnspConnectionException(SwitchConfigException):
pass
# ----------------------
# 核心配置器
# ----------------------
class SwitchConfigurator:
connection_pool: Dict[str, tuple] = {}
def __init__(
self,
username: str = None,
password: str = None,
timeout: int = None,
max_workers: int = 5,
ensp_command_delay: float = 0.5,
**ssh_options
):
self.username = username if username is not None else settings.SWITCH_USERNAME
self.password = password if password is not None else settings.SWITCH_PASSWORD
self.timeout = timeout if timeout is not None else settings.SWITCH_TIMEOUT
self.semaphore = asyncio.Semaphore(max_workers)
self.backup_dir = Path("config_backups")
self.backup_dir.mkdir(exist_ok=True)
self.ensp_delay = ensp_command_delay
self.ssh_options = ssh_options
async def _get_or_create_connection(self, ip: str):
if ip in self.connection_pool:
logger.debug(f"复用已有连接: {ip}")
return self.connection_pool[ip]
logger.info(f"建立新连接: {ip}")
reader, writer = await telnetlib3.open_connection(
host=ip,
port=23,
encoding=None, # 二进制模式
shell=None
)
try:
if self.username and self.username.upper() != "NONE":
try:
logger.debug("等待用户名提示...")
await asyncio.wait_for(reader.readuntil(b"Username:"), timeout=self.timeout)
writer.write(self.username.encode("utf-8") + b"\n")
await writer.drain()
logger.info("用户名发送完成")
except asyncio.TimeoutError:
logger.warning("未收到用户名提示,可能交换机不要求用户名")
try:
logger.debug("等待密码提示...")
await asyncio.wait_for(reader.readuntil(b"Password:"), timeout=self.timeout)
logger.info('密码',self.password)
writer.write(self.password.encode("utf-8") + b"\n")
await writer.drain()
logger.info("密码发送完成")
except asyncio.TimeoutError:
writer.close()
raise EnspConnectionException("未收到密码提示,登录失败")
await asyncio.sleep(1)
except Exception as e:
writer.close()
raise EnspConnectionException(f"登录异常: {e}")
self.connection_pool[ip] = (reader, writer)
return reader, writer
async def _send_ensp_commands(self, ip: str, commands: List[str]) -> bool:
try:
reader, writer = await self._get_or_create_connection(ip)
for cmd in commands:
if cmd.startswith("!"):
logger.debug(f"跳过特殊命令: {cmd}")
continue
logger.info(f"[{ip}] 发送命令: {cmd}")
writer.write((cmd + "\n").encode("utf-8"))
await writer.drain()
await asyncio.sleep(self.ensp_delay)
if cmd.lower() in ["quit", "save", "y"]:
try:
data = await asyncio.wait_for(reader.read(1024), timeout=3)
if data:
logger.info(f"[{ip}] 返回结果:\n{data.strip()}")
except asyncio.TimeoutError:
logger.warning(f"[{ip}] 可能无回显,命令已执行: {cmd}")
continue
try:
output = await asyncio.wait_for(
reader.readuntil(b">"),
timeout=5
)
if output:
logger.info(f"[{ip}] 返回结果:\n{output.strip()}")
except asyncio.TimeoutError:
logger.warning(f"[{ip}] 读取返回超时 (命令: {cmd})")
logger.info(f"[{ip}] 所有命令发送完成")
return True
except asyncio.TimeoutError as e:
logger.error(f"[{ip}] 连接或读取超时: {e}")
return False
except Exception as e:
logger.error(f"[{ip}] 命令发送异常: {e}", exc_info=True)
return False
async def execute_raw_commands(self, ip: str, commands: List[str]) -> bool:
async with self.semaphore:
return await self._send_ensp_commands(ip, commands)