Compare commits

..

2 Commits

Author SHA1 Message Date
da5743e27a 又改回来了 2025-07-18 11:12:17 +08:00
0f856dc1e8 又改回来了 2025-07-18 09:18:11 +08:00
3 changed files with 6066 additions and 8711 deletions

View File

@ -13,7 +13,6 @@ class NetworkOptimizer:
G = nx.Graph()
for device in devices:
G.add_node(device['ip'], type=device['type'])
# 添加连接关系(示例)
G.add_edge('192.168.1.1', '192.168.1.2', bandwidth=1000)
return G
@ -25,7 +24,6 @@ class NetworkOptimizer:
"""带宽优化模型"""
def objective(x):
# 最小化最大链路利用率
return max(x)
constraints = (

View File

@ -1,56 +1,41 @@
import socket
from pathlib import Path
import nmap
import json
from pathlib import Path
from typing import List, Dict
import os
from scapy.layers.l2 import ARP, Ether
from scapy.sendrecv import srp
from ..utils.logger import logger
class NetworkScanner:
def __init__(self, cache_path: str = "switch_devices.json"):
self.cache_path = Path(cache_path)
self.nm = nmap.PortScanner()
def scan_subnet(self, subnet: str = "192.168.1.0/24", ports: List[int] = [22, 23, 80]) -> List[Dict]:
"""扫描指定子网的交换机设备,获取设备信息和开放端口"""
"""扫描指定子网的设备,获取设备信息和开放端口"""
logger.info(f"Scanning subnet: {subnet}")
arp_request = ARP(pdst=subnet)
broadcast = Ether(dst="ff:ff:ff:ff:ff:ff")
arp_request_broadcast = broadcast/arp_request
answered_list = srp(arp_request_broadcast, timeout=2, verbose=False)[0]
devices = []
for element in answered_list:
ip = element[1].psrc
mac = element[1].hwsrc
try:
self.nm.scan(hosts=subnet, arguments=f'-p {",".join(map(str, ports))}')
for host in self.nm.all_hosts():
ip = host
mac = self.nm[host]['addresses'].get('mac', 'N/A')
open_ports = []
device = {"ip": ip, "mac": mac, "ports": []}
for port in ports:
if self._is_port_open(ip, port):
device["ports"].append(port)
if 'tcp' in self.nm[host]:
for port in self.nm[host]['tcp']:
if self.nm[host]['tcp'][port]['state'] == 'open':
open_ports.append(port)
if device["ports"]:
devices.append(device)
logger.debug(f"Found device: {device}")
if open_ports:
devices.append({"ip": ip, "mac": mac, "ports": open_ports})
logger.debug(f"Found device: {ip} with open ports: {open_ports}")
except Exception as e:
logger.error(f"Error while scanning subnet: {e}")
self._save_to_cache(devices)
return devices
def _is_port_open(self, ip: str, port: int) -> bool:
"""检查指定 IP 和端口是否开放"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
result = sock.connect_ex((ip, port))
sock.close()
return result == 0
except socket.error:
return False
def _save_to_cache(self, devices: List[Dict]):
"""保存扫描结果到本地文件"""
with open(self.cache_path, "w") as f:

14720
src/frontend/pnpm-lock.yaml generated

File diff suppressed because it is too large Load Diff