使用socket和scapy代替nmap

This commit is contained in:
Jerry 2025-07-17 20:10:22 +08:00
parent e2f1e3eaa6
commit 01a2735318
5 changed files with 41 additions and 28 deletions

View File

@ -9,7 +9,7 @@
<content url="file://$MODULE_DIR$">
<excludeFolder url="file://$MODULE_DIR$/.venv" />
</content>
<orderEntry type="jdk" jdkName="Python 3.10" jdkType="Python SDK" />
<orderEntry type="jdk" jdkName="Python 3.11" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>

2
.idea/misc.xml generated
View File

@ -3,5 +3,5 @@
<component name="Black">
<option name="sdkName" value="Python 3.13" />
</component>
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.10" project-jdk-type="Python SDK" />
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.11" project-jdk-type="Python SDK" />
</project>

View File

@ -33,6 +33,7 @@ class AIService:
5.要包含使用ssh连接交换机后的完整命令包括但不完全包括system-view退出保存等完整操作
示例命令'创建VLAN 100名称为TEST'
示例返回{"type": "vlan", "vlan_id": 100, "name": "TEST", "commands": ["vlan 100", "name TEST"]}
注意这里生成的commands中需包含登录交换机和保存等所有操作命令我们使ssh连接交换机你不需要给出连接ssh的命令你只需要给出使用ssh连接到交换机后所输入的全部命令
"""
messages = [

View File

@ -1,43 +1,56 @@
import os
os.environ["PATH"] += ";C:\\Program Files (x86)\\Nmap"
import nmap
import json
import socket
from pathlib import Path
import json
from typing import List, Dict
from ..utils.logger import logger
import os
from scapy.layers.l2 import ARP, Ether
from scapy.sendrecv import srp
from ..utils.logger import logger
class NetworkScanner:
def __init__(self, cache_path: str = "switch_devices.json"):
self.cache_path = Path(cache_path)
os.environ["PATH"] += r";D:\Program Files\Nmap"
self.nm = nmap.PortScanner()
def scan_subnet(self, subnet: str = "192.168.1.0/24") -> List[Dict]:
"""扫描指定子网的交换机设备"""
def scan_subnet(self, subnet: str = "192.168.1.0/24", ports: List[int] = [22, 23, 80]) -> List[Dict]:
"""扫描指定子网的交换机设备,获取设备信息和开放端口"""
logger.info(f"Scanning subnet: {subnet}")
# 扫描开放22(SSH)或23(Telnet)端口的设备
self.nm.scan(
hosts=subnet,
arguments="-p 22,23,80,161 --min-rate 1000 --max-retries 1"
)
arp_request = ARP(pdst=subnet)
broadcast = Ether(dst="ff:ff:ff:ff:ff:ff")
arp_request_broadcast = broadcast/arp_request
answered_list = srp(arp_request_broadcast, timeout=2, verbose=False)[0]
devices = []
for host in self.nm.all_hosts():
if self.nm[host].state() == "up":
device = {
"ip": host,
"ports": list(self.nm[host]["tcp"].keys()),
"mac": self.nm[host].get("addresses", {}).get("mac", "unknown")
}
for element in answered_list:
ip = element[1].psrc
mac = element[1].hwsrc
device = {"ip": ip, "mac": mac, "ports": []}
for port in ports:
if self._is_port_open(ip, port):
device["ports"].append(port)
if device["ports"]:
devices.append(device)
logger.debug(f"Found device: {device}")
self._save_to_cache(devices)
return devices
def _is_port_open(self, ip: str, port: int) -> bool:
"""检查指定 IP 和端口是否开放"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
result = sock.connect_ex((ip, port))
sock.close()
return result == 0
except socket.error:
return False
def _save_to_cache(self, devices: List[Dict]):
"""保存扫描结果到本地文件"""
with open(self.cache_path, "w") as f:
@ -50,4 +63,4 @@ class NetworkScanner:
return []
with open(self.cache_path) as f:
return json.load(f)
return json.load(f)

View File

@ -17,9 +17,8 @@ tenacity==8.2.3
asyncio==3.4.3
typing_extensions==4.10.0
scapy
psutil==5.9.8
matplotlib==3.8.3
sqlalchemy==2.0.28
sqlalchemy
openai