Merge pull request 'encrypt2' (#149) from encrypt2 into release

Reviewed-on: https://gitea.pangea.org/trustchain-oc1-orchestral/IdHub/pulls/149
This commit is contained in:
cayop 2024-02-26 10:10:38 +00:00
commit 4ed3225ce5
9 changed files with 152 additions and 37 deletions

View file

@ -75,6 +75,10 @@ class EncryptionKeyForm(forms.Form):
if commit:
cache.set("KEY_DIDS", self._key, None)
if not DID.objects.exists():
did = DID.objects.create(label='Default', type=DID.Types.WEB)
did.set_did()
did.save()
return
@ -155,9 +159,8 @@ class ImportForm(forms.Form):
self.rows = {}
self.properties = {}
self.users = []
self.user = kwargs.pop('user', None)
super().__init__(*args, **kwargs)
dids = DID.objects.filter(user=self.user)
dids = DID.objects.filter(user__isnull=True)
self.fields['did'].choices = [
(x.did, x.label) for x in dids.filter(eidas1=False)
]
@ -176,7 +179,7 @@ class ImportForm(forms.Form):
def clean(self):
data = self.cleaned_data["did"]
did = DID.objects.filter(
user=self.user,
user__isnull=True,
did=data
)
@ -188,7 +191,7 @@ class ImportForm(forms.Form):
eidas1 = self.cleaned_data.get('eidas1')
if eidas1:
self._eidas1 = DID.objects.filter(
user=self.user,
user__isnull=True,
eidas1=True,
did=eidas1
).first()

View file

@ -759,7 +759,7 @@ class DidsView(Credentials, SingleTableView):
def get_context_data(self, **kwargs):
queryset = kwargs.pop('object_list', None)
dids = DID.objects.filter(user=self.request.user)
dids = DID.objects.filter(user__isnull=True)
if queryset is None:
self.object_list = dids.all()
@ -781,7 +781,6 @@ class DidRegisterView(Credentials, CreateView):
object = None
def form_valid(self, form):
form.instance.user = self.request.user
form.instance.set_did()
form.save()
messages.success(self.request, _('DID created successfully'))
@ -1063,11 +1062,6 @@ class ImportAddView(NotifyActivateUserByEmail, ImportExport, FormView):
form_class = ImportForm
success_url = reverse_lazy('idhub:admin_import')
def get_form_kwargs(self):
kwargs = super().get_form_kwargs()
kwargs['user'] = self.request.user
return kwargs
def form_valid(self, form):
creds = form.save()
if creds:

View file

@ -7,9 +7,8 @@ from utils import credtools
from django.conf import settings
from django.core.management.base import BaseCommand
from django.contrib.auth import get_user_model
from django.core.cache import cache
from decouple import config
from idhub.models import DID, Schemas
from idhub.models import Schemas
from oidc4vp.models import Organization
@ -22,8 +21,6 @@ class Command(BaseCommand):
def handle(self, *args, **kwargs):
ADMIN_EMAIL = config('ADMIN_EMAIL', 'admin@example.org')
ADMIN_PASSWORD = config('ADMIN_PASSWORD', '1234')
KEY_DIDS = config('KEY_DIDS', '1234')
cache.set("KEY_DIDS", KEY_DIDS, None)
self.create_admin_users(ADMIN_EMAIL, ADMIN_PASSWORD)
if settings.CREATE_TEST_USERS:
@ -37,6 +34,10 @@ class Command(BaseCommand):
f = csv.reader(csvfile, delimiter=';', quotechar='"')
for r in f:
self.create_organizations(r[0].strip(), r[1].strip())
# You need to confirm than your Organization is created
assert Organization.objects.filter(name=settings.ORGANIZATION).exists()
if settings.SYNC_ORG_DEV == 'y':
self.sync_credentials_organizations("pangea.org", "somconnexio.coop")
self.sync_credentials_organizations("local 8000", "local 9000")
@ -44,17 +45,13 @@ class Command(BaseCommand):
def create_admin_users(self, email, password):
su = User.objects.create_superuser(email=email, password=password)
su.set_encrypted_sensitive_data()
su.save()
self.create_defaults_dids(su)
def create_users(self, email, password):
u = User.objects.create(email=email, password=password)
u.set_password(password)
u.set_encrypted_sensitive_data()
u.save()
self.create_defaults_dids(u)
def create_organizations(self, name, url):
@ -70,11 +67,6 @@ class Command(BaseCommand):
org1.save()
org2.save()
def create_defaults_dids(self, u):
did = DID(label="Default", user=u, type=DID.Types.WEB)
did.set_did()
did.save()
def create_schemas(self):
schemas_files = os.listdir(settings.SCHEMAS_DIR)
for x in schemas_files:

View file

@ -16,6 +16,7 @@ from utils.idhub_ssikit import (
webdid_from_controller_key,
verify_credential,
)
from oidc4vp.models import Organization
from idhub_auth.models import User
@ -419,8 +420,8 @@ class Event(models.Model):
class DID(models.Model):
class Types(models.IntegerChoices):
KEY = 1, "Key"
WEB = 2, "Web"
WEB = 1, "Web"
KEY = 2, "Key"
type = models.PositiveSmallIntegerField(
_("Type"),
choices=Types.choices,
@ -442,18 +443,23 @@ class DID(models.Model):
# JSON-serialized DID document
didweb_document = models.TextField()
def get_key_material(self):
return self.user.decrypt_data(self.key_material)
def set_key_material(self, value):
self.key_material = self.user.encrypt_data(value)
@property
def is_organization_did(self):
if not self.user:
return True
return False
def get_key_material(self):
user = self.user or self.get_organization()
return user.decrypt_data(self.key_material)
def set_key_material(self, value):
user = self.user or self.get_organization()
if not user.encrypted_sensitive_data:
user.set_encrypted_sensitive_data()
user.save()
self.key_material = user.encrypt_data(value)
def set_did(self):
new_key_material = generate_did_controller_key()
self.set_key_material(new_key_material)
@ -468,6 +474,9 @@ class DID(models.Model):
def get_key(self):
return json.loads(self.key_material)
def get_organization(self):
return Organization.objects.get(name=settings.ORGANIZATION)
class Schemas(models.Model):
type = models.CharField(max_length=250)
file_schema = models.CharField(max_length=250)

View file

@ -43,6 +43,15 @@
{{ object.email }}
</div>
</div>
{% if object.is_admin %}
<div class="row mt-3">
<div class="col-3">
{% trans "Is Admin" %}
</div>
<div class="col-9 text-secondary">
</div>
</div>
{% endif %}
</div>
</div>
</div>

View file

@ -11,7 +11,7 @@ class ProfileForm(forms.ModelForm):
class Meta:
model = User
fields = ['first_name', 'last_name', 'email']
fields = ['first_name', 'last_name', 'email', 'is_admin']
def clean_first_name(self):
first_name = super().clean()['first_name']

View file

@ -145,17 +145,19 @@ class User(AbstractBaseUser):
self.encrypted_sensitive_data = key_crypted
def encrypt_data(self, data):
sb = self.get_secret_box()
pw = self.decrypt_sensitive_data().encode('utf-8')
sb = self.get_secret_box(pw)
value_enc = sb.encrypt(data.encode('utf-8'))
return base64.b64encode(value_enc).decode('utf-8')
def decrypt_data(self, data):
sb = self.get_secret_box()
pw = self.decrypt_sensitive_data().encode('utf-8')
sb = self.get_secret_box(pw)
value = base64.b64decode(data.encode('utf-8'))
return sb.decrypt(value).decode('utf-8')
def get_secret_box(self):
sb_key = self.derive_key_from_password()
def get_secret_box(self, password):
sb_key = self.derive_key_from_password(password)
return secret.SecretBox(sb_key)
def change_password_key(self, new_password):

View file

@ -0,0 +1,22 @@
# Generated by Django 4.2.5 on 2024-02-23 13:01
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('oidc4vp', '0001_initial'),
]
operations = [
migrations.AddField(
model_name='organization',
name='encrypted_sensitive_data',
field=models.CharField(default=None, max_length=255, null=True),
),
migrations.AddField(
model_name='organization',
name='salt',
field=models.CharField(default=None, max_length=255, null=True),
),
]

View file

@ -1,6 +1,11 @@
import json
import requests
import secrets
import nacl
import base64
from nacl import pwhash, secret
from django.core.cache import cache
from django.conf import settings
from django.http import QueryDict
@ -64,6 +69,8 @@ class Organization(models.Model):
help_text=_("Url where to send the verificable presentation"),
max_length=250
)
encrypted_sensitive_data = models.CharField(max_length=255, default=None, null=True)
salt = models.CharField(max_length=255, default=None, null=True)
def send(self, vp, code):
"""
@ -90,6 +97,83 @@ class Organization(models.Model):
auth = (self.my_client_id, self.my_client_secret)
return requests.get(url, auth=auth)
def derive_key_from_password(self, password=None):
if not password:
password = cache.get("KEY_DIDS").encode('utf-8')
kdf = pwhash.argon2i.kdf
ops = pwhash.argon2i.OPSLIMIT_INTERACTIVE
mem = pwhash.argon2i.MEMLIMIT_INTERACTIVE
return kdf(
secret.SecretBox.KEY_SIZE,
password,
self.get_salt(),
opslimit=ops,
memlimit=mem
)
def decrypt_sensitive_data(self, data=None):
sb_key = self.derive_key_from_password()
sb = secret.SecretBox(sb_key)
if not data:
data = self.get_encrypted_sensitive_data()
if not isinstance(data, bytes):
data = data.encode('utf-8')
return sb.decrypt(data).decode('utf-8')
def encrypt_sensitive_data(self, data):
sb_key = self.derive_key_from_password()
sb = secret.SecretBox(sb_key)
if not isinstance(data, bytes):
data = data.encode('utf-8')
return base64.b64encode(sb.encrypt(data)).decode('utf-8')
def get_salt(self):
if not self.salt:
return ''
return base64.b64decode(self.salt.encode('utf-8'))
def set_salt(self):
self.salt = base64.b64encode(nacl.utils.random(16)).decode('utf-8')
def get_encrypted_sensitive_data(self):
return base64.b64decode(self.encrypted_sensitive_data.encode('utf-8'))
def set_encrypted_sensitive_data(self):
key = base64.b64encode(nacl.utils.random(64))
self.set_salt()
key_crypted = self.encrypt_sensitive_data(key)
self.encrypted_sensitive_data = key_crypted
def encrypt_data(self, data):
pw = self.decrypt_sensitive_data().encode('utf-8')
sb = self.get_secret_box(pw)
value_enc = sb.encrypt(data.encode('utf-8'))
return base64.b64encode(value_enc).decode('utf-8')
def decrypt_data(self, data):
pw = self.decrypt_sensitive_data().encode('utf-8')
sb = self.get_secret_box(pw)
value = base64.b64decode(data.encode('utf-8'))
return sb.decrypt(value).decode('utf-8')
def get_secret_box(self, password):
sb_key = self.derive_key_from_password(password=password)
return secret.SecretBox(sb_key)
def change_password_key(self, new_password):
data = self.decrypt_sensitive_data()
sb_key = self.derive_key_from_password(password=new_password)
sb = secret.SecretBox(sb_key)
if not isinstance(data, bytes):
data = data.encode('utf-8')
encrypted_data = base64.b64encode(sb.encrypt(data)).decode('utf-8')
self.encrypted_sensitive_data = encrypted_data
def __str__(self):
return self.name