mirror of
https://github.com/Jerryplusy/AI-powered-switches.git
synced 2025-10-14 09:49:19 +00:00
Compare commits
2 Commits
01a2735318
...
da5743e27a
Author | SHA1 | Date | |
---|---|---|---|
da5743e27a | |||
0f856dc1e8 |
@ -13,7 +13,6 @@ class NetworkOptimizer:
|
||||
G = nx.Graph()
|
||||
for device in devices:
|
||||
G.add_node(device['ip'], type=device['type'])
|
||||
# 添加连接关系(示例)
|
||||
G.add_edge('192.168.1.1', '192.168.1.2', bandwidth=1000)
|
||||
return G
|
||||
|
||||
@ -25,7 +24,6 @@ class NetworkOptimizer:
|
||||
"""带宽优化模型"""
|
||||
|
||||
def objective(x):
|
||||
# 最小化最大链路利用率
|
||||
return max(x)
|
||||
|
||||
constraints = (
|
||||
|
@ -1,56 +1,41 @@
|
||||
import socket
|
||||
from pathlib import Path
|
||||
import nmap
|
||||
import json
|
||||
from pathlib import Path
|
||||
from typing import List, Dict
|
||||
import os
|
||||
|
||||
from scapy.layers.l2 import ARP, Ether
|
||||
from scapy.sendrecv import srp
|
||||
|
||||
from ..utils.logger import logger
|
||||
|
||||
class NetworkScanner:
|
||||
def __init__(self, cache_path: str = "switch_devices.json"):
|
||||
self.cache_path = Path(cache_path)
|
||||
self.nm = nmap.PortScanner()
|
||||
|
||||
def scan_subnet(self, subnet: str = "192.168.1.0/24", ports: List[int] = [22, 23, 80]) -> List[Dict]:
|
||||
"""扫描指定子网的交换机设备,获取设备信息和开放端口"""
|
||||
"""扫描指定子网的设备,获取设备信息和开放端口"""
|
||||
logger.info(f"Scanning subnet: {subnet}")
|
||||
|
||||
arp_request = ARP(pdst=subnet)
|
||||
broadcast = Ether(dst="ff:ff:ff:ff:ff:ff")
|
||||
arp_request_broadcast = broadcast/arp_request
|
||||
|
||||
answered_list = srp(arp_request_broadcast, timeout=2, verbose=False)[0]
|
||||
|
||||
devices = []
|
||||
for element in answered_list:
|
||||
ip = element[1].psrc
|
||||
mac = element[1].hwsrc
|
||||
try:
|
||||
self.nm.scan(hosts=subnet, arguments=f'-p {",".join(map(str, ports))}')
|
||||
for host in self.nm.all_hosts():
|
||||
ip = host
|
||||
mac = self.nm[host]['addresses'].get('mac', 'N/A')
|
||||
open_ports = []
|
||||
|
||||
device = {"ip": ip, "mac": mac, "ports": []}
|
||||
for port in ports:
|
||||
if self._is_port_open(ip, port):
|
||||
device["ports"].append(port)
|
||||
if 'tcp' in self.nm[host]:
|
||||
for port in self.nm[host]['tcp']:
|
||||
if self.nm[host]['tcp'][port]['state'] == 'open':
|
||||
open_ports.append(port)
|
||||
|
||||
if device["ports"]:
|
||||
devices.append(device)
|
||||
logger.debug(f"Found device: {device}")
|
||||
if open_ports:
|
||||
devices.append({"ip": ip, "mac": mac, "ports": open_ports})
|
||||
logger.debug(f"Found device: {ip} with open ports: {open_ports}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error while scanning subnet: {e}")
|
||||
|
||||
self._save_to_cache(devices)
|
||||
return devices
|
||||
|
||||
def _is_port_open(self, ip: str, port: int) -> bool:
|
||||
"""检查指定 IP 和端口是否开放"""
|
||||
try:
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
sock.settimeout(1)
|
||||
result = sock.connect_ex((ip, port))
|
||||
sock.close()
|
||||
return result == 0
|
||||
except socket.error:
|
||||
return False
|
||||
|
||||
def _save_to_cache(self, devices: List[Dict]):
|
||||
"""保存扫描结果到本地文件"""
|
||||
with open(self.cache_path, "w") as f:
|
||||
|
14720
src/frontend/pnpm-lock.yaml
generated
14720
src/frontend/pnpm-lock.yaml
generated
File diff suppressed because it is too large
Load Diff
Loading…
x
Reference in New Issue
Block a user