import json import base64 import qrcode import datetime import weasyprint import qrcode.image.svg from io import BytesIO from pathlib import Path from pyhanko.sign import fields, signers from pyhanko import stamp from pyhanko.pdf_utils import text from pyhanko.pdf_utils.incremental_writer import IncrementalPdfFileWriter from django.utils.translation import gettext_lazy as _ from django.views.generic import View from django.views.generic.edit import ( UpdateView, CreateView, DeleteView, FormView ) from django.views.generic.base import TemplateView from django.shortcuts import get_object_or_404, redirect from django.core.cache import cache from django.urls import reverse_lazy from django.http import HttpResponse from django.contrib import messages from django_tables2 import SingleTableView from idhub.user.tables import ( DashboardTable, PersonalInfoTable, RolesTable, DIDTable, CredentialsTable ) from idhub.user.forms import ( RequestCredentialForm, DemandAuthorizationForm, TermsConditionsForm ) from utils import certs from idhub.mixins import UserView from idhub.models import DID, VerificableCredential, Event, Membership from idhub_auth.models import User class MyProfile(UserView): title = _("My profile") section = "MyProfile" class MyWallet(UserView): title = _("My wallet") section = "MyWallet" class DashboardView(UserView, SingleTableView): template_name = "idhub/user/dashboard.html" table_class = DashboardTable title = _('Dashboard') subtitle = _('Events') icon = 'bi bi-bell' section = "Home" def get_queryset(self, **kwargs): events_for_users = self.get_user_events() queryset = Event.objects.select_related('user').filter( user=self.request.user).filter( type__in=events_for_users).order_by("-created") return queryset def get_user_events(self): events_for_users = [ Event.Types.EV_USR_WELCOME, # User welcomed Event.Types.EV_USR_UPDATED, # Your data updated by admin Event.Types.EV_DID_CREATED, # DID created Event.Types.EV_DID_DELETED, # DID deleted Event.Types.EV_CREDENTIAL_DELETED, # Credential deleted Event.Types.EV_CREDENTIAL_ISSUED, # Credential issued Event.Types.EV_CREDENTIAL_PRESENTED, # Credential presented Event.Types.EV_CREDENTIAL_CAN_BE_REQUESTED, # Credential available Event.Types.EV_CREDENTIAL_REVOKED, # Credential revoked Event.Types.EV_USR_SEND_VP, # User send verificable presentation ] return events_for_users class ProfileView(MyProfile, UpdateView, SingleTableView): template_name = "idhub/user/profile.html" table_class = PersonalInfoTable subtitle = _('My personal data') icon = 'bi bi-person-gear' fields = ('first_name', 'last_name', 'email') success_url = reverse_lazy('idhub:user_profile') model = User def get_queryset(self, **kwargs): queryset = Membership.objects.select_related('user').filter( user=self.request.user) return queryset def get_object(self): return self.request.user def get_form(self): form = super().get_form() form.fields['first_name'].disabled = True form.fields['last_name'].disabled = True form.fields['email'].disabled = True return form def form_valid(self, form): return super().form_valid(form) def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) context.update({ 'lang': self.request.LANGUAGE_CODE, }) return context class RolesView(MyProfile, SingleTableView): template_name = "idhub/user/roles.html" table_class = RolesTable subtitle = _('My roles') icon = 'fa-brands fa-critical-role' def get_queryset(self, **kwargs): queryset = self.request.user.roles.all() return queryset class GDPRView(MyProfile, TemplateView): template_name = "idhub/user/gdpr.html" subtitle = _('Data protection') icon = 'bi bi-file-earmark-medical' class CredentialsView(MyWallet, SingleTableView): template_name = "idhub/user/credentials.html" table_class = CredentialsTable subtitle = _('Credential management') icon = 'bi bi-patch-check-fill' def get_queryset(self): queryset = VerificableCredential.objects.filter( user=self.request.user) return queryset class TermsAndConditionsView(UserView, FormView): template_name = "idhub/user/terms_conditions.html" title = _("Data Protection") section = "" subtitle = _('Terms and Conditions') icon = 'bi bi-file-earmark-medical' form_class = TermsConditionsForm success_url = reverse_lazy('idhub:user_dashboard') def get_form_kwargs(self): kwargs = super().get_form_kwargs() kwargs['user'] = self.request.user if self.request.user.accept_gdpr: kwargs['initial'] = { "accept_privacy": True, "accept_legal": True, "accept_cookies": True } return kwargs def form_valid(self, form): form.save() return super().form_valid(form) class WaitingView(UserView, TemplateView): template_name = "idhub/user/waiting.html" title = _("Comunication with admin") subtitle = _('Service temporary close') section = "" icon = 'bi bi-file-earmark-medical' success_url = reverse_lazy('idhub:user_dashboard') def get(self, request, *args, **kwargs): if cache.get("KEY_DIDS"): return redirect(self.success_url) return super().get(request, *args, **kwargs) class CredentialView(MyWallet, TemplateView): template_name = "idhub/user/credential.html" subtitle = _('Credential') icon = 'bi bi-patch-check-fill' def get(self, request, *args, **kwargs): self.pk = kwargs['pk'] self.object = get_object_or_404( VerificableCredential, pk=self.pk, user=self.request.user ) return super().get(request, *args, **kwargs) def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) url_ca = reverse_lazy('idhub:user_credential_pdf', args=[self.object.id, 'ca']) url_es = reverse_lazy('idhub:user_credential_pdf', args=[self.object.id, 'es']) context.update({ 'object': self.object, 'url_ca': url_ca, 'url_es': url_es, }) return context class CredentialPdfView(MyWallet, TemplateView): template_name = "certificates/{}_{}.html" template_name = "certificates/{}_{}.html" subtitle = _('Credential management') icon = 'bi bi-patch-check-fill' file_name = "certificate.pdf" def get(self, request, *args, **kwargs): if not cache.get("KEY_DIDS"): return redirect(reverse_lazy('idhub:user_dashboard')) pk = kwargs['pk'] lang = kwargs.get('lang', 'ca') self.user = self.request.user self.object = get_object_or_404( VerificableCredential, pk=pk, eidas1_did__isnull=False, user=self.request.user ) self.credential_type = self.object.schema.file_schema.split(".json")[0] self.template_name = self.template_name.format( self.credential_type, lang ) self.url_id = "{}://{}/public/credentials/{}".format( self.request.scheme, self.request.get_host(), self.object.hash ) data = self.build_certificate() if self.object.eidas1_did: doc = self.insert_signature(data) else: doc = data response = HttpResponse(doc, content_type="application/pdf") response['Content-Disposition'] = 'attachment; filename={}'.format(self.file_name) return response def get_img_sign(self): path_img_sig = "idhub/static/images/4_Model_Certificat_html_58d7f7eeb828cf29.jpg" img_signature = next(Path.cwd().glob(path_img_sig)) with open(img_signature, 'rb') as _f: img_sig = base64.b64encode(_f.read()).decode('utf-8') return img_sig def get_img_header(self): path_img_head = "idhub/static/images/4_Model_Certificat_html_7a0214c6fc8f2309.jpg" img_header= next(Path.cwd().glob(path_img_head)) with open(img_header, 'rb') as _f: img_head = base64.b64encode(_f.read()).decode('utf-8') return img_head def get_img_footer(self): path_img_foot = "idhub/static/images/4_Model_Certificat_html_941e7b967953b3f3.jpg" img_foot= next(Path.cwd().glob(path_img_foot)) with open(img_foot, 'rb') as _f: img_foot = base64.b64encode(_f.read()).decode('utf-8') return img_foot def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) img_sig = self.get_img_sign() img_head = self.get_img_header() img_foot = self.get_img_footer() qr = self.generate_qr_code(self.url_id) issue_date_now = datetime.datetime.now() context.update(dict(self.object.get_datas())) context.update({ 'object': self.object, "image_signature": img_sig, "image_header": img_head, "image_footer": img_foot, "issue_date_now": issue_date_now.strftime("%d/%m/%Y"), "qr": qr, }) return context def build_certificate(self): try: doc = self.render_to_response(context=self.get_context_data()) except Exception: self.template_name = "certificates/4_Model_Certificat_ca.html" doc = self.render_to_response(context=self.get_context_data()) doc.render() pdf = weasyprint.HTML(string=doc.content) return pdf.write_pdf() def generate_qr_code(self, data): qr = qrcode.QRCode( version=1, error_correction=qrcode.constants.ERROR_CORRECT_L, box_size=10, border=4, ) qr.add_data(data) qr.make(fit=True) img_buffer = BytesIO() img = qr.make_image(fill_color="black", back_color="white") img.save(img_buffer, format="PNG") return base64.b64encode(img_buffer.getvalue()).decode('utf-8') def get_pfx_data(self): did = self.object.eidas1_did if not did: return None, None key_material = json.loads(did.get_key_material()) cert = key_material.get("cert") passphrase = key_material.get("passphrase") if cert and passphrase: return base64.b64decode(cert), passphrase.encode('utf-8') return None, None def signer_init(self): pfx_data, passphrase = self.get_pfx_data() if not pfx_data or not passphrase: return s = certs.load_cert( pfx_data, passphrase ) return s def insert_signature(self, doc): sig = self.signer_init() if not sig: return _buffer = BytesIO() _buffer.write(doc) w = IncrementalPdfFileWriter(_buffer) fields.append_signature_field( w, sig_field_spec=fields.SigFieldSpec( 'Signature', box=(150, 75, 450, 100) ) ) meta = signers.PdfSignatureMetadata(field_name='Signature') pdf_signer = signers.PdfSigner( meta, signer=sig, stamp_style=stamp.TextStampStyle( stamp_text='Signed by: %(signer)s\nTime: %(ts)s\nURL: %(url)s', text_box_style=text.TextBoxStyle() ) ) _bf_out = BytesIO() pdf_signer.sign_pdf(w, output=_bf_out, appearance_text_params={'url': self.url_id}) return _bf_out.read() class CredentialJsonView(MyWallet, TemplateView): def get(self, request, *args, **kwargs): pk = kwargs['pk'] self.object = get_object_or_404( VerificableCredential, pk=pk, user=self.request.user ) data = self.object.get_data() response = HttpResponse(data, content_type="application/json") response['Content-Disposition'] = 'attachment; filename={}'.format("credential.json") return response class PublicCredentialJsonView(View): def get(self, request, *args, **kwargs): pk = kwargs['pk'] self.object = get_object_or_404( VerificableCredential, hash=pk, eidas1_did__isnull=False, ) response = HttpResponse(self.object.get_data(), content_type="application/json") response['Content-Disposition'] = 'attachment; filename={}'.format("credential.json") return response class CredentialsRequestView(MyWallet, FormView): template_name = "idhub/user/credentials_request.html" subtitle = _('Credential request') icon = 'bi bi-patch-check-fill' form_class = RequestCredentialForm success_url = reverse_lazy('idhub:user_credentials') def get(self, *args, **kwargs): response = super().get(*args, **kwargs) if not DID.objects.filter(user=self.request.user).exists(): return redirect(reverse_lazy('idhub:user_dids_new')) return response def get_form_kwargs(self): kwargs = super().get_form_kwargs() self.if_credentials = VerificableCredential.objects.filter( user=self.request.user, status=VerificableCredential.Status.ENABLED.value, ).exists() kwargs['user'] = self.request.user kwargs['lang'] = self.request.LANGUAGE_CODE domain = "{}://{}".format(self.request.scheme, self.request.get_host()) kwargs['domain'] = domain kwargs['if_credentials'] = self.if_credentials return kwargs def form_valid(self, form): cred = form.save() if cred: messages.success(self.request, _("The credential was issued successfully!")) Event.set_EV_CREDENTIAL_ISSUED_FOR_USER(cred) Event.set_EV_CREDENTIAL_ISSUED(cred) url = self.request.session.pop('next_url', None) if url: return redirect(url) else: messages.error(self.request, _("The credential does not exist!")) return super().form_valid(form) class DemandAuthorizationView(MyWallet, FormView): template_name = "idhub/user/credentials_presentation.html" subtitle = _('Credential presentation') icon = 'bi bi-patch-check-fill' form_class = DemandAuthorizationForm success_url = reverse_lazy('idhub:user_demand_authorization') def get(self, *args, **kwargs): response = super().get(*args, **kwargs) creds_enable = VerificableCredential.objects.filter( user=self.request.user, status=VerificableCredential.Status.ENABLED.value, ).exists() if not self.if_credentials and creds_enable: return redirect(reverse_lazy('idhub:user_credentials_request')) return response def get_form_kwargs(self): kwargs = super().get_form_kwargs() self.if_credentials = VerificableCredential.objects.filter( user=self.request.user, status=VerificableCredential.Status.ISSUED.value, ).exists() kwargs['user'] = self.request.user kwargs['if_credentials'] = self.if_credentials return kwargs def form_valid(self, form): try: authorization = form.save() except Exception: txt = _("Problems connecting with {url}").format( url=form.org.response_uri ) messages.error(self.request, txt) return super().form_valid(form) if authorization: return redirect(authorization) else: messages.error(self.request, _("Error sending credential!")) return super().form_valid(form) class DidsView(MyWallet, SingleTableView): template_name = "idhub/user/dids.html" table_class = DIDTable subtitle = _('Identities (DIDs)') icon = 'bi bi-patch-check-fill' def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) context.update({ 'dids': self.request.user.dids, }) return context def get_queryset(self, **kwargs): queryset = DID.objects.filter(user=self.request.user) return queryset class DidRegisterView(MyWallet, CreateView): template_name = "idhub/user/did_register.html" subtitle = _('Add a new Identity (DID)') icon = 'bi bi-patch-check-fill' wallet = True model = DID fields = ('label', 'type') success_url = reverse_lazy('idhub:user_dids') object = None def form_valid(self, form): form.instance.user = self.request.user form.instance.set_did() form.save() messages.success(self.request, _('DID created successfully')) Event.set_EV_DID_CREATED(form.instance) Event.set_EV_DID_CREATED_BY_USER(form.instance) return super().form_valid(form) class DidEditView(MyWallet, UpdateView): template_name = "idhub/user/did_register.html" subtitle = _('Identities (DIDs)') icon = 'bi bi-patch-check-fill' wallet = True model = DID fields = ('label',) success_url = reverse_lazy('idhub:user_dids') def get(self, request, *args, **kwargs): self.pk = kwargs['pk'] self.object = get_object_or_404(self.model, pk=self.pk) return super().get(request, *args, **kwargs) def form_valid(self, form): form.save() messages.success(self.request, _('DID updated successfully')) return super().form_valid(form) class DidDeleteView(MyWallet, DeleteView): subtitle = _('Identities (DIDs)') icon = 'bi bi-patch-check-fill' wallet = True model = DID success_url = reverse_lazy('idhub:user_dids') def get(self, request, *args, **kwargs): self.pk = kwargs['pk'] self.object = get_object_or_404(self.model, pk=self.pk) Event.set_EV_DID_DELETED(self.object) self.object.delete() messages.success(self.request, _('DID delete successfully')) return redirect(self.success_url)