Limpiado el sistema de tests previo a integración en la pipeline automática

This commit is contained in:
daniel 2024-02-22 12:09:08 +01:00
parent 594c1854a5
commit d269adb2aa
3 changed files with 157 additions and 161 deletions

View File

@ -1,18 +1,50 @@
import asyncio
import base64
import datetime
import copy
import zlib
from typing import Any
from typing import Callable, Any
import didkit
import json
import jinja2
from jinja2 import Environment, FileSystemLoader, select_autoescape
from ast import literal_eval
from pyroaring import BitMap
def deep_merge_dict_inplace(d1: dict, d2: dict):
"""
Implements d1 |= d2, but recursively.
Merges d1 and d2, giving preference to keys in d2.
Keys in d1 but not in d2 are left as-is.
"""
for key, val in d2.items():
if isinstance(d1.get(key, None), dict) and isinstance(val, dict):
deep_merge_dict_inplace(d1[key], val)
continue
d1[key] = val
def deep_merge_dict(d1: dict, d2: dict) -> dict:
"""
Implements d1 | d2, but recursively.
Merges d1 and d2, giving preference to keys in d2.
Keys in d1 but not in d2 are left as-is.
"""
d1 = copy.deepcopy(d1)
deep_merge_dict_inplace(d1, d2)
return d1
def deep_filter_dict(f: Callable[[Any], bool], d: dict):
"""
Implements builtin filter(), but recursively.
Applies f to all k,v pairs in d. If some v is a dict, recurse into v instead of applying f(v) directly.
"""
for key, val in d.items():
if isinstance(val, dict):
yield key, dict(deep_filter_dict(f, val))
elif f(val):
yield key, val
def generate_did_controller_key():
return didkit.generate_ed25519_key()
@ -21,11 +53,6 @@ def keydid_from_controller_key(key):
return didkit.key_to_did("key", key)
def generate_generic_vc_id():
# TODO agree on a system for Verifiable Credential IDs
return "https://pangea.org/credentials/42"
def resolve_did(keydid):
async def inner():
return await didkit.resolve_did(keydid, "{}")
@ -81,21 +108,6 @@ def render_and_sign_credential(unsigned_vc: dict, jwk_issuer):
return asyncio.run(inner())
def sign_credential(unsigned_vc: str, jwk_issuer):
"""
Signs the unsigned credential with the provided key.
"""
async def inner():
signed_vc = await didkit.issue_credential(
unsigned_vc,
'{"proofFormat": "ldp"}',
jwk_issuer
)
return signed_vc
return asyncio.run(inner())
def verify_credential(vc):
"""
Returns a (bool, str) tuple indicating whether the credential is valid.
@ -103,7 +115,10 @@ def verify_credential(vc):
If it is false, the VC is invalid and the second argument contains a JSON object with further information.
"""
async def inner():
str_res = await didkit.verify_credential(vc, '{"proofFormat": "ldp"}')
try:
str_res = await didkit.verify_credential(vc, '{"proofFormat": "ldp"}')
except:
return False, "Invalid, corrupt, or tampered-with credential."
res = literal_eval(str_res)
ok = res["warnings"] == [] and res["errors"] == []
return ok, str_res
@ -117,8 +132,10 @@ def verify_credential(vc):
# Credential verifies against its schema. Now check revocation status.
vc = json.loads(vc)
if "credentialStatus" in vc:
vc_issuer = vc["credentialStatus"]["id"] # Either a DID:WEB or the special value in the line below
if vc_issuer == "https://revocation.not.supported/":
return True, "This credential does not support revocation"
revocation_index = int(vc["credentialStatus"]["revocationBitmapIndex"]) # NOTE: THIS FIELD SHOULD BE SERIALIZED AS AN INTEGER, BUT IOTA DOCUMENTAITON SERIALIZES IT AS A STRING. DEFENSIVE CAST ADDED JUST IN CASE.
vc_issuer = vc["issuer"]["id"] # This is a DID
issuer_did_document = json.loads(resolve_did(vc_issuer)) # TODO: implement a caching layer so we don't have to fetch the DID (and thus the revocation list) every time a VC is validated.
issuer_revocation_list = issuer_did_document["service"][0]
assert issuer_revocation_list["type"] == "RevocationBitmap2022"
@ -135,9 +152,8 @@ def verify_credential(vc):
return True, "Credential passes all checks"
def issue_verifiable_presentation(vc_list: list[str], jwk_holder: str, holder_did: str) -> str:
async def inner():
unsigned_vp = unsigned_vp_template.render(data)
def issue_verifiable_presentation(vc_list: list[str], jwk_holder: str, holder_did: str, presentation_id: str) -> str:
async def inner(unsigned_vp):
signed_vp = await didkit.issue_presentation(
unsigned_vp,
'{"proofFormat": "ldp"}',
@ -145,27 +161,40 @@ def issue_verifiable_presentation(vc_list: list[str], jwk_holder: str, holder_di
)
return signed_vp
env = Environment(
loader=FileSystemLoader("vc_templates"),
autoescape=select_autoescape()
)
unsigned_vp_template = env.get_template("verifiable_presentation.json")
data = {
"holder_did": holder_did,
"verifiable_credential_list": "[" + ",".join(vc_list) + "]"
}
unsigned_vp = json.dumps({
"@context": [
"https://www.w3.org/2018/credentials/v1"
],
"id": presentation_id,
"type": [
"VerifiablePresentation"
],
"holder": holder_did,
"verifiableCredential": vc_list
})
return asyncio.run(inner())
return asyncio.run(inner(unsigned_vp))
def verify_presentation(vp):
def verify_presentation(vp: str):
"""
Returns a (bool, str) tuple indicating whether the presentation is valid.
If the boolean is true, the presentation is valid and the second argument can be ignored.
If it is false, the VC is invalid and the second argument contains a JSON object with further information.
If it is false, the VC is invalid and the second argument contains further information.
"""
async def inner():
proof_options = '{"proofFormat": "ldp"}'
return await didkit.verify_presentation(vp, proof_options)
str_res = await didkit.verify_presentation(vp, '{"proofFormat": "ldp"}')
res = literal_eval(str_res)
ok = res["warnings"] == [] and res["errors"] == []
return ok, str_res
return asyncio.run(inner())
valid, reason = asyncio.run(inner())
if not valid:
return valid, f"Presentation is invalid: {reason}"
vp = json.loads(vp)
for idx, credential in enumerate(vp["verifiableCredential"]):
valid, reason = verify_credential(json.dumps(credential))
if not valid:
return valid, f"Credential at index {idx} is invalid with reason: {reason}"
# Fallthrough means all is good.
return True, "Verifiable presentation passes all checks"

195
main.py
View File

@ -1,50 +1,11 @@
import asyncio
from typing import Callable, Any
import didkit
import json
from jinja2 import Environment, FileSystemLoader, select_autoescape
import idhub_ssikit
from ast import literal_eval
import copy
import logging
from sys import stderr
def deep_merge_dict_inplace(d1: dict, d2: dict):
"""
Implements d1 |= d2, but recursively.
Merges d1 and d2, giving preference to keys in d2.
Keys in d1 but not in d2 are left as-is.
"""
for key, val in d2.items():
if isinstance(d1.get(key, None), dict) and isinstance(val, dict):
deep_merge_dict_inplace(d1[key], val)
continue
d1[key] = val
def deep_merge_dict(d1: dict, d2: dict) -> dict:
"""
Implements d1 | d2, but recursively.
Merges d1 and d2, giving preference to keys in d2.
Keys in d1 but not in d2 are left as-is.
"""
d1 = copy.deepcopy(d1)
deep_merge_dict_inplace(d1, d2)
return d1
def deep_filter_dict(f: Callable[[Any], bool], d: dict):
"""
Implements builtin filter(), but recursively.
Applies f to all k,v pairs in d. If some v is a dict, recurse into v instead of applying f(v) directly.
"""
for key, val in d.items():
if isinstance(val, dict):
yield key, dict(deep_filter_dict(f, val))
elif f(val):
yield key, val
def test_all_vcs(use_webdid=False):
def test_all_use_cases():
vcs = [
'membership-card',
'financial-vulnerability',
@ -52,24 +13,71 @@ def test_all_vcs(use_webdid=False):
'federation-membership',
'e-operator-claim'
]
# Test basic VC issuance: did:key, no revocation checks (they are not supported with did:key)
for vc in vcs:
print(f"trying {vc}... ", end="")
print(f"trying {vc} in did:key mode... ", end="", file=stderr)
try:
signed_cred = issue_vc_test_newstyle(vc, use_webdid)
signed_cred = issue_vc_test_newstyle(vc, use_web=False)
ok, err = idhub_ssikit.verify_credential(signed_cred)
if ok:
print("OK")
print("OK", file=stderr)
else:
print("FAILED!", err)
open(f'/tmp/{vc}', mode='w').write(signed_cred)
print("FAILED!", err, file=stderr)
except Exception as e:
print("FAILED! With exception:")
print(e)
logging.exception("FAILED! With exception:")
# Test VC issuance using did:web DIDs for the issuer, unrevoked, and check revocation
print("", file=stderr)
for vc in vcs:
print(f"trying {vc} in did:web mode, unrevoked... ", end="", file=stderr)
try:
signed_cred = issue_vc_test_newstyle(vc, use_web=True, did_revokes_vc=False)
ok, err = idhub_ssikit.verify_credential(signed_cred)
if ok:
print("OK", file=stderr)
else:
print("FAILED!", err, file=stderr)
except Exception as e:
logging.exception("FAILED! With exception:")
# Test VC issuance using did:web DIDs for the issuer, *revoked*, and check revocation
print("", file=stderr)
for vc in vcs:
print(f"trying {vc} in did:web mode, *REVOKED*... ", end="", file=stderr)
try:
signed_cred = issue_vc_test_newstyle(vc, use_web=True, did_revokes_vc=True)
ok, err = idhub_ssikit.verify_credential(signed_cred)
if not ok:
print("OK", file=stderr)
else:
print("FAILED! Credential ", err, file=stderr)
except Exception as e:
logging.exception("FAILED! With exception:")
# Test VC resistance to tampering by modifying a credential after signature
print("", file=stderr)
for vc in vcs:
print(f"tampering {vc} after issuance, check that it fails to verify... ", end="", file=stderr)
try:
issue_vc_test_and_fail_verification(vc) # All went well if this doesn't raise an exception
print("OK", file=stderr)
except Exception as e:
logging.exception("FAILED!")
# Test VP issuance and signature
print("", file=stderr)
print(f"doing end-to-end VP test, expected success... ", end="", file=stderr)
ok, reason = issue_and_sign_vp_test(revoked_credential=False)
assert ok is True
print("OK", file=stderr)
print(f"doing end-to-end VP test, expected failure... ", end="", file=stderr)
ok, reason = issue_and_sign_vp_test(revoked_credential=True)
assert ok is False
print("OK", file=stderr)
def issue_vc_test_newstyle(vc_name, use_web=True, did_revokes_vc=False, check_revocation=False):
def issue_vc_test_newstyle(vc_name,
use_web=True,
did_revokes_vc=False,
holder_jwk=None):
jwk_issuer = '{"kty":"OKP","crv":"Ed25519","x":"piojLFIHQ4Z6heRuPI87nrfMJKdet1dJIPG15iGjmDE","d":"zpOBTDrp_iNQTY5nZlIxLA34Sl7FXWXNGehFktznxTM"}'
jwk_subject = '{"kty":"OKP","crv":"Ed25519","x":"BuKyt44QKYSX6kmAt771ai37lIFNwYlhugWXPiqcyYU","d":"qbvMhSCPKvQ-vSkqNr3q8gWY5zPUj7ry0t2YnmT7agc"}'
jwk_subject = holder_jwk or '{"kty":"OKP","crv":"Ed25519","x":"BuKyt44QKYSX6kmAt771ai37lIFNwYlhugWXPiqcyYU","d":"qbvMhSCPKvQ-vSkqNr3q8gWY5zPUj7ry0t2YnmT7agc"}'
did_subject = didkit.key_to_did("key", jwk_subject)
if use_web:
if did_revokes_vc:
@ -81,39 +89,34 @@ def issue_vc_test_newstyle(vc_name, use_web=True, did_revokes_vc=False, check_re
vc_template = json.load(open(f'schemas/vc_templates/{vc_name}.json'))
data_base = json.load(open(f'schemas/vc_examples/base--data.json'))
data_specific = json.load(open(f'schemas/vc_examples/{vc_name}--data.json'))
data = deep_merge_dict(data_base, data_specific)
data = idhub_ssikit.deep_merge_dict(data_base, data_specific)
data["issuer"]["id"] = did_issuer
data["credentialSubject"]["id"] = did_subject
data["credentialStatus"]["id"] = did_issuer
data["credentialStatus"]["revocationBitmapIndex"] = "420"
vc_rendered_unsigned = deep_merge_dict(vc_template, data)
if use_web:
data["credentialStatus"]["id"] = did_issuer
data["credentialStatus"]["revocationBitmapIndex"] = 42
vc_rendered_unsigned = idhub_ssikit.deep_merge_dict(vc_template, data)
signed_credential = idhub_ssikit.render_and_sign_credential(
vc_rendered_unsigned,
jwk_issuer,
)
if check_revocation:
assert use_web is True
ok, reason = idhub_ssikit.verify_credential(signed_credential)
print(ok)
print(reason)
return signed_credential
def issue_vc_test_and_fail_verification(vc_name):
signed_credential = issue_vc_test_newstyle(vc_name)
verification_result = idhub_ssikit.verify_credential(signed_credential)
print(verification_result)
def replace(s, position, character):
return s[:position] + character + s[position+1:]
signed_credential = issue_vc_test_newstyle(vc_name)
ok, reason = idhub_ssikit.verify_credential(signed_credential)
assert ok is True, (ok, reason)
signed_credential = replace(signed_credential, (len(signed_credential)//4)*3, ".")
verification_result = idhub_ssikit.verify_credential(signed_credential)
print(verification_result)
ok, reason = idhub_ssikit.verify_credential(signed_credential)
assert ok is False, (ok, reason)
def issue_and_sign_vp_test():
def issue_and_sign_vp_test(revoked_credential=False):
"""
In this example execution two Verifiable Credentials associated with a single Holder are issued and then
combined into a single Verifiable Presentation.
@ -123,51 +126,15 @@ def issue_and_sign_vp_test():
- Issuer B being "EXO" foundation,
- Verifier (not pictured) being "Som Connexio", which wants verifiable data of the Holder from both Issuers.
"""
jwk_issuer = didkit.generate_ed25519_key()
jwk_issuer2 = didkit.generate_ed25519_key()
jwk_subject = didkit.generate_ed25519_key()
did_issuer = didkit.key_to_did("key", jwk_issuer)
did_issuer2 = didkit.key_to_did("key", jwk_issuer2)
did_subject = didkit.key_to_did("key", jwk_subject)
print(did_issuer)
print(did_issuer2)
print(did_subject)
# TODO: WE'RE NO LONGER USING JINJA2
env = Environment(
loader=FileSystemLoader("vc_templates"),
autoescape=select_autoescape()
holder_jwk = didkit.generate_ed25519_key()
holder_did = didkit.key_to_did("key", holder_jwk)
vc1 = issue_vc_test_newstyle('membership-card', use_web=True, holder_jwk=holder_jwk)
vc2 = issue_vc_test_newstyle('course-credential', use_web=True, did_revokes_vc=revoked_credential, holder_jwk=holder_jwk)
signed_presentation = idhub_ssikit.issue_verifiable_presentation(
[json.loads(vc1), json.loads(vc2)],
holder_jwk,
holder_did,
"https://idhub.pangea.org/presentations/42"
)
unsigned_vc_template = env.get_template("member.json")
data = {
"vc_id": "http://example.org/credentials/3731",
"issuer_did": did_issuer,
"subject_did": did_subject,
"issuance_date": "2020-08-19T21:41:50Z",
"subject_is_member_of": "Pangea"
}
signed_credential = idhub_ssikit.render_and_sign_credential(
unsigned_vc_template,
jwk_issuer,
data
)
data2 = data
data2["issuer_did"] = did_issuer2
signed_credential2 = idhub_ssikit.render_and_sign_credential(
unsigned_vc_template,
jwk_issuer2,
data2
)
signed_presentation = idhub_ssikit.issue_verifiable_presentation([signed_credential, signed_credential2], jwk_subject, did_subject)
print("##############--- SIGNED PRESENTATION ---##############")
print(signed_presentation)
print("##############--- ------------------- ---##############")
res = idhub_ssikit.verify_presentation(signed_presentation)
print(res)
def scratch():
jwk_issuer = didkit.generate_ed25519_key()
did_issuer = didkit.key_to_did("key", jwk_issuer)
return idhub_ssikit.verify_presentation(signed_presentation)

@ -1 +1 @@
Subproject commit f1e6a33d0801a2104bf297292b953b1d138f675b
Subproject commit 90ce148f88e18dead0d9b2f94803236d6312ee09