Created
April 24, 2019 17:06
-
-
Save rkhapov/fc1d4cfdf5b7c409d1a86e96b9509f81 to your computer and use it in GitHub Desktop.
port scanner
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import socket | |
| import select | |
| import argparse | |
| import multiprocessing as mp | |
| import errno | |
| import os | |
| import threading | |
| import struct | |
| import threading | |
| import time | |
| PORTS_PER_PROCESS = 512 | |
| def parse_arguments(): | |
| def f(port): | |
| if port <= 0: | |
| return 1 | |
| if port > 2 ** 16 - 1: | |
| return 2 ** 16 - 1 | |
| return port | |
| args = argparse.ArgumentParser(description='Simple port scanner\n' | |
| 'Scans port to tcp and udp applications\n' | |
| 'Please, note that it is necessary to run me with superuser privileges\n') | |
| args.add_argument('--target', help='address to scan ports', default='127.0.0.1') | |
| args.add_argument('--start', type=int, help='starting port number', default=1) | |
| args.add_argument('--finish', type=int, help='final port number', default=2 ** 16 - 1) | |
| argv = args.parse_args() | |
| argv.start = f(argv.start) | |
| argv.finish = f(argv.finish) | |
| return argv.target, min(argv.start, argv.finish), max(argv.start, argv.finish) | |
| def chunks(iterable, chunksize): | |
| chunk = [] | |
| it = iter(iterable) | |
| try: | |
| while True: | |
| val = next(it) | |
| chunk.append(val) | |
| if len(chunk) == chunksize: | |
| yield list(chunk) | |
| chunk.clear() | |
| except StopIteration: | |
| if len(chunk) != 0: | |
| yield list(chunk) | |
| def open_non_block_tcp(target): | |
| sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
| sock.setblocking(0) | |
| code = sock.connect_ex(target) | |
| if code != errno.EINPROGRESS and code != 0: | |
| raise ValueError(f"Unexpected errno code: {code}") | |
| return sock | |
| def scan_for_tcp(targets): | |
| sockets = [open_non_block_tcp(target) for target in targets] | |
| fd_to_target = {sock.fileno(): target for sock, target in zip(sockets, targets)} | |
| poller = select.poll() | |
| for sock in sockets: | |
| poller.register(sock) | |
| while True: | |
| ready = poller.poll(500) | |
| if len(ready) == 0: | |
| break | |
| for fd, event in ready: | |
| poller.unregister(fd) | |
| if event & (select.POLLHUP | select.POLLERR | select.POLLNVAL): | |
| continue | |
| target = fd_to_target[fd] | |
| print(f'{target[1]}: TCP') | |
| for sock in sockets: | |
| sock.close() | |
| def listen_icmp(ports_to_write): | |
| icmp_sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP) | |
| class IcmpListener: | |
| def __init__(self): | |
| self.__ports = [] | |
| self.__icmp_sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP) | |
| self.__icmp_sock.settimeout(0.5) | |
| self.__stop = False | |
| self.__thread = threading.Thread(target=self.__do) | |
| self.__thread.start() | |
| def stop(self): | |
| self.__stop = True | |
| self.__thread.join() | |
| def ports(self): | |
| return self.__ports | |
| def __do(self): | |
| while not self.__stop: | |
| try: | |
| pack = self.__icmp_sock.recv(1024 * 4) | |
| idx = pack.find(b'privet') | |
| if idx == -1: | |
| continue | |
| _, port = struct.unpack('!6si', pack[idx:]) | |
| self.__ports.append(port) | |
| except socket.timeout: | |
| pass | |
| def scan_for_udp(targets): | |
| udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
| icmp_listener = IcmpListener() | |
| for target in targets: | |
| udp.sendto(struct.pack('!6si', b'privet', target[1]), target) | |
| time.sleep(1) | |
| icmp_listener.stop() | |
| for port in set((target[1] for target in targets)).difference(icmp_listener.ports()): | |
| print(f'{port}: UDP') | |
| def main(): | |
| try: | |
| with mp.Pool() as pool: | |
| target, start, finish = parse_arguments() | |
| def do_scan(scanner): | |
| scanning_data = ((target, port) for port in range(start, finish + 1)) | |
| pool.map(scanner, chunks(scanning_data, PORTS_PER_PROCESS)) | |
| do_scan(scan_for_tcp) | |
| do_scan(scan_for_udp) | |
| except KeyboardInterrupt: | |
| print() | |
| print("Stopped") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment