Source code for mcvirt.test.virtual_machine.virtual_machine_tests

# Copyright (c) 2014 - I.T. Dev Ltd
#
# This file is part of MCVirt.
#
# MCVirt is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# MCVirt is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with MCVirt.  If not, see <http://www.gnu.org/licenses/>

import unittest
import os
import shutil
import xml.etree.ElementTree as ET
import tempfile
import time

from mcvirt.virtual_machine.virtual_machine import VirtualMachine
from mcvirt.constants import PowerStates, LockStates, DirectoryLocation
from mcvirt.exceptions import (InvalidVirtualMachineNameException,
                               VmAlreadyExistsException,
                               VmDirectoryAlreadyExistsException,
                               VmAlreadyStartedException,
                               VmAlreadyStoppedException,
                               CannotStartClonedVmException,
                               CannotDeleteClonedVmException,
                               CannotCloneDrbdBasedVmsException,
                               VirtualMachineLockException,
                               NetworkDoesNotExistException,
                               DrbdStateException,
                               DrbdNotEnabledOnNode,
                               UnknownStorageTypeException)
from mcvirt.test.test_base import TestBase, skip_drbd
from mcvirt.virtual_machine.hard_drive.drbd import DrbdDiskState


[docs]class VirtualMachineTests(TestBase): """Provide unit tests for the VirtualMachine class""" @staticmethod
[docs] def suite(): """Return a test suite of the Virtual Machine tests""" suite = unittest.TestSuite() suite.addTest(VirtualMachineTests('test_create_local')) suite.addTest(VirtualMachineTests('test_delete_local')) suite.addTest(VirtualMachineTests('test_invalid_name')) suite.addTest(VirtualMachineTests('test_create_duplicate')) suite.addTest(VirtualMachineTests('test_vm_directory_already_exists')) suite.addTest(VirtualMachineTests('test_start_local')) suite.addTest(VirtualMachineTests('test_start_running_vm')) suite.addTest(VirtualMachineTests('test_reset')) suite.addTest(VirtualMachineTests('test_reset_stopped_vm')) suite.addTest(VirtualMachineTests('test_lock')) suite.addTest(VirtualMachineTests('test_stop_local')) suite.addTest(VirtualMachineTests('test_stop_stopped_vm')) suite.addTest(VirtualMachineTests('test_clone_local')) suite.addTest(VirtualMachineTests('test_duplicate_local')) suite.addTest(VirtualMachineTests('test_unspecified_storage_type_local')) suite.addTest(VirtualMachineTests('test_invalid_network_name')) suite.addTest(VirtualMachineTests('test_create_alternative_driver')) suite.addTest(VirtualMachineTests('test_live_iso_change')) # # Add tests for Drbd suite.addTest(VirtualMachineTests('test_create_drbd')) suite.addTest(VirtualMachineTests('test_delete_drbd')) suite.addTest(VirtualMachineTests('test_start_drbd')) suite.addTest(VirtualMachineTests('test_stop_drbd')) suite.addTest(VirtualMachineTests('test_create_drbd_not_enabled')) suite.addTest(VirtualMachineTests('test_offline_migrate')) suite.addTest(VirtualMachineTests('test_clone_drbd')) suite.addTest(VirtualMachineTests('test_duplicate_drbd')) suite.addTest(VirtualMachineTests('test_unspecified_storage_type_drbd')) return suite
[docs] def test_create_local(self): """Perform the test_create test with Local storage""" self.test_create('Local')
@skip_drbd(True) def test_create_drbd(self): """Perform the test_create test with Drbd storage""" self.test_create('Drbd')
[docs] def test_create(self, storage_type): """Test the creation of VMs through the argument parser""" # Ensure VM does not exist self.assertFalse(self.vm_factory.check_exists(self.test_vms['TEST_VM_1']['name'])) # Create virtual machine using parser self.parser.parse_arguments('create %s' % self.test_vms['TEST_VM_1']['name'] + ' --cpu-count %s --disk-size %s --memory %s' % (self.test_vms['TEST_VM_1']['cpu_count'], self.test_vms['TEST_VM_1']['disk_size'][0], self.test_vms['TEST_VM_1']['memory_allocation']) + ' --network %s --storage-type %s' % (self.test_vms['TEST_VM_1']['networks'][0], storage_type)) # Ensure VM exists self.assertTrue(self.vm_factory.check_exists(self.test_vms['TEST_VM_1']['name'])) # Obtain VM object vm_object = self.vm_factory.getVirtualMachineByName(self.test_vms['TEST_VM_1']['name']) self.rpc.annotate_object(vm_object) # Check each of the attributes for VM self.assertEqual(int(vm_object.getRAM()), self.test_vms['TEST_VM_1']['memory_allocation'] * 1024) self.assertEqual(vm_object.getCPU(), str(self.test_vms['TEST_VM_1']['cpu_count'])) # Ensure second VM does not exist self.assertFalse(self.vm_factory.check_exists(self.test_vms['TEST_VM_2']['name'])) # Create second VM self.parser.parse_arguments('create %s' % self.test_vms['TEST_VM_2']['name'] + ' --cpu-count %s --disk-size %s --memory %s' % (self.test_vms['TEST_VM_2']['cpu_count'], self.test_vms['TEST_VM_2']['disk_size'][0], self.test_vms['TEST_VM_2']['memory_allocation']) + ' --network %s --storage-type %s' % (self.test_vms['TEST_VM_2']['networks'][0], storage_type)) # Ensure VM exists self.assertTrue(self.vm_factory.check_exists(self.test_vms['TEST_VM_2']['name'])) # Obtain VM object vm_object_2 = self.vm_factory.getVirtualMachineByName(self.test_vms['TEST_VM_2']['name']) self.rpc.annotate_object(vm_object_2) vm_object_2.delete(True)
@skip_drbd(False) def test_create_drbd_not_enabled(self): """Attempt to create a VM with Drbd storage on a node that doesn't have Drbd enabled""" # Attempt to create VM and ensure exception is thrown with self.assertRaises(DrbdNotEnabledOnNode): self.parser.parse_arguments('create %s' % self.test_vms['TEST_VM_1']['name'] + ' --cpu-count %s --disk-size %s --memory %s' % (self.test_vms['TEST_VM_1']['cpu_count'], self.test_vms['TEST_VM_1']['disk_size'][0], self.test_vms['TEST_VM_1']['memory_allocation']) + ' --network %s --storage-type %s' % (self.test_vms['TEST_VM_1']['networks'][0], 'Drbd'))
[docs] def test_delete_local(self): """Perform the test_delete test with Local storage""" self.test_delete('Local')
@skip_drbd(True) def test_delete_drbd(self): """Perform the test_delete test with Drbd storage""" self.test_delete('Drbd')
[docs] def test_delete(self, storage_type): """Test the deletion of a VM through the argument parser""" # Create Virtual machine self.create_vm('TEST_VM_1', storage_type) # Remove VM using parser self.parser.parse_arguments('delete %s --remove-data' % self.test_vms['TEST_VM_1']['name']) # Ensure VM has been deleted self.assertFalse(self.vm_factory.check_exists(self.test_vms['TEST_VM_1']['name'])) # Ensure that VM directory does not exist self.assertFalse(os.path.exists( VirtualMachine._get_vm_dir(self.test_vms['TEST_VM_1']['name']) ))
[docs] def test_clone_local(self): """Test the VM cloning in MCVirt using the argument parser""" # Create Virtual machine test_vm_parent = self.create_vm('TEST_VM_1', 'Local') test_data = os.urandom(8) # Obtain the disk path for the VM and write random data to it for disk_object in test_vm_parent.getHardDriveObjects(): self.rpc.annotate_object(disk_object) fh = open(disk_object.getDiskPath(), 'w') fh.write(test_data) fh.close() # Clone VM self.parser.parse_arguments( 'clone --template %s %s' % (self.test_vms['TEST_VM_1']['name'], self.test_vms['TEST_VM_2']['name'])) test_vm_clone = self.vm_factory.getVirtualMachineByName( self.test_vms['TEST_VM_2']['name'] ) self.rpc.annotate_object(test_vm_clone) # Check data is present on target VM for disk_object in test_vm_clone.getHardDriveObjects(): self.rpc.annotate_object(disk_object) fh = open(disk_object.getDiskPath(), 'r') self.assertEqual(fh.read(8), test_data) fh.close() # Attempt to start clone test_vm_clone.start() # Attempt to stop clone test_vm_clone.stop() # Attempt to start parent with self.assertRaises(CannotStartClonedVmException): test_vm_parent.start() # Attempt to delete parent with self.assertRaises(CannotDeleteClonedVmException): test_vm_parent.delete(True) # Remove clone test_vm_clone.delete(True) # Remove parent test_vm_parent.delete(True)
@skip_drbd(True) def test_clone_drbd(self): """Attempt to clone a Drbd-based VM""" # Create parent VM test_vm_parent = self.create_vm('TEST_VM_1', 'Drbd') # Attempt to clone VM with self.assertRaises(CannotCloneDrbdBasedVmsException): self.parser.parse_arguments( 'clone --template %s %s' % (self.test_vms['TEST_VM_1']['name'], self.test_vms['TEST_VM_2']['name'])) test_vm_parent.delete(True)
[docs] def test_duplicate_local(self): """Perform test_duplicate test with Local storage""" self.test_duplicate('Local')
@skip_drbd(True) def test_duplicate_drbd(self): """Perform the test_duplicate test with Drbd storage""" self.test_duplicate('Drbd')
[docs] def test_duplicate(self, storage_type): """Attempt to duplicate a VM using the argument parser and perform tests on the parent and duplicate VM """ # Create Virtual machine test_vm_parent = self.create_vm('TEST_VM_1', 'Local') test_data = os.urandom(8) # Obtain the disk path for the VM and write random data to it for disk_object in test_vm_parent.getHardDriveObjects(): self.rpc.annotate_object(disk_object) fh = open(disk_object.getDiskPath(), 'w') fh.write(test_data) fh.close() # Clone VM self.parser.parse_arguments( 'duplicate --template %s %s' % (self.test_vms['TEST_VM_1']['name'], self.test_vms['TEST_VM_2']['name'])) test_vm_duplicate = self.vm_factory.getVirtualMachineByName( self.test_vms['TEST_VM_2']['name'] ) self.rpc.annotate_object(test_vm_duplicate) # Check data is present on target VM for disk_object in test_vm_duplicate.getHardDriveObjects(): self.rpc.annotate_object(disk_object) fh = open(disk_object.getDiskPath(), 'r') self.assertEqual(fh.read(8), test_data) fh.close() # Attempt to start clone test_vm_duplicate.start() # Attempt to stop clone test_vm_duplicate.stop() # Start parent test_vm_parent.start() # Stop parent test_vm_parent.stop() # Remove parent test_vm_parent.delete(True) # Remove duplicate test_vm_duplicate.delete(True)
[docs] def test_invalid_name(self): """Attempt to create a virtual machine with an invalid name""" invalid_vm_name = 'invalid.name+' # Ensure VM does not exist self.assertFalse(self.vm_factory.check_exists(invalid_vm_name)) # Attempt to create VM and ensure exception is thrown with self.assertRaises(InvalidVirtualMachineNameException): self.parser.parse_arguments( 'create "%s"' % invalid_vm_name + ' --cpu-count %s --disk-size %s --memory %s --network %s --storage-type %s' % (self.test_vms['TEST_VM_1']['cpu_count'], self.test_vms['TEST_VM_1']['disk_size'][0], self.test_vms['TEST_VM_1']['memory_allocation'], self.test_vms['TEST_VM_1']['networks'][0], 'Local')) # Ensure VM has not been created self.assertFalse(self.vm_factory.check_exists(invalid_vm_name))
[docs] def test_create_duplicate(self): """Attempt to create two VMs with the same name""" # Create Virtual machine original_memory_allocation = 200 test_vm_object = self.vm_factory.create( self.test_vms['TEST_VM_1']['name'], 1, original_memory_allocation, [100], ['Production'], storage_type='Local') self.rpc.annotate_object(test_vm_object) self.assertTrue(self.vm_factory.check_exists(self.test_vms['TEST_VM_1']['name'])) # Attempt to create VM with duplicate name, ensuring that an exception is thrown with self.assertRaises(VmAlreadyExistsException): self.parser.parse_arguments('create %s' % self.test_vms['TEST_VM_1']['name'] + ' --cpu-count %s --disk-size %s --memory %s' % (self.test_vms['TEST_VM_1']['cpu_count'], self.test_vms['TEST_VM_1']['disk_size'][0], self.test_vms['TEST_VM_1']['memory_allocation']) + ' --network %s --storage-type %s' % (self.test_vms['TEST_VM_1']['networks'][0], 'Local')) # Ensure original VM already exists self.assertTrue(self.vm_factory.check_exists(self.test_vms['TEST_VM_1']['name'])) # Check memory amount of VM matches original VM self.assertEqual(int(test_vm_object.getRAM()), int(original_memory_allocation)) # Remove test VM test_vm_object.delete(True)
[docs] def test_vm_directory_already_exists(self): """Attempt to create a VM whilst the directory for the VM already exists""" # Create the directory for the VM os.makedirs(VirtualMachine._get_vm_dir(self.test_vms['TEST_VM_1']['name'])) # Attempt to create VM, expecting an exception for the directory already existing with self.assertRaises(VmDirectoryAlreadyExistsException): self.parser.parse_arguments('create %s' % self.test_vms['TEST_VM_1']['name'] + ' --cpu-count %s --disk-size %s --memory %s' % (self.test_vms['TEST_VM_1']['cpu_count'], self.test_vms['TEST_VM_1']['disk_size'][0], self.test_vms['TEST_VM_1']['memory_allocation']) + ' --network %s --storage-type %s' % (self.test_vms['TEST_VM_1']['networks'][0], 'Local')) # Ensure the VM has not been created self.assertFalse(self.vm_factory.check_exists(self.test_vms['TEST_VM_1']['name'])) # Remove directory shutil.rmtree(VirtualMachine._get_vm_dir(self.test_vms['TEST_VM_1']['name']))
[docs] def test_start_local(self): """Perform the test_start test with Local storage""" self.test_start('Local')
@skip_drbd(True) def test_start_drbd(self): """Perform the test_start test with Drbd storage""" self.test_start('DRDB')
[docs] def test_start(self, storage_type): """Test starting VMs through the argument parser""" # Create Virtual machine test_vm_object = self.create_vm('TEST_VM_1', storage_type) # Use argument parser to start the VM self.parser.parse_arguments('start %s' % self.test_vms['TEST_VM_1']['name']) # Ensure that it is running self.assertTrue(test_vm_object.getPowerState() is PowerStates.RUNNING.value)
[docs] def test_start_running_vm(self): """Attempt to start a running VM""" # Create Virtual machine and start it test_vm_object = self.create_vm('TEST_VM_1', 'Local') test_vm_object.start() # Use argument parser to start the VM with self.assertRaises(VmAlreadyStartedException): self.parser.parse_arguments( 'start %s' % self.test_vms['TEST_VM_1']['name'])
[docs] def test_reset(self): """Reset a running VM""" # Create Virtual machine and start it test_vm_object = self.create_vm('TEST_VM_1', 'Local') test_vm_object.start() # Use argument parser to reset the VM self.parser.parse_arguments( 'reset %s' % self.test_vms['TEST_VM_1']['name'])
[docs] def test_reset_stopped_vm(self): """Attempt to reset a stopped VM""" # Create Virtual machine and start it self.create_vm('TEST_VM_1', 'Local') # Use argument parser to reset the VM with self.assertRaises(VmAlreadyStoppedException): self.parser.parse_arguments( 'reset %s' % self.test_vms['TEST_VM_1']['name'])
[docs] def test_stop_local(self): """Perform the test_stop test with Local storage""" self.test_stop('Local')
@skip_drbd(True) def test_stop_drbd(self): """Perform the test_stop test with Drbd storage""" self.test_stop('Drbd')
[docs] def test_stop(self, storage_type): """Test stopping VMs through the argument parser""" # Create virtual machine for testing test_vm_object = self.create_vm('TEST_VM_1', storage_type) # Start VM and ensure it is running test_vm_object.start() self.assertTrue(test_vm_object.getPowerState() is PowerStates.RUNNING.value) # Use the argument parser to stop the VM self.parser.parse_arguments( 'stop %s' % self.test_vms['TEST_VM_1']['name']) # Ensure the VM is stopped self.assertTrue(test_vm_object.getPowerState() is PowerStates.STOPPED.value)
[docs] def test_stop_stopped_vm(self): """Attempt to stop an already stopped VM""" # Create Virtual machine self.create_vm('TEST_VM_1', 'Local') # Use argument parser to start the VM with self.assertRaises(VmAlreadyStoppedException): self.parser.parse_arguments( 'stop %s' % self.test_vms['TEST_VM_1']['name'])
@skip_drbd(True) def test_offline_migrate(self): """Test the offline migration of a VM""" test_vm_object = self.create_vm('TEST_VM_1', 'Drbd') # Get the first available remote node for the VM node_name = test_vm_object.get_remote_nodes()[0] # Assert that the VM is registered locally self.assertTrue(test_vm_object.isRegisteredLocally()) # Monitor the hard drives until they are synced for disk_object in test_vm_object.getHardDriveObjects(): self.rpc.annotate_object(disk_object) while ( disk_object.drbdGetDiskState() != ( DrbdDiskState.UP_TO_DATE.value, DrbdDiskState.UP_TO_DATE.value)): time.sleep(5) # Migrate VM to remote node try: self.parser.parse_arguments( 'migrate --node %s %s' % (node_name, test_vm_object.get_name())) except DrbdStateException: # If the migration fails, attempt to manually register locally before failing test_vm_object.register() raise # Ensure the VM node matches the destination node self.assertEqual(test_vm_object.getNode(), node_name) # Attempt to start the VM on the remote node self.parser.parse_arguments('start %s' % self.test_vms['TEST_VM_1']['name']) # Ensure VM is running self.assertTrue(test_vm_object.getPowerState() is PowerStates.RUNNING.value) # Attempt to stop the VM on the remote node self.parser.parse_arguments('stop %s' % self.test_vms['TEST_VM_1']['name']) # Ensure VM is stopped self.assertTrue(test_vm_object.getPowerState() is PowerStates.STOPPED.value) # Delete VM test_vm_object.delete(True) # Ensure VM no longer exists self.assertFalse(self.vm_factory.check_exists(self.test_vms['TEST_VM_1']['name']))
[docs] def test_lock(self): """Exercise VM locking""" # Create a test VM test_vm_object = self.create_vm('TEST_VM_1', 'Local') # Ensure the VM is initially unlocked self.assertEqual(test_vm_object.getLockState(), LockStates.UNLOCKED.value) # Lock the VM, using the argument parser self.parser.parse_arguments('lock --lock %s' % test_vm_object.get_name()) # Ensure the VM is reported as locked self.assertEqual(test_vm_object.getLockState(), LockStates.LOCKED.value) # Attempt to start the VM with self.assertRaises(VirtualMachineLockException): test_vm_object.start() # Attempt to unlock using the argument parser self.parser.parse_arguments('lock --unlock %s' % test_vm_object.get_name()) # Ensure the VM can be started test_vm_object.start() test_vm_object.stop() # Attempt to unlock the VM again, ensuring an exception is thrown with self.assertRaises(VirtualMachineLockException): self.parser.parse_arguments('lock --unlock %s' % test_vm_object.get_name())
@skip_drbd(False) def test_unspecified_storage_type_local(self): """Create a VM without specifying the storage type""" # Create virtual machine using parser, without specifying the storage_type self.parser.parse_arguments('create %s' % self.test_vms['TEST_VM_1']['name'] + ' --cpu-count %s --disk-size %s --memory %s' % (self.test_vms['TEST_VM_1']['cpu_count'], self.test_vms['TEST_VM_1']['disk_size'][0], self.test_vms['TEST_VM_1']['memory_allocation']) + ' --network %s' % self.test_vms['TEST_VM_1']['networks'][0]) # Ensure that the VM exists self.assertTrue(self.vm_factory.check_exists(self.test_vms['TEST_VM_1']['name'])) @skip_drbd(True) def test_unspecified_storage_type_drbd(self): """Create a VM without specifying the storage type""" # Create virtual machine using parser, without specifying the storage_type. # Assert that an exception is thrown as the storage_type has not been specified with self.assertRaises(UnknownStorageTypeException): self.parser.parse_arguments('create %s' % self.test_vms['TEST_VM_1']['name'] + ' --cpu-count %s --disk-size %s --memory %s' % (self.test_vms['TEST_VM_1']['cpu_count'], self.test_vms['TEST_VM_1']['disk_size'][0], self.test_vms['TEST_VM_1']['memory_allocation']) + ' --network %s' % self.test_vms['TEST_VM_1']['networks'][0])
[docs] def test_invalid_network_name(self): """Attempt to create a VM using a network that does not exist""" with self.assertRaises(NetworkDoesNotExistException): self.parser.parse_arguments('create %s' % self.test_vms['TEST_VM_1']['name'] + ' --cpu-count %s --disk-size %s --memory %s' % (self.test_vms['TEST_VM_1']['cpu_count'], self.test_vms['TEST_VM_1']['disk_size'][0], self.test_vms['TEST_VM_1']['memory_allocation']) + ' --network nonexistentnetwork' + ' --storage-type Local')
[docs] def test_create_alternative_driver(self): """Create VMs using alternative hard drive drivers""" for disk_driver in [['IDE', 'ide'], ['VIRTIO', 'virtio'], ['SCSI', 'scsi']]: self.parser.parse_arguments('create %s' % self.test_vms['TEST_VM_1']['name'] + ' --cpu-count %s --disk-size %s --memory %s' % (self.test_vms['TEST_VM_1']['cpu_count'], self.test_vms['TEST_VM_1']['disk_size'][0], self.test_vms['TEST_VM_1']['memory_allocation']) + ' --network %s --storage-type %s' % (self.test_vms['TEST_VM_1']['networks'][0], 'Local') + ' --driver %s' % disk_driver[0]) vm_object = self.vm_factory.getVirtualMachineByName(self.test_vms['TEST_VM_1']['name']) self.rpc.annotate_object(vm_object) domain_xml_string = vm_object.get_libvirt_xml() domain_config = ET.fromstring(domain_xml_string) self.assertEqual( domain_config.find('./devices/disk[@type="block"]/target').get('bus'), disk_driver[1] ) vm_object.delete(True)
[docs] def test_live_iso_change(self): """Change the ISO attached to a running VM""" # Create a test VM and start test_vm_object = self.create_vm('TEST_VM_1', 'Local') test_vm_object.start() # Create temp file, for use as fake ISO temp_file = tempfile.NamedTemporaryFile(dir=DirectoryLocation.ISO_STORAGE_DIR, suffix='.iso') iso_name = temp_file.name.split('/')[-1] iso_path = temp_file.name temp_file.close() fhandle = open(iso_path, 'a') try: fhandle.write('test') os.utime(iso_path, None) finally: fhandle.close() self.parser.parse_arguments('update %s --attach-iso %s' % (self.test_vms['TEST_VM_1']['name'], iso_name)) domain_xml_string = test_vm_object.get_libvirt_xml() domain_config = ET.fromstring(domain_xml_string) self.assertEqual( domain_config.find('./devices/disk[@device="cdrom"]/source').get('file'), iso_path ) test_vm_object.stop() test_vm_object.delete(True)