|
#!/usr/bin/env python3 |
|
# Copyright: (c) 2024, Jordan Borean (@jborean93) <jborean93@gmail.com> |
|
# MIT License (see LICENSE or https://opensource.org/licenses/MIT) |
|
# PYTHON_ARGCOMPLETE_OK |
|
|
|
# Big thanks to pysmb for help with the RPC structures |
|
# https://github.com/miketeo/pysmb |
|
|
|
from __future__ import annotations |
|
|
|
import argparse |
|
import collections.abc |
|
import dataclasses |
|
import enum |
|
import sys |
|
|
|
import smbclient |
|
from smbprotocol.ioctl import CtlCode, IOCTLFlags, SMB2IOCTLRequest, SMB2IOCTLResponse |
|
|
|
HAS_ARGCOMPLETE = True |
|
try: |
|
import argcomplete |
|
except ImportError: |
|
HAS_ARGCOMPLETE = False |
|
|
|
|
|
class ShareType(enum.IntEnum): |
|
STYPE_DISKTREE = 0x00000000 |
|
STYPE_PRINTQ = 0x00000001 |
|
STYPE_DEVICE = 0x00000002 |
|
STYPE_IPC = 0x00000003 |
|
STYPE_CLUSTER_FS = 0x02000000 |
|
STYPE_CLUSTER_SOFS = 0x04000000 |
|
STYPE_CLUSTER_DFS = 0x08000000 |
|
|
|
|
|
class ShareTypeFlags(enum.IntFlag): |
|
NONE = 0x00000000 |
|
STYPE_TEMPORARY = 0x40000000 |
|
STYPE_SPECIAL = 0x80000000 |
|
|
|
|
|
@dataclasses.dataclass(frozen=True) |
|
class ShareInfo: |
|
name: str |
|
share_type: ShareType |
|
share_type_flags: ShareTypeFlags |
|
comment: str |
|
|
|
|
|
def build_dce_rpc_req(call_id: int, packet_type: int, data: bytes) -> bytes: |
|
return b"".join( |
|
[ |
|
# Version |
|
b"\x05" |
|
# Version minor |
|
b"\x00", |
|
# Packet type |
|
packet_type.to_bytes(1, byteorder="little"), |
|
# Packet Flags (First | Last fragment) |
|
b"\x03", |
|
# Data rep (ASCII/Little Endian/IEEE) |
|
b"\x10\x00\x00\x00", |
|
# Frag Length |
|
(len(data) + 16).to_bytes(2, byteorder="little"), |
|
# Auth Length |
|
b"\x00\x00", |
|
# Call Id |
|
call_id.to_bytes(4, byteorder="little"), |
|
data, |
|
] |
|
) |
|
|
|
|
|
def build_dce_rpc_bind(call_id: int) -> bytes: |
|
data = b"".join( |
|
[ |
|
# Max Xmit Frag - 4280 |
|
b"\xb8\x10", |
|
# Max Recv Frag - 4280 |
|
b"\xb8\x10", |
|
# Assoc Group |
|
b"\x00\x00\x00\x00", |
|
# Num Ctx Items |
|
b"\x02\x00\x00\x00", |
|
# Ctx 1 - SRVSVC v3.0 - 32bit NDR |
|
# 8a885d04-1ceb-11c9-9fe8-08002b104860 |
|
b"\x00\x00\x01\x00\xc8\x4f\x32\x4b\x70\x16\xd3\x01\x12\x78\x5a\x47", |
|
b"\xbf\x6e\xe1\x88\x03\x00\x00\x00\x04\x5d\x88\x8a\xeb\x1c\xc9\x11", |
|
b"\x9f\xe8\x08\x00\x2b\x10\x48\x60\x02\x00\x00\x00", |
|
# Ctx 2 - SRVSVC v3.0 - Bind Time Feature Negotiation |
|
# 6cb71c2c-9812-4540-0300-000000000000 |
|
b"\x01\x00\x01\x00\xc8\x4f\x32\x4b\x70\x16\xd3\x01\x12\x78\x5a\x47", |
|
b"\xbf\x6e\xe1\x88\x03\x00\x00\x00\x2c\x1c\xb7\x6c\x12\x98\x40\x45", |
|
b"\x03\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00", |
|
] |
|
) |
|
return build_dce_rpc_req(call_id, 11, data) |
|
|
|
|
|
def build_net_share_enum(call_id: int, server_name: str) -> bytes: |
|
""" |
|
NET_API_STATUS NetrShareEnum( |
|
[in, string, unique] SRVSVC_HANDLE ServerName, |
|
[in, out] LPSHARE_ENUM_STRUCT InfoStruct, |
|
[in] DWORD PreferedMaximumLength, |
|
[out] DWORD* TotalEntries, |
|
[in, out, unique] DWORD* ResumeHandle |
|
); |
|
""" |
|
server_len = len(server_name) + 1 |
|
server_len_padding = b"" |
|
if server_len % 2: |
|
server_len_padding = b"\x00\x00" |
|
b_server_name = (server_name + "\u0000").encode("utf-16-le") |
|
|
|
stub_data = b"".join( |
|
[ |
|
# ServerName - Pointer |
|
b"\x00\x00\x02\x00", # Referent Id |
|
server_len.to_bytes(4, byteorder="little"), # Max Value Len |
|
b"\x00\x00\x00\x00", # Offset |
|
server_len.to_bytes(4, byteorder="little"), # Value Len |
|
b_server_name, |
|
server_len_padding, |
|
# InfoStruct |
|
(1).to_bytes(4, byteorder="little"), # Level - SHARE_INFO_1_CONTAINER |
|
b"\x01\x00\x00\x00\x04\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00", # Unknown still part of InfoStruct |
|
b"\xff\xff\xff\xff", # PreferedMaximumLength |
|
b"\x08\x00\x02\x00\x00\x00\x00\x00", # ResumeHandle |
|
] |
|
) |
|
|
|
rpc_request_data = b"".join( |
|
[ |
|
# Alloc hint |
|
b"\x4c\x00\x00\x00", |
|
# Context ID |
|
b"\x00\x00", |
|
# Opnum (NetShareEnumAll), |
|
(15).to_bytes(2, byteorder="little"), |
|
stub_data, |
|
] |
|
) |
|
return build_dce_rpc_req(call_id, 0, rpc_request_data) |
|
|
|
|
|
def unpack_smb_share_info(data: memoryview) -> list[ShareInfo]: |
|
# Ignore the RPC PDU header (24 bytes) + leading data in the RPC result |
|
# we don't care about (12 bytes). |
|
result_len = int.from_bytes(data[36:40], byteorder="little") |
|
|
|
array_view = data[48:] |
|
str_view = array_view[(12 * result_len) :] |
|
results = [] |
|
for _ in range(result_len): |
|
share_type_raw = int.from_bytes(array_view[4:8], byteorder="little") |
|
share_type = ShareType(share_type_raw & 0x0FFFFFFF) |
|
share_flags = ShareTypeFlags(share_type_raw & 0xF0000000) |
|
array_view = array_view[12:] |
|
|
|
name_len = int.from_bytes(str_view[8:12], byteorder="little") |
|
name = str_view[12 : 12 + (name_len - 1) * 2].tobytes().decode("utf-16-le") |
|
if name_len % 2: |
|
name_len += 1 |
|
str_view = str_view[12 + (name_len * 2) :] |
|
|
|
comment_len = int.from_bytes(str_view[8:12], byteorder="little") |
|
comment = ( |
|
str_view[12 : 12 + (comment_len - 1) * 2].tobytes().decode("utf-16-le") |
|
) |
|
if comment_len % 2: |
|
comment_len += 1 |
|
str_view = str_view[12 + (comment_len * 2) :] |
|
|
|
results.append(ShareInfo(name, share_type, share_flags, comment)) |
|
|
|
return results |
|
|
|
|
|
def get_share_info(server: str) -> list[ShareInfo]: |
|
with smbclient.open_file( |
|
rf"\\{server}\IPC$\srvsvc", |
|
mode="w+b", |
|
buffering=0, |
|
file_type="pipe", |
|
) as srvsvc: |
|
connection = srvsvc.fd.connection |
|
fid = srvsvc.fd.file_id |
|
sid = srvsvc.fd.tree_connect.session.session_id |
|
tid = srvsvc.fd.tree_connect.tree_connect_id |
|
|
|
# Bind to DCE RPC service |
|
bind_req = build_dce_rpc_bind(1) |
|
srvsvc.write(bind_req) |
|
_ = srvsvc.read(1024) # bind_ack - should validate this |
|
|
|
# Send the NetShareEnumAll as part of an IOCTL request |
|
net_share_enum_req = build_net_share_enum(2, rf"\\{server}") |
|
ioctl_req = SMB2IOCTLRequest() |
|
ioctl_req["ctl_code"] = CtlCode.FSCTL_PIPE_TRANSCEIVE |
|
ioctl_req["file_id"] = fid |
|
ioctl_req["flags"] = IOCTLFlags.SMB2_0_IOCTL_IS_FSCTL |
|
ioctl_req["max_output_response"] = 8196 |
|
ioctl_req["buffer"] = net_share_enum_req |
|
|
|
req = connection.send(ioctl_req, sid=sid, tid=tid) |
|
resp = connection.receive(req) |
|
ioctl_resp = SMB2IOCTLResponse() |
|
ioctl_resp.unpack(resp["data"].get_value()) |
|
|
|
# Should validate this is a success response not a failure |
|
net_share_enum_resp = ioctl_resp["buffer"].get_value() |
|
return unpack_smb_share_info(memoryview(net_share_enum_resp)) |
|
|
|
|
|
def parse_args(args: collections.abc.Sequence[str]) -> argparse.Namespace: |
|
"""Parse and return args.""" |
|
parser = argparse.ArgumentParser( |
|
description="List SMB shares for the specific server.", |
|
) |
|
|
|
parser.add_argument( |
|
"server", |
|
nargs=1, |
|
default="", |
|
type=str, |
|
help="The SMB server to list the share for.", |
|
) |
|
|
|
if HAS_ARGCOMPLETE: # pragma: nocover |
|
argcomplete.autocomplete(parser) |
|
|
|
parsed_args = parser.parse_args(args) |
|
|
|
return parsed_args |
|
|
|
|
|
def main(args: collections.abc.Sequence[str]) -> None: |
|
"""Run the main program.""" |
|
parsed_args = parse_args(args) |
|
|
|
shares = get_share_info(parsed_args.server[0]) |
|
for share in shares: |
|
print(share) |
|
|
|
|
|
if __name__ == "__main__": |
|
main(sys.argv[1:]) |
Thanks for putting together this code.
I think that the biggest problem here is that this code is not SMB protocol.... this is a RPC helper.
Not all SMB servers will have this available.
I was not able to use it with Azure Files SMB cloud server.
One thing to mention... on some servers you might need to be authenticated to list the shares
def get_share_info(server: str) -> list[ShareInfo]: + smbclient.ClientConfig(username='YOUR-USER', password='YOUR-PASS') with smbclient.open_file(