This repository has been archived on 2024-05-31. You can view files and clone it, but cannot push or open issues or pull requests.
ssikit_trustchain/idhub_ssikit/__init__.py

200 lines
7.2 KiB
Python

import asyncio
import base64
import copy
import zlib
from typing import Callable, Any
import didkit
import json
from ast import literal_eval
from pyroaring import BitMap
def deep_merge_dict_inplace(d1: dict, d2: dict):
"""
Implements d1 |= d2, but recursively.
Merges d1 and d2, giving preference to keys in d2.
Keys in d1 but not in d2 are left as-is.
"""
for key, val in d2.items():
if isinstance(d1.get(key, None), dict) and isinstance(val, dict):
deep_merge_dict_inplace(d1[key], val)
continue
d1[key] = val
def deep_merge_dict(d1: dict, d2: dict) -> dict:
"""
Implements d1 | d2, but recursively.
Merges d1 and d2, giving preference to keys in d2.
Keys in d1 but not in d2 are left as-is.
"""
d1 = copy.deepcopy(d1)
deep_merge_dict_inplace(d1, d2)
return d1
def deep_filter_dict(f: Callable[[Any], bool], d: dict):
"""
Implements builtin filter(), but recursively.
Applies f to all k,v pairs in d. If some v is a dict, recurse into v instead of applying f(v) directly.
"""
for key, val in d.items():
if isinstance(val, dict):
yield key, dict(deep_filter_dict(f, val))
elif f(val):
yield key, val
def generate_did_controller_key():
return didkit.generate_ed25519_key()
def keydid_from_controller_key(key):
return didkit.key_to_did("key", key)
def resolve_did(keydid):
async def inner():
return await didkit.resolve_did(keydid, "{}")
return asyncio.run(inner())
def webdid_from_controller_key(key):
"""
Se siguen los pasos para generar un webdid a partir de un keydid.
Documentado en la docu de spruceid.
"""
keydid = keydid_from_controller_key(key) # "did:key:<...>"
pubkeyid = keydid.rsplit(":")[-1] # <...>
document = json.loads(resolve_did(keydid)) # Documento DID en terminos "key"
webdid_url = f"did:web:idhub.pangea.org:{pubkeyid}" # nueva URL: "did:web:idhub.pangea.org:<...>"
webdid_url_owner = webdid_url + "#owner"
# Reemplazamos los campos del documento DID necesarios:
document["id"] = webdid_url
document["verificationMethod"][0]["id"] = webdid_url_owner
document["verificationMethod"][0]["controller"] = webdid_url
document["authentication"][0] = webdid_url_owner
document["assertionMethod"][0] = webdid_url_owner
document_fixed_serialized = json.dumps(document)
return webdid_url, document_fixed_serialized
def render_and_sign_credential(unsigned_vc: dict, jwk_issuer):
"""
Populates a VC template with data for issuance, and signs the result with the provided key.
The `vc_data` parameter must at a minimum include:
* issuer_did
* subject_did
* vc_id
and must include whatever other fields are relevant for the vc_template to be instantiated.
The following field(s) will be auto-generated if not passed in `vc_data`:
* issuance_date (to `datetime.datetime.now()`)
"""
async def inner():
signed_vc = await didkit.issue_credential(
json.dumps(unsigned_vc),
'{"proofFormat": "ldp"}',
jwk_issuer
)
return signed_vc
# if vc_data.get("issuance_date") is None:
# vc_data["issuance_date"] = datetime.datetime.now().replace(microsecond=0).isoformat()
#print(json.dumps(unsigned_vc))
return asyncio.run(inner())
def verify_credential(vc):
"""
Returns a (bool, str) tuple indicating whether the credential is valid.
If the boolean is true, the credential is valid and the second argument can be ignored.
If it is false, the VC is invalid and the second argument contains a JSON object with further information.
"""
async def inner():
try:
str_res = await didkit.verify_credential(vc, '{"proofFormat": "ldp"}')
except:
return False, "Invalid, corrupt, or tampered-with credential."
res = literal_eval(str_res)
ok = res["warnings"] == [] and res["errors"] == []
return ok, str_res
valid, reason = asyncio.run(inner())
if not valid:
return valid, reason
# Credential passes basic signature verification. Now check it against its schema.
# TODO: check agasint schema
pass
# Credential verifies against its schema. Now check revocation status.
vc = json.loads(vc)
if "credentialStatus" in vc:
vc_issuer = vc["credentialStatus"]["id"] # Either a DID:WEB or the special value in the line below
if vc_issuer == "https://revocation.not.supported/":
return True, "This credential does not support revocation"
revocation_index = int(vc["credentialStatus"]["revocationBitmapIndex"]) # NOTE: THIS FIELD SHOULD BE SERIALIZED AS AN INTEGER, BUT IOTA DOCUMENTAITON SERIALIZES IT AS A STRING. DEFENSIVE CAST ADDED JUST IN CASE.
issuer_did_document = json.loads(resolve_did(vc_issuer)) # TODO: implement a caching layer so we don't have to fetch the DID (and thus the revocation list) every time a VC is validated.
issuer_revocation_list = issuer_did_document["service"][0]
assert issuer_revocation_list["type"] == "RevocationBitmap2022"
revocation_bitmap = BitMap.deserialize(
zlib.decompress(
base64.b64decode(
issuer_revocation_list["serviceEndpoint"].rsplit(",")[1]
)
)
)
if revocation_index in revocation_bitmap:
return False, "Credential has been revoked by the issuer"
# Fallthrough means all is good.
return True, "Credential passes all checks"
def issue_verifiable_presentation(vc_list: list[str], jwk_holder: str, holder_did: str, presentation_id: str) -> str:
async def inner(unsigned_vp):
signed_vp = await didkit.issue_presentation(
unsigned_vp,
'{"proofFormat": "ldp"}',
jwk_holder
)
return signed_vp
unsigned_vp = json.dumps({
"@context": [
"https://www.w3.org/2018/credentials/v1"
],
"id": presentation_id,
"type": [
"VerifiablePresentation"
],
"holder": holder_did,
"verifiableCredential": vc_list
})
return asyncio.run(inner(unsigned_vp))
def verify_presentation(vp: str):
"""
Returns a (bool, str) tuple indicating whether the presentation is valid.
If the boolean is true, the presentation is valid and the second argument can be ignored.
If it is false, the VC is invalid and the second argument contains further information.
"""
async def inner():
str_res = await didkit.verify_presentation(vp, '{"proofFormat": "ldp"}')
res = literal_eval(str_res)
ok = res["warnings"] == [] and res["errors"] == []
return ok, str_res
valid, reason = asyncio.run(inner())
if not valid:
return valid, f"Presentation is invalid: {reason}"
vp = json.loads(vp)
for idx, credential in enumerate(vp["verifiableCredential"]):
valid, reason = verify_credential(json.dumps(credential))
if not valid:
return valid, f"Credential at index {idx} is invalid with reason: {reason}"
# Fallthrough means all is good.
return True, "Verifiable presentation passes all checks"