Skip to content

Instantly share code, notes, and snippets.

@rkhapov
Created April 24, 2019 17:06
Show Gist options
  • Select an option

  • Save rkhapov/fc1d4cfdf5b7c409d1a86e96b9509f81 to your computer and use it in GitHub Desktop.

Select an option

Save rkhapov/fc1d4cfdf5b7c409d1a86e96b9509f81 to your computer and use it in GitHub Desktop.
port scanner
import socket
import select
import argparse
import multiprocessing as mp
import errno
import os
import threading
import struct
import threading
import time
PORTS_PER_PROCESS = 512
def parse_arguments():
def f(port):
if port <= 0:
return 1
if port > 2 ** 16 - 1:
return 2 ** 16 - 1
return port
args = argparse.ArgumentParser(description='Simple port scanner\n'
'Scans port to tcp and udp applications\n'
'Please, note that it is necessary to run me with superuser privileges\n')
args.add_argument('--target', help='address to scan ports', default='127.0.0.1')
args.add_argument('--start', type=int, help='starting port number', default=1)
args.add_argument('--finish', type=int, help='final port number', default=2 ** 16 - 1)
argv = args.parse_args()
argv.start = f(argv.start)
argv.finish = f(argv.finish)
return argv.target, min(argv.start, argv.finish), max(argv.start, argv.finish)
def chunks(iterable, chunksize):
chunk = []
it = iter(iterable)
try:
while True:
val = next(it)
chunk.append(val)
if len(chunk) == chunksize:
yield list(chunk)
chunk.clear()
except StopIteration:
if len(chunk) != 0:
yield list(chunk)
def open_non_block_tcp(target):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setblocking(0)
code = sock.connect_ex(target)
if code != errno.EINPROGRESS and code != 0:
raise ValueError(f"Unexpected errno code: {code}")
return sock
def scan_for_tcp(targets):
sockets = [open_non_block_tcp(target) for target in targets]
fd_to_target = {sock.fileno(): target for sock, target in zip(sockets, targets)}
poller = select.poll()
for sock in sockets:
poller.register(sock)
while True:
ready = poller.poll(500)
if len(ready) == 0:
break
for fd, event in ready:
poller.unregister(fd)
if event & (select.POLLHUP | select.POLLERR | select.POLLNVAL):
continue
target = fd_to_target[fd]
print(f'{target[1]}: TCP')
for sock in sockets:
sock.close()
def listen_icmp(ports_to_write):
icmp_sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)
class IcmpListener:
def __init__(self):
self.__ports = []
self.__icmp_sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)
self.__icmp_sock.settimeout(0.5)
self.__stop = False
self.__thread = threading.Thread(target=self.__do)
self.__thread.start()
def stop(self):
self.__stop = True
self.__thread.join()
def ports(self):
return self.__ports
def __do(self):
while not self.__stop:
try:
pack = self.__icmp_sock.recv(1024 * 4)
idx = pack.find(b'privet')
if idx == -1:
continue
_, port = struct.unpack('!6si', pack[idx:])
self.__ports.append(port)
except socket.timeout:
pass
def scan_for_udp(targets):
udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
icmp_listener = IcmpListener()
for target in targets:
udp.sendto(struct.pack('!6si', b'privet', target[1]), target)
time.sleep(1)
icmp_listener.stop()
for port in set((target[1] for target in targets)).difference(icmp_listener.ports()):
print(f'{port}: UDP')
def main():
try:
with mp.Pool() as pool:
target, start, finish = parse_arguments()
def do_scan(scanner):
scanning_data = ((target, port) for port in range(start, finish + 1))
pool.map(scanner, chunks(scanning_data, PORTS_PER_PROCESS))
do_scan(scan_for_tcp)
do_scan(scan_for_udp)
except KeyboardInterrupt:
print()
print("Stopped")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment