Source code for mcvirt.auth.user_base

"""Provide a base class for user objects."""

# Copyright (c) 2016 - I.T. Dev Ltd
#
# This file is part of MCVirt.
#
# MCVirt is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# MCVirt is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with MCVirt.  If not, see <http://www.gnu.org/licenses/>

import os
import random
import string
from binascii import hexlify
from pbkdf2 import crypt
import Pyro4

from mcvirt.mcvirt_config import MCVirtConfig
from mcvirt.exceptions import UserDoesNotExistException
from mcvirt.rpc.pyro_object import PyroObject
from mcvirt.auth.permissions import PERMISSIONS
from mcvirt.rpc.lock import locking_method


[docs]class UserBase(PyroObject): """Base object for users (both user and automated).""" USER_PREFIX = None CAN_GENERATE = False PERMISSIONS = [] CLUSTER_USER = False DISTRIBUTED = True @property def allow_proxy_user(self): """Connection users can proxy for another user.""" return False @staticmethod def _check_exists(username): """Check the MCVirt config to determine if a given user exists.""" return (username in MCVirtConfig().get_config()['users']) @staticmethod def _generate_salt(): """Generate random salt for the user's password,""" return hexlify(os.urandom(32)) def __init__(self, username): """Store member variables and ensures that the user exists.""" self.username = username self._ensure_exists() @Pyro4.expose()
[docs] def get_username(self): """Return the username of the current user""" return self.username
def _ensure_exists(self): """Ensure that the current user exists in the MCVirt configuration""" if not self.__class__._check_exists(self.get_username()): raise UserDoesNotExistException('User %s does not exist' % self.get_username()) @Pyro4.expose()
[docs] def get_config(self): """Return the configuration of the user.""" self._get_registered_object('auth').assert_permission( PERMISSIONS.MANAGE_USERS ) return self._get_config()
def _get_config(self): """Return the config hash for the current user""" return MCVirtConfig().get_config()['users'][self.get_username()]
[docs] def get_user_type(self): """Return the user type of the user""" return self._get_config()['user_type']
def _check_password(self, password): """Check the given password against the stored password for the user.""" password_hash = self._hash_password(password) config = self._get_config() return (password_hash == config['password']) def _get_password_salt(self): """Return the user's salt""" return self._get_config()['salt'] def _set_password(self, new_password): """Set the password for the current user""" password_hash = self._hash_password(new_password) def update_config(config): config['users'][self.get_username()]['password'] = password_hash MCVirtConfig().update_config( update_config, 'Updated password for \'%s\'' % self.get_username() ) if self.DISTRIBUTED and self._is_cluster_master: def remote_command(node_connection): remote_user_factory = node_connection.get_connection('user_factory') remote_user = remote_user_factory.get_user_by_username(self.get_username()) node_connection.annotate_object(remote_user) remote_user.set_password(new_password) cluster = self._get_registered_object('cluster') cluster.run_remote_command(remote_command) def _hash_password(self, password): """Hash a password, using the current user's salt""" return self.__class__._hash_string(password, self._get_password_salt()) @staticmethod def _hash_string(string, salt): """Hash string using salt""" return crypt(string, salt, iterations=1000) @staticmethod
[docs] def generate_password(length, numeric_only=False): """Return a randomly generated password""" characers = string.ascii_letters if not numeric_only: characers += string.digits + '!@#$%^&*()' random.seed(os.urandom(1024)) return ''.join(random.choice(characers) for i in range(length))
@Pyro4.expose() @locking_method() def delete(self): """Delete the current user from MCVirt config""" auth_object = self._get_registered_object('auth') auth_object.assert_permission( PERMISSIONS.MANAGE_USERS ) # Remove any global/VM-specific permissions if self.get_username() in auth_object.get_superusers(): auth_object.delete_superuser(self) virtual_machine_factory = self._get_registered_object('virtual_machine_factory') for virtual_machine in [None] + virtual_machine_factory.getAllVirtualMachines(): for permission_group in auth_object.get_permission_groups(): if (self.get_username() in auth_object.get_users_in_permission_group( permission_group, vm_object=virtual_machine)): auth_object.delete_user_permission_group( permission_group, self, vm_object=virtual_machine ) def update_config(config): del config['users'][self.get_username()] MCVirtConfig().update_config(update_config, 'Deleted user \'%s\'' % self.get_username()) if self.DISTRIBUTED and self._is_cluster_master: def remote_command(node_connection): remote_user_factory = node_connection.get_connection('user_factory') remote_user = remote_user_factory.get_user_by_username(self.get_username()) node_connection.annotate_object(remote_user) remote_user.delete() cluster = self._get_registered_object('cluster') cluster.run_remote_command(remote_command) @staticmethod
[docs] def get_default_config(): """Return the default configuration for the user type.""" return { 'password': None, 'salt': None, 'user_type': None }