Linux 运维

内网在线设备扫描

2025-12-31 3 min read 654 chars
import subprocess
import socket
import threading
import ipaddress
import platform
from queue import Queue
from datetime import datetime

# 设置扫描的线程数,线程越多扫描越快,但过高可能导致网络拥堵或报错
THREAD_COUNT = 100
queue = Queue()
active_hosts = []
lock = threading.Lock()

def get_local_ip_info():
    """
    获取本机IP和大致的网段信息
    注意:这是一个简化的网段获取方式,假设是常见的 /24 子网
    """
    try:
        # 创建一个UDP套接字连接到外部以获取本机IP(不会实际发送数据)
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        s.connect(("8.8.8.8", 80))
        local_ip = s.getsockname()[0]
        s.close()
        
        # 假设子网掩码是 255.255.255.0 (/24)
        # 将IP的最后一段替换为0,例如 192.168.1.5 -> 192.168.1.0/24
        network_prefix = '.'.join(local_ip.split('.')[:-1]) + '.0/24'
        return local_ip, network_prefix
    except Exception as e:
        print(f"获取本机IP失败: {e}")
        return None, None

def ping_host(ip):
    """
    对单个IP执行Ping操作
    Windows下参数是 -n, Linux/Mac下是 -c
    """
    param = '-n' if platform.system().lower() == 'windows' else '-c'
    # timeout参数在不同系统表现不同,Windows ping 默认等待时间较长
    # -w 500 表示等待500毫秒
    timeout_param = '-w' if platform.system().lower() == 'windows' else '-W'
    timeout_val = '500' if platform.system().lower() == 'windows' else '1'

    command = ['ping', param, '1', timeout_param, timeout_val, str(ip)]
    
    # 禁止弹出控制台窗口 (Windows特有优化)
    startupinfo = None
    if platform.system().lower() == 'windows':
        startupinfo = subprocess.STARTUPINFO()
        startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW

    try:
        response = subprocess.call(
            command, 
            stdout=subprocess.DEVNULL, 
            stderr=subprocess.DEVNULL,
            startupinfo=startupinfo
        )
        return response == 0
    except Exception:
        return False

def get_hostname(ip):
    """尝试解析主机名"""
    try:
        hostname, _, _ = socket.gethostbyaddr(str(ip))
        return hostname
    except socket.herror:
        return "Unknown"

def worker():
    """线程工作函数"""
    while True:
        ip = queue.get()
        if ip is None:
            break
        
        if ping_host(ip):
            hostname = get_hostname(ip)
            with lock:
                print(f"[+] 发现设备: {ip} ({hostname})")
                active_hosts.append({'ip': str(ip), 'hostname': hostname})
        
        queue.task_done()

def scan_network():
    print("-" * 50)
    print("简单 Python 内网扫描器 (基于 Ping)")
    print("-" * 50)

    local_ip, network_str = get_local_ip_info()
    if not local_ip:
        return

    print(f"本机 IP: {local_ip}")
    print(f"扫描目标网段: {network_str}")
    print(f"开始扫描... (线程数: {THREAD_COUNT})")
    print("-" * 50)

    start_time = datetime.now()

    # 创建网络对象
    try:
        network = ipaddress.ip_network(network_str, strict=False)
    except ValueError:
        print("网段格式错误")
        return

    # 启动线程
    threads = []
    for _ in range(THREAD_COUNT):
        t = threading.Thread(target=worker)
        t.daemon = True # 设置为守护线程
        t.start()
        threads.append(t)

    # 将所有IP放入队列 (跳过网络地址和广播地址)
    for ip in network.hosts():
        queue.put(str(ip))

    # 等待队列清空
    queue.join()

    # 停止线程
    for _ in range(THREAD_COUNT):
        queue.put(None)
    for t in threads:
        t.join()

    end_time = datetime.now()
    duration = end_time - start_time

    print("-" * 50)
    print(f"扫描完成。耗时: {duration}")
    print(f"共发现 {len(active_hosts)} 个在线设备:")
    
    # 简单的排序并输出
    active_hosts.sort(key=lambda x: ipaddress.IPv4Address(x['ip']))
    for host in active_hosts:
        print(f"IP: {host['ip'].ljust(15)} | Hostname: {host['hostname']}")

if __name__ == "__main__":
    scan_network()
    input("\n按回车键退出...")

Comments.