Created
June 6, 2013 17:18
-
-
Save rbistolfi/5723210 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/bin/env python | |
| """Media handling. Implements objects providing methods | |
| for mounting, umounting, formating and querying media. | |
| """ | |
| __author__ = "rbistolfi" | |
| import re | |
| import os | |
| import parted | |
| from vinstall.backend import sp | |
| from utils import mount, umount, is_mounted, format_partition | |
| from _ped import DiskLabelException | |
| DEV_DIR = '/dev' | |
| CD_INFO = '/proc/sys/dev/cdrom/info' | |
| PARTITIONS = '/proc/partitions' | |
| MOUNTPOINT = '/mnt/loop' | |
| class StorageDevice(object): | |
| """Base class for Storage Devices | |
| """ | |
| def size(self, unit="GB"): | |
| """ Returns size, by default in GB, unit can be GB or MB | |
| """ | |
| return NotImplemented | |
| def path(self): | |
| """Return the node path for this device | |
| """ | |
| return NotImplemented | |
| def is_read_only(self): | |
| """Return True if the device is read-only | |
| """ | |
| return NotImplemented | |
| class MountMixin(object): | |
| """Mixin for devices that can be mounted | |
| """ | |
| @property | |
| def mountpoint(self): | |
| """Return actual mountpoint if device is mounted. If its not mounted, | |
| create a mountpoint from the device path. Wrapped in a property for API | |
| compatibility. | |
| """ | |
| if self.is_mounted(): | |
| for line in open("/etc/mtab"): | |
| device, mountpoint, _ = line.split(" ", 2) | |
| if device == self.device_path: | |
| return mountpoint | |
| else: | |
| mntpoint = self.device_path.replace("/dev", "/mnt") | |
| if not os.path.exists(mntpoint): | |
| os.mkdir(mntpoint) | |
| return mntpoint | |
| def mount(self, mountpoint=None, filesystem="auto"): | |
| """Mount the media in the specified mountpoint. | |
| """ | |
| if mountpoint is None: | |
| mountpoint = self.mountpoint | |
| return mount(self.device_path, mountpoint, filesystem=filesystem) | |
| def umount(self): | |
| """Umount the media. | |
| """ | |
| return umount(self.mountpoint) | |
| def is_mounted(self): | |
| """Returns True if the media is mounted, False otherwise. | |
| """ | |
| return is_mounted(self.device_path) | |
| class FormatMixin(object): | |
| """Mixin for devices that can be formated | |
| """ | |
| def query_filesystem(self): | |
| """Retrieve the current filesystem type from the system.""" | |
| try: | |
| pfs = self._parted_partition.fileSystem.type | |
| except AttributeError: | |
| # partition has no format | |
| pfs = "" | |
| if "swap" in pfs: | |
| pfs = "swap" | |
| return pfs | |
| def format(self, filesystem): | |
| """Create a filesystem in this partition. | |
| """ | |
| format_partition(self.device_path, filesystem) | |
| class Partition(StorageDevice, MountMixin, FormatMixin): | |
| """A class representing a partition in a hard disk. | |
| """ | |
| def __str__(self): | |
| if self.query_filesystem(): | |
| filesystem = self.query_filesystem() | |
| else: | |
| filesystem = "Not formated" | |
| return '%s (%s %s)' % (self.device_path, self.size("GB"), filesystem) | |
| def __repr__(self): | |
| return '<Partition object: %s>' % self.device_path | |
| def __init__(self, device_path): | |
| """Initializes a Media object from the device path. | |
| """ | |
| self.device_path = device_path | |
| def size(self, unit="GB"): | |
| """Return the partition size in the specified unit as a string. | |
| Defaults to GB | |
| """ | |
| try: | |
| su = round(self._parted_partition.getSize(unit), 1) | |
| except AttributeError: | |
| su = round(self._parted_partition.getLength(unit), 1) | |
| return "%s %s"% (su, unit) | |
| def path(self): | |
| """Return the device path (only for interface compatibility with | |
| Disk) | |
| """ | |
| return self.device_path | |
| @classmethod | |
| def all(cls): | |
| """Return all the partitions in the system | |
| """ | |
| disks = [ i for i in Disk.all() if i._disk is not None ] | |
| partitions = [] | |
| for dsk in disks: | |
| partitions.extend(dsk._disk.partitions) | |
| for part in partitions: | |
| if part.type in (parted.PARTITION_NORMAL, | |
| parted.PARTITION_LOGICAL,): | |
| p = cls(part.path) | |
| p._parted_partition = part | |
| yield p | |
| class CDRom(StorageDevice, MountMixin): | |
| def __init__(self, device_path): | |
| super(CDRom, self).__init__() | |
| self.device_path = device_path | |
| self.is_cdrom = True | |
| self.is_disk = False | |
| def is_read_only(self): | |
| return True | |
| def path(self): | |
| # api compat | |
| return self.device_path | |
| def size(self, unit="GB"): | |
| # we dont care, not going to write | |
| return 0 | |
| @classmethod | |
| def all(cls): | |
| devices = list_cdroms() | |
| for dev in devices: | |
| yield cls(dev) | |
| class Disk(StorageDevice): | |
| def __init__(self): | |
| self._device = None | |
| self._disk = None | |
| self.is_cdrom = False | |
| self.is_disk = True | |
| def __repr__(self): | |
| return "<Disk %s>" % self.path() | |
| def __str__(self): | |
| return "%s - %s GB (%s)" % (self.model(), round(self.size(), 2), self.path()) | |
| def model(self): | |
| """Return the string describing the device model""" | |
| return self._device.model | |
| def path(self): | |
| return self._device.path | |
| def size(self, unit="GB"): | |
| """ Returns size, by default in GB, unit can be GB or MB | |
| """ | |
| # fix parted api change | |
| try: | |
| l = self._device.getSize(unit) | |
| except AttributeError: | |
| l = self._device.getLength(unit) | |
| return l | |
| def is_read_only(self): | |
| return False | |
| @property | |
| def has_partition_table(self): | |
| """Return True if the disk has a partition table. False otherwise | |
| """ | |
| return self._disk is not None | |
| def partitions(self): | |
| """Return iterable with partitions from this disk | |
| """ | |
| for part in self._disk.partitions: | |
| if part.type in (parted.PARTITION_NORMAL, | |
| parted.PARTITION_LOGICAL,): | |
| p = Partition(part.path) | |
| p._parted_partition = part | |
| yield p | |
| @classmethod | |
| def all(cls): | |
| devices = ( dev for dev in parted.getAllDevices() if dev.readOnly is False ) | |
| if not devices: | |
| raise RuntimeError("No hard drives found") | |
| for dev in devices: | |
| disk = cls() | |
| disk._device = dev | |
| try: | |
| disk._disk = parted.Disk(dev) | |
| except DiskLabelException, e: | |
| #print "%s Has no partion table. Cannot read partitions" % disk | |
| pass | |
| yield disk | |
| def list_cdroms(proc_info=CD_INFO): | |
| """Find CDROM devices based in the information stored in the proc tree. | |
| """ | |
| with open(proc_info) as file_handler: | |
| cdroms = re.findall(r'([hs]d[a-z]|s[rg]\d*)', file_handler.read(), re.M) | |
| return ( os.path.join("/dev", i) for i in cdroms ) | |
| def list_swap(): | |
| """Returns a list of swap devices. | |
| """ | |
| swaps = [] | |
| for i in Partition.all(): | |
| data = sp.check_output(['blkid', i.path()]) | |
| if "swap" in data: | |
| swaps.append(i.path()) | |
| return swaps |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment