又改回来了

This commit is contained in:
Jerry 2025-07-18 09:18:11 +08:00
parent 01a2735318
commit 0f856dc1e8
2 changed files with 6067 additions and 8709 deletions

View File

@ -1,56 +1,42 @@
import socket
from pathlib import Path
import nmap
import json
from pathlib import Path
from typing import List, Dict
import os
from scapy.layers.l2 import ARP, Ether
from scapy.sendrecv import srp
from ..utils.logger import logger
class NetworkScanner:
def __init__(self, cache_path: str = "switch_devices.json"):
self.cache_path = Path(cache_path)
self.nm = nmap.PortScanner()
def scan_subnet(self, subnet: str = "192.168.1.0/24", ports: List[int] = [22, 23, 80]) -> List[Dict]:
"""扫描指定子网的交换机设备,获取设备信息和开放端口"""
"""扫描指定子网的设备,获取设备信息和开放端口"""
logger.info(f"Scanning subnet: {subnet}")
arp_request = ARP(pdst=subnet)
broadcast = Ether(dst="ff:ff:ff:ff:ff:ff")
arp_request_broadcast = broadcast/arp_request
answered_list = srp(arp_request_broadcast, timeout=2, verbose=False)[0]
devices = []
for element in answered_list:
ip = element[1].psrc
mac = element[1].hwsrc
try:
self.nm.scan(hosts=subnet, arguments=f'-p {",".join(map(str, ports))}')
for host in self.nm.all_hosts():
ip = host
mac = self.nm[host]['addresses'].get('mac', 'N/A')
open_ports = []
device = {"ip": ip, "mac": mac, "ports": []}
for port in ports:
if self._is_port_open(ip, port):
device["ports"].append(port)
if 'tcp' in self.nm[host]:
for port in self.nm[host]['tcp']:
if self.nm[host]['tcp'][port]['state'] == 'open':
open_ports.append(port)
if device["ports"]:
devices.append(device)
logger.debug(f"Found device: {device}")
if open_ports:
devices.append({"ip": ip, "mac": mac, "ports": open_ports})
logger.debug(f"Found device: {ip} with open ports: {open_ports}")
except Exception as e:
logger.error(f"Error while scanning subnet: {e}")
self._save_to_cache(devices)
return devices
def _is_port_open(self, ip: str, port: int) -> bool:
"""检查指定 IP 和端口是否开放"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
result = sock.connect_ex((ip, port))
sock.close()
return result == 0
except socket.error:
return False
def _save_to_cache(self, devices: List[Dict]):
"""保存扫描结果到本地文件"""
with open(self.cache_path, "w") as f:

14720
src/frontend/pnpm-lock.yaml generated

File diff suppressed because it is too large Load Diff