2023-11-07 16:43:23 +00:00
|
|
|
import os
|
|
|
|
import csv
|
2023-12-12 17:00:21 +00:00
|
|
|
import json
|
2023-11-07 16:43:23 +00:00
|
|
|
|
|
|
|
from pathlib import Path
|
2023-12-12 17:00:21 +00:00
|
|
|
from utils import credtools
|
|
|
|
from django.conf import settings
|
2024-02-22 13:58:52 +00:00
|
|
|
from django.core.management.base import BaseCommand
|
2023-11-03 12:29:15 +00:00
|
|
|
from django.contrib.auth import get_user_model
|
2025-01-28 17:15:35 +00:00
|
|
|
from django.core.cache import cache
|
2025-01-27 19:34:24 +00:00
|
|
|
from django.urls import reverse
|
|
|
|
from pyvckit.did import (
|
|
|
|
generate_did,
|
|
|
|
gen_did_document,
|
|
|
|
)
|
|
|
|
|
|
|
|
from idhub.models import Schemas, DID
|
2023-11-24 15:36:05 +00:00
|
|
|
from oidc4vp.models import Organization
|
2025-01-23 16:58:07 +00:00
|
|
|
from webhook.models import Token
|
2023-11-03 12:29:15 +00:00
|
|
|
|
|
|
|
|
|
|
|
User = get_user_model()
|
|
|
|
|
|
|
|
|
|
|
|
class Command(BaseCommand):
|
2025-01-27 19:34:24 +00:00
|
|
|
help = "Insert minimum data for the project"
|
2024-03-04 08:44:53 +00:00
|
|
|
DOMAIN = settings.DOMAIN
|
|
|
|
OIDC_ORGS = settings.OIDC_ORGS
|
2023-11-03 12:29:15 +00:00
|
|
|
|
2025-01-23 16:58:07 +00:00
|
|
|
def add_arguments(self, parser):
|
|
|
|
parser.add_argument('predefined_token', nargs='?', default='', type=str, help='predefined token')
|
2025-01-28 17:15:35 +00:00
|
|
|
parser.add_argument('predefined_did', nargs='?', default='', type=str, help='predefined did')
|
2025-01-23 16:58:07 +00:00
|
|
|
|
2023-11-03 12:29:15 +00:00
|
|
|
def handle(self, *args, **kwargs):
|
2024-03-07 10:13:26 +00:00
|
|
|
ADMIN_EMAIL = settings.INITIAL_ADMIN_EMAIL
|
|
|
|
ADMIN_PASSWORD = settings.INITIAL_ADMIN_PASSWORD
|
2025-01-23 16:58:07 +00:00
|
|
|
self.predefined_token = kwargs['predefined_token']
|
2025-01-27 19:34:24 +00:00
|
|
|
self.predefined_did = kwargs['predefined_did']
|
2025-01-29 17:02:17 +00:00
|
|
|
# on demo situation, encrypted vault is hardcoded with password DEMO
|
|
|
|
cache.set("KEY_DIDS", "DEMO", None)
|
2023-11-07 16:43:23 +00:00
|
|
|
|
2024-03-04 08:44:53 +00:00
|
|
|
self.org = Organization.objects.create(
|
2025-01-23 16:58:07 +00:00
|
|
|
name=self.DOMAIN,
|
|
|
|
domain=self.DOMAIN,
|
2024-03-04 08:44:53 +00:00
|
|
|
main=True
|
|
|
|
)
|
2025-01-28 17:15:35 +00:00
|
|
|
self.org.set_encrypted_sensitive_data()
|
|
|
|
self.org.save()
|
|
|
|
|
|
|
|
self.create_admin_users(ADMIN_EMAIL, ADMIN_PASSWORD)
|
|
|
|
if settings.CREATE_TEST_USERS:
|
|
|
|
for u in range(1, 6):
|
|
|
|
user = 'user{}@example.org'.format(u)
|
|
|
|
self.create_users(user, '1234')
|
2024-03-04 08:44:53 +00:00
|
|
|
|
|
|
|
if self.OIDC_ORGS:
|
|
|
|
self.create_organizations()
|
2024-02-23 08:27:18 +00:00
|
|
|
|
2023-12-12 17:00:21 +00:00
|
|
|
self.create_schemas()
|
2023-11-03 12:29:15 +00:00
|
|
|
|
|
|
|
def create_admin_users(self, email, password):
|
2024-01-04 15:27:27 +00:00
|
|
|
su = User.objects.create_superuser(email=email, password=password)
|
|
|
|
su.save()
|
2023-11-03 12:29:15 +00:00
|
|
|
|
2025-01-28 17:15:35 +00:00
|
|
|
if self.predefined_token:
|
|
|
|
tk = Token.objects.filter(token=self.predefined_token).first()
|
|
|
|
if not tk:
|
|
|
|
Token.objects.create(token=self.predefined_token)
|
2023-11-03 12:29:15 +00:00
|
|
|
|
2025-01-28 17:15:35 +00:00
|
|
|
self.create_default_did()
|
|
|
|
|
|
|
|
def create_default_did(self):
|
|
|
|
|
2025-01-27 19:34:24 +00:00
|
|
|
fdid = self.open_example_did()
|
|
|
|
if not fdid:
|
|
|
|
return
|
|
|
|
|
2025-01-28 17:15:35 +00:00
|
|
|
did = DID(type=DID.Types.WEB)
|
2025-01-27 19:34:24 +00:00
|
|
|
new_key_material = fdid.get("key_material", "")
|
|
|
|
label = fdid.get("label", "")
|
|
|
|
if not new_key_material:
|
|
|
|
return
|
2025-01-28 17:15:35 +00:00
|
|
|
|
2025-01-27 19:34:24 +00:00
|
|
|
did.set_key_material(new_key_material)
|
|
|
|
|
|
|
|
if label:
|
|
|
|
did.label = label
|
|
|
|
|
|
|
|
if did.type == did.Types.KEY:
|
|
|
|
did.did = generate_did(new_key_material)
|
|
|
|
elif did.type == did.Types.WEB:
|
|
|
|
url = "https://{}".format(settings.DOMAIN)
|
|
|
|
path = reverse("idhub:serve_did", args=["a"])
|
|
|
|
|
|
|
|
if path:
|
|
|
|
path = path.split("/a/did.json")[0]
|
|
|
|
url = "https://{}/{}".format(settings.DOMAIN, path)
|
|
|
|
|
|
|
|
did.did = generate_did(new_key_material, url)
|
|
|
|
key = json.loads(new_key_material)
|
2025-01-28 17:15:35 +00:00
|
|
|
url, did.didweb_document = gen_did_document(did.did, key)
|
2025-01-27 19:34:24 +00:00
|
|
|
|
|
|
|
did.save()
|
|
|
|
|
|
|
|
def open_example_did(self):
|
|
|
|
BASE_DIR = Path(__file__).resolve().parent.parent.parent.parent
|
|
|
|
didweb_path = os.path.join(BASE_DIR, "examples", "keys_did.json")
|
|
|
|
|
2025-01-28 17:15:35 +00:00
|
|
|
if self.predefined_did:
|
|
|
|
didweb_path = self.predefined_did
|
2025-01-27 19:34:24 +00:00
|
|
|
|
|
|
|
data = ''
|
2025-01-28 17:15:35 +00:00
|
|
|
with open(didweb_path) as _file:
|
2025-01-27 19:34:24 +00:00
|
|
|
try:
|
|
|
|
data = json.loads(_file.read())
|
|
|
|
except Exception:
|
|
|
|
pass
|
|
|
|
|
|
|
|
return data
|
|
|
|
|
2023-11-03 12:29:15 +00:00
|
|
|
def create_users(self, email, password):
|
2024-01-04 15:27:27 +00:00
|
|
|
u = User.objects.create(email=email, password=password)
|
2023-11-03 12:29:15 +00:00
|
|
|
u.set_password(password)
|
|
|
|
u.save()
|
|
|
|
|
2024-03-04 08:44:53 +00:00
|
|
|
def create_organizations(self):
|
|
|
|
BASE_DIR = Path(__file__).resolve().parent.parent.parent.parent
|
|
|
|
ORGANIZATION = os.path.join(BASE_DIR, self.OIDC_ORGS)
|
|
|
|
DOMAIN = self.DOMAIN
|
|
|
|
|
|
|
|
with open(ORGANIZATION, newline='\n') as csvfile:
|
|
|
|
f = csv.reader(csvfile, delimiter=';', quotechar='"')
|
|
|
|
exist_main_domain = False
|
|
|
|
for r in f:
|
|
|
|
if DOMAIN == r[2].strip():
|
|
|
|
exist_main_domain = True
|
|
|
|
self.create_one_organization(r[0].strip(), r[1].strip(), r[2].strip())
|
|
|
|
|
|
|
|
assert exist_main_domain, f"{DOMAIN} is not in {ORGANIZATION}"
|
|
|
|
|
|
|
|
if settings.SYNC_ORG_DEV == 'y':
|
|
|
|
self.sync_credentials_organizations("pangea.org", "somconnexio.coop")
|
|
|
|
self.sync_credentials_organizations("local 8000", "local 9000")
|
2023-11-03 12:29:15 +00:00
|
|
|
|
2024-03-04 08:44:53 +00:00
|
|
|
def create_one_organization(self, name, url, domain):
|
|
|
|
if self.DOMAIN == domain:
|
|
|
|
self.org.name = name
|
|
|
|
self.org.response_uri = url
|
|
|
|
self.org.save()
|
2024-02-27 08:27:55 +00:00
|
|
|
else:
|
2024-03-04 08:44:53 +00:00
|
|
|
Organization.objects.create(name=name, response_uri=url, domain=domain)
|
2023-12-11 12:55:56 +00:00
|
|
|
|
2023-12-11 19:06:53 +00:00
|
|
|
def sync_credentials_organizations(self, test1, test2):
|
|
|
|
org1 = Organization.objects.get(name=test1)
|
|
|
|
org2 = Organization.objects.get(name=test2)
|
2023-12-11 12:55:56 +00:00
|
|
|
org2.my_client_id = org1.client_id
|
|
|
|
org2.my_client_secret = org1.client_secret
|
|
|
|
org1.my_client_id = org2.client_id
|
|
|
|
org1.my_client_secret = org2.client_secret
|
2023-12-11 14:38:16 +00:00
|
|
|
org1.save()
|
|
|
|
org2.save()
|
2024-02-20 16:50:45 +00:00
|
|
|
|
2023-12-12 17:00:21 +00:00
|
|
|
def create_schemas(self):
|
|
|
|
schemas_files = os.listdir(settings.SCHEMAS_DIR)
|
|
|
|
for x in schemas_files:
|
|
|
|
if Schemas.objects.filter(file_schema=x).exists():
|
|
|
|
continue
|
|
|
|
self._create_schemas(x)
|
|
|
|
|
|
|
|
def _create_schemas(self, file_name):
|
|
|
|
data = self.open_file(file_name)
|
|
|
|
try:
|
|
|
|
ldata = json.loads(data)
|
|
|
|
assert credtools.validate_schema(ldata)
|
2024-01-15 18:10:55 +00:00
|
|
|
dname = ldata.get('name')
|
2024-01-21 12:46:01 +00:00
|
|
|
title = ldata.get('title')
|
2024-01-15 18:10:55 +00:00
|
|
|
assert dname
|
2024-01-21 12:46:01 +00:00
|
|
|
assert title
|
2023-12-12 17:00:21 +00:00
|
|
|
except Exception:
|
2024-11-25 19:18:47 +00:00
|
|
|
ldata = {}
|
2024-01-21 12:46:01 +00:00
|
|
|
title = ''
|
2024-01-22 17:13:06 +00:00
|
|
|
_name = ''
|
|
|
|
|
|
|
|
_name = json.dumps(ldata.get('name', ''))
|
|
|
|
_description = json.dumps(ldata.get('description', ''))
|
|
|
|
|
|
|
|
Schemas.objects.create(
|
|
|
|
file_schema=file_name,
|
|
|
|
data=data,
|
|
|
|
type=title,
|
|
|
|
_name=_name,
|
|
|
|
_description=_description
|
|
|
|
)
|
2023-12-12 17:00:21 +00:00
|
|
|
|
|
|
|
def open_file(self, file_name):
|
|
|
|
data = ''
|
|
|
|
filename = Path(settings.SCHEMAS_DIR).joinpath(file_name)
|
|
|
|
with filename.open() as schema_file:
|
|
|
|
data = schema_file.read()
|
|
|
|
|
|
|
|
return data
|