Source code for mcvirt.test.auth_tests

# Copyright (c) 2014 - I.T. Dev Ltd
#
# This file is part of MCVirt.
#
# MCVirt is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# MCVirt is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with MCVirt.  If not, see <http://www.gnu.org/licenses/>

import unittest

from mcvirt.virtual_machine.virtual_machine import VirtualMachine
from mcvirt.auth.auth import Auth
from mcvirt.exceptions import (InsufficientPermissionsException, AuthenticationError,
                               UserDoesNotExistException, DuplicatePermissionException,
                               ArgumentParserException, UserNotPresentInGroup)
from mcvirt.test.test_base import TestBase
from mcvirt.parser import Parser
from mcvirt.client.rpc import Connection


[docs]class AuthTests(TestBase): """Provides unit tests for the Auth class""" TEST_USERNAME = 'test-user' TEST_PASSWORD = 'test-password' TEST_USERNAME_ALTERNATIVE = 'user-to-delete'
[docs] def create_test_user(self, username, password): """Create a test user, annotate the user object and return it""" # Ensure that user does not already exist: try: test_user = self.user_factory.get_user_by_username(username) self.rpc.annotate_object(test_user) test_user.delete() except UserDoesNotExistException: pass self.user_factory.create(username, password) new_user = self.user_factory.get_user_by_username(username) self.rpc.annotate_object(new_user) return new_user
[docs] def parse_command(self, command, username, password): """Parse the specified command with the specified credentials""" Parser(verbose=False).parse_arguments('%s --username %s --password %s' % (command, username, password))
[docs] def setUp(self): """Set up a test user""" super(AuthTests, self).setUp() self.auth = self.rpc.get_connection('auth') self.user_factory = self.rpc.get_connection('user_factory') self.test_user = self.create_test_user(self.TEST_USERNAME, self.TEST_PASSWORD)
[docs] def tearDown(self): """Remove the test user""" # If test_remove_user_account() failed then a user called 'user-to-delete' may still exist test_user = None try: test_user = self.user_factory.get_user_by_username(self.TEST_USERNAME_ALTERNATIVE) self.rpc.annotate_object(test_user) test_user.delete() except UserDoesNotExistException: pass self.test_user.delete() self.test_user = None self.user_factory = None self.auth = None super(AuthTests, self).tearDown()
@staticmethod
[docs] def suite(): """Returns a test suite of the Auth tests""" suite = unittest.TestSuite() suite.addTest(AuthTests('test_add_remove_user_permission')) suite.addTest(AuthTests('test_add_delete_superuser')) suite.addTest(AuthTests('test_attempt_add_superuser_to_vm')) suite.addTest(AuthTests('test_add_duplicate_superuser')) suite.addTest(AuthTests('test_delete_non_existant_superuser')) suite.addTest(AuthTests('test_change_password')) suite.addTest(AuthTests('test_add_new_user')) suite.addTest(AuthTests('test_remove_user_account')) return suite
[docs] def test_add_remove_user_permission(self): """Add a user to a virtual machine, using the argument parser""" # Ensure VM does not exist test_vm_object = self.create_vm('TEST_VM_1', 'Local') # Ensure user is not in 'user' group self.assertFalse( self.test_user.get_username() in self.auth.get_users_in_permission_group('user', test_vm_object) ) # Assert that the test user cannot start the test VM with self.assertRaises(InsufficientPermissionsException): self.parse_command('start %s' % test_vm_object.get_name(), self.TEST_USERNAME, self.TEST_PASSWORD) # Add user to 'user' group using parser self.parser.parse_arguments('permission --add-user %s %s' % (self.test_user.get_username(), test_vm_object.get_name())) # Ensure VM exists self.assertTrue( self.test_user.get_username() in self.auth.get_users_in_permission_group( 'user', test_vm_object) ) # Ensure that user can now start the VM self.parse_command('start %s' % test_vm_object.get_name(), self.TEST_USERNAME, self.TEST_PASSWORD) # Ensure that user can now start the VM self.parse_command('stop %s' % test_vm_object.get_name(), self.TEST_USERNAME, self.TEST_PASSWORD) # Attempt to re-add user to group with self.assertRaises(DuplicatePermissionException): # Add user to 'user' group using parser self.parser.parse_arguments('permission --add-user %s %s' % (self.test_user.get_username(), test_vm_object.get_name())) # Remove user to 'user' group using parser self.parser.parse_arguments('permission --delete-user %s %s' % (self.test_user.get_username(), test_vm_object.get_name())) # Assert that user is no longer part of the group self.assertFalse( self.test_user.get_username() in self.auth.get_users_in_permission_group( 'user', test_vm_object) ) # Assert that the test user cannot stop the test VM with self.assertRaises(InsufficientPermissionsException): self.parse_command('start %s' % test_vm_object.get_name(), username=self.TEST_USERNAME, password=self.TEST_PASSWORD)
[docs] def test_add_delete_superuser(self): """Add/delete a user to/from the superuser role""" # Assert that the user is not already a superuser self.assertFalse( self.TEST_USERNAME in self.RPC_DAEMON.DAEMON.registered_factories['auth'].get_superusers() ) # Add the user to the superuser group using the argument parser self.parser.parse_arguments('permission --add-superuser %s --global' % self.TEST_USERNAME) # Ensure that the auth object asserts that the user is a superuser self.assertTrue( self.TEST_USERNAME in self.RPC_DAEMON.DAEMON.registered_factories['auth'].get_superusers() ) rpc_connection = Connection(username=self.TEST_USERNAME, password=self.TEST_PASSWORD) test_auth = rpc_connection.get_connection('auth') self.assertTrue(test_auth.is_superuser()) # Ensure that user can start a test VM and delete it test_vm = self.create_vm('TEST_VM_1', 'Local') self.parse_command('start %s' % test_vm.get_name(), username=self.TEST_USERNAME, password=self.TEST_PASSWORD) self.parse_command('stop %s' % test_vm.get_name(), username=self.TEST_USERNAME, password=self.TEST_PASSWORD) self.parse_command('delete --remove-data %s' % test_vm.get_name(), username=self.TEST_USERNAME, password=self.TEST_PASSWORD) # Delete the user from the superuser group using the argument parser self.parser.parse_arguments('permission --delete-superuser %s --global' % self.TEST_USERNAME) # Assert that the user is no longer considered a superuser self.assertFalse(test_auth.is_superuser()) # Assert that the user no longer has access to the superuser permission self.assertFalse(test_auth.is_superuser()) self.assertFalse( self.TEST_USERNAME in self.RPC_DAEMON.DAEMON.registered_factories['auth'].get_superusers() )
[docs] def test_attempt_add_superuser_to_vm(self): """Attempts to add a user as a superuser to a VM""" test_vm_object = self.create_vm('TEST_VM_1', 'Local') with self.assertRaises(ArgumentParserException): self.parser.parse_arguments('permission --add-superuser %s %s' % (self.TEST_USERNAME, test_vm_object.get_name()))
[docs] def test_add_duplicate_superuser(self): """Attempts to add a superuser twice""" # Add the user as a superuser self.auth.add_superuser(self.test_user) with self.assertRaises(DuplicatePermissionException): self.parser.parse_arguments('permission --add-superuser %s --global' % self.TEST_USERNAME)
[docs] def test_delete_non_existant_superuser(self): """Attempts to remove a non-existent user from the superuser group""" with self.assertRaises(UserNotPresentInGroup): self.parser.parse_arguments('permission --delete-superuser %s --global' % self.TEST_USERNAME)
[docs] def test_change_password(self): """Change the password of a user through the parser""" # Change the password of the test user new_password = 'new-password-here' self.parse_command('user change-password --new-password %s' % new_password, self.TEST_USERNAME, self.TEST_PASSWORD) # Try to run a command with the old credentials and check an AuthenticationError is raised with self.assertRaises(AuthenticationError): self.parse_command('list', self.TEST_USERNAME, self.TEST_PASSWORD) # Now run a command as the test user with the new password try: self.parse_command('list', self.TEST_USERNAME, new_password) except AuthenticationError: self.fail('Password not changed successfully')
[docs] def test_add_new_user(self): """Create a new user through the parser""" username = 'brand-new-user' password = 'pass' create_command = 'user create %s --user-password %s' % (username, password) # Try to create a new user as the test user and check an InsufficientPermissionsException # is raised with self.assertRaises(InsufficientPermissionsException): self.parse_command(create_command, self.TEST_USERNAME, self.TEST_PASSWORD) # Create the new user as a superuser self.parser.parse_arguments(create_command) # Check the new user exists try: new_user = self.user_factory.get_user_by_username(username) # Delete the new user self.rpc.annotate_object(new_user) new_user.delete() except UserDoesNotExistException: self.fail('User not created') # Create the new user again using the --generate-password flag self.parser.parse_arguments('user create %s --generate-password' % username) # Check the new user exists try: new_user = self.user_factory.get_user_by_username(username) # Delete the new user self.rpc.annotate_object(new_user) new_user.delete() except UserDoesNotExistException: self.fail('User not created')
[docs] def test_remove_user_account(self): """Delete a user through the parser""" self.user_to_delete = self.create_test_user(self.TEST_USERNAME_ALTERNATIVE, 'pass') delete_command = 'user remove %s' % self.TEST_USERNAME_ALTERNATIVE # Try to delete as test user and check InsufficientPermissionsException is raised with self.assertRaises(InsufficientPermissionsException): self.parse_command(delete_command, self.TEST_USERNAME, self.TEST_PASSWORD) # Run delete command as superuser self.parser.parse_arguments(delete_command) # Check that the user no longer exists with self.assertRaises(UserDoesNotExistException): self.user_factory.get_user_by_username(self.TEST_USERNAME_ALTERNATIVE) self.user_to_delete = None