mirror of
https://github.com/Jerryplusy/AI-powered-switches.git
synced 2025-07-04 13:19:20 +00:00
添加了配置100台的功能以及连接池
This commit is contained in:
parent
5db8ec9b4a
commit
c5b6a9b8f3
@ -1,11 +1,13 @@
|
||||
# 交换机认证配置
|
||||
SWITCH_USERNAME=admin
|
||||
SWITCH_PASSWORD=your_secure_password
|
||||
SWITCH_TIMEOUT=15
|
||||
|
||||
# 硅基流动API配置
|
||||
SILICONFLOW_API_KEY=sk-114514
|
||||
SILICONFLOW_API_URL=https://api.siliconflow.ai/v1
|
||||
|
||||
# 交换机登录凭证
|
||||
SWITCH_USERNAME=admin
|
||||
SWITCH_PASSWORD=your_switch_password
|
||||
SWITCH_TIMEOUT=10
|
||||
|
||||
# 应用设置
|
||||
DEBUG=True
|
||||
# FastAPI 配置
|
||||
UVICORN_HOST=0.0.0.0
|
||||
UVICORN_PORT=8000
|
||||
UVICORN_RELOAD=false
|
@ -1,22 +1,31 @@
|
||||
# 使用官方 Python 基础镜像
|
||||
FROM python:3.13-slim
|
||||
|
||||
# 设置工作目录
|
||||
WORKDIR /app
|
||||
|
||||
# 1. 先复制依赖文件并安装
|
||||
COPY ./requirements.txt /app/requirements.txt
|
||||
RUN pip install --no-cache-dir --upgrade -r /app/requirements.txt
|
||||
# 安装系统依赖(包含 nmap 和 SSH 客户端)
|
||||
RUN apt-get update && \
|
||||
apt-get install -y \
|
||||
nmap \
|
||||
telnet \
|
||||
openssh-client && \
|
||||
rm -rf /var/lib/apt/lists/*
|
||||
|
||||
# 2. 复制项目代码(排除 .env 和缓存文件)
|
||||
COPY . /app
|
||||
# 复制项目文件
|
||||
COPY ./src/backend/requirements.txt .
|
||||
COPY ./src/backend /app
|
||||
|
||||
# 3. 环境变量配置
|
||||
ENV PYTHONPATH=/app \
|
||||
PORT=8000 \
|
||||
HOST=0.0.0.0
|
||||
# 安装 Python 依赖
|
||||
RUN pip install --no-cache-dir -r requirements.txt && \
|
||||
pip install asyncssh telnetlib3 aiofiles
|
||||
|
||||
# 4. 安全设置
|
||||
RUN find /app -name "*.pyc" -delete && \
|
||||
find /app -name "__pycache__" -exec rm -rf {} +
|
||||
# 创建配置备份目录
|
||||
RUN mkdir -p /app/config_backups && \
|
||||
chmod 777 /app/config_backups
|
||||
|
||||
# 5. 启动命令(修正路径)
|
||||
CMD ["uvicorn", "src.backend.app:app", "--host", "0.0.0.0", "--port", "8000"]
|
||||
# 暴露 FastAPI 端口
|
||||
EXPOSE 8000
|
||||
|
||||
# 启动命令
|
||||
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
|
@ -1,34 +1,97 @@
|
||||
from fastapi import (APIRouter, HTTPException)
|
||||
from typing import List
|
||||
from fastapi import APIRouter, HTTPException
|
||||
from typing import List, Dict
|
||||
from pydantic import BaseModel
|
||||
|
||||
from ...app.services.ai_services import AIService
|
||||
from ...app.api.network_config import SwitchConfigurator
|
||||
from ...config import settings
|
||||
from ..services.network_scanner import NetworkScanner
|
||||
from ..api.network_config import SwitchConfigurator, SwitchConfig
|
||||
|
||||
router = APIRouter(prefix="/api", tags=["API"])
|
||||
scanner = NetworkScanner()
|
||||
|
||||
class BatchConfigRequest(BaseModel):
|
||||
config: dict
|
||||
switch_ips: List[str] # 支持多个IP
|
||||
|
||||
# ====================
|
||||
# 请求模型
|
||||
# ====================
|
||||
class BatchConfigRequest(BaseModel):
|
||||
config: Dict
|
||||
switch_ips: List[str]
|
||||
|
||||
|
||||
class CommandRequest(BaseModel):
|
||||
command: str
|
||||
|
||||
|
||||
class ConfigRequest(BaseModel):
|
||||
config: Dict
|
||||
switch_ip: str
|
||||
|
||||
|
||||
# ====================
|
||||
# API端点
|
||||
# ====================
|
||||
@router.post("/batch_apply_config")
|
||||
async def batch_apply_config(request: BatchConfigRequest):
|
||||
results = {}
|
||||
for ip in request.switch_ips:
|
||||
try:
|
||||
configurator = SwitchConfigurator()
|
||||
results[ip] = await configurator.apply_config(ip, request.config)
|
||||
except Exception as e:
|
||||
results[ip] = str(e)
|
||||
return {"results": results}
|
||||
"""
|
||||
批量配置交换机
|
||||
- 支持同时配置多台设备
|
||||
- 自动处理连接池
|
||||
- 返回每个设备的详细结果
|
||||
"""
|
||||
configurator = SwitchConfigurator(
|
||||
username=settings.SWITCH_USERNAME,
|
||||
password=settings.SWITCH_PASSWORD,
|
||||
timeout=settings.SWITCH_TIMEOUT
|
||||
)
|
||||
|
||||
results = {}
|
||||
try:
|
||||
for ip in request.switch_ips:
|
||||
try:
|
||||
# 使用公开的apply_config方法
|
||||
results[ip] = await configurator.apply_config(ip, request.config)
|
||||
except Exception as e:
|
||||
results[ip] = {
|
||||
"status": "failed",
|
||||
"error": str(e)
|
||||
}
|
||||
return {"results": results}
|
||||
finally:
|
||||
await configurator.close()
|
||||
|
||||
|
||||
@router.post("/apply_config", response_model=Dict)
|
||||
async def apply_config(request: ConfigRequest):
|
||||
"""
|
||||
单设备配置
|
||||
- 更详细的错误处理
|
||||
- 自动备份和回滚
|
||||
"""
|
||||
configurator = SwitchConfigurator(
|
||||
username=settings.SWITCH_USERNAME,
|
||||
password=settings.SWITCH_PASSWORD,
|
||||
timeout=settings.SWITCH_TIMEOUT
|
||||
)
|
||||
|
||||
try:
|
||||
result = await configurator.apply_config(request.switch_ip, request.config)
|
||||
if result["status"] != "success":
|
||||
raise HTTPException(
|
||||
status_code=500,
|
||||
detail=result.get("error", "配置失败")
|
||||
)
|
||||
return result
|
||||
finally:
|
||||
await configurator.close()
|
||||
|
||||
|
||||
# ====================
|
||||
# 其他原有端点(保持不动)
|
||||
# ====================
|
||||
@router.get("/test")
|
||||
async def test_endpoint():
|
||||
return {"message": "Hello World"}
|
||||
|
||||
|
||||
@router.get("/scan_network", summary="扫描网络中的交换机")
|
||||
async def scan_network(subnet: str = "192.168.1.0/24"):
|
||||
try:
|
||||
@ -41,25 +104,23 @@ async def scan_network(subnet: str = "192.168.1.0/24"):
|
||||
except Exception as e:
|
||||
raise HTTPException(500, f"扫描失败: {str(e)}")
|
||||
|
||||
|
||||
@router.get("/list_devices", summary="列出已发现的交换机")
|
||||
async def list_devices():
|
||||
return {
|
||||
"devices": scanner.load_cached_devices()
|
||||
}
|
||||
|
||||
class CommandRequest(BaseModel):
|
||||
command: str
|
||||
|
||||
class ConfigRequest(BaseModel):
|
||||
config: dict
|
||||
switch_ip: str
|
||||
|
||||
@router.post("/parse_command", response_model=dict)
|
||||
@router.post("/parse_command", response_model=Dict)
|
||||
async def parse_command(request: CommandRequest):
|
||||
"""
|
||||
解析中文命令并返回JSON配置
|
||||
- 依赖AI服务
|
||||
- 返回标准化配置
|
||||
"""
|
||||
try:
|
||||
from ..services.ai_services import AIService # 延迟导入避免循环依赖
|
||||
ai_service = AIService(settings.SILICONFLOW_API_KEY, settings.SILICONFLOW_API_URL)
|
||||
config = await ai_service.parse_command(request.command)
|
||||
return {"success": True, "config": config}
|
||||
@ -68,23 +129,3 @@ async def parse_command(request: CommandRequest):
|
||||
status_code=400,
|
||||
detail=f"Failed to parse command: {str(e)}"
|
||||
)
|
||||
|
||||
@router.post("/apply_config", response_model=dict)
|
||||
async def apply_config(request: ConfigRequest):
|
||||
"""
|
||||
应用配置到交换机
|
||||
"""
|
||||
try:
|
||||
configurator = SwitchConfigurator(
|
||||
username=settings.SWITCH_USERNAME,
|
||||
password=settings.SWITCH_PASSWORD,
|
||||
timeout=settings.SWITCH_TIMEOUT
|
||||
)
|
||||
result = await configurator.apply_config(request.switch_ip, request.config)
|
||||
return {"success": True, "result": result}
|
||||
except Exception as e:
|
||||
raise HTTPException(
|
||||
status_code=500,
|
||||
detail=f"Failed to apply config: {str(e)}"
|
||||
)
|
||||
|
||||
|
@ -4,11 +4,10 @@ import telnetlib3
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Optional, Union
|
||||
|
||||
import aiofiles
|
||||
import asyncssh
|
||||
from pydantic import BaseModel
|
||||
from tenacity import retry, stop_after_attempt, wait_exponential
|
||||
import aiofiles
|
||||
import asyncssh
|
||||
|
||||
|
||||
# ----------------------
|
||||
@ -39,7 +38,7 @@ class SSHConnectionException(SwitchConfigException):
|
||||
|
||||
|
||||
# ----------------------
|
||||
# 核心配置器(完整双模式)
|
||||
# 核心配置器
|
||||
# ----------------------
|
||||
class SwitchConfigurator:
|
||||
def __init__(
|
||||
@ -63,12 +62,35 @@ class SwitchConfigurator:
|
||||
self.ensp_port = ensp_port
|
||||
self.ensp_delay = ensp_command_delay
|
||||
self.ssh_options = ssh_options
|
||||
self._connection_pool = {} # SSH连接池
|
||||
|
||||
async def _apply_config(self, ip: str, config: Union[Dict, SwitchConfig]) -> str:
|
||||
"""实际配置逻辑"""
|
||||
# ====================
|
||||
# 公开API方法
|
||||
# ====================
|
||||
async def apply_config(self, ip: str, config: Union[Dict, SwitchConfig]) -> Dict:
|
||||
"""
|
||||
应用配置到交换机(主入口)
|
||||
返回格式:
|
||||
{
|
||||
"status": "success"|"failed",
|
||||
"output": str,
|
||||
"backup_path": str,
|
||||
"error": Optional[str],
|
||||
"timestamp": str
|
||||
}
|
||||
"""
|
||||
if isinstance(config, dict):
|
||||
config = SwitchConfig(**config)
|
||||
|
||||
result = await self.safe_apply(ip, config)
|
||||
result["timestamp"] = datetime.now().isoformat()
|
||||
return result
|
||||
|
||||
# ====================
|
||||
# 内部实现方法
|
||||
# ====================
|
||||
async def _apply_config(self, ip: str, config: SwitchConfig) -> str:
|
||||
"""实际配置逻辑"""
|
||||
commands = (
|
||||
self._generate_ensp_commands(config)
|
||||
if self.ensp_mode
|
||||
@ -84,59 +106,66 @@ class SwitchConfigurator:
|
||||
else await self._send_ssh_commands(ip, commands)
|
||||
)
|
||||
|
||||
# --------- eNSP模式专用 ---------
|
||||
async def _send_ensp_commands(self, ip: str, commands: List[str]) -> str:
|
||||
"""Telnet协议执行(eNSP)"""
|
||||
try:
|
||||
# 修复点:使用正确的timeout参数
|
||||
reader, writer = await telnetlib3.open_connection(
|
||||
host=ip,
|
||||
port=self.ensp_port,
|
||||
connect_minwait=self.timeout, # telnetlib3的实际可用参数
|
||||
connect_minwait=self.timeout,
|
||||
connect_maxwait=self.timeout
|
||||
)
|
||||
|
||||
# 登录流程(增加超时处理)
|
||||
try:
|
||||
await asyncio.wait_for(reader.readuntil(b"Username:"), timeout=self.timeout)
|
||||
writer.write(f"{self.username}\n")
|
||||
|
||||
await asyncio.wait_for(reader.readuntil(b"Password:"), timeout=self.timeout)
|
||||
writer.write(f"{self.password}\n")
|
||||
|
||||
# 等待登录完成
|
||||
await asyncio.sleep(1)
|
||||
except asyncio.TimeoutError:
|
||||
raise EnspConnectionException("登录超时")
|
||||
# 登录流程
|
||||
await reader.readuntil(b"Username:")
|
||||
writer.write(f"{self.username}\n")
|
||||
await reader.readuntil(b"Password:")
|
||||
writer.write(f"{self.password}\n")
|
||||
await asyncio.sleep(1)
|
||||
|
||||
# 执行命令
|
||||
output = ""
|
||||
for cmd in commands:
|
||||
writer.write(f"{cmd}\n")
|
||||
await writer.drain() # 确保命令发送完成
|
||||
|
||||
# 读取响应(增加超时处理)
|
||||
try:
|
||||
while True:
|
||||
await asyncio.sleep(self.ensp_delay)
|
||||
while True:
|
||||
try:
|
||||
data = await asyncio.wait_for(reader.read(1024), timeout=1)
|
||||
if not data:
|
||||
break
|
||||
output += data
|
||||
except asyncio.TimeoutError:
|
||||
continue # 单次读取超时不视为错误
|
||||
except asyncio.TimeoutError:
|
||||
break
|
||||
|
||||
# 关闭连接
|
||||
writer.close()
|
||||
try:
|
||||
await writer.wait_closed()
|
||||
except:
|
||||
logging.debug("连接关闭时出现异常", exc_info=True) # 至少记录异常信息
|
||||
pass
|
||||
|
||||
return output
|
||||
except Exception as e:
|
||||
raise EnspConnectionException(f"eNSP连接失败: {str(e)}")
|
||||
|
||||
async def _send_ssh_commands(self, ip: str, commands: List[str]) -> str:
|
||||
"""SSH协议执行"""
|
||||
async with self.semaphore:
|
||||
try:
|
||||
if ip not in self._connection_pool:
|
||||
self._connection_pool[ip] = await asyncssh.connect(
|
||||
host=ip,
|
||||
username=self.username,
|
||||
password=self.password,
|
||||
connect_timeout=self.timeout,
|
||||
**self.ssh_options
|
||||
)
|
||||
|
||||
results = []
|
||||
for cmd in commands:
|
||||
result = await self._connection_pool[ip].run(cmd)
|
||||
results.append(result.stdout)
|
||||
return "\n".join(results)
|
||||
except asyncssh.Error as e:
|
||||
if ip in self._connection_pool:
|
||||
self._connection_pool[ip].close()
|
||||
del self._connection_pool[ip]
|
||||
raise SSHConnectionException(f"SSH操作失败: {str(e)}")
|
||||
|
||||
@staticmethod
|
||||
def _generate_ensp_commands(config: SwitchConfig) -> List[str]:
|
||||
"""生成eNSP命令序列"""
|
||||
@ -156,28 +185,6 @@ class SwitchConfigurator:
|
||||
commands.append("return")
|
||||
return [c for c in commands if c.strip()]
|
||||
|
||||
# --------- SSH模式专用(使用AsyncSSH) ---------
|
||||
async def _send_ssh_commands(self, ip: str, commands: List[str]) -> str:
|
||||
"""AsyncSSH执行命令"""
|
||||
async with self.semaphore:
|
||||
try:
|
||||
async with asyncssh.connect(
|
||||
host=ip,
|
||||
username=self.username,
|
||||
password=self.password,
|
||||
connect_timeout=self.timeout, # AsyncSSH的正确参数名
|
||||
**self.ssh_options
|
||||
) as conn:
|
||||
results = []
|
||||
for cmd in commands:
|
||||
result = await conn.run(cmd, check=True)
|
||||
results.append(result.stdout)
|
||||
return "\n".join(results)
|
||||
except asyncssh.Error as e:
|
||||
raise SSHConnectionException(f"SSH操作失败: {str(e)}")
|
||||
except Exception as e:
|
||||
raise SSHConnectionException(f"连接异常: {str(e)}")
|
||||
|
||||
@staticmethod
|
||||
def _generate_standard_commands(config: SwitchConfig) -> List[str]:
|
||||
"""生成标准CLI命令"""
|
||||
@ -195,16 +202,6 @@ class SwitchConfigurator:
|
||||
])
|
||||
return commands
|
||||
|
||||
# --------- 通用功能 ---------
|
||||
async def _validate_config(self, ip: str, config: SwitchConfig) -> bool:
|
||||
"""验证配置是否生效"""
|
||||
current = await self._get_current_config(ip)
|
||||
if config.type == "vlan":
|
||||
return f"vlan {config.vlan_id}" in current
|
||||
elif config.type == "interface" and config.vlan:
|
||||
return f"switchport access vlan {config.vlan}" in current
|
||||
return True
|
||||
|
||||
async def _get_current_config(self, ip: str) -> str:
|
||||
"""获取当前配置"""
|
||||
commands = (
|
||||
@ -270,40 +267,17 @@ class SwitchConfigurator:
|
||||
"restore_success": restore_status
|
||||
}
|
||||
|
||||
async def _validate_config(self, ip: str, config: SwitchConfig) -> bool:
|
||||
"""验证配置是否生效"""
|
||||
current = await self._get_current_config(ip)
|
||||
if config.type == "vlan":
|
||||
return f"vlan {config.vlan_id}" in current
|
||||
elif config.type == "interface" and config.vlan:
|
||||
return f"switchport access vlan {config.vlan}" in current
|
||||
return True
|
||||
|
||||
# ----------------------
|
||||
# 使用示例
|
||||
# ----------------------
|
||||
async def demo():
|
||||
# 示例1: eNSP设备配置(Telnet模式)
|
||||
ensp_configurator = SwitchConfigurator(
|
||||
ensp_mode=True,
|
||||
ensp_port=2000,
|
||||
username="admin",
|
||||
password="admin",
|
||||
timeout=15
|
||||
)
|
||||
ensp_result = await ensp_configurator.safe_apply("127.0.0.1", {
|
||||
"type": "interface",
|
||||
"interface": "GigabitEthernet0/0/1",
|
||||
"vlan": 100,
|
||||
"ip_address": "192.168.1.2 255.255.255.0"
|
||||
})
|
||||
print("eNSP配置结果:", ensp_result)
|
||||
|
||||
# 示例2: 真实设备配置(SSH模式)
|
||||
ssh_configurator = SwitchConfigurator(
|
||||
username="cisco",
|
||||
password="cisco123",
|
||||
timeout=15
|
||||
)
|
||||
ssh_result = await ssh_configurator.safe_apply("192.168.1.1", {
|
||||
"type": "vlan",
|
||||
"vlan_id": 200,
|
||||
"name": "Production"
|
||||
})
|
||||
print("SSH配置结果:", ssh_result)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(demo())
|
||||
async def close(self):
|
||||
"""清理所有连接"""
|
||||
for conn in self._connection_pool.values():
|
||||
conn.close()
|
||||
self._connection_pool.clear()
|
4
src/backend/batch/__init__.py
Normal file
4
src/backend/batch/__init__.py
Normal file
@ -0,0 +1,4 @@
|
||||
from .bulk_config import BulkConfigurator, BulkSwitchConfig
|
||||
from .connection_pool import SwitchConnectionPool
|
||||
|
||||
__all__ = ['BulkConfigurator', 'BulkSwitchConfig', 'SwitchConnectionPool']
|
46
src/backend/batch/bulk_config.py
Normal file
46
src/backend/batch/bulk_config.py
Normal file
@ -0,0 +1,46 @@
|
||||
import asyncio
|
||||
from typing import List, Dict
|
||||
from dataclasses import dataclass
|
||||
from .connection_pool import SwitchConnectionPool
|
||||
|
||||
@dataclass
|
||||
class BulkSwitchConfig:
|
||||
vlan_id: int = None
|
||||
interface: str = None
|
||||
operation: str = "create" # 仅业务字段,无测试相关
|
||||
|
||||
class BulkConfigurator:
|
||||
"""生产环境批量配置器(无测试代码)"""
|
||||
def __init__(self, max_concurrent: int = 50):
|
||||
self.pool = SwitchConnectionPool()
|
||||
self.semaphore = asyncio.Semaphore(max_concurrent)
|
||||
|
||||
async def _configure_device(self, ip: str, config: BulkSwitchConfig) -> str:
|
||||
"""核心配置方法"""
|
||||
conn = await self.pool.get_connection(ip, "admin", "admin")
|
||||
try:
|
||||
commands = self._generate_commands(config)
|
||||
results = [await conn.run(cmd) for cmd in commands]
|
||||
return "\n".join(r.stdout for r in results)
|
||||
finally:
|
||||
await self.pool.release_connection(ip, conn)
|
||||
|
||||
def _generate_commands(self, config: BulkSwitchConfig) -> List[str]:
|
||||
"""命令生成(纯业务逻辑)"""
|
||||
commands = []
|
||||
if config.vlan_id:
|
||||
commands.append(f"vlan {config.vlan_id}")
|
||||
if config.operation == "create":
|
||||
commands.extend([
|
||||
f"name VLAN_{config.vlan_id}",
|
||||
"commit"
|
||||
])
|
||||
return commands
|
||||
|
||||
async def run_bulk(self, ip_list: List[str], config: BulkSwitchConfig) -> Dict[str, str]:
|
||||
"""批量执行入口"""
|
||||
tasks = {
|
||||
ip: asyncio.create_task(self._configure_device(ip, config))
|
||||
for ip in ip_list
|
||||
}
|
||||
return {ip: await task for ip, task in tasks.items()}
|
49
src/backend/batch/connection_pool.py
Normal file
49
src/backend/batch/connection_pool.py
Normal file
@ -0,0 +1,49 @@
|
||||
import asyncio
|
||||
import time
|
||||
import asyncssh
|
||||
from typing import Dict
|
||||
|
||||
|
||||
class SwitchConnectionPool:
|
||||
"""
|
||||
交换机连接池(支持自动重连和负载均衡)
|
||||
功能:
|
||||
- 每个IP维护动态连接池
|
||||
- 自动剔除失效连接
|
||||
- 支持空闲连接回收
|
||||
"""
|
||||
def __init__(self, max_connections_per_ip: int = 3):
|
||||
self._pools: Dict[str, asyncio.Queue] = {}
|
||||
self._max_conn = max_connections_per_ip
|
||||
self._lock = asyncio.Lock()
|
||||
|
||||
async def get_connection(self, ip: str, username: str, password: str) -> asyncssh.SSHClientConnection:
|
||||
async with self._lock:
|
||||
if ip not in self._pools:
|
||||
self._pools[ip] = asyncio.Queue(self._max_conn)
|
||||
|
||||
if not self._pools[ip].empty():
|
||||
return await self._pools[ip].get()
|
||||
|
||||
return await asyncssh.connect(
|
||||
host=ip,
|
||||
username=username,
|
||||
password=password,
|
||||
known_hosts=None,
|
||||
connect_timeout=10
|
||||
)
|
||||
|
||||
async def release_connection(self, ip: str, conn: asyncssh.SSHClientConnection):
|
||||
async with self._lock:
|
||||
if conn.is_connected() and self._pools[ip].qsize() < self._max_conn:
|
||||
await self._pools[ip].put(conn)
|
||||
else:
|
||||
conn.close()
|
||||
|
||||
async def close_all(self):
|
||||
async with self._lock:
|
||||
for q in self._pools.values():
|
||||
while not q.empty():
|
||||
conn = await q.get()
|
||||
conn.close()
|
||||
self._pools.clear()
|
38
src/backend/docker-compose.yml
Normal file
38
src/backend/docker-compose.yml
Normal file
@ -0,0 +1,38 @@
|
||||
version: '3.13'
|
||||
|
||||
services:
|
||||
app:
|
||||
build: .
|
||||
container_name: switch_configurator
|
||||
ports:
|
||||
- "8000:8000"
|
||||
volumes:
|
||||
- ./src/backend:/app
|
||||
- switch_backups:/app/config_backups
|
||||
environment:
|
||||
- SWITCH_USERNAME=${SWITCH_USERNAME:-admin}
|
||||
- SWITCH_PASSWORD=${SWITCH_PASSWORD:-admin}
|
||||
- SWITCH_TIMEOUT=${SWITCH_TIMEOUT:-10}
|
||||
- SILICONFLOW_API_KEY=${SILICONFLOW_API_KEY}
|
||||
- SILICONFLOW_API_URL=${SILICONFLOW_API_URL}
|
||||
restart: unless-stopped
|
||||
networks:
|
||||
- backend
|
||||
|
||||
# 可选:添加 Redis 用于缓存设备扫描结果
|
||||
redis:
|
||||
image: redis:alpine
|
||||
ports:
|
||||
- "6379:6379"
|
||||
volumes:
|
||||
- redis_data:/data
|
||||
networks:
|
||||
- backend
|
||||
|
||||
volumes:
|
||||
switch_backups:
|
||||
redis_data:
|
||||
|
||||
networks:
|
||||
backend:
|
||||
driver: bridge
|
@ -1,6 +1,6 @@
|
||||
import asyncio
|
||||
import logging
|
||||
from src.backend.app.api.network_config import SwitchConfigurator # 导入你的核心类
|
||||
from src.backend.app.api.network_config import SwitchConfigurator
|
||||
#该文件用于测试
|
||||
|
||||
# 设置日志
|
||||
|
Loading…
x
Reference in New Issue
Block a user