Skip to content

Instantly share code, notes, and snippets.

@ysfchn
Last active November 1, 2025 18:58
Show Gist options
  • Select an option

  • Save ysfchn/07435aaec8e8c3680b1b61901348f9d2 to your computer and use it in GitHub Desktop.

Select an option

Save ysfchn/07435aaec8e8c3680b1b61901348f9d2 to your computer and use it in GitHub Desktop.
A script to read & unpack firmware files of Sandisk's discontinued wireless flash drive series.
# pyright: basic
#
# Copyright (C) 2025 Yusuf Cihan
# @ysfchn https://ysfchn.com
#
# This program is a free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
"""
A script to read & unpack firmware files of Sandisk's discontinued Wireless Flash
Drive & Wireless Stick series[^1] from 2015. The devices are clone of AirStash devices with
Sandisk's branding & modifications added on top of it.
Since the device itself does no longer being sold and its firmware format is proprietary,
there is very few information about these available on the internet, so I've tried my
best to research to gather much info as possible.
If you're a looking for copies of the firmware files,
1) The discontinued (and pulled off from the app store) "Sandisk Connect Drive" app
contains firmware files stored in the APK itself, so you can just grab the APK from a
random APK mirror website and unpack it. (in "res/raw" folder)
2) Luckily, Internet Archive has some archived copies. Though the below link also lists
firmwares for other Sandisk devices, so you will need to filter links that ending with
".df2" and ".df3"
https://web.archive.org/web/*/http://downloads.sandisk.com/firmware/*
".DF2" files uses different format than ".DF3" files, and they are not compatible with
each other. Some models uses ".DF2", and newer ones uses ".DF3" from what I know.
I've only managed to find information for ".DF3" files, and thanks to the OP, who explains
some byte structure found in the firmware, I was able to create this script. [^4] There
is also another topic in the same community discussing about the device. [^5]
Sandisk's own knowledge base contains several articles about these devices, so if you're
interested, you can also check out there, specifially manual firmware upgrade instructions. [^2][^3]
See also "sandisk_wireless_wmd.py" file for Wireless Media drives.
[^1]: https://web.archive.org/web/20221205065124/https://support-en.wd.com/app/answers/detailweb/a_id/44632/
[^2]: https://support-en.sandisk.com/app/answers/detailweb/a_id/41388
(archive 1: https://web.archive.org/web/20250409170927/https://support-en.sandisk.com/app/answers/detailweb/a_id/41388)
(archive 2: https://archive.is/W1bC6)
[^3]: https://web.archive.org/web/20160515082212/http://kb.sandisk.com/app/answers/detail/a_id/17556
[^4]: https://forums.hak5.org/topic/41479-sandisk-wireless-connect-16g-flash-drive/
(archive 1: https://web.archive.org/web/20250409165759/https://forums.hak5.org/topic/41479-sandisk-wireless-connect-16g-flash-drive/)
(archive 2: https://archive.md/NTENm)
[^5]: https://forums.hak5.org/topic/30273-hack-a-sandisk-32g-wifi-enabled-flash-drive/
"""
import json
import struct
from argparse import ArgumentParser
from hashlib import sha256
from sys import stderr
from io import BytesIO
from pathlib import Path
from zipfile import ZipFile
from typing import cast, Tuple, Dict, NamedTuple, List
class FirmwareInfo(NamedTuple):
model: str
version: int
size: int
blocks: List["FirmwareBlock"]
files: List["FirmwareFile"]
class FirmwareBlock(NamedTuple):
data: bytes
class FirmwareFile(NamedTuple):
data: bytes
type: int
size: int
digest: str
path: str
def parse_df3_firmware(firmware: Path):
"""
Reads an Sandisk Wireless Stick (SDWS4) firmware file and returns a FirmwareInfo object
containing information and list of sectors and files, which then can be iterated through
to dump the filesystem contained in the firmware.
The firmware doesn't contain the filenames of files in the filesystem, assuming they
are hardcoded in ROM, so this function tries to decipher the names of the files based
on their checksum and which HTTP path are they served from in the web server.
"""
buffer = BytesIO()
buffer.write(firmware.read_bytes())
total_size = buffer.tell()
buffer.seek(0)
# The byte structure is simply follows:
# AA BB BB .. ..
# AA -> ID of the sector
# BB -> Length of the sector (big-endian)
def get_next_sector(eid: int, elen: int):
sid = int.from_bytes(buffer.read(1), "big")
# Check if the obtained byte matches with given value to make sure it is a valid file.
assert eid == sid, f"current sector id mismatch, got {sid} but expected {eid}, is it a valid firmware?"
slen = int.from_bytes(buffer.read(2), "big")
assert slen == elen, f"current sector length mismatch, got {slen} but expected {elen}, is it a valid firmware?"
return buffer.read(slen)
# First 3 bytes of the file is equal to [01 00 08], so,
# taking above information into the account:
# 01 -> The ID of the sector
# 00 08 (= 8) -> Sector is 8 bytes long
model = get_next_sector(1, 8).strip(bytes(1)).decode("ascii")
# The version code is represented as an integer, then followed by an ASCII encoded string
# of the same version number. For example, if first 4 bytes are 00 00 08 02 (= 2050), then
# the next 4 byte must be "2050" in ASCII.
version_int, version_ascii = cast(Tuple[int, bytes], struct.unpack_from(">I32s", get_next_sector(2, 36)))
assert bytes(str(version_int), "ascii").ljust(32, bytes(1)) == version_ascii, f"version names doesn't match, {version_int} != {version_ascii}"
info = FirmwareInfo(
model = model,
version = version_int,
size = total_size,
blocks = [],
files = []
)
print(f"model: {info.model}", file = stderr)
print(f"version: {info.version}", file = stderr)
print(f"file size: {info.size}", file = stderr)
# Findings:
# wfd2050s @ 0x001032FF - WiFi driver and firmware version
# I couldn't get to know about meaning of these bytes, but as far I've tested with several
# firmwares, these sectors are always the same, so we can just seek & validate through it.
sectors = (
(3, 4), (4, 1), (5, 96), (6, 16), (7, 117), (7, 117), (7, 117), (10, 96)
)
for sector_id, sector_length in sectors:
info.blocks.append(FirmwareBlock(get_next_sector(sector_id, sector_length)))
print(f"read through {buffer.tell()} bytes", file = stderr)
assert sum(buffer.read(934)) == 0, "unexpected bytes found when 936 zero bytes was expected"
# Individual file contents in the firmware start at where [FF FF FF FF 00 00] byte pattern
# is first appears, so we just skip to seek there and start reading file entries.
boundary = bytes((0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x00))
start = buffer.getvalue().find(boundary)
assert start != -1, "couldn't find the start of the file entry header"
info.blocks.append(FirmwareBlock(buffer.read(start - buffer.tell())))
buffer.read(len(boundary))
print(f"header found at {hex(buffer.tell())}", file = stderr)
# Parse header to read the file entries, each file entry takes up 12 bytes,
# therefore the number of bytes must be multiple of 12.
header_size = int.from_bytes(buffer.read(2), "big")
print(f"header size is {header_size} bytes", file = stderr)
header = buffer.read(header_size)
assert (len(header) % 12) == 0, f"invalid header size, got {header_size}, which is not a multiple of 12"
# Extensions of files contained in the firmware, mapped with their byte values.
extensions : Dict[int, str] = {
0: "bin", 1: "bin", 14: "swf", 15: "xap", 25: "gif", 27: "png",
28: "svg", 33: "css", 35: "html", 36: "js", 37: "txt",
}
start_offset = buffer.tell() - header_size
entries : List[Tuple[int, int, int]] = []
# bytes 0-4 is the incremental index (starts from 1)
# bytes 4-8 is the offset (starting from 0xFFFFFFFF0000 boundary (included), so first file offset will equal to the boundary + header size)
# bytes 8-12 is the file type (see the extension code mapping above)
for i in range(len(header) // 12):
f_index, f_offset, f_type = cast(Tuple[int, int, int], struct.unpack(">III", header[i * 12:i * 12 + 12]))
assert f_index == (i + 1), f"file indexes doesn't match, expected {i + 1} but got {f_index}"
entries.append((f_index, f_offset, f_type))
for fi, fo, ft in entries:
index = int.from_bytes(buffer.read(4), "big")
size = int.from_bytes(buffer.read(4), "big")
assert start_offset + fo == buffer.tell(), f"file #{fi} reports unexpected offset"
assert index == fi, f"file indexes doesn't match, expected {fi} but got {index}"
file_data = buffer.read(size)
file_ext = extensions.get(ft, None)
digest = sha256(file_data).digest().hex()
print(f"file #{fi}, ext: {file_ext or '?'} ({ft}), size: {size}, offset: {hex(start_offset + fo)}", file = stderr)
info.files.append(FirmwareFile(file_data, ft, size, digest, guess_filename(digest)))
assert buffer.read() == bytes(8), "invalid bytes found at the end"
return info
def create_df3_firmware(firmware: FirmwareInfo):
"""
Converts the previously parsed FirmwareInfo object into bytes, re-creating the whole
firmware. This means `create_df3_firmware(parse_df3_firmware(firmware_file)) == firmware_file`
is `True` if given FirmwareInfo object is same.
"""
buffer = BytesIO()
def put_next_sector(eid: int, edata: bytes):
buffer.write(eid.to_bytes(1, "big"))
buffer.write(len(edata).to_bytes(2, "big"))
buffer.write(edata)
put_next_sector(1, firmware.model.encode("ascii").ljust(8, bytes(1)))
put_next_sector(2,
firmware.version.to_bytes(4, "big") + \
str(firmware.version).encode("ascii").ljust(32, bytes(1))
)
sectors = (
(3, 4), (4, 1), (5, 96), (6, 16), (7, 117), (7, 117), (7, 117), (10, 96)
)
sector_data = bytearray()
assert len(firmware.blocks) == len(sectors) + 1, f"expected {len(sectors) + 1} sector"
for i, sector in enumerate(sectors):
sector_id, sector_length = sector
sector_data = firmware.blocks[i].data
assert len(sector_data) == sector_length, f"unexpected sector length at sector #{i} (expected {sector_length}, got {len(sector_data)})"
put_next_sector(sector_id, sector_data)
buffer.write(bytes(934))
buffer.write(firmware.blocks[len(sectors)].data)
boundary = bytes((0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x00))
buffer.write(boundary)
header_size = (len(firmware.files) * 12)
buffer.write(header_size.to_bytes(2, "big"))
start_offset = header_size + len(boundary) + 2
for i, file in enumerate(firmware.files):
buffer.write((i + 1).to_bytes(4, "big"))
buffer.write(start_offset.to_bytes(4, "big"))
buffer.write(file.type.to_bytes(4, "big"))
start_offset += len(file.data) + 8
for i, file in enumerate(firmware.files):
buffer.write((i + 1).to_bytes(4, "big"))
buffer.write(len(file.data).to_bytes(4, "big"))
buffer.write(file.data)
buffer.write(bytes(8))
return buffer.getvalue()
def firmware_to_pack(firmware: FirmwareInfo):
"""
Returns a byte encoded JSON value containing firmware information and sector file.
"""
sectors = bytearray()
for sector in firmware.blocks:
sectors.extend(sector.data)
return json.dumps({
"model": firmware.model,
"version": firmware.version,
"size": firmware.size,
"blocks": [{"size": len(x.data), "checksum": sha256(x.data).digest().hex()} for x in firmware.blocks],
"files": [{"type": x.type, "size": x.size, "path": x.path, "checksum": x.digest} for x in firmware.files]
}, ensure_ascii = False, indent = 2).encode("utf-8"), bytes(sectors)
def pack_to_firmware(firmware_json: bytes, sectors_data: bytes, file_list: List[bytes]):
"""
Creates a FirmwareInfo object from serialized firmware data.
"""
fdict = json.loads(firmware_json)
blocks = []
files = []
sectors_buffer = BytesIO()
sectors_buffer.write(sectors_data)
assert all((x in fdict for x in ("model", "version", "size", "blocks", "files"))), "keys missing for firmware.json file"
for block in fdict["blocks"]:
blocks.append(FirmwareBlock(sectors_buffer.read(block["size"])))
assert len(file_list) == len(fdict.get("files", -1)), "firmware files must be equal to list of given file data"
assert sectors_buffer.read() == bytes(), "sectors file contain extra data"
for file in fdict["files"]:
file_data = file_list[file]
files.append(FirmwareFile(file_data, file["type"], file["size"], file["checksum"], file["path"]))
return FirmwareInfo(fdict["model"], fdict["version"], fdict["size"], blocks, files)
def firmware_to_zip(firmware: FirmwareInfo):
"""
Creates an in-memory ZIP file containing files found inside the firmware.
"""
dumpobj = BytesIO()
zipdump = ZipFile(dumpobj, "w")
for file in firmware.files:
with zipdump.open(f"static/{file.path}", "w") as f:
f.write(file.data)
firmware_data, sectors_file = firmware_to_pack(firmware)
with zipdump.open("_firmware.json", "w") as f:
f.write(firmware_data)
with zipdump.open("_sectors", "w") as f:
f.write(sectors_file)
zipdump.close()
dumpobj.seek(0)
return dumpobj
def guess_filename(digest: str):
"""
Returns the known filename of the given firmware file SHA256 digest.
The returned path is relative to the /static/ path served from the device IP.
For example, "licenses.txt" is located in "http://{IP}/static/licenses.txt" in the device.
"""
return FILE_SHA256_MAPPINGS.get(digest, "")
# Firmware files doesn't contain the name of the files, only we can obtain their contents, so we
# hardcode the SHA256 checksum of the content to give more context about the purpose of the file.
FILE_SHA256_MAPPINGS = {
"7f9eb2428b942ee7d592f739ffade39987935f9cf87e47a99e172c4dc15ab58e": "licenses.txt",
"ac0daed20e915525f905dc2d86a3819a71b6702a6ab1649a47fa643a1517a630": "nocard.html",
"2208bd30ff94535b95fe632c436deca3d123b8a0bbf564e884bfb94e013437bf": "nocard.html", # from: wfd2034s
"56a23b12e413d4c5057266587e040240e8d8984d4fdeb659ca853a63d183d798": "settings.html",
"e2841bdd461bf653b601ffcf57cba770399cf6c4b372cc86d835b612f990c065": "settings.html", # from: wfd2034s
"2a9a52a325d9fda977cea97624faed8be00209a99062198591b5927596be0545": "video.html",
"d743ebe83b166bf0472b63bf1bf81a624cceec3cce1172bf01ce7e3c8178b0f8": "video.html", # from: wfd2034s
"6222dae0bdd42d1c2d6ab7369db7c7ddfa9e44b1173a2e2f21dbf70e228a3a39": "battery/charging0.png",
"1edb3b35dc64a7e6727fa5618594db36c8338f71d83932dfb9a365a853d71532": "battery/charging1.png",
"03e19edbbc0151bfa6fd3b01fa397a714d99f6a991e9f9e55eeaa9234d1aeb0c": "battery/charging2.png",
"48b2f07ce76941a2899fd822e7925d0cda8ad0006fb4382a4b0003b515f73baa": "battery/charging3.png",
"884b820a8f6960653877c72a0eb3845562f644abaef740ad7f1c135a4b83e3c7": "battery/critical0.png",
"4d0dab46c05c18c0be29574b244b15d9eec359e62c3d1823653e2011deb02ab6": "battery/critical1.png",
"ca9c09d6a2c6372e1ea979d03ddbc952b1d504fefdea013303577344cc312286": "battery/high.png",
"a2141fa152c3166a277dd839dd977bad26b6e540e38f9f8da15bb7d7c1a3d311": "battery/low.png",
"d18b392ded5c02a2551e742da0c8b976c7d4587f30e9ed4d01b9f9ce45ed9e05": "battery/medium.png",
"2a279be18af1717a2fbedf5b885a95535a008083098895d4945125954232e929": "folder_check_icon.png",
"0a894478123ff42698ceca7564095cb8be8eb4f939f9f6c5966be3838965ec8d": "folder_nocheck_icon.png",
"f8a2835f2451e13b4dc95d302b8e2ac28a9c6ec131c5c86fc835b07b1200ca6f": "mejs/silverlightmediaelement.xap",
"858566cbfd4b3837477b0842bc3971b9633901317880604c475209e7720b9683": "mejs/mediaelement-and-player.min.js",
"e9e77b96fef09b18ba89467cf7285722ea6d6e1e9e11d1aa35c5abf39770ccff": "mejs/flashmediaelement.swf",
"05d12432b14d6b810243398927997904668f69f94eacd96001a838d3d70f2143": "mejs/bigplay.svg",
"1e5b85acb1b0b2d0bd24f4806a1cfa66d7e6dec37110c78d563b84be9951e8f2": "mejs/bigplay.png",
"7acb5f1cc018169d97b1dd90e2aae94f0b545aa4e7244a0321bb3e1093639a37": "mejs/mediaelementplayer.min.css",
"016f259972a2aaaf499e93756f6182f73839b1af8c4187fd54976dac723bf853": "mejs/controls.svg",
"f38cc337d1e8e5c17baf5c3812da8f6e4f49bedccba605b93dc38c338e89f4d5": "mejs/controls.png",
"3036bee9f749fdca0544a5592ce8da4204fab8f2b68edc6ac3905c90266014d4": "mejs/loading.gif",
"b294e973896f8f874e90a8eb1a8908ac790980d034c4c4bdf0fc3d37b8abf682": "js/jquery.min.js",
"23e57cf573e516953a72a685ea9b27bd6095a4e1aaac58c6724050f7557f487c": "js/sort.js",
"c5c2118428c14c6286d2a57c28a5a4278ef457a34f2afab8e0eb3da66daa7ccd": "js/sort.js", # from: wfd2034s
"2d40e4435dbbaef6a552220fb7b9206c1872adb3d7af192327dfe7b4b911c0cc": "js/dragdrop.js",
"1d629a298fb4bbc58301e6f80e180140e7fa91aac8cc5aac5b46eda551aa1636": "js/video.js",
"2877b0d2b73bed938d38651a90865bac9068da60ba43af671a45ad572ddadb97": "css/jquery-ui-core.min.css",
"02b9463648c62e92eafc7e7dcbe26b84ccb34b19b68d7c96e72d355cc649b68c": "css/jquery-ui-theme.min.css",
"4af2c76d0661c920118c82cae980e86214aa39260bffa364b021dd17eaa8697a": "js/jquery-ui.min.js",
"27685be9560b70f041cc56e235a841172a57359af8221ab0d86d17ceeb16d4cb": "js/settings.js",
"4bdf71c3c281ec0b3c31c65e1842d20d3b8a6b3b056645772f714fb7d5d316ff": "js/settings.js", # from: wfd2034s
"be7349ed81dd4690dd8adfdd6dcba8d6c02655f321bc21f33bea567b7b0c6f1e": "css/style.css",
"6f7ae25f30e55b9d211170a94147a716b7c37c6d0336e36801c9d0498f18247b": "css/style.css", # from: wfd2034s
"e2d1b1c7c51f8c30431327fe43029d62b6d5dfd2d95bbd6b8b9929c178dba4bf": "css/images/ui-icons_888888_256x240.png",
"a8d28e2d83a807b2b86ed2a02e31086f6c0718dfa96e0ba6a4577b657f69cc34": "css/images/ui-icons_454545_256x240.png",
"82886336a384acad75c803bb87720b144e09c444c36ad1082203c29870ccf39e": "mejs/background.png",
"d10881ce4015c880f5a5bd35d2a17b4205043e0c5a4e2f08b4354571406f246a": "_unknown/unknown.bin",
"1248a4025de4062583abc9f6b61936d15fe6f518fcaa7836f51e1ed3f63c1439": "_unknown/unknown.bin", # from: wfd2034s
"5094f6d172f1daa36d1f70fae50d65c564ef2e9ac703ecd3206a57c000396d1d": "_unknown/text.txt"
}
def main():
parser = ArgumentParser(
description = "A script to read & unpack firmware files of Sandisk's discontinued Wireless Stick series."
)
parser.add_argument("type", choices = ["df3"], help = "Type of the input firmware file, must be 'df3'.")
parser.add_argument("firmware", help = "Path of the input firmware file.", type = Path)
parser.add_argument("output", help = "Path for the output unpacked archive ZIP file.", type = Path)
data = parser.parse_args()
itype = cast(str, data.type)
assert itype == "df3", "input firmware type must be df3"
ifile = cast(Path, data.firmware).absolute()
ofile = cast(Path, data.output).absolute()
if (not ifile.exists()) or (not ifile.is_file()):
raise ValueError(f"input path '{str(ifile)}' doesn't exists or not a file!")
if ofile.is_dir():
raise ValueError(f"output path '{str(ofile)}' must be an file, not a directory!")
firmware_info = parse_df3_firmware(ifile)
with ofile.open("wb") as f:
f.write(firmware_to_zip(firmware_info).read())
print("written to: " + str(ofile), file = stderr)
print(firmware_to_pack(firmware_info)[0].decode("utf-8"))
if __name__ == "__main__":
main()
# pyright: basic
#
# Copyright (C) 2025 Yusuf Cihan
# @ysfchn https://ysfchn.com
#
# This program is a free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
"""
There is also a Wireless Media Drive, which its telnet & ftp access is left open making it way
easier to play around with it. For "root" user, the password is "sqn1351". It's firmware file is
an EXT2 filesystem followed after 152 bytes, which then can be mounted to the host system. [^1]
See also "sandisk_wireless_df3.py" file for Wireless Stick drives.
[^1]: https://forums.hak5.org/topic/35884-sandisk-wireless-media-drive-root-crackand-other-useful-info/
(archive 1: https://web.archive.org/web/20250715204033/https://forums.hak5.org/topic/35884-sandisk-wireless-media-drive-root-crackand-other-useful-info/)
"""
# TODO
from typing import NamedTuple, Type, Tuple, cast
from sys import stderr
from io import BytesIO
from pathlib import Path
from datetime import datetime
from zlib import crc32
import struct
UIMAGE_MAGIC = bytes((0x27, 0x05, 0x19, 0x56))
GZIP_MAGIC = bytes((0x1f, 0x8b, 0x08, 0x00))
# https://formats.kaitai.io/uimage/
# TODO: Unused
class UImageHeader(NamedTuple):
header_crc: int
timestamp: datetime
data_length: int
load_addr: int
entry_addr: int
data_crc: int
os_type: int
arch_type: int
image_type: int
comp_type: int
name: str
@classmethod
def from_bytes(cls: "Type[UImageHeader]", data: bytes) -> "UImageHeader":
assert len(data) == 64, f"uimage header must be 64 in length, not {len(data)}"
assert data[0:4] == UIMAGE_MAGIC, f"no uimage header was detected, got 0x{data[0:4].hex()}"
# Check if header CRC32 is correct.
calculated = crc32(data[0:4] + bytes(4) + data[8:64])
result = cls(
header_crc = int.from_bytes(data[4:8], "big"),
timestamp = datetime.fromtimestamp(int.from_bytes(data[8:12], "big")),
data_length = int.from_bytes(data[12:16], "big"),
load_addr = int.from_bytes(data[16:20], "big"),
entry_addr = int.from_bytes(data[20:24], "big"),
data_crc = int.from_bytes(data[24:28], "big"),
os_type = int.from_bytes(data[28:29], "big"),
arch_type = int.from_bytes(data[29:30], "big"),
image_type = int.from_bytes(data[30:31], "big"),
comp_type = int.from_bytes(data[31:32], "big"),
name = data[32:64].decode("utf-8")
)
assert calculated == result.header_crc, f"mismatched uimage header crc32, expected {result.header_crc} but got {calculated}"
return result
def __repr__(self) -> str:
return \
f"time={self.timestamp.isoformat()} length={self.data_length} load=0x{self.load_addr:x} entry=0x{self.entry_addr:x} " + \
f"os={self.os_type} arch={self.arch_type} comp={self.comp_type} itype={self.image_type} name='{self.name}' crc={self.data_crc}"
# https://wiki.osdev.org/Ext2
class EXTHeader(NamedTuple):
inodes_count: int
blocks_count: int
r_blocks_count: int
free_blocks_count: int
free_inodes_count: int
first_data_block: int
log_block_size: int
log_frag_size: int
blocks_per_group: int
frags_per_group: int
inodes_per_group: int
mtime: int
wtime: int
mnt_count: int
mnt_max_count: int
magic: bytes
state: int
errors: int
minor_version: int
check_previous: int
check_interval: int
system_id: int
major_version: int
rsb_uid: int
rsb_gid: int
# Extended superblock:
first_ino: int
inode_size: int
block_group: int
feature_compat: int
feature_incompat: int
feature_ro_compat: int
uuid: bytes
volume_name: bytes
path_volume: bytes
compress: int
alc_files: int
alc_dirs: int
unused: int
journal_id: bytes
journal_inode: int
journal_dev: int
head_inode: int
@classmethod
def from_bytes(cls: "Type[EXTHeader]", data: bytes) -> "EXTHeader":
sup = "IIIIIIIIIIIIIhh2sHHHIIIIHH"
sup_ext = "IHHIII16s16s64sIBBH16sIII"
ext = cls(*struct.unpack_from("<" + sup + sup_ext, data))
assert ext.magic == b"\x53\xEF", "ext magic doesn't match"
return ext
class WMDFirmware(NamedTuple):
version: str
file_size: int
ext : EXTHeader
def read_wmd_firmware(firmware: Path) -> Tuple[BytesIO, WMDFirmware]:
"""
Reads an Sandisk Media Drive (SWDS1) firmware and unpacks its contents.
There is a EXT2 filesystem following after a custom header (152 bytes).
"""
buffer = BytesIO()
buffer.write(firmware.read_bytes())
total_size = buffer.tell()
buffer.seek(0)
# Skip these bytes now, we don't know their purpose.
buffer.read(20)
# The version is stored in 2 bytes, first byte is the major,
# and the second byte is the minor.
# 02 5D -> 2.93 // 03 04 -> 3.04
major, minor = cast(Tuple[str, str], map(str, buffer.read(2)))
version = major + "." + minor.rjust(2, "0")
start_bytes = 152
assert buffer.read(2) == bytes(2), "unexpected bytes at header"
assert buffer.read(9) == b"Qwifi.img", "unexpected bytes at header; name mismatch"
assert sum(buffer.read(start_bytes - buffer.tell())) == 0, "unexpected bytes at header; must be zero"
assert (total_size % 1024) == start_bytes, "file size must be multiple of 1024 with 152 bytes at start"
assert sum(buffer.read(1024)) == 0, "unexpected bytes before superblock"
info = WMDFirmware(
version = version,
ext = EXTHeader.from_bytes(buffer.read(1024)),
file_size = total_size
)
print(f"version: {info.version}", file = stderr)
print(f"size: {info.file_size}", file = stderr)
print("mount image with:\n sudo mount -o loop,ro \"output.img\" -t ext2 sandisk-mount", file = stderr)
print("unmount image with:\n sudo umount /dev/loop0", file = stderr)
print(info.ext, file = stderr)
dumpobj = BytesIO()
buffer.seek(start_bytes)
buffer_size = 1024
while (data := buffer.read(buffer_size)):
dumpobj.write(data)
dumpobj.seek(0)
return dumpobj, info
# pyright: basic
from cmd import Cmd
from enum import Enum
from http.client import HTTPResponse
from io import BytesIO
import re
from sys import stderr
from time import sleep
from typing import Dict, List, NamedTuple, Optional, Tuple, Union, cast
from urllib.error import HTTPError
from urllib.parse import urlencode, unquote, urlunsplit
import xml.etree.ElementTree as ET
from concurrent.futures import ThreadPoolExecutor
from urllib.request import Request, urlopen
from hashlib import sha1, md5
from os.path import join
from datetime import datetime
# from socket import IP_MULTICAST_LOOP, IP_MULTICAST_TTL, SO_REUSEADDR, SO_REUSEPORT, SOCK_DGRAM, SOL_IP, SOL_SOCKET, gethostname, socket, AF_INET, gethostbyaddr, getfqdn, getaddrinfo, gethostbyname, gethostbyname_ex
class NullableEnum:
@classmethod
def _missing_(cls, _):
if hasattr(cls, "UNSUPPORTED"):
return getattr(cls, "UNSUPPORTED")
return getattr(cls, "UNKNOWN")
class DeviceType(NullableEnum, Enum):
# Probably AirStash
A01 = "A01"
A02 = "A02"
# Wireless Flash Drive
FD_128K = "A02S" # 16GB/32GB variant
FD_256K = "A02E" # 64GB variant
# Wireless Stick
WS_V1 = "A03S" # 16GB/32GB variant
WS_V2 = "A03E" # 64GB/128GB/200GB variant
UNKNOWN = ""
class BatteryStatus(NullableEnum, Enum):
CHARGING = "charging"
FULL = "charged"
HIGH = "high"
MEDIUM = "med"
LOW = "low"
CRITICAL = "critical"
UNKNOWN = ""
def to_level(self):
if (self == BatteryStatus.UNKNOWN) or (self == BatteryStatus.CHARGING):
return -1
level = (BatteryStatus.CRITICAL, BatteryStatus.LOW, BatteryStatus.MEDIUM, BatteryStatus.HIGH, BatteryStatus.FULL) \
.index(self) + 1
if level > 4:
level = 4
return level / 4
class WiFiSecurity(NullableEnum, Enum):
WPA2 = "wpa2"
WPA = "wpa"
WEP = "wep"
PUBLIC = "none"
UNKNOWN = ""
class StorageMediumStatus(NullableEnum, Enum):
MOUNTED = "mounted"
UNFORMATTED = "unformatted"
FS_ERROR = "fserror"
ERROR = "carderror"
UNSUPPORTED = ""
class StorageMediumFileSystem(NullableEnum, Enum):
FAT16 = "fat16"
FAT32 = "fat32"
EXFAT = "exfat"
NTFS = "ntfs"
HFS = "hfs"
UNKNOWN = ""
class NetworkConnectStatus(NullableEnum, Enum):
CONNECTING = "connecting"
CONNECTED = "connected"
FAILED = "failed"
UNKNOWN = "unknown"
class NetworkSecurityLevel(NullableEnum, Enum):
NONE = "none"
ALL = "all"
UNKNOWN = ""
class NetworkScanState(NullableEnum, Enum):
SCANNING = "scanning"
LOCKED = "locked"
NONE = "none"
UNKNOWN = ""
class SettingsPushState(Enum):
SUCCESS = "ok" # Changes saved successfully.
SUCCESS_PENDING_RESTART = "ok:pending"
BAD_REQUEST = "bad" # Couldn't register a new network for some reason (?).
ERROR_ENTRY_FULL = "full" # Couldn't register a new network because maximum limit has been reached.
ERROR_ENTRY_DUPLICATE = "duplicate" # Couldn't register a new network because it is already exists.
ERROR_ENTRY_NONEXIST = "notfound" # Couldn't remove a saved network because it doesn't already exists.
class BatteryInfo(NamedTuple):
status: BatteryStatus
voltage: int
class ImplementationInfo(NamedTuple):
security: int
cachent: int
coex: int
upgrade: int
restart: int
exfat: int
move_rename: int
def supports_auto_upgrade(self):
return self.upgrade >= 2
class BuildInfo(NamedTuple):
name: str
model: str
code: int
class BitrateThresholdInfo(NamedTuple):
warning: int
critical: int
class LastErrorInfo(NamedTuple):
message: str
source: str
line: int
version: Optional[int]
address: Optional[int]
counter: Optional[int]
timestamp: Optional[int]
class ExpectedAppVersionInfo(NamedTuple):
android: str
ios: str
class ServerNetworkInfo(NamedTuple):
enabled: bool
clients: int
ssid: str
channel: int
security: WiFiSecurity
key: str
class ClientNetworkInfo(NamedTuple):
enabled: bool
ssid: str
ip: str
from_home: bool
status: NetworkConnectStatus
class NetworkInfo(NamedTuple):
ssid: str
security: WiFiSecurity
rssi: int
connected: bool
saved: bool
def signal(self, is_alt: bool = False):
if self.rssi == -1:
return 0
if is_alt:
if self.rssi <= 50:
return 3
elif self.rssi <= 70:
return 2
return 1
if self.rssi >= 32:
return 3
elif self.rssi >= 16:
return 2
return 1
class StorageMediumInfo(NamedTuple):
status: StorageMediumStatus
format: StorageMediumFileSystem
serial: bytes
path: str
label: str
free: int
total: int
block_size: int
read_only: bool
class WebDAVFolder(NamedTuple):
name: str
created: datetime
modified: datetime
class WebDAVPartialFolder(NamedTuple):
name: str
class WebDAVFile(NamedTuple):
name: str
created: Optional[datetime]
modified: datetime
mimetype: str
size: int
etag: str
cachent: Optional[str]
class MACAddress(str):
def __new__(cls, content: str):
mac = str(content).upper()
if mac.count(":") != 5:
raise ValueError(f"doesn't appear to be a valid mac: {mac}")
if not all((int(x, 16) <= 0xFF for x in mac.split(":"))):
raise ValueError(f"doesn't appear to be a valid mac: {mac}")
return str.__new__(cls, mac)
@property
def as_home(self):
if self.startswith(DIRECT_CONNECT_MAC_PREFIX):
return HOME_CONNECT_MAC_PREFIX + self.removeprefix(DIRECT_CONNECT_MAC_PREFIX)
return self
@property
def as_direct(self):
if self.startswith(HOME_CONNECT_MAC_PREFIX):
return DIRECT_CONNECT_MAC_PREFIX + self.removeprefix(HOME_CONNECT_MAC_PREFIX)
return self
@property
def as_int(self):
return int.from_bytes(bytes.fromhex(self.as_direct.replace(":", "")), "big")
def to_model(self):
# TODO: might be incorrect
value = self.as_int
if value < 0xD0_E4_0B_00_0F_00:
return DeviceType.A01
elif (value >= 0xD0_E4_0B_00_0F_00) and (value <= 0xD0_E4_0B_03_99_FF):
return DeviceType.A02
elif (value >= 0xD0_E4_0B_F5_D6_00) and (value <= 0xD0_E4_0B_FB_9F_FF):
return DeviceType.FD_128K
elif (value >= 0xD0_E4_0B_FB_E0_00) and (value <= 0xD0_E4_0B_FE_FF_FF):
return DeviceType.WS_V1
elif (value >= 0xD0_E4_0B_F5_D5_FF) and (value <= 0xD0_E4_0B_80_50_00):
return DeviceType.WS_V2
class Settings(NamedTuple):
model: DeviceType
hostname: str
mac: MACAddress
network: ServerNetworkInfo
home_network: Optional[ClientNetworkInfo]
build: BuildInfo
battery: BatteryInfo
bitrate: BitrateThresholdInfo
app_version: ExpectedAppVersionInfo
auth: NetworkSecurityLevel
authhash: str
timeout: int
pending_update: Optional[int]
implementation: ImplementationInfo
mediums: List[StorageMediumInfo]
last_error: Optional[LastErrorInfo]
@property
def is_usb(self):
return (len(self.mediums) == 0) and (self.build.code >= 2009)
# SERVER_ADDRESS = "172.25.63.1"
# SERVER_ADDRESS = "sandiskf47e27.local"
SERVER_ADDRESS = "192.168.1.81"
DIRECT_CONNECT_MAC_PREFIX = "D0:E4:0B:"
HOME_CONNECT_MAC_PREFIX = "D2:E4:0B:"
class RequestResponse(NamedTuple):
status: int
body: bytes
headers: Dict[str, str]
def do_request(
path: str,
method: str,
query: Optional[Dict[str, str]] = None,
headers: Optional[Dict[str, str]] = None,
body: Optional[bytes] = None
) -> RequestResponse:
query_string = "" if not query else urlencode(query)
request_url = urlunsplit((
"http", SERVER_ADDRESS,
"/" + path.removeprefix("/"),
query_string, ""
))
req = Request(
url = request_url,
method = method,
unverifiable = True,
data = body,
headers = headers or {}
)
try:
resp = cast(HTTPResponse, urlopen(req))
return RequestResponse(status = resp.status, body = resp.read(), headers = dict(resp.headers))
except HTTPError as he:
return RequestResponse(status = he.status or -1, body = he.read(), headers = dict(he.headers))
def is_legal_name(fs : StorageMediumFileSystem, name : str):
illegal_chars = ""
if fs == StorageMediumFileSystem.FAT32:
illegal_chars = "*?<>:/\\|\""
illegal_chars = "*?.,;:/\\|+=<>[]\""
elif fs == StorageMediumFileSystem.FAT16:
illegal_chars = "\"*,/:;<=>?[\\]|"
return illegal_chars
def get_service():
pass
# host = gethostbyname(gethostname())
# sock = socket(AF_INET, SOCK_DGRAM)
# sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
# sock.setsockopt(SOL_SOCKET, SO_REUSEPORT, 1)
# sock.setsockopt(SOL_IP, IP_MULTICAST_TTL, 255)
# sock.setsockopt(SOL_IP, IP_MULTICAST_LOOP, 1)
# sock.bind(("", 5353))
def post_settings(query: Union[Dict[str, str], Dict[str, Union[bytes, str]]], post_restart: bool = False):
"""
Set device settings.
"""
if post_restart:
query["restart"] = "allowed"
response = do_request(
"/settings.xml",
"POST",
headers = {"Content-Type": "application/x-www-form-urlencoded"},
body = urlencode(query).encode("utf-8")
)
statusxml = ET.XML(response.body)
assert statusxml.tag == "status"
# Is there a pending reboot?
extra = statusxml.attrib.get("restart", "")
if extra:
assert extra == "pending"
extra = f":{extra}"
return SettingsPushState((statusxml.text or "") + extra)
def get_networks(new_scan: bool = True) -> List[NetworkInfo]:
"""
Performs a new Wi-Fi scan, and returns a tuple of two items, a list of
already saved networks and a list of currently scanned networks. If
`new_scan` is False, then no new scan will be performed and the
cached scan list stored on the device will be used instead.
"""
def _get_network_list(*, is_scanned: bool):
response = do_request(
"/settings.xml",
"GET",
query = {"group": "scan" if is_scanned else "saved"}
)
assert response.status == 200, "failed response"
xmldata = ET.XML(response.body)
if is_scanned:
if xmldata.tag == "status":
return NetworkScanState(xmldata.text)
assert xmldata.tag == "ssidlist", f"unexpected xml tag '{xmldata.tag}'"
result : List[NetworkInfo] = []
for ssid in xmldata.findall("./ssid"):
network = NetworkInfo(
ssid = unquote(ssid.attrib["name"]).encode("ISO-8859-15").decode("utf-8"),
security = WiFiSecurity(ssid.attrib["security"]),
rssi = int(ssid.attrib.get("rssi", "") or -1),
# This won't be True for when listing all saved networks even if there is a
# saved network that the device is currently connected to.
# So, it only applies for scanned networks.
connected = ssid.attrib.get("connected", "") == "connected",
saved = not is_scanned
)
# Only include networks that has a SSID.
if not network.ssid:
continue
result.append(network)
return result
def _block_for_scan(interval: int, wait_more: bool = False):
sleep((interval / 1000) * (2 if wait_more else 1))
networks = _get_network_list(is_scanned = True)
if type(networks) is NetworkScanState:
if networks == NetworkScanState.SCANNING:
return _block_for_scan(interval)
elif networks == NetworkScanState.LOCKED:
return _block_for_scan(interval, True)
else:
post_settings({
"group": "scan"
})
return _block_for_scan(interval, True)
else:
return networks
saved_list = cast(List[NetworkInfo], _get_network_list(is_scanned = False))
scanned_list : List[NetworkInfo] = []
if new_scan:
post_settings({
"group": "scan"
})
with ThreadPoolExecutor(max_workers=1) as executor:
fut = executor.submit(_block_for_scan, 2000 if new_scan else 0)
scanned_list = cast(List[NetworkInfo], fut.result())
# If the device is currently connected to a network,
# then don't add the same SSID again.
result : List[NetworkInfo] = []
result.extend(scanned_list)
for old in saved_list:
exist_in_new = next((new for new in result if new.ssid == old.ssid), None)
if exist_in_new:
result.remove(exist_in_new)
result.append(NetworkInfo(*exist_in_new[:-1], saved = True))
result.append(old)
return result
# ---------------------------------------------------------------------------
# Wi-Fi Security
# ---------------------------------------------------------------------------
class WiFiPasswordValidationStatus(Enum):
REQUIRE_NO_PASSWORD = "require_no_password"
REQUIRE_PASSWORD = "require_password"
TOO_SHORT = "too_short"
TOO_LONG = "too_long"
BAD_CHARACTER = "bad_character"
VALID = "valid"
ASCII_REGEX = re.compile("^[ -~]+$")
HEX_REGEX = re.compile("^[0-9A-Fa-f]+$")
def validate_wifi_password(security: WiFiSecurity, password: str) -> Tuple[WiFiPasswordValidationStatus, bytes]:
"""
Checks whether the given password satisfies the requirements of the specified Wi-Fi
security model, and returns a WiFiPasswordValidationStatus reporting the validation outcome.
"""
# Public network
if security == WiFiSecurity.PUBLIC:
if password:
return WiFiPasswordValidationStatus.REQUIRE_NO_PASSWORD, b"",
return WiFiPasswordValidationStatus.VALID, b"",
length = len(password)
if not length:
return WiFiPasswordValidationStatus.REQUIRE_PASSWORD, b"",
# WEP security
if security == WiFiSecurity.WEP:
is_ascii = bool(ASCII_REGEX.fullmatch(password))
is_hex = bool(HEX_REGEX.fullmatch(password))
if is_ascii:
if any((length == 5, length == 13, length == 29)):
return WiFiPasswordValidationStatus.VALID, password.encode("ISO-8859-15")
elif length < 29:
return WiFiPasswordValidationStatus.TOO_SHORT, b"",
return WiFiPasswordValidationStatus.TOO_LONG, b"",
elif is_hex:
if any((length == 10, length == 26, length == 58)):
return WiFiPasswordValidationStatus.VALID, bytes.fromhex(password).decode("ISO-8859-1").encode("ISO-8859-15")
elif length < 58:
return WiFiPasswordValidationStatus.TOO_SHORT, b"",
return WiFiPasswordValidationStatus.TOO_LONG, b"",
return WiFiPasswordValidationStatus.BAD_CHARACTER, b"",
# WPA-PSK password must only include printable ASCII characters
elif (security == WiFiSecurity.WPA2) or (security == WiFiSecurity.WPA):
is_ascii = bool(ASCII_REGEX.fullmatch(password))
if is_ascii:
if length < 8:
return WiFiPasswordValidationStatus.TOO_SHORT, b"",
elif length > 63:
return WiFiPasswordValidationStatus.TOO_LONG, b"",
return WiFiPasswordValidationStatus.VALID, password.encode("ISO-8859-15")
return WiFiPasswordValidationStatus.BAD_CHARACTER, b"",
else:
raise ValueError("invalid Wi-Fi security to validate password against")
def create_pbkdf2_wpapsk_key(ssid: bytes, password: bytes) -> bytes:
"""
Creates a PBKDF2 WPA-PSK value from a SSID and password.
Adapted from `hashlib.pbkdf2_hmac()`
"""
inner = sha1()
outer = sha1()
passbytes = bytearray()
passbytes.extend(password if len(password) <= 64 else sha1(password).digest())
passbytes.extend(bytes(64 - len(passbytes)))
inner.update(bytes(x ^ 0x36 for x in passbytes))
outer.update(bytes(x ^ 0x5C for x in passbytes))
def psuedo_random(data: bytes):
icpy = inner.copy()
ocpy = outer.copy()
icpy.update(data)
ocpy.update(icpy.digest())
return ocpy.digest()
output = bytearray()
loop = 1
while len(output) < 32:
prev = psuedo_random(ssid + loop.to_bytes(4, 'big'))
rkey = int.from_bytes(prev, 'big')
for _ in range(4096 - 1):
prev = psuedo_random(prev)
rkey ^= int.from_bytes(prev, 'big')
loop += 1
output.extend(rkey.to_bytes(inner.digest_size, 'big'))
return bytes(output[:32])
def save_network(ssid: str, security: WiFiSecurity, password: str):
"""
Connects to a Wi-Fi access point with given its SSID and password.
Requires version >= 669.
"""
query : Dict[str, Union[bytes, str]] = {
"group": "saved",
"action": "save"
}
query["ssid"] = ssid.encode("ISO-8859-15").decode("utf-8")
if security == WiFiSecurity.PUBLIC:
query["security"] = "none"
elif security == WiFiSecurity.WEP:
query["security"] = "wep"
elif (security == WiFiSecurity.WPA) or (security == WiFiSecurity.WPA2):
query["security"] = "wpa"
else:
raise ValueError("invalid security model")
wifi_status, wifi_pass = validate_wifi_password(security, password)
if wifi_status != WiFiPasswordValidationStatus.VALID:
raise ValueError("password isn't legal for this security model, reason: " + str(wifi_status.name))
# Insert WEP password
if security == WiFiSecurity.WEP:
query["password"] = wifi_pass.decode("utf-8")
elif (security == WiFiSecurity.WPA) or (security == WiFiSecurity.WPA2):
query["password"] = create_pbkdf2_wpapsk_key(ssid.encode("ISO-8859-15"), wifi_pass).decode("utf-8")
return post_settings(query)
def remove_network(ssid: str):
"""
Removes an existing network. Requires version >= 669.
"""
query = {
"group": "saved",
"action": "delete"
}
query["ssid"] = ssid.encode("ISO-8859-15").decode("utf-8")
return post_settings(query)
def connect_network(ssid: str):
"""
Connect to an already saved network. Requires version >= 669.
"""
query = {
"group": "saved",
"action": "connect"
}
query["ssid"] = ssid.encode("ISO-8859-15").decode("utf-8")
return post_settings(query)
def set_sidelink_mode(mode: bool):
"""
Change home network mode. Requires version >= 669.
"""
return post_settings({
"sidelinken": "true" if mode else "false"
})
def set_ap_password(settings: Settings, password: str, ssid: Optional[str] = None, for_home_network: Optional[bool] = None):
"""
Set AP password.
"""
required_model = WiFiSecurity.WEP
query = {}
if settings.build.code >= 703:
required_model = WiFiSecurity.WPA2
if ssid is not None:
if not ssid:
raise ValueError("either provide an non-empty ssid or leave it blank to use default ssid")
if len(ssid) > 32:
raise ValueError("ssid can't be longer than 32 characters")
if password:
wifi_status, wifi_pass = validate_wifi_password(required_model, password)
if wifi_status != WiFiPasswordValidationStatus.VALID:
raise ValueError("password isn't legal for this security model, reason: " + str(wifi_status.name))
if required_model == WiFiSecurity.WEP:
query["security"] = "wep"
query["password"] = wifi_pass.decode("utf-8")
elif required_model == WiFiSecurity.WPA2:
query["security"] = "wpa"
query["password"] = create_pbkdf2_wpapsk_key((ssid or settings.network.ssid).encode("ISO-8859-15"), wifi_pass)
else:
query["security"] = "none"
if for_home_network is True:
query["auth"] = "all"
query["authowner"] = "owner"
query["authhash"] = md5(f"owner:{ssid or settings.network.ssid}:{password}".encode("ISO-8859-1")).digest().hex()
elif for_home_network is False:
query["auth"] = "none"
return post_settings(query, post_restart = True)
def set_ap_mode(mode: bool):
"""
Change AP mode. Requires version >= 657.
"""
return post_settings({
"ap": "true" if mode else "false"
})
def set_coex_mode(mode: bool):
"""
TODO: Found in source code, not sure what does it do.
"""
return post_settings({
"group": "coex",
"action": "save" if mode else "delete",
**({"name": "test device"} if mode else {})
})
def set_timeout(value: int):
"""
Sets power saving timeout. Requires version >= 586.
"""
if value < 0:
raise ValueError("timeout value can't be negative")
# 0, 15, 30, 60
return post_settings({
"timeout": str(value)
})
def change_wifi_channel(value: int):
"""
Sets the WLAN channel of the device to a zero-indexed channel number.
Channels #13 and #14 should be avoided in North America.
Requires version >= 574. See more about WLAN channels:
https://en.wikipedia.org/wiki/List_of_WLAN_channels#2.4_GHz_(802.11b/g/n/ax/be)
"""
if value < 0:
raise ValueError("channel number must start from 0 (inclusive)")
elif value > 13:
raise ValueError("channel number must equal or less than 13")
return post_settings({
"channel": str(value)
})
def push_firmware(file: bytes, is_xml: bool, is_wfd_v2: bool):
"""
Uploads a firmware file to the device.
is_xml: Must be True if Settings.implementation.upgrade == 2
is_wfd_v2: Must be True if Settings.model == DeviceType.WS_V2, only is in effect when is_xml is False.
"""
path: str = ""
if not is_xml:
path = "/files/" + ("AIRST.DF2" if not is_wfd_v2 else "wfd.df3")
else:
path = "/settings.xml"
assert path, "firmware parameters are invalid!"
response = do_request(
path,
"PUT",
query = None if not is_xml else {"group": "firmware"},
body = file
)
print(response.body, file = stderr)
if response.status == 507:
raise ValueError("disk full")
assert response.status in (200, 201, 204), f"status was {response.status}"
return response
def webdav_list(path: str = "/") -> List[Union[WebDAVFolder, WebDAVFile]]:
"""
Lists files and folders in a path. Returns a list containing WebDAVFile and WebDAVFolder
objects depending on the type of the item.
"""
# https://datatracker.ietf.org/doc/html/rfc4918#section-14.20
# https://datatracker.ietf.org/doc/html/rfc4918#section-9.1
propfind = ET.Element("propfind", {"xmlns:D": "DAV:"})
ET.SubElement(propfind, "{DAV:}propname")
tree = ET.ElementTree(propfind)
buffer = BytesIO()
tree.write(buffer, xml_declaration=True, encoding="utf-8")
filename = path.removeprefix('/')
response = do_request(
f"/files/{filename}",
"PROPFIND",
headers = {
"Accept": "*/*",
"Depth": "1",
"Content-Type": "text/xml; charset=UTF-8"
},
body = buffer.getvalue()
)
if response.status == 404:
if not filename:
raise ValueError("no memory card exists on the device")
raise ValueError("directory couldn't be found")
# If a redirect is detected, toggle the trailing slash.
if (response.status == 301) and path:
return webdav_list(path + ("" if path[-1] == "/" else "/"))
assert response.status == 207, f"status was {response.status}"
xmldata = ET.XML(response.body)
assert xmldata.tag == "{DAV:}multistatus", "couldn't parse this xml response"
result = []
for element in xmldata.findall("./{DAV:}response"):
propsxml = element.find("./{DAV:}propstat/{DAV:}prop")
assert propsxml is not None
creation_date = cast(str, propsxml.findtext("./{DAV:}creationdate"))
modified_date = cast(str, propsxml.findtext("./{DAV:}getlastmodified"))
mimetype = cast(str, propsxml.findtext("./{DAV:}getcontenttype"))
name = cast(str, element.findtext("./{DAV:}href")).removeprefix(f"http://{SERVER_ADDRESS}/files/{filename}")
# Skip the collection itself that being listed
if not name:
continue
# Sub-collections (folders) will have a child element inside resourcetype element.
resource_type = propsxml.find("./{DAV:}resourcetype/*")
if resource_type is not None:
resource_type = resource_type.tag
# DRY: File properties inherit properties than folder
webdav_obj = WebDAVFolder(
name = name,
created = datetime.strptime(creation_date, "%Y-%m-%dT%H:%M:%SZ"),
modified = datetime.strptime(modified_date, "%a, %d %b %Y %H:%M:%S %Z")
)
if resource_type == "{DAV:}collection":
assert mimetype == "text/directory", "got a non-expected mimetype for a directory"
result.append(webdav_obj)
else:
result.append(WebDAVFile(
*webdav_obj,
mimetype = mimetype,
size = int(cast(str, propsxml.findtext("./{DAV:}getcontentlength"))),
etag = cast(str, propsxml.findtext("./{DAV:}getetag")).removeprefix('"').removesuffix('"'),
cachent = propsxml.findtext("./{AirStash:}cachent")
))
return result
def webdav_get(path: str):
"""
Retrieves details for a given path. Returns a WebDAVFile object if the path is a
file, or a WebDAVPartialFolder object if the path is not a file.
"""
filename = path.removeprefix('/').removesuffix('/')
response = do_request(
f"/files/{filename}",
"HEAD",
headers = {"Accept": "*/*"}
)
if not filename:
raise ValueError("a filename is required")
if response.status == 404:
raise ValueError("file was not found")
assert response.status == 200, f"status was {response.status}"
# The requested filename can be a directory, in this case, return a WebDAVFolder.
if "Content-Length" not in response.headers:
return WebDAVPartialFolder(
name = filename
)
return WebDAVFile(
name = filename,
created = None,
modified = datetime.strptime(response.headers["Last-Modified"], "%a, %d %b %Y %H:%M:%S %Z"),
mimetype = response.headers["Content-Type"],
size = int(response.headers["Content-Length"]),
etag = response.headers["ETag"].removeprefix('"').removesuffix('"'),
cachent = None
)
def webdav_delete(path: str):
"""
Deletes an object. Returns True on success, otherwise False. (file is already non exist)
"""
filename = path.removeprefix('/').removesuffix('/')
response = do_request(
f"/files/{filename}",
"DELETE",
headers = {"Accept": "*/*"}
)
if not filename:
raise ValueError("a filename is required")
assert response.status in (204, 404), f"status was {response.status}"
return response.status == 204
def webdav_mkdir(path: str):
"""
Creates a new path.
"""
filename = path.removeprefix('/').removesuffix('/')
response = do_request(
f"/files/{filename}/",
"MKCOL",
headers = {"Accept": "*/*"}
)
if response.status == 403:
raise ValueError("a file with same name exists or the name contains invalid characters")
elif response.status == 404:
raise ValueError("path was not found (try creating missing parent directories first)")
assert response.status == 201, f"status was {response.status}"
def webdav_move(source: str, target: str, overwrite: bool = False):
"""
Renames the path.
"""
if (not source) or (not target):
raise ValueError("both filenames are required")
# While the device itself doesn't care the trailing slashes on renaming,
# we check anyway as a safeguard.
if (source.endswith("/") != target.endswith("/")):
raise ValueError("mixed trailing slashes; both sides must be a directory or a file")
src, trg = source.removeprefix('/').removesuffix('/'), target.removeprefix('/').removesuffix('/')
response = do_request(
f"/files/{src}",
"MOVE",
headers = {
"Destination": f"http://{SERVER_ADDRESS}/files/{trg}",
"Overwrite": "T" if overwrite else "F",
"Accept": "*/*"
}
)
if response.status == 403:
raise ValueError("a file with same name exists")
elif response.status == 404:
raise ValueError("path was not found")
elif response.status == 400:
raise ValueError("invalid path (file may be already moved)")
assert response.status == 201, f"status was {response.status}"
def webdav_copy(source: str, target: str, overwrite: bool = False):
"""
Copies an object.
TODO: Doesn't work?
"""
if (not source) or (not target):
raise ValueError("both filenames are required")
# While the device itself doesn't care the trailing slashes on renaming,
# we check anyway as a safeguard.
if (source.endswith("/") != target.endswith("/")):
raise ValueError("mixed trailing slashes; both sides must be a directory or a file")
src, trg = source.removeprefix('/').removesuffix('/'), target.removeprefix('/').removesuffix('/')
response = do_request(
f"/files/{src}",
"COPY",
headers = {
"Destination": f"http://{SERVER_ADDRESS}/files/{trg}",
"Overwrite": "T" if overwrite else "F",
"Accept": "*/*"
}
)
if response.status == 403:
raise ValueError("a file with same name exists")
elif response.status == 404:
raise ValueError("path was not found")
elif response.status == 400:
raise ValueError("invalid path")
assert response.status == 201, f"status was {response.status}"
def check_connection():
"""
Returns True if connected to the hostname.
"""
pass
# if gethostbyname(getfqdn()) != "172.25.63.2":
# return False
# return True
# sources/com/wearable/sdk/impl/SettingsManager.java
def get_settings():
"""
Parses a settings.xml file contents and returns a structure containing fields.
"""
response = do_request(
"/settings.xml",
"GET"
)
assert response.status == 200, "failed response"
settings = ET.XML(response.body)
assert settings.tag == "settings"
build = BuildInfo(
name = settings.findtext("./version") or "",
model = settings.findtext("./buildmodel") or "",
code = int(settings.findtext("./numericversion") or 0)
)
model = settings.findtext("./model") or ""
hostname = settings.findtext("./hostname") or ""
serial = settings.findtext("./serial") or ""
assert all((model, hostname, serial, build.name, build.model, build.code > 0))
featxml = settings.find("./features")
assert featxml is not None
implement = ImplementationInfo(
security = int(featxml.findtext("./security") or 0),
cachent = int(featxml.findtext("./cachent") or 0),
coex = int(featxml.findtext("./coex") or 0),
upgrade = int(featxml.findtext("./firmwareupdate") or (1 if build.code >= 563 else 0)),
restart = int(featxml.findtext("./restart") or 0),
exfat = int(featxml.findtext("./exfat") or 0),
move_rename = 1 if build.code >= 1082 else 0
)
bitxml = settings.find("./bitrate")
assert bitxml is not None
bitrate = BitrateThresholdInfo(
warning = int(bitxml.attrib["warn"]) * 1000,
critical = int(bitxml.attrib["critical"]) * 1000
)
appxml = settings.find("./appversion")
assert appxml is not None
app_version = ExpectedAppVersionInfo(
android = appxml.findtext("./android") or "",
ios = appxml.findtext("./ios") or ""
)
# Get battery info.
battxml = settings.find("./battery")
assert battxml is not None
battery = BatteryInfo(
status = BatteryStatus(battxml.attrib["status"]),
voltage = int(battxml.attrib["voltage"])
)
# Get access point information.
apxml = settings.find("./ap")
assert apxml is not None
access_point = ServerNetworkInfo(
enabled = apxml.attrib["enabled"] == "true",
clients = int(apxml.attrib.get("clients", None) or -1),
ssid = unquote(cast(str, settings.findtext("./ssid"))).encode("ISO-8859-15").decode("utf-8"),
channel = int(settings.findtext("./channel") or -1),
security = WiFiSecurity(settings.findtext("./security")),
key = settings.findtext("./wpapsk") or "",
)
# Get connected Wi-Fi client information.
sidelinkxml = settings.find("./sidelink")
sidelink_enabled = False
if (sidelinkxml is not None) and (sidelinkxml.attrib.get("enabled") == "true"):
sidelink_enabled = True
sdcxml = settings.find("./client")
sidelink = None
if sdcxml is not None:
sidelink = ClientNetworkInfo(
enabled = sidelink_enabled,
ssid = unquote(sdcxml.attrib["ssid"]).encode("ISO-8859-15").decode("utf-8"),
ip = sdcxml.attrib["ip"],
# Are we currently connected to the home network, instead of drive's own AP?
from_home = sdcxml.attrib.get("method", "") == "sidelink",
status = NetworkConnectStatus(sdcxml.attrib["status"])
)
# Not obtained an IP address yet.
if sidelink.ip == "0.0.0.0":
sidelink = ClientNetworkInfo(*sidelink[:-1], status = NetworkConnectStatus.UNKNOWN)
# Get a list of storage mediums (in fact, microSD cards residing on the drive).
mediums : List[StorageMediumInfo] = []
for medium in settings.findall("./cards/card"):
mount_type = medium.attrib["status"]
# The microSD card is removed, or the device plugged to a USB port, since
# the device doesn't support accessing files from both WebDAV and USB at the same time.
if mount_type.lower() == "none":
continue
card_status = StorageMediumStatus(mount_type)
mediums.append(StorageMediumInfo(
status = card_status,
format = StorageMediumFileSystem(medium.attrib["format"]),
serial = bytes.fromhex(medium.attrib["serial"]), # The UUID is returned as "1234ABCD", without the dash.
path = medium.attrib["path"],
label = medium.attrib["label"],
free = int(medium.attrib["free"]),
total = int(medium.attrib["total"]),
block_size = int(medium.attrib["blocksize"]),
read_only =
card_status != StorageMediumStatus.MOUNTED if build.code >= 914 else
medium.attrib.get("readonly", "") == "protected"
))
# Get the last thrown exception stored in the device.
last_error : Optional[LastErrorInfo] = None
if build.code >= 914:
stored_error = settings.find("./storederror")
if stored_error is not None:
errfile = stored_error.attrib.get("file", None)
errdesc = stored_error.attrib.get("description", None)
if ((errfile is not None) and errdesc) and (((build.code < 2009) and (errdesc.lower() != "none")) or (build.code >= 2009)):
last_error = LastErrorInfo(
message = errdesc,
source = errfile,
line = int(stored_error.attrib["line"]),
version = None if "version" not in stored_error.attrib else int(stored_error.attrib["version"]),
address = None if "address" not in stored_error.attrib else int(stored_error.attrib["address"], 16),
counter = None if "pc" not in stored_error.attrib else int(stored_error.attrib["pc"], 16),
timestamp = None if "timestamp" not in stored_error.attrib else int(stored_error.attrib["timestamp"])
)
pending_firm = None
pendingxml = settings.find("./pendingfirmware")
if pendingxml is not None:
pending_firm = int(pendingxml.attrib.get("build", "") or 0)
modelenum = DeviceType(model)
if (model == DeviceType.FD_128K) and (build.model == DeviceType.FD_256K.value):
model = DeviceType.FD_256K
if (build.model == DeviceType.WS_V2.value):
model = DeviceType.WS_V2
mac = MACAddress(serial)
timeout = int(settings.findtext("./timeout") or -1)
auth_level = NetworkSecurityLevel(settings.findtext("./auth"))
auth_hash = settings.findtext("./authhash") or ""
# If we can't still determine the device from its model,
# try with MAC address instead.
if modelenum == DeviceType.UNKNOWN:
print("model couldn't be detected, trying mac!", file = stderr)
modelenum = mac.to_model()
if not modelenum:
raise ValueError(f"couldn't determine the model: {serial}")
result = Settings(
model = modelenum,
hostname = hostname,
mac = mac,
network = access_point,
home_network = sidelink,
build = build,
battery = battery,
bitrate = bitrate,
app_version = app_version,
auth = auth_level,
authhash = auth_hash,
timeout = timeout,
pending_update = pending_firm,
implementation = implement,
mediums = mediums,
last_error = last_error
)
return result
# ----------------------------------------------------------
# A very tiny command implementation
# ----------------------------------------------------------
class SandiskShell(Cmd):
intro: str = ""
prompt: str = "sandisk:/> "
currentdir: str = "/"
def do_pwd(self, *_):
"Get the current path."
print(self.currentdir)
def do_info(self, *_):
for k, v in get_settings()._asdict().items():
print(f"{k}: {v}")
def do_ls(self, arg: str):
details = arg.strip() == "more"
empty = True
for item in webdav_list(self.currentdir):
empty = False
if isinstance(item, WebDAVFolder):
if details:
print(f"- [{item.name}] | {item.created.isoformat()} | {item.modified.isoformat()}")
else:
print(f"- [{item.name}]")
else:
if details:
assert item.created
assert item.modified
print(f"- {item.name} | {item.created.isoformat()} | {item.modified.isoformat()}, {item.size} bytes, {item.etag}, {item.cachent}")
else:
print(f"- {item.name}")
if empty:
print("(directory is empty)")
def do_network(self, arg: str):
if (arg == "scan") or (arg == "list"):
is_scan = arg == "scan"
state = get_settings()
if is_scan:
print("Scanning for networks, wait for few seconds...")
networks = get_networks(is_scan)
rssi_mode = state.model in (DeviceType.WS_V2, DeviceType.WS_V1, )
for network in networks:
level = network.signal(rssi_mode)
bars = "..."
if level > 0:
bars = (level * "β– ").ljust(3, "_")
print(f"* [{bars}] {'β™― ' if network.security.value != 'none' else ' '}{network.ssid}")
elif arg.startswith("connect"):
ssid = arg.removeprefix("connect").strip()
if not ssid:
print("An SSID is required!")
return
network = next((x for x in get_networks(False) if x.ssid == ssid), None)
if not network:
print("SSID couldn't be found!")
return
if network.security == WiFiSecurity.UNKNOWN:
print("Unknown security model for this SSID!")
return
password = ""
if (network.security != WiFiSecurity.PUBLIC) and (not network.saved):
password = input("Input password: ")
if not network.saved:
save_network(ssid, network.security, password)
print(connect_network(ssid))
def do_cd(self, arg: str):
upcoming = None
if arg == "..":
upcoming = "/".join(self.currentdir.split("/")[:-1]) or "/"
elif arg == "/":
upcoming = "/"
elif arg:
upcoming = join(self.currentdir, "/".join((x for x in arg.split("/") if x)))
if upcoming:
try:
webdav_get(upcoming)
self.currentdir = upcoming
except ValueError as e:
print("error:", str(e))
def do_crash(self, _):
webdav_move("../files/", "../")
def do_restart(self, _):
print(post_settings({
"restart": "allowed"
}))
def do_battery(self, _):
settings = get_settings()
bar = {
BatteryStatus.CHARGING: "[β–‘πŸ—²β–‘]",
BatteryStatus.CRITICAL: "[β–‘βš β–‘]",
BatteryStatus.FULL: "[β–Šβ–Šβ–Š]",
BatteryStatus.HIGH: "[β–Šβ–Šβ–Š]",
BatteryStatus.MEDIUM: "[β–Šβ–Š ]",
BatteryStatus.LOW: "[β–Š ]",
BatteryStatus.UNKNOWN: "[ ? ]"
}
level = \
"Charging" if settings.battery.status == BatteryStatus.CHARGING else \
"Unknown" if settings.battery.status == BatteryStatus.UNKNOWN else \
"Fully charged" if settings.battery.status == BatteryStatus.FULL else \
f"<= {(settings.battery.status.to_level()) * 100:.2f}%"
print(f"{bar[settings.battery.status]} {level}, {settings.battery.voltage / 1000:.2f}V")
print(f"Power save timeout: {settings.timeout} min(s)")
def emptyline(self):
return False
def precmd(self, line: str) -> str:
self.prompt = f"sandisk:{self.currentdir}> "
return line
def postcmd(self, stop: bool, line: str) -> bool:
self.prompt = f"sandisk:{self.currentdir}> "
return stop
def do_exit(self, *_):
"Exit from the program."
exit(0)
if __name__ == '__main__':
try:
SandiskShell().cmdloop()
except KeyboardInterrupt:
print()
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment