Source code for mcvirt.test.test_base

# Copyright (c) 2016 - I.T. Dev Ltd
#
# This file is part of MCVirt.
#
# MCVirt is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# MCVirt is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with MCVirt.  If not, see <http://www.gnu.org/licenses/>

import unittest
import threading

from mcvirt.client.rpc import Connection
from mcvirt.parser import Parser
from mcvirt.constants import PowerStates, LockStates


[docs]def skip_drbd(required): def wrapper_gen(f): def wrapper(*args): if (bool(args[0].rpc.get_connection('node_drbd').is_enabled()) != bool(wrapper.required)): return args[0].skipTest(('DRBD either required and not available or' ' can\'t be present and is installed.')) wrapper.required = wrapper_gen.required return wrapper wrapper_gen.required = required return wrapper_gen
[docs]class TestBase(unittest.TestCase): """Provide base test case, with constructor/destructor for providing access to the parser and RPC """ # Define RPC credentials, which are the default superuser credentials # that are supplied with MCVirt RPC_USERNAME = 'mjc' RPC_PASSWORD = 'pass'
[docs] def setUp(self): """Obtain connections to the daemon and create various member variables. """ # Create and store RPC connection to daemon. self.rpc = Connection(self.RPC_USERNAME, self.RPC_PASSWORD) # Create and store parser instance self.parser = Parser(verbose=False) # Obtain the session ID from the RPC connection and re-use this, # so that the parser does not need to authenticate with a password # self.parser.parse_arguments('list --username %s --password %s' % (self.RPC_USERNAME, # self.RPC_PASSWORD)) self.parser.USERNAME = self.RPC_USERNAME self.parser.SESSION_ID = self.rpc.session_id # Setup variable for test VM self.test_vms = \ { 'TEST_VM_1': { 'name': 'mcvirt-unittest-vm', 'cpu_count': 1, 'memory_allocation': 100, 'disk_size': [100], 'networks': ['Production'] }, 'TEST_VM_2': { 'name': 'mcvirt-unittest-vm2', 'cpu_count': 2, 'memory_allocation': 120, 'disk_size': [100], 'networks': ['Production'] } } # Ensure any test VM is stopped and removed from the machine self.stop_and_delete(self.test_vms['TEST_VM_2']['name']) self.stop_and_delete(self.test_vms['TEST_VM_1']['name']) self.vm_factory = self.rpc.get_connection('virtual_machine_factory') self.test_network_name = 'testnetwork' self.test_physical_interface = 'vmbr0' self.network_factory = self.rpc.get_connection('network_factory') # Determine if the test network exists. If so, delete it if self.network_factory.check_exists(self.test_network_name): network = self.network_factory.get_network_by_name(self.test_network_name) self.rpc.annotate_object(network) network.delete()
[docs] def tearDown(self): """Destroy stored objects.""" # Ensure any test VM is stopped and removed from the machine self.rpc.ignore_drbd() self.stop_and_delete(self.test_vms['TEST_VM_2']['name']) self.stop_and_delete(self.test_vms['TEST_VM_1']['name']) # Remove the test network, if it exists if self.network_factory.check_exists(self.test_network_name): network = self.network_factory.get_network_by_name(self.test_network_name) self.rpc.annotate_object(network) network.delete() self.network_factory = None self.vm_factory = None self.rpc = None self.parser = None
[docs] def create_vm(self, vm_name, storage_type): """Create a test VM, annotate object and ensure it exists""" vm_object = self.vm_factory.create(self.test_vms[vm_name]['name'], self.test_vms[vm_name]['cpu_count'], self.test_vms[vm_name]['memory_allocation'], self.test_vms[vm_name]['disk_size'], self.test_vms[vm_name]['networks'], storage_type=storage_type) self.rpc.annotate_object(vm_object) self.assertTrue(self.vm_factory.check_exists(self.test_vms[vm_name]['name'])) return vm_object
[docs] def stop_and_delete(self, vm_name): """Stop and remove a virtual machine""" virtual_machine_factory = self.rpc.get_connection('virtual_machine_factory') if virtual_machine_factory.check_exists(vm_name): vm_object = virtual_machine_factory.getVirtualMachineByName(vm_name) self.rpc.annotate_object(vm_object) # Reset sync state for any Drbd disks for disk_object in vm_object.getHardDriveObjects(): self.rpc.annotate_object(disk_object) if disk_object.get_type() == 'Drbd': disk_object.setSyncState(True) if not vm_object.isRegistered(): # Manually register VM on local node vm_object.register() # Stop the VM if it is running if vm_object.getPowerState() == PowerStates.RUNNING.value: vm_object.stop() if vm_object.getLockState() is LockStates.LOCKED.value: vm_object.setLockState(LockStates.UNLOCKED.value) # Delete VM vm_object.delete(True)