Source code for mcvirt.cluster.cluster

"""Provide cluster classes"""

# Copyright (c) 2014 - I.T. Dev Ltd
#
# This file is part of MCVirt.
#
# MCVirt is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# MCVirt is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with MCVirt.  If not, see <http://www.gnu.org/licenses/>

import json
import base64
import Pyro4
import socket
from texttable import Texttable

from mcvirt.utils import get_hostname
from mcvirt.exceptions import (NodeAlreadyPresent, NodeDoesNotExistException,
                               RemoteObjectConflict, ClusterNotInitialisedException,
                               InvalidConnectionString, DrbdNotInstalledException,
                               CouldNotConnectToNodeException, InaccessibleNodeException,
                               MissingConfigurationException, NodeVersionMismatch)
from mcvirt.mcvirt_config import MCVirtConfig
from mcvirt.auth.connection_user import ConnectionUser
from mcvirt.auth.permissions import PERMISSIONS
from mcvirt.client.rpc import Connection
from mcvirt.rpc.lock import locking_method
from mcvirt.cluster.remote import Node
from mcvirt.rpc.pyro_object import PyroObject
from mcvirt.syslogger import Syslogger


[docs]class Cluster(PyroObject): """Class to perform node management within the MCVirt cluster""" @Pyro4.expose()
[docs] def generate_connection_info(self): """Generate required information to connect to this node from a remote node""" # Ensure user has required permissions self._get_registered_object('auth').assert_permission( PERMISSIONS.MANAGE_CLUSTER ) # Ensure that the IP address configurations has been made correctly self.check_ip_configuration() # Create connection user user_factory = self._get_registered_object('user_factory') connection_username, connection_password = user_factory.generate_user(ConnectionUser) ssl_object = self._get_registered_object( 'certificate_generator_factory').get_cert_generator(get_hostname()) return [get_hostname(), self.get_cluster_ip_address(), connection_username, connection_password, ssl_object.get_ca_contents()]
@Pyro4.expose()
[docs] def get_connection_string(self): """Generate a string to connect to this node from a remote cluster""" # Only superusers can generate a connection string self._get_registered_object('auth').assert_permission( PERMISSIONS.MANAGE_CLUSTER ) # Generate dict with connection information. Convert to JSON and base64 encode connection_info = self.generate_connection_info() connection_info_dict = { 'hostname': connection_info[0], 'ip_address': connection_info[1], 'username': connection_info[2], 'password': connection_info[3], 'ca_cert': connection_info[4] } connection_info_json = json.dumps(connection_info_dict) return base64.b64encode(connection_info_json)
@Pyro4.expose()
[docs] def print_info(self): """Print information about the nodes in the cluster""" table = Texttable() table.set_deco(Texttable.HEADER | Texttable.VLINES) table.header(('Node', 'IP Address', 'Status')) # Add this node to the table table.add_row((get_hostname(), self.get_cluster_ip_address(), 'Local')) # Add remote nodes for node in self.get_nodes(return_all=True): node_config = self.get_node_config(node) node_status = 'Unreachable' try: self.get_remote_node(node) node_status = 'Connected' except CouldNotConnectToNodeException: pass table.add_row((node, node_config['ip_address'], node_status)) return table.draw()
[docs] def check_node_versions(self): """Ensure that all nodes in the cluster are connected and checks the node Status """ def check_version(connection): node = connection.get_connection('node') return node.get_version() node_versions = self.run_remote_command(check_version) local_version = self._get_registered_object('node').get_version() for node in node_versions: if node_versions[node] != local_version: raise NodeVersionMismatch('Node %s is running MCVirt %s. Local version: %s' % (node, node_versions[node], local_version))
@Pyro4.expose() @locking_method() def add_node_configuration(self, node_name, ip_address, connection_user, connection_password, ca_key): """Add MCVirt node to configuration, generates a cluster user on the remote node and stores credentials against node in the MCVirt configuration. """ self._get_registered_object('auth').assert_permission(PERMISSIONS.MANAGE_CLUSTER) # Create CA file ssl_object = self._get_registered_object( 'certificate_generator_factory').get_cert_generator(node_name) ssl_object.ca_pub_file = ca_key # Connect to node and obtain cluster user remote = Connection(username=connection_user, password=connection_password, host=node_name) remote_user_factory = remote.get_connection('user_factory') connection_user = remote_user_factory.get_user_by_username(connection_user) remote.annotate_object(connection_user) username, password = connection_user.create_cluster_user(host=get_hostname()) # Add node to configuration file def add_node_config(mcvirt_config): mcvirt_config['cluster']['nodes'][node_name] = { 'ip_address': ip_address, 'username': username, 'password': password } MCVirtConfig().update_config(add_node_config)
[docs] def check_ip_configuration(self): """Perform various checks to ensure that the IP configuration is such that is suitable to be part of a cluster """ # Ensure that the cluster IP address has been defined cluster_ip = self.get_cluster_ip_address() if not cluster_ip: raise MissingConfigurationException('IP address has not yet been configured') # Ensure that the hostname of the local machine does not resolve # to 127.0.0.1 if socket.gethostbyname(get_hostname()).startswith('127.'): raise MissingConfigurationException(('Node hostname %s resolves to the localhost.' ' Instead it should resolve to the cluster' ' IP address')) resolve_ip = socket.gethostbyname(get_hostname()) if resolve_ip != cluster_ip: raise MissingConfigurationException(('The local hostname (%s) should resolve the' ' cluster IP address (%s). Instead it resolves' ' to \'%s\'. Please correct this issue before' ' continuing.') % (get_hostname(), cluster_ip, resolve_ip))
@Pyro4.expose() @locking_method() def add_node(self, node_connection_string): """Connect to a remote MCVirt machine, setup shared authentication and clusters the machines. """ # Ensure the user has privileges to manage the cluster self._get_registered_object('auth').assert_permission(PERMISSIONS.MANAGE_CLUSTER) # Ensure that the IP address configurations has been made correctly self.check_ip_configuration() try: config_json = base64.b64decode(node_connection_string) node_config = json.loads(config_json) assert 'username' in node_config and node_config['username'] assert 'password' in node_config and node_config['password'] assert 'ip_address' in node_config and node_config['ip_address'] assert 'hostname' in node_config and node_config['hostname'] assert 'ca_cert' in node_config and node_config['ca_cert'] except (TypeError, ValueError, AssertionError): raise InvalidConnectionString('Connection string is invalid') # Determine if node is already connected to cluster if self.check_node_exists(node_config['hostname']): raise NodeAlreadyPresent( 'Node %s is already connected to the cluster' % node_config['hostname']) # Create CA public key for machine ssl_object = self._get_registered_object( 'certificate_generator_factory' ).get_cert_generator(node_config['hostname']) ssl_object.ca_pub_file = node_config['ca_cert'] # Check remote machine, to ensure it can be synced without any # conflicts remote = Connection(username=node_config['username'], password=node_config['password'], host=node_config['hostname']) self.check_remote_machine(remote) remote = None original_cluster_nodes = self.get_nodes() # Add remote node self.add_node_configuration(node_name=node_config['hostname'], ip_address=node_config['ip_address'], connection_user=node_config['username'], connection_password=node_config['password'], ca_key=node_config['ca_cert']) # Obtain node connection to new node remote_node = self.get_remote_node(node_config['hostname']) # Generate local connection user for new remote node local_connection_info = self.generate_connection_info() # Add the local node to the new remote node remote_cluster_instance = remote_node.get_connection('cluster') remote_cluster_instance.add_node_configuration( node_name=local_connection_info[0], ip_address=local_connection_info[1], connection_user=local_connection_info[2], connection_password=local_connection_info[3], ca_key=local_connection_info[4] ) new_node_cert_gen_factory = remote_node.get_connection('certificate_generator_factory') # Create client certificates for libvirt for the new node to connect to the # current cluster node new_node_cert_gen = new_node_cert_gen_factory.get_cert_generator(get_hostname()) remote_node.annotate_object(new_node_cert_gen) # Generate CSR csr = new_node_cert_gen.generate_csr() # Sign CSR cert_gen_factory = self._get_registered_object('certificate_generator_factory') cert_gen = cert_gen_factory.get_cert_generator(node_config['hostname'], remote=True) pub_key = cert_gen.sign_csr(csr) # Add public key to new node new_node_cert_gen.add_public_key(pub_key) # Create client certificate for libvirt for the current cluster node to connect # to the new node cert_gen = cert_gen_factory.get_cert_generator(node_config['hostname']) # Generate CSR csr = cert_gen.generate_csr() # Sign CSR new_node_cert_gen = new_node_cert_gen_factory.get_cert_generator( get_hostname(), remote=True) remote_node.annotate_object(new_node_cert_gen) pub_key = new_node_cert_gen.sign_csr(csr) # Add public key to local node cert_gen.add_public_key(pub_key) # Sync credentials to/from old nodes in the cluster for original_node in original_cluster_nodes: # Share connection information between cluster node and new node original_node_remote = self.get_remote_node(original_node) original_cluster = original_node_remote.get_connection('cluster') original_node_con_info = original_cluster.generate_connection_info() remote_cluster_instance.add_node_configuration( node_name=original_node_con_info[0], ip_address=original_node_con_info[1], connection_user=original_node_con_info[2], connection_password=original_node_con_info[3], ca_key=original_node_con_info[4] ) new_node_con_info = remote_cluster_instance.generate_connection_info() original_cluster.add_node_configuration(node_name=new_node_con_info[0], ip_address=new_node_con_info[1], connection_user=new_node_con_info[2], connection_password=new_node_con_info[3], ca_key=new_node_con_info[4]) # Create client certificates for libvirt for the new node to connect to the # current cluster node new_node_cert_gen = new_node_cert_gen_factory.get_cert_generator(original_node) remote_node.annotate_object(new_node_cert_gen) csr = new_node_cert_gen.generate_csr() original_node_cert_gen_factory = original_node_remote.get_connection( 'certificate_generator_factory') original_node_cert_gen = original_node_cert_gen_factory.get_cert_generator( node_config['hostname'], remote=True ) original_node_remote.annotate_object(original_node_cert_gen) pub_key = original_node_cert_gen.sign_csr(csr) new_node_cert_gen.add_public_key(pub_key) # Create client certificate for libvirt for the current cluster node to connect # to the new node original_node_cert_gen = original_node_cert_gen_factory.get_cert_generator(node_config[ 'hostname']) original_node_remote.annotate_object(original_node_cert_gen) # Generate CSR csr = original_node_cert_gen.generate_csr() # Sign CSR new_node_cert_gen = new_node_cert_gen_factory.get_cert_generator( original_node, remote=True) remote_node.annotate_object(new_node_cert_gen) pub_key = new_node_cert_gen.sign_csr(csr) # Add public key to original node original_node_cert_gen.add_public_key(pub_key) # If Drbd is enabled on the local node, configure/enable it on the remote node if self._get_registered_object('node_drbd').is_enabled(): remote_drbd = remote_node.get_connection('node_drbd') remote_drbd.enable(secret=MCVirtConfig().get_config()['drbd']['secret']) # Sync users self.sync_users(remote_node) # Sync networks self.sync_networks(remote_node) # Sync global permissions self.sync_permissions(remote_node) # Sync VMs self.sync_virtual_machines(remote_node)
[docs] def sync_users(self, remote_node): """Synchronise the local users with the remote node""" # Remove all users on the remote node remote_user_factory = remote_node.get_connection('user_factory') for remote_user in remote_user_factory.get_all_users(): remote_node.annotate_object(remote_user) remote_user.delete() user_factory = self._get_registered_object('user_factory') for user in user_factory.get_all_users(): remote_user_factory.add_config(user.get_username(), user.get_config())
[docs] def sync_networks(self, remote_object): """Add the local networks to the remote node""" network_factory = self._get_registered_object('network_factory') # Remove all networks from remote node remote_network_factory = remote_object.get_connection('network_factory') for remote_network in remote_network_factory.get_all_network_objects(): remote_object.annotate_object(remote_network) remote_network.delete() for network in network_factory.get_all_network_objects(): remote_network_factory.create(name=network.get_name(), physical_interface=network.get_adapter())
[docs] def sync_permissions(self, remote_object): """Duplicate the global permissions on the local node onto the remote node""" auth_instance = self._get_registered_object('auth') remote_auth_instance = remote_object.get_connection('auth') remote_user_factory = remote_object.get_connection('user_factory') # Sync superusers for superuser in auth_instance.get_superusers(): remote_user_object = remote_user_factory.get_user_by_username(superuser) remote_object.annotate_object(remote_user_object) remote_auth_instance.add_superuser(remote_user_object) # Iterate over the permission groups, adding all of the members to the group # on the remote node for group in auth_instance.get_permission_groups(): users = auth_instance.get_users_in_permission_group(group) for user in users: user_object = remote_user_factory.get_user_by_username(user) remote_object.annotate_object(user_object) remote_auth_instance.add_user_permission_group(group, user_object)
[docs] def sync_virtual_machines(self, remote_object): """Duplicate the VM configurations on the local node onto the remote node""" virtual_machine_factory = self._get_registered_object('virtual_machine_factory') network_adapter_factory = self._get_registered_object('network_adapter_factory') remote_virtual_machine_factory = remote_object.get_connection('virtual_machine_factory') # Obtain list of local VMs for vm_object in virtual_machine_factory.getAllVirtualMachines(): remote_virtual_machine_object = remote_virtual_machine_factory.create( name=vm_object.get_name(), cpu_cores=vm_object.getCPU(), memory_allocation=vm_object.getRAM(), hard_drives=[], node=vm_object.getNode(), available_nodes=vm_object.getAvailableNodes() ) remote_object.annotate_object(remote_virtual_machine_object) # Add each of the disks to the VM for hard_disk in vm_object.getHardDriveObjects(): remote_hard_drive_object = hard_disk.get_remote_object(remote_node=remote_object, registered=False) remote_hard_drive_object.addToVirtualMachine() remote_network_factory = remote_object.get_connection('network_factory') remote_network_adapter_factory = remote_object.get_connection( 'network_adapter_factory' ) network_adapters = network_adapter_factory.getNetworkAdaptersByVirtualMachine( vm_object ) for network_adapter in network_adapters: # Add network adapters to VM remote_network = remote_network_factory.get_network_by_name( network_adapter.getConnectedNetwork()) remote_network_adapter_factory.create(remote_virtual_machine_object, remote_network, mac_address=network_adapter.getMacAddress()) # Sync permissions to VM on remote node auth_instance = self._get_registered_object('auth') remote_auth_instance = remote_object.get_connection('auth') remote_user_factory = remote_object.get_connection('user_factory') for group in auth_instance.get_permission_groups(): users = auth_instance.get_users_in_permission_group(group, vm_object) for user in users: user_object = remote_user_factory.get_user_by_username(user) remote_object.annotate_object(user_object) remote_auth_instance.add_user_permission_group(group, user_object, remote_virtual_machine_object) # Set the VM node remote_virtual_machine_object.setNodeRemote(vm_object.getNode())
[docs] def check_remote_machine(self, remote_connection): """Perform checks on the remote node to ensure that there will be no object conflicts when syncing the Network and VM configurations """ # Ensure that the remote node has no cluster nodes remote_cluster = remote_connection.get_connection('cluster') if len(remote_cluster.get_nodes(return_all=True)): raise RemoteObjectConflict('Remote node already has nodes attached') # Determine if any of the local networks/VMs exist on the remote node remote_network_factory = remote_connection.get_connection('network_factory') # Check that each of the interfaces, used for the networks, is present on the # remote node network_factory = self._get_registered_object('network_factory') for local_network in network_factory.get_all_network_objects(): if not remote_network_factory.interface_exists(local_network.get_adapter()): raise RemoteObjectConflict('Network interface %s does not exist on remote node' % local_network.get_adapter()) # Determine if there are any VMs on the remote node remote_virtual_machine_factory = remote_connection.get_connection( 'virtual_machine_factory') if len(remote_virtual_machine_factory.getAllVirtualMachines()): raise RemoteObjectConflict(('Target node contains VMs.' ' These must be removed before adding to a cluster')) # If Drbd is enabled on the local machine, ensure it is installed on the remote machine # and is not already enabled remote_node_drbd = remote_connection.get_connection('node_drbd') if self._get_registered_object('node_drbd').is_enabled(): if not remote_node_drbd.is_installed(): raise DrbdNotInstalledException('Drbd is not installed on the remote node') if remote_node_drbd.is_enabled(): raise DrbdNotInstalledException('Drbd is already enabled on the remote node')
@Pyro4.expose() @locking_method() def remove_node(self, node_name_to_remove): """Remove a node from the MCVirt cluster""" # Ensure the user has privileges to manage the cluster self._get_registered_object('auth').assert_permission(PERMISSIONS.MANAGE_CLUSTER) # Ensure node exists self.ensure_node_exists(node_name_to_remove) # Check for any VMs that the node, to be removed, is available to vm_factory = self._get_registered_object('virtual_machine_factory') all_vm_objects = vm_factory.getAllVirtualMachines() for vm_object in all_vm_objects: vm_available_nodes = vm_object.getAvailableNodes() if len(vm_available_nodes) > 1 and node_name_to_remove in vm_available_nodes: raise RemoteObjectConflict('The remote node is available to VM: %s' % vm_object.get_name()) # Get a list of remote cluster nodes that will remain in the cluster. all_nodes = self.get_nodes(return_all=True) all_nodes.remove(node_name_to_remove) def remove_vm(remote_connection, vm_name): remote_vm_factory = remote_connection.get_connection('virtual_machine_factory') remote_vm = remote_vm_factory.getVirtualMachineByName(vm_name) remote_connection.annotate_object(remote_vm) remote_vm.delete(remove_data=True, local_only=True) # Remove any VMs that are only present on the remote node node_to_remove_con = self.get_remote_node(node_name_to_remove) for vm_object in all_vm_objects: if vm_object.getAvailableNodes() == [node_name_to_remove]: vm_object.delete(remove_data=True, local_only=True) self.run_remote_command(callback_method=remove_vm, nodes=all_nodes, kwargs={'vm_name': vm_object.get_name()}) else: remove_vm(node_to_remove_con, vm_object.get_name()) # Remove the SSL certificates from the other nodes self._remove_node_ssl_certificates(node_name_to_remove) @Pyro4.expose()
[docs] def remove_node_ssl_certificates(self, remote_node): """Exposed method for _remove_node_ssl_certificates""" self._get_registered_object('auth').check_user_type('ClusterUser') self._remove_node_ssl_certificates(remote_node)
def _remove_node_ssl_certificates(self, remote_node): """Remove the SSL certificates relating to a node that is being removed from the cluster """ if self._is_cluster_master: def remove_auth(node_connection, remove_nodes): # Removes the SSL certificates for the remote node remote_cluster = node_connection.get_connection('cluster') for remove_node in remove_nodes: remote_cluster.remove_node_ssl_certificates(remove_node) # For all remaining nodes in the cluster, remove all SSL certificates # and cluster user for node being removed. other_nodes = self.get_nodes() other_nodes.remove(remote_node) self.run_remote_command(callback_method=remove_auth, nodes=other_nodes, kwargs={'remove_nodes': [remote_node]}) # Remove Credentials for all nodes in cluster from node being removed other_nodes.append(get_hostname()) self.run_remote_command(callback_method=remove_auth, nodes=[remote_node], kwargs={'remove_nodes': other_nodes}) # Remove authentication from the local node to the node to be removed cert_generator = self._get_registered_object( 'certificate_generator_factory' ).get_cert_generator(remote_node) cert_generator.remove_certificates() # Remove local cluster user user_factory = self._get_registered_object('user_factory') user = user_factory.get_cluster_user_by_node(remote_node) user.delete() # Remove configuration for remote node from local config self.remove_node_configuration(remote_node)
[docs] def get_cluster_ip_address(self): """Return the cluster IP address of the local node""" cluster_config = self.get_cluster_config() return cluster_config['cluster_ip']
[docs] def get_remote_node(self, node, ignore_cluster_master=False): """Obtain a Remote object for a node, caching the object""" if not self._is_cluster_master and not ignore_cluster_master: raise ClusterNotInitialisedException('Cannot get remote node %s' % node + ' as the cluster is not initialised') node_config = self.get_node_config(node) try: node_object = Node(node, node_config) except: if not self._cluster_disabled: raise InaccessibleNodeException('Cannot connect to node \'%s\'' % node) else: Syslogger.logger().error('Cannot connect to node: %s (Ignored)' % node) node_object = None return node_object
[docs] def get_cluster_config(self): """Get the MCVirt cluster configuration""" return MCVirtConfig().get_config()['cluster']
[docs] def get_node_config(self, node): """Return the configuration for a node""" self.ensure_node_exists(node) return self.get_cluster_config()['nodes'][node]
@Pyro4.expose()
[docs] def get_nodes(self, return_all=False): """Return an array of node configurations""" cluster_config = self.get_cluster_config() nodes = cluster_config['nodes'].keys() if self._cluster_disabled and not return_all: for node in nodes: if not node: nodes.remove(node) return nodes
[docs] def run_remote_command(self, callback_method, nodes=None, args=[], kwargs={}): """Run a remote command on all (or a given list of) remote nodes""" return_data = {} # If the user has not specified a list of nodes, obtain all remote nodes if nodes is None: nodes = self.get_nodes() for node in nodes: node_object = self.get_remote_node(node) if node_object is not None: return_data[node] = callback_method(node_object, *args, **kwargs) return return_data
[docs] def check_node_exists(self, node_name): """Determine if a node is already present in the cluster""" return (node_name in self.get_nodes(return_all=True))
[docs] def ensure_node_exists(self, node): """Check if node exists and throws exception if it does not""" if not self.check_node_exists(node): raise NodeDoesNotExistException('Node %s does not exist' % node)
[docs] def remove_node_configuration(self, node_name): """Remove an MCVirt node from the configuration and regenerates authorized_keys file """ def remove_node_config(mcvirt_config): del(mcvirt_config['cluster']['nodes'][node_name]) MCVirtConfig().update_config(remove_node_config)