From 121efcbd096517fd50cbc2ea5035bb22a5919020 Mon Sep 17 00:00:00 2001 From: Cayo Puigdefabregas Date: Thu, 16 Jan 2025 13:13:33 +0100 Subject: [PATCH] the script base --- migration-script.py | 108 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 108 insertions(+) create mode 100644 migration-script.py diff --git a/migration-script.py b/migration-script.py new file mode 100644 index 0000000..5751130 --- /dev/null +++ b/migration-script.py @@ -0,0 +1,108 @@ +import os +import json +import django +import logging + +os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'dhub.settings') + +django.setup() + +from django.contrib.auth import get_user_model + +from utils.save_snapshots import move_json, save_in_disk +from evidence.parse import Build +from evidence.models import Annotation + + +logger = logging.getLogger('django') + +User = get_user_model() + +SEPARATOR = ";" +PATH_SNAPTHOPS = "examples/snapshots" +email_user = "user@example.org" + + +### read csv ### +def get_dict(row, header): + if not row or not header: + return + if len(row) != len(header): + return + + return {row[i]: header[i] for i in range(len(header))} + + +def open_csv(csv): + # return a list of dictionaries whith the header of csv as keys of the dicts + with open(csv) as f: + _file = f.read() + + rows = _file.split("\n") + if len(rows) < 2: + return [] + + header = rows[0].split(SEPARATOR) + return [get_dict(row, header) for row in rows[1:]] +### end read csv ### + + +### read snapshot ### +def search_snapshot_from_uuid(uuid): + # search snapshot from uuid + for root, _, files in os.walk(PATH_SNAPTHOPS): + for f in files: + if uuid in f: + return os.path.join(root, f) + + +def open_snapshot(uuid): + snapshot_path = search_snapshot_from_uuid(uuid) + if not snapshot_path: + return None, None + + with open(snapshot_path) as f: + try: + snap = json.loads(f.read()) + except Exception as err: + logger.error("uuid: {}, error: {}".format(uuid, err)) + return None, None + return snap, snapshot_path +### end read snapshot ### + + +def create_custom_id(dhid, uuid, user): + tag = Annotation.objects.filter( + uuid=uuid, + type=Annotation.Type.SYSTEM, + key='CUSTOM_ID', + owner=user.institution + ).first() + + if tag: + return + + Annotation.objects.create( + uuid=uuid, + type=Annotation.Type.SYSTEM, + key='CUSTOM_ID', + value=dhid, + owner=user.institution, + user=user + ) + + +def migrate_snapshots(row, user): + dhid = row.get("dhid") + uuid = row.get("uuid") + snapshot, snapshot_path = open_snapshot(uuid) + if not snapshot or not snapshot_path: + return + + # insert snapshot + path_name = save_in_disk(snapshot, user.institution.name) + Build(path_name, user) + move_json(path_name, user.institution.name) + + # insert dhid + create_custom_id(dhid, uuid)