From d450819cd3786788591710d0397ab3fdf67ec06e Mon Sep 17 00:00:00 2001 From: Elijah Date: Wed, 27 Dec 2023 14:32:26 +0100 Subject: [PATCH] Bug fixes and refactored code for better reusability and extensibility --- .../commands/create_example_data.py | 342 +++++++++++------- 1 file changed, 210 insertions(+), 132 deletions(-) diff --git a/idhub/management/commands/create_example_data.py b/idhub/management/commands/create_example_data.py index 4c8b14f..4ee24bb 100644 --- a/idhub/management/commands/create_example_data.py +++ b/idhub/management/commands/create_example_data.py @@ -1,15 +1,18 @@ import random +from typing import Type, Callable, List from django.core.management.base import BaseCommand -from django.db import IntegrityError +from django.db import models, IntegrityError + from idhub.models import Event, Rol, Service, UserRol from idhub_auth.models import User from faker import Faker -DEFAULT_OBJECTS_CREATED = 30 -RANDOM_STRING_LENGTH = 30 -EMAIL_RANDOM_STRING_LENGTH = 10 +DEFAULT_OBJECTS_CREATED: int = 30 +RANDOM_STRING_LENGTH: int = 30 +EMAIL_RANDOM_STRING_LENGTH: int = 10 +EXISTING_EVENTS: int = 30 fake = Faker() @@ -24,54 +27,68 @@ class Command(BaseCommand): """ help = 'Populate the database with dummy values for testing.' - created_users = [] - created_roles = [] - created_services = [] + + def __init__(self, *args, **kwargs): + """ + In the context of a Django management command, + initializing lists like created_users in the constructor ensures + that each run of the command has its own set of users, services, + roles, etc., and does not unintentionally share or retain state + across different invocations of the command. + """ + super().__init__(*args, **kwargs) + self.created_users = [] + self.created_services = [] + self.created_roles = [] def handle(self, *args, **options): - any_option_used = False - - if options["events"]: - self.create_events(options["events"]) - any_option_used = True - if options["users"]: - self.create_users(options["users"]) - any_option_used = True - if options["superusers"]: - self.create_superusers(options["superusers"]) - any_option_used = True - if options["services"]: - self.create_services(options["services"]) - any_option_used = True - if options["roles"]: - self.create_roles(options["roles"]) - any_option_used = True - if options["userroles"]: - self.create_user_roles(options["userroles"]) - any_option_used = True - if options["userrole"]: - user = options["userrole"][0] - service = options["userrole"][1] - self.create_user_roles(1, user, service) - any_option_used = True - if options["register"]: - email = options["register"][0] - password = options["register"][1] - self.create_user(email, password) - any_option_used = True - if options["register_superuser"]: - email = options["register_superuser"][0] - password = options["register_superuser"][1] - self.create_superuser(email, password) - any_option_used = True - - if options["amount"]: - self.create_all(options["amount"]) - any_option_used = True + any_option_used = self.process_options(options) if not any_option_used: self.create_all() + def process_options(self, input_options): + option_methods = { + 'events': self.create_events, + 'users': self.create_users, + 'superusers': self.create_superusers, + 'services': self.create_services, + 'roles': self.create_roles, + 'userroles': self.create_user_roles, + 'userrole': self.create_specific_user_role, + 'register': self.register_user, + 'register_superuser': self.register_superuser, + 'amount': self.create_all + } + + any_option_used = self.match_input_to_options(input_options, + option_methods) + + return any_option_used + + def match_input_to_options(self, input_options, option_methods): + any_option_used = False + + for option, method in option_methods.items(): + is_valid_option = option in input_options and input_options[ + option] is not None + if is_valid_option: + self.call_selected_method(input_options, method, option) + any_option_used = True + + return any_option_used + + def call_selected_method(self, input_options, method, option): + args = self.create_argument_list(input_options, option) + method(*args) + + def create_argument_list(self, input_options, option): + if isinstance(input_options[option], list): + args = input_options[option] + else: + args = [input_options[option]] + return args + def add_arguments(self, parser): parser.add_argument( '--amount', type=int, action='store', @@ -95,11 +112,32 @@ class Command(BaseCommand): metavar=('U', 'S'), ) parser.add_argument('--register', nargs=2, type=str, - help='Create a user with email E and password P', - metavar=('E', 'P')) + help='Create a user with email EMAIL and password PW', + metavar=('EMAIL', 'PW')) parser.add_argument('--register-superuser', nargs=2, type=str, - help='Create a superuser with email E and password P', - metavar=('E', 'P')) + help='Create a superuser with email EMAIL and ' + 'password PW', + metavar=('EMAIL', 'PW')) + + def register_user(self, email: str, password: str): + """ + Register a new user with the given email and password from the command line. + """ + try: + self.create_user(email, password) + self.stdout.write(f"Successfully registered user: {email}") + except IntegrityError: + self.stdout.write(f"Failed to register user: {email}") + + def register_superuser(self, email: str, password: str): + """ + Register a new superuser with the given email and password from the command line. + """ + try: + self.create_superuser(email, password) + self.stdout.write(f"Successfully registered superuser: {email}") + except IntegrityError: + self.stdout.write(f"Failed to register superuser: {email}") def create_all(self, amount=DEFAULT_OBJECTS_CREATED): self.create_events(amount) @@ -108,36 +146,49 @@ class Command(BaseCommand): self.create_services(amount) self.create_user_roles(amount) - def create_events(self, amount, user=None): - created_event_amount = 0 - for value in range(0, amount): - try: - Event.objects.create( - type=random.randint(1, 30), - message=fake.paragraph(nb_sentences=3), - user=user - ) - created_event_amount += 1 - except IntegrityError: - self.stdout.write("Couldn't create event") - self.stdout.write(f"Created {created_event_amount} events") + def create_objects(self, model: Type[models.Model], + data_generator: Callable, amount: int) -> List[ + models.Model]: + """ + Generic method to create objects for a given model and keep track of them. - def create_users(self, amount): - created_user_amount = 0 - for value in range(0, amount): - email = fake.email() - try: - User.objects.create( - email=email, - first_name=fake.first_name(), - last_name=fake.last_name() - ) - self.created_users.append(email) - created_user_amount += 1 - except IntegrityError: - self.stdout.write("Couldn't create user " + email) + Args: + model: The Django model class for which objects are to be created. + data_generator: A function that returns a dictionary of attributes for creating a model instance. + amount: The number of objects to create. - self.stdout.write(f"Created {created_user_amount} users") + Returns: + A list of successfully created model instances. + """ + created_objects = [] + for _ in range(amount): + try: + model_instance = self.create_from_data(data_generator, model) + created_objects.append(model_instance) + except IntegrityError: + self.print_failure_message(model) + + self.print_amount_created(created_objects, model) + return created_objects + + def create_users(self, amount: int): + def user_data(): + return { + 'email': fake.email(), + 'first_name': fake.first_name(), + 'last_name': fake.last_name() + } + created_users = self.create_objects(User, user_data, amount) + self.created_users.extend(created_users) + + def create_events(self, amount: int, user=None): + def event_data(): + return { + 'type': random.randint(1, EXISTING_EVENTS), + 'message': fake.paragraph(nb_sentences=3), + 'user': user + } + self.create_objects(Event, event_data, amount) def create_superusers(self, amount=0): """Superusers can only be created from the specific command""" @@ -151,65 +202,92 @@ class Command(BaseCommand): self.stdout.write("Couldn't create superuser " + email) self.stdout.write(f"Created {created_superuser_amount} users") - def create_services(self, amount): - created_service_amount = 0 - for value in range(0, amount): + def create_services(self, amount: int): + def service_data(): domain = fake.text(max_nb_chars=200) - try: - service = Service.objects.create( - domain=domain, - description=fake.text(max_nb_chars=250) - ) - self.created_services.append(domain) - try: - associated_rol = Rol.objects.get(name=random.choice( - self.created_roles)) - service.rol.add(associated_rol.id) - except IntegrityError: - self.stdout.write( - f"Couldn't associate role with service {domain}") - created_service_amount += 1 - except IntegrityError: - self.stdout.write("Couldn't create service " + domain) - self.stdout.write(f"Created {created_service_amount} services") + description = fake.text(max_nb_chars=250) + return { + 'domain': domain, + 'description': description, + } - def create_roles(self, amount): - created_role_amount = 0 - for value in range(0, amount): + created_services = self.create_objects(Service, service_data, amount) + self.associate_random_roles(created_services) + self.created_services.extend(created_services) + + def create_roles(self, amount: int): + def role_data(): name = fake.job() - try: - Rol.objects.create(name=name, - description=fake.text(max_nb_chars=250) - ) - created_role_amount += 1 - except IntegrityError: - self.stdout.write("Couldn't create role " + name) - self.created_roles.append(name) - self.stdout.write(f"Created {created_role_amount} roles") + description = fake.text(max_nb_chars=250) + return {'name': name, 'description': description} - def create_user_roles(self, amount, user_id=None, service_id=None): - created_user_role_amount = 0 - user_id = user_id if user_id is not None else random.choice( - self.created_users) - service_id = service_id if service_id is not None else random.choice( - self.created_services) - for value in range(0, amount): - try: - user = User.objects.get(email=user_id) - service = Service.objects.get(domain=service_id) - UserRol.objects.create( - user=user, - service=service - ) - created_user_role_amount += 1 - except IntegrityError: - self.stdout.write("Couldn't create user role for user " + user.email) - user_id = random.choice(self.created_users) - service_id = random.choice(self.created_services) - self.stdout.write(f"Created {created_user_role_amount} user roles") + created_roles = self.create_objects(Rol, role_data, amount) + self.created_roles.extend(created_roles) - def create_user(self, email, password): + def create_user_roles(self, amount: int, user_id=None, service_id=None): + def user_role_data(): + user = self.get_or_create_user(user_id) + service = self.get_or_create_service(service_id) + return {"user": user, "service": service} + + self.create_objects(UserRol, user_role_data, amount) + + def create_specific_user_role(self, user, service): + self.create_user_roles(1, user, service) + + def create_user(self, email: str, password: str): User.objects.create_user(email, password) - def create_superuser(self, email, password): + def create_superuser(self, email: str, password: str): User.objects.create_superuser(email, password) + + def print_failure_message(self, model): + self.stdout.write(f"Couldn't create {model.__name__} object.") + + def print_amount_created(self, created_objects, model): + self.stdout.write(f"Created {len(created_objects)} " + f"{model.__name__} objects") + + def create_from_data(self, data_generator, model): + model_instance = model.objects.create(**data_generator()) + return model_instance + + def associate_random_roles(self, created_services): + for service in created_services: + self.associate_service_to_random_roles(service) + + def associate_service_to_random_roles(self, service): + associated_roles = [self.get_or_create_role() for _ in range( + random.randint(0, 2))] + service.rol.set(associated_roles) + + def get_or_create_user(self, user_id): + if user_id is not None: + user = User.objects.get(user_id=user_id) + elif len(self.created_users) != 0: + user = random.choice(self.created_users) + else: + self.create_users(1) + user = self.created_users[0] + + return user + + def get_or_create_service(self, service_id): + if service_id is not None: + service = Service.objects.get(service_id=service_id) + elif len(self.created_services) != 0: + service = random.choice(self.created_services) + else: + self.create_services(1) + service = self.created_services[0] + + return service + + def get_or_create_role(self): + if len(self.created_roles) != 0: + role = random.choice(self.created_roles) + else: + self.create_roles(1) + role = self.created_roles[0] + + return role \ No newline at end of file