"""Provide factory class for ISO"""
# Copyright (c) 2016 - I.T. Dev Ltd
#
# This file is part of MCVirt.
#
# MCVirt is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# MCVirt is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with MCVirt. If not, see <http://www.gnu.org/licenses/
import os
import urllib2
import urlparse
import tempfile
import shutil
import binascii
import Pyro4
from mcvirt.iso.iso import Iso
from mcvirt.rpc.pyro_object import PyroObject
from mcvirt.rpc.lock import locking_method
from mcvirt.constants import DirectoryLocation
from mcvirt.exceptions import InvalidISOPathException
[docs]class Factory(PyroObject):
"""Class for obtaining ISO objects"""
ISO_CLASS = Iso
[docs] def get_isos(self):
"""Return a list of a ISOs"""
# Get files in ISO directory
file_list = os.listdir(DirectoryLocation.ISO_STORAGE_DIR)
iso_list = []
for iso_name in file_list:
iso_path = os.path.join(DirectoryLocation.ISO_STORAGE_DIR, iso_name)
if os.path.isfile(iso_path):
iso_list.append(iso_name)
return iso_list
@Pyro4.expose()
[docs] def get_iso_by_name(self, iso_name):
"""Create and register Iso object"""
iso_object = Iso(iso_name)
self._register_object(iso_object)
return iso_object
@Pyro4.expose()
[docs] def get_iso_list(self):
"""Return a user-readable list of ISOs"""
iso_list = self.get_isos()
if len(iso_list) == 0:
return 'No ISOs found'
else:
return "\n".join(iso_list)
[docs] def add_iso(self, path):
"""Copy an ISO to ISOs directory"""
# Check that file exists
if not os.path.isfile(path):
raise InvalidISOPathException('Error: \'%s\' is not a file or does not exist' % path)
filename = Iso.get_filename_from_path(path)
Iso.overwrite_check(filename, DirectoryLocation.ISO_STORAGE_DIR + '/' + filename)
shutil.copy(path, DirectoryLocation.ISO_STORAGE_DIR)
return self.get_iso_by_name(filename)
@Pyro4.expose()
@locking_method()
def add_from_url(self, url, name=None):
"""Download an ISO from given URL and save in ISO directory"""
# Work out name from URL if name is not supplied
if name is None:
# Parse URL to get path part
url_parse = urlparse.urlparse(url)
name = Iso.get_filename_from_path(url_parse.path)
# Get temporary directory to store ISO
temp_directory = tempfile.mkdtemp()
output_path = temp_directory + '/' + name
# Open file
iso = urllib2.urlopen(url)
# Read file in 16KB chunks
chunk_size = 16 * 1024
# Save ISO
with open(output_path, 'wb') as file:
while True:
chunk = iso.read(chunk_size)
if not chunk:
break
file.write(chunk)
iso.close()
iso_object = self.add_iso(output_path)
os.remove(output_path)
os.rmdir(temp_directory)
return iso_object
@Pyro4.expose()
@locking_method()
def add_iso_from_stream(self, path, name=None):
"""Import ISO, writing binary data to the ISO file"""
if name is None:
name = Iso.get_filename_from_path(path)
# Get temporary directory to store ISO
temp_directory = tempfile.mkdtemp()
output_path = temp_directory + '/' + name
iso_writer = IsoWriter(output_path, self, temp_directory, path)
self._register_object(iso_writer)
return iso_writer
[docs]class IsoWriter(PyroObject):
"""Provide an interface for writing ISOs"""
def __init__(self, temp_file, factory, temp_directory, path):
"""Set methods to be able to create ISO from temp path."""
self.temp_file = temp_file
self.temp_directory = temp_directory
self.factory = factory
self.path = path
self.fh = open(self.temp_file, 'wb')
def __delete__(self):
"""Close FH on object deletion"""
if self.fh:
self.fh.close()
self.fh = None
@Pyro4.expose()
[docs] def write_data(self, data):
"""Write data to the ISO file"""
self.fh.write(binascii.unhexlify(data))
@Pyro4.expose()
[docs] def write_end(self):
"""End writing object, close FH and import ISO"""
if self.fh:
self.fh.close()
self.fh = None
iso_object = self.factory.add_iso(self.temp_file)
os.remove(self.temp_file)
os.rmdir(self.temp_directory)
return iso_object