This repository has been archived on 2024-05-31. You can view files and clone it, but cannot push or open issues or pull requests.
authentik/authentik/flows/migrations/0018_oob_flows.py
Jens Langhammer ca35204e0c flows: ask for email address in oob flow
Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>
2021-04-08 22:50:58 +02:00

151 lines
5.3 KiB
Python

# Generated by Django 3.1.7 on 2021-04-06 13:25
from django.apps.registry import Apps
from django.contrib.auth.hashers import is_password_usable
from django.db import migrations
from django.db.backends.base.schema import BaseDatabaseSchemaEditor
from authentik.flows.models import FlowDesignation
PW_USABLE_POLICY_EXPRESSION = """# This policy ensures that the setup flow can only be
# executed when the admin user doesn't have a password set
akadmin = ak_user_by(username="akadmin")
return not akadmin.has_usable_password()"""
PREFILL_POLICY_EXPRESSION = """# This policy sets the user for the currently running flow
# by injecting "pending_user"
akadmin = ak_user_by(username="akadmin")
context["pending_user"] = akadmin
# We're also setting the backend for the user, so we can
# directly login without having to identify again
context["user_backend"] = "django.contrib.auth.backends.ModelBackend"
return True"""
def create_default_oob_flow(apps: Apps, schema_editor: BaseDatabaseSchemaEditor):
from authentik.stages.prompt.models import FieldTypes
User = apps.get_model("authentik_core", "User")
PolicyBinding = apps.get_model("authentik_policies", "PolicyBinding")
Flow = apps.get_model("authentik_flows", "Flow")
FlowStageBinding = apps.get_model("authentik_flows", "FlowStageBinding")
UserLoginStage = apps.get_model("authentik_stages_user_login", "UserLoginStage")
UserWriteStage = apps.get_model("authentik_stages_user_write", "UserWriteStage")
PromptStage = apps.get_model("authentik_stages_prompt", "PromptStage")
Prompt = apps.get_model("authentik_stages_prompt", "Prompt")
ExpressionPolicy = apps.get_model(
"authentik_policies_expression", "ExpressionPolicy"
)
db_alias = schema_editor.connection.alias
# Only create the flow if the akadmin user exists,
# and has an un-usable password
akadmins = User.objects.filter(username="akadmin")
if not akadmins.exists():
return
akadmin = akadmins.first()
if is_password_usable(akadmin.password):
return
# Create a policy that sets the flow's user
prefill_policy, _ = ExpressionPolicy.objects.using(db_alias).update_or_create(
name="default-oob-prefill-user",
defaults={"expression": PREFILL_POLICY_EXPRESSION},
)
password_usable_policy, _ = ExpressionPolicy.objects.using(
db_alias
).update_or_create(
name="default-oob-password-usable",
defaults={"expression": PW_USABLE_POLICY_EXPRESSION},
)
prompt_header, _ = Prompt.objects.using(db_alias).update_or_create(
field_key="oob-header-text",
defaults={
"label": "oob-header-text",
"type": FieldTypes.STATIC,
"placeholder": "Welcome to authentik! Please set a password for the default admin user, akadmin.",
"order": 100,
},
)
prompt_email, _ = Prompt.objects.using(db_alias).update_or_create(
field_key="email",
defaults={
"label": "Email",
"type": FieldTypes.EMAIL,
"placeholder": "Admin email",
"order": 101,
},
)
password_first = Prompt.objects.using(db_alias).get(field_key="password")
password_second = Prompt.objects.using(db_alias).get(field_key="password_repeat")
prompt_stage, _ = PromptStage.objects.using(db_alias).update_or_create(
name="default-oob-password",
)
prompt_stage.fields.set(
[prompt_header, prompt_email, password_first, password_second]
)
prompt_stage.save()
user_write, _ = UserWriteStage.objects.using(db_alias).update_or_create(
name="default-password-change-write"
)
login_stage, _ = UserLoginStage.objects.using(db_alias).update_or_create(
name="default-authentication-login"
)
flow, _ = Flow.objects.using(db_alias).update_or_create(
slug="initial-setup",
designation=FlowDesignation.STAGE_CONFIGURATION,
defaults={
"name": "default-oob-setup",
"title": "Welcome to authentik!",
},
)
PolicyBinding.objects.using(db_alias).update_or_create(
policy=password_usable_policy, target=flow, defaults={"order": 0}
)
FlowStageBinding.objects.using(db_alias).update_or_create(
target=flow,
stage=prompt_stage,
defaults={
"order": 10,
},
)
user_write_binding, _ = FlowStageBinding.objects.using(db_alias).update_or_create(
target=flow,
stage=user_write,
defaults={"order": 20, "evaluate_on_plan": False, "re_evaluate_policies": True},
)
PolicyBinding.objects.using(db_alias).update_or_create(
policy=prefill_policy, target=user_write_binding, defaults={"order": 0}
)
FlowStageBinding.objects.using(db_alias).update_or_create(
target=flow,
stage=login_stage,
defaults={
"order": 100,
},
)
class Migration(migrations.Migration):
dependencies = [
("authentik_flows", "0017_auto_20210329_1334"),
("authentik_stages_user_write", "__latest__"),
("authentik_stages_user_login", "__latest__"),
("authentik_stages_password", "0002_passwordstage_change_flow"),
("authentik_policies", "0001_initial"),
("authentik_policies_expression", "0001_initial"),
]
operations = [
migrations.RunPython(create_default_oob_flow),
]