diff --git a/deploy-workbench.sh b/deploy-workbench.sh index d0884bc..3effbef 100755 --- a/deploy-workbench.sh +++ b/deploy-workbench.sh @@ -327,7 +327,7 @@ echo 'Install requirements' apt-get install -y --no-install-recommends \ sudo locales keyboard-configuration console-setup qrencode \ python-is-python3 python3 python3-dev python3-pip pipenv \ - dmidecode smartmontools hwinfo pciutils lshw nfs-common < /dev/null + dmidecode smartmontools hwinfo pciutils lshw nfs-common inxi < /dev/null # Install lshw B02.19 utility using backports (DEPRECATED in Debian 12) #apt install -y -t ${VERSION_CODENAME}-backports lshw < /dev/null diff --git a/install-dependencies.sh b/install-dependencies.sh index bb97b97..e1f7345 100755 --- a/install-dependencies.sh +++ b/install-dependencies.sh @@ -9,7 +9,7 @@ set -u set -x main() { - sudo apt install smartmontools lshw hwinfo dmidecode + sudo apt install qrencode smartmontools lshw hwinfo dmidecode inxi } main "${@}" diff --git a/settings.ini.example b/settings.ini.example index d63b976..33f4dad 100644 --- a/settings.ini.example +++ b/settings.ini.example @@ -1,9 +1,13 @@ [settings] -url = http://localhost:8000/api/v1/snapshot/ +url = http://localhost:8000/api/snapshot/ #url = https://demo.ereuse.org/api/v1/snapshot/ # sample token that works with default deployment such as the previous two urls token = 5018dd65-9abd-4a62-8896-80f34ac66150 +# Idhub +# wb_sign_token = "27de6ad7-cee2-4fe8-84d4-c7eea9c969c8" +# url_wallet = "http://localhost" + # path = /path/to/save # device = your_device_name # # erase = basic diff --git a/workbench-script.py b/workbench-script.py index cc85629..7d49b9b 100644 --- a/workbench-script.py +++ b/workbench-script.py @@ -16,21 +16,17 @@ import logging from datetime import datetime -## Legacy Functions ## +SNAPSHOT_BASE = { + 'timestamp': str(datetime.now()), + 'type': 'Snapshot', + 'uuid': str(uuid.uuid4()), + 'software': "workbench-script", + 'version': "0.0.1", + 'operator_id': "", + 'data': {}, + 'erase': [] +} -def convert_to_legacy_snapshot(snapshot): - snapshot["sid"] = str(uuid.uuid4()).split("-")[0] - snapshot["software"] = "workbench-script" - snapshot["version"] = "dev" - snapshot["schema_api"] = "1.0.0" - snapshot["settings_version"] = "No Settings Version (NaN)" - snapshot["timestamp"] = snapshot["timestamp"].replace(" ", "T") - snapshot["data"]["smart"] = snapshot["data"]["disks"] - snapshot["data"]["lshw"] = json.loads(snapshot["data"]["lshw"]) - snapshot["data"].pop("disks") - snapshot.pop("erase") - -## End Legacy Functions ## ## Utility Functions ## @@ -50,6 +46,7 @@ def exec_cmd(cmd): logger.info(_('Running command `%s`'), cmd) return os.popen(cmd).read() + @logs def exec_cmd_erase(cmd): logger.info(_('Running command `%s`'), cmd) @@ -59,15 +56,33 @@ def exec_cmd_erase(cmd): ## End Utility functions ## -SNAPSHOT_BASE = { - 'timestamp': str(datetime.now()), - 'type': 'Snapshot', - 'uuid': str(uuid.uuid4()), - 'software': "workbench-script", - 'version': "0.0.1", - 'data': {}, - 'erase': [] -} +## Legacy Functions ## + +def convert_to_legacy_snapshot(snapshot): + snapshot["sid"] = str(uuid.uuid4()).split("-")[1] + snapshot["software"] = "workbench-script" + snapshot["version"] = "dev" + snapshot["schema_api"] = "1.0.0" + snapshot["settings_version"] = "No Settings Version (NaN)" + snapshot["timestamp"] = snapshot["timestamp"].replace(" ", "T") + snapshot["data"]["smart"] = json.loads(snapshot["data"]["smartctl"]) + snapshot["data"].pop("smartctl") + snapshot["data"].pop("inxi") + snapshot.pop("operator_id") + snapshot.pop("erase") + + lshw = 'sudo lshw -json' + hwinfo = 'sudo hwinfo --reallyall' + lspci = 'sudo lspci -vv' + + data = { + 'lshw': exec_cmd(lshw) or "{}", + 'hwinfo': exec_cmd(hwinfo), + 'lspci': exec_cmd(lspci) + } + snapshot['data'].update(data) + +## End Legacy Functions ## ## Command Functions ## @@ -237,7 +252,7 @@ def smartctl(all_disks, disk=None): data = exec_smart(disk['name']) data_list.append(data) - return data_list + return json.dumps(data_list) ## End Command Functions ## @@ -245,16 +260,13 @@ def smartctl(all_disks, disk=None): # TODO permitir selección # TODO permitir que vaya más rápido def get_data(all_disks): - lshw = 'sudo lshw -json' - hwinfo = 'sudo hwinfo --reallyall' dmidecode = 'sudo dmidecode' - lspci = 'sudo lspci -vv' + inxi = "sudo inxi -afmnGEMABD -x 3 --edid --output json --output-file print" + data = { - 'lshw': exec_cmd(lshw) or "{}", - 'disks': smartctl(all_disks), - 'hwinfo': exec_cmd(hwinfo), + 'smartctl': smartctl(all_disks), 'dmidecode': exec_cmd(dmidecode), - 'lspci': exec_cmd(lspci) + 'inxi': exec_cmd(inxi) } return data @@ -266,20 +278,20 @@ def gen_snapshot(all_disks): return snapshot -def save_snapshot_in_disk(snapshot, path): +def save_snapshot_in_disk(snapshot, path, snap_uuid): snapshot_path = os.path.join(path, 'snapshots') filename = "{}/{}_{}.json".format( snapshot_path, datetime.now().strftime("%Y%m%d-%H_%M_%S"), - snapshot['uuid']) + snap_uuid) try: if not os.path.exists(snapshot_path): os.makedirs(snapshot_path) logger.info(_("Created snapshots directory at '%s'"), snapshot_path) with open(filename, "w") as f: - f.write(json.dumps(snapshot)) + f.write(snapshot) logger.info(_("Snapshot written in path '%s'"), filename) except Exception as e: try: @@ -287,18 +299,59 @@ def save_snapshot_in_disk(snapshot, path): fallback_filename = "{}/{}_{}.json".format( path, datetime.now().strftime("%Y%m%d-%H_%M_%S"), - snapshot['uuid']) + snap_uuid) with open(fallback_filename, "w") as f: - f.write(json.dumps(snapshot)) + f.write(snapshot) logger.warning(_("Snapshot written in fallback path '%s'"), fallback_filename) except Exception as e: logger.error(_("Could not save snapshot locally. Reason: Failed to write in fallback path:\n %s"), e) + +def send_to_sign_credential(snapshot, token, url): + headers = { + "Authorization": f"Bearer {token}", + "Content-Type": "application/json" + } + + try: + cred = { + "type": "DeviceSnapshotV1", + "save": False, + "data": { + "operator_id": snapshot["operator_id"], + "dmidecode": snapshot["data"]["dmidecode"], + "inxi": snapshot["data"]["inxi"], + "smartctl": snapshot["data"]["smartctl"], + "uuid": snapshot["uuid"], + } + } + + data = json.dumps(cred).encode('utf-8') + request = urllib.request.Request(url, data=data, headers=headers) + with urllib.request.urlopen(request) as response: + status_code = response.getcode() + response_text = response.read().decode('utf-8') + + if 200 <= status_code < 300: + logger.info(_("Credential successfully signed")) + res = json.loads(response_text) + if res.get("status") == "success" and res.get("data"): + return res["data"] + return json.dumps(snapshot) + else: + logger.error(_("Credential cannot signed in '%s'"), url) + return json.dumps(snapshot) + + except Exception as e: + logger.error(_("Credential not remotely builded to URL '%s'. Do you have internet? Is your server up & running? Is the url token authorized?\n %s"), url, e) + return json.dumps(snapshot) + + # TODO sanitize url, if url is like this, it fails # url = 'http://127.0.0.1:8000/api/snapshot/' -def send_snapshot_to_devicehub(snapshot, token, url, legacy): +def send_snapshot_to_devicehub(snapshot, token, url, ev_uuid, legacy): url_components = urllib.parse.urlparse(url) - ev_path = "evidence/{}".format(snapshot["uuid"]) + ev_path = f"evidence/{ev_uuid}" components = (url_components.scheme, url_components.netloc, ev_path, '', '', '') ev_url = urllib.parse.urlunparse(components) # apt install qrencode @@ -308,7 +361,7 @@ def send_snapshot_to_devicehub(snapshot, token, url, legacy): "Content-Type": "application/json" } try: - data = json.dumps(snapshot).encode('utf-8') + data = snapshot.encode('utf-8') request = urllib.request.Request(url, data=data, headers=headers) with urllib.request.urlopen(request) as response: status_code = response.getcode() @@ -335,15 +388,10 @@ def send_snapshot_to_devicehub(snapshot, token, url, legacy): print(exec_cmd(qr)) print(f"url: {ev_url}") else: - logger.error(_("Snapshot %s could not be sent to URL '%s'"), snapshot["uuid"], url) - # TODO review all the try-except thing here; maybe the try inside legacy does not make sense anymore - except urllib.error.HTTPError as e: - error_details = e.read().decode('utf-8') # Get the error response body - logger.error(_("Snapshot %s not remotely sent to URL '%s'. Server responded with error:\n %s"), - snapshot["uuid"], url, error_details) + logger.error(_("Snapshot %s not remotely sent to URL '%s'. Server responded with error:\n %s"), ev_uuid, url, response_text) except Exception as e: - logger.error(_("Snapshot %s not remotely sent to URL '%s'. Do you have internet? Is your server up & running? Is the url token authorized?\n %s"), snapshot["uuid"], url, e) + logger.error(_("Snapshot not remotely sent to URL '%s'. Do you have internet? Is your server up & running? Is the url token authorized?\n %s"), url, e) def load_config(config_file="settings.ini"): @@ -365,10 +413,12 @@ def load_config(config_file="settings.ini"): device = config.get('settings', 'device', fallback=None) erase = config.get('settings', 'erase', fallback=None) legacy = config.get('settings', 'legacy', fallback=None) + url_wallet = config.get('settings', 'url_wallet', fallback=None) + wb_sign_token = config.get('settings', 'wb_sign_token', fallback=None) else: logger.error(_("Config file '%s' not found. Using default values."), config_file) path = os.path.join(os.getcwd()) - url, token, device, erase, legacy = None, None, None, None, None + url, token, device, erase, legacy, url_wallet, wb_sign_token = (None,)*7 return { 'path': path, @@ -376,7 +426,9 @@ def load_config(config_file="settings.ini"): 'token': token, 'device': device, 'erase': erase, - 'legacy': legacy + 'legacy': legacy, + 'wb_sign_token': wb_sign_token, + 'url_wallet': url_wallet } def parse_args(): @@ -439,19 +491,39 @@ def main(): all_disks = get_disks() snapshot = gen_snapshot(all_disks) + snap_uuid = snapshot["uuid"] - if config['erase'] and config['device'] and not config.get("legacy"): + if config['erase'] and config['device'] and not legacy: snapshot['erase'] = gen_erase(all_disks, config['erase'], user_disk=config['device']) - elif config['erase'] and not config.get("legacy"): + elif config['erase'] and not legacy: snapshot['erase'] = gen_erase(all_disks, config['erase']) if legacy: convert_to_legacy_snapshot(snapshot) + snapshot = json.dumps(snapshot) + else: + url_wallet = config.get("url_wallet") + wb_sign_token = config.get("wb_sign_token") - save_snapshot_in_disk(snapshot, config['path']) + if wb_sign_token: + tk = wb_sign_token.encode("utf8") + snapshot["operator_id"] = hashlib.sha3_256(tk).hexdigest() + + if url_wallet and wb_sign_token: + snapshot = send_to_sign_credential(snapshot, wb_sign_token, url_wallet) + else: + snapshot = json.dumps(snapshot) + + save_snapshot_in_disk(snapshot, config['path'], snap_uuid) if config['url']: - send_snapshot_to_devicehub(snapshot, config['token'], config['url'], legacy) + send_snapshot_to_devicehub( + snapshot, + config['token'], + config['url'], + snap_uuid, + legacy + ) logger.info(_("END"))