first base for dpp

This commit is contained in:
Cayo Puigdefabregas 2024-11-15 18:56:11 +01:00
parent b7d7b9041d
commit 9857891b63
10 changed files with 232 additions and 0 deletions

View File

@ -89,6 +89,7 @@ INSTALLED_APPS = [
"dashboard", "dashboard",
"admin", "admin",
"api", "api",
"dpp",
] ]

0
dpp/__init__.py Normal file
View File

3
dpp/admin.py Normal file
View File

@ -0,0 +1,3 @@
from django.contrib import admin
# Register your models here.

96
dpp/api_dlt.py Normal file
View File

@ -0,0 +1,96 @@
from ereuseapi.methods import API
def connect_api():
if not session.get('token_dlt'):
return
token_dlt = session.get('token_dlt')
api_dlt = app.config.get('API_DLT')
return API(api_dlt, token_dlt, "ethereum")
def register_dlt():
api = self.connect_api()
if not api:
return
snapshot = [x for x in self.actions if x.t == 'Snapshot']
if not snapshot:
return
snapshot = snapshot[0]
from ereuse_devicehub.modules.dpp.models import ALGORITHM
from ereuse_devicehub.resources.enums import StatusCode
cny_a = 1
while cny_a:
api = self.connect_api()
result = api.register_device(
self.chid,
ALGORITHM,
snapshot.phid_dpp,
app.config.get('ID_FEDERATED')
)
try:
assert result['Status'] == StatusCode.Success.value
assert result['Data']['data']['timestamp']
cny_a = 0
except Exception:
if result.get("Data") != "Device already exists":
logger.error("API return: %s", result)
time.sleep(10)
else:
cny_a = 0
register_proof(result)
if app.config.get('ID_FEDERATED'):
cny = 1
while cny:
try:
api.add_service(
self.chid,
'DeviceHub',
app.config.get('ID_FEDERATED'),
'Inventory service',
'Inv',
)
cny = 0
except Exception:
time.sleep(10)
def register_proof(self, result):
from ereuse_devicehub.modules.dpp.models import PROOF_ENUM, Proof
from ereuse_devicehub.resources.enums import StatusCode
if result['Status'] == StatusCode.Success.value:
timestamp = result.get('Data', {}).get('data', {}).get('timestamp')
if not timestamp:
return
snapshot = [x for x in self.actions if x.t == 'Snapshot']
if not snapshot:
return
snapshot = snapshot[0]
d = {
"type": PROOF_ENUM['Register'],
"device": self,
"action": snapshot,
"timestamp": timestamp,
"issuer_id": g.user.id,
"documentId": snapshot.id,
"documentSignature": snapshot.phid_dpp,
"normalizeDoc": snapshot.json_hw,
}
proof = Proof(**d)
db.session.add(proof)
if not hasattr(self, 'components'):
return
for c in self.components:
if isinstance(c, DataStorage):
c.register_dlt()

6
dpp/apps.py Normal file
View File

@ -0,0 +1,6 @@
from django.apps import AppConfig
class DppConfig(AppConfig):
default_auto_field = "django.db.models.BigAutoField"
name = "dpp"

View File

@ -0,0 +1,90 @@
# Generated by Django 5.0.6 on 2024-11-15 18:55
import django.db.models.deletion
from django.conf import settings
from django.db import migrations, models
class Migration(migrations.Migration):
initial = True
dependencies = [
("user", "0001_initial"),
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
]
operations = [
migrations.CreateModel(
name="Dpp",
fields=[
(
"id",
models.BigAutoField(
auto_created=True,
primary_key=True,
serialize=False,
verbose_name="ID",
),
),
("timestamp", models.IntegerField()),
("key", models.CharField(max_length=256)),
("uuid", models.UUIDField()),
("signature", models.CharField(max_length=256)),
("normalizeDoc", models.TextField()),
("type", models.CharField(max_length=256)),
(
"owner",
models.ForeignKey(
on_delete=django.db.models.deletion.CASCADE,
to="user.institution",
),
),
(
"user",
models.ForeignKey(
blank=True,
null=True,
on_delete=django.db.models.deletion.SET_NULL,
to=settings.AUTH_USER_MODEL,
),
),
],
),
migrations.CreateModel(
name="Proof",
fields=[
(
"id",
models.BigAutoField(
auto_created=True,
primary_key=True,
serialize=False,
verbose_name="ID",
),
),
("timestamp", models.IntegerField()),
("uuid", models.UUIDField()),
("signature", models.CharField(max_length=256)),
("normalizeDoc", models.TextField()),
("type", models.CharField(max_length=256)),
("action", models.CharField(max_length=256)),
(
"owner",
models.ForeignKey(
on_delete=django.db.models.deletion.CASCADE,
to="user.institution",
),
),
(
"user",
models.ForeignKey(
blank=True,
null=True,
on_delete=django.db.models.deletion.SET_NULL,
to=settings.AUTH_USER_MODEL,
),
),
],
),
]

View File

30
dpp/models.py Normal file
View File

@ -0,0 +1,30 @@
from django.db import models
from user.models import User, Institution
from utils.constants import STR_EXTEND_SIZE
# Create your models here.
class Proof(models.Model):
timestamp = models.IntegerField()
uuid = models.UUIDField()
signature = models.CharField(max_length=STR_EXTEND_SIZE)
normalizeDoc = models.TextField()
type = models.CharField(max_length=STR_EXTEND_SIZE)
action = models.CharField(max_length=STR_EXTEND_SIZE)
owner = models.ForeignKey(Institution, on_delete=models.CASCADE)
user = models.ForeignKey(
User, on_delete=models.SET_NULL, null=True, blank=True)
class Dpp(models.Model):
timestamp = models.IntegerField()
key = models.CharField(max_length=STR_EXTEND_SIZE)
uuid = models.UUIDField()
signature = models.CharField(max_length=STR_EXTEND_SIZE)
normalizeDoc = models.TextField()
type = models.CharField(max_length=STR_EXTEND_SIZE)
owner = models.ForeignKey(Institution, on_delete=models.CASCADE)
user = models.ForeignKey(
User, on_delete=models.SET_NULL, null=True, blank=True)

3
dpp/tests.py Normal file
View File

@ -0,0 +1,3 @@
from django.test import TestCase
# Create your tests here.

3
dpp/views.py Normal file
View File

@ -0,0 +1,3 @@
from django.shortcuts import render
# Create your views here.