Skip to content

Instantly share code, notes, and snippets.

@rbistolfi
Created June 6, 2013 17:18
Show Gist options
  • Select an option

  • Save rbistolfi/5723210 to your computer and use it in GitHub Desktop.

Select an option

Save rbistolfi/5723210 to your computer and use it in GitHub Desktop.
#!/bin/env python
"""Media handling. Implements objects providing methods
for mounting, umounting, formating and querying media.
"""
__author__ = "rbistolfi"
import re
import os
import parted
from vinstall.backend import sp
from utils import mount, umount, is_mounted, format_partition
from _ped import DiskLabelException
DEV_DIR = '/dev'
CD_INFO = '/proc/sys/dev/cdrom/info'
PARTITIONS = '/proc/partitions'
MOUNTPOINT = '/mnt/loop'
class StorageDevice(object):
"""Base class for Storage Devices
"""
def size(self, unit="GB"):
""" Returns size, by default in GB, unit can be GB or MB
"""
return NotImplemented
def path(self):
"""Return the node path for this device
"""
return NotImplemented
def is_read_only(self):
"""Return True if the device is read-only
"""
return NotImplemented
class MountMixin(object):
"""Mixin for devices that can be mounted
"""
@property
def mountpoint(self):
"""Return actual mountpoint if device is mounted. If its not mounted,
create a mountpoint from the device path. Wrapped in a property for API
compatibility.
"""
if self.is_mounted():
for line in open("/etc/mtab"):
device, mountpoint, _ = line.split(" ", 2)
if device == self.device_path:
return mountpoint
else:
mntpoint = self.device_path.replace("/dev", "/mnt")
if not os.path.exists(mntpoint):
os.mkdir(mntpoint)
return mntpoint
def mount(self, mountpoint=None, filesystem="auto"):
"""Mount the media in the specified mountpoint.
"""
if mountpoint is None:
mountpoint = self.mountpoint
return mount(self.device_path, mountpoint, filesystem=filesystem)
def umount(self):
"""Umount the media.
"""
return umount(self.mountpoint)
def is_mounted(self):
"""Returns True if the media is mounted, False otherwise.
"""
return is_mounted(self.device_path)
class FormatMixin(object):
"""Mixin for devices that can be formated
"""
def query_filesystem(self):
"""Retrieve the current filesystem type from the system."""
try:
pfs = self._parted_partition.fileSystem.type
except AttributeError:
# partition has no format
pfs = ""
if "swap" in pfs:
pfs = "swap"
return pfs
def format(self, filesystem):
"""Create a filesystem in this partition.
"""
format_partition(self.device_path, filesystem)
class Partition(StorageDevice, MountMixin, FormatMixin):
"""A class representing a partition in a hard disk.
"""
def __str__(self):
if self.query_filesystem():
filesystem = self.query_filesystem()
else:
filesystem = "Not formated"
return '%s (%s %s)' % (self.device_path, self.size("GB"), filesystem)
def __repr__(self):
return '<Partition object: %s>' % self.device_path
def __init__(self, device_path):
"""Initializes a Media object from the device path.
"""
self.device_path = device_path
def size(self, unit="GB"):
"""Return the partition size in the specified unit as a string.
Defaults to GB
"""
try:
su = round(self._parted_partition.getSize(unit), 1)
except AttributeError:
su = round(self._parted_partition.getLength(unit), 1)
return "%s %s"% (su, unit)
def path(self):
"""Return the device path (only for interface compatibility with
Disk)
"""
return self.device_path
@classmethod
def all(cls):
"""Return all the partitions in the system
"""
disks = [ i for i in Disk.all() if i._disk is not None ]
partitions = []
for dsk in disks:
partitions.extend(dsk._disk.partitions)
for part in partitions:
if part.type in (parted.PARTITION_NORMAL,
parted.PARTITION_LOGICAL,):
p = cls(part.path)
p._parted_partition = part
yield p
class CDRom(StorageDevice, MountMixin):
def __init__(self, device_path):
super(CDRom, self).__init__()
self.device_path = device_path
self.is_cdrom = True
self.is_disk = False
def is_read_only(self):
return True
def path(self):
# api compat
return self.device_path
def size(self, unit="GB"):
# we dont care, not going to write
return 0
@classmethod
def all(cls):
devices = list_cdroms()
for dev in devices:
yield cls(dev)
class Disk(StorageDevice):
def __init__(self):
self._device = None
self._disk = None
self.is_cdrom = False
self.is_disk = True
def __repr__(self):
return "<Disk %s>" % self.path()
def __str__(self):
return "%s - %s GB (%s)" % (self.model(), round(self.size(), 2), self.path())
def model(self):
"""Return the string describing the device model"""
return self._device.model
def path(self):
return self._device.path
def size(self, unit="GB"):
""" Returns size, by default in GB, unit can be GB or MB
"""
# fix parted api change
try:
l = self._device.getSize(unit)
except AttributeError:
l = self._device.getLength(unit)
return l
def is_read_only(self):
return False
@property
def has_partition_table(self):
"""Return True if the disk has a partition table. False otherwise
"""
return self._disk is not None
def partitions(self):
"""Return iterable with partitions from this disk
"""
for part in self._disk.partitions:
if part.type in (parted.PARTITION_NORMAL,
parted.PARTITION_LOGICAL,):
p = Partition(part.path)
p._parted_partition = part
yield p
@classmethod
def all(cls):
devices = ( dev for dev in parted.getAllDevices() if dev.readOnly is False )
if not devices:
raise RuntimeError("No hard drives found")
for dev in devices:
disk = cls()
disk._device = dev
try:
disk._disk = parted.Disk(dev)
except DiskLabelException, e:
#print "%s Has no partion table. Cannot read partitions" % disk
pass
yield disk
def list_cdroms(proc_info=CD_INFO):
"""Find CDROM devices based in the information stored in the proc tree.
"""
with open(proc_info) as file_handler:
cdroms = re.findall(r'([hs]d[a-z]|s[rg]\d*)', file_handler.read(), re.M)
return ( os.path.join("/dev", i) for i in cdroms )
def list_swap():
"""Returns a list of swap devices.
"""
swaps = []
for i in Partition.all():
data = sp.check_output(['blkid', i.path()])
if "swap" in data:
swaps.append(i.path())
return swaps
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment