Compare commits

...

2 Commits

2 changed files with 95 additions and 39 deletions

View File

@ -1,6 +1,8 @@
[settings]
url = http://localhost:8000/api/snapshot/
token = '1234'
# wb_sign_token = "27de6ad7-cee2-4fe8-84d4-c7eea9c969c8"
# url_wallet = "http://localhost"
# path = /path/to/save
# device = your_device_name
# # erase = basic

View File

@ -6,18 +6,37 @@ import uuid
import hashlib
import argparse
import configparser
import urllib.parse
import urllib.request
import gettext
import locale
import logging
from pathlib import Path
from string import Template
from datetime import datetime
BASE_DIR = Path(__file__).resolve().parent
SNAPSHOT_BASE = {
'timestamp': str(datetime.now()),
'type': 'Snapshot',
'uuid': str(uuid.uuid4()),
'software': "workbench-script",
'version': "0.0.1",
'token_hash': "",
'data': {},
'erase': []
}
## Legacy Functions ##
def convert_to_legacy_snapshot(snapshot):
snapshot["sid"] = str(uuid.uuid4()).split("-")[0]
snapshot["sid"] = str(uuid.uuid4()).split("-")[1]
snapshot["software"] = "workbench-script"
snapshot["version"] = "dev"
snapshot["schema_api"] = "1.0.0"
@ -25,9 +44,9 @@ def convert_to_legacy_snapshot(snapshot):
snapshot["timestamp"] = snapshot["timestamp"].replace(" ", "T")
snapshot["data"]["smart"] = snapshot["data"]["disks"]
snapshot["data"].pop("disks")
snapshot.pop("code")
snapshot.pop("erase")
snapshot.pop("token_hash")
## End Legacy Functions ##
@ -54,26 +73,19 @@ def exec_cmd_erase(cmd):
return ''
# return os.popen(cmd).read()
def gen_code():
uid = str(uuid.uuid4()).encode('utf-8')
return hashlib.shake_256(uid).hexdigest(3)
## End Utility functions ##
SNAPSHOT_BASE = {
'timestamp': str(datetime.now()),
'type': 'Snapshot',
'uuid': str(uuid.uuid4()),
'code': gen_code(),
'software': "workbench-script",
'version': "0.0.1",
'data': {},
'erase': []
}
def convert_to_credential(snapshot):
snapshot["data"] = json.dumps(snapshot["data"])
file_path = os.path.join(BASE_DIR, "templates", "snapshot.json")
with open(file_path) as f:
ff = f.read()
template = Template(ff)
cred = template.substitute(**snapshot)
return cred
## Command Functions ##
## Erase Functions ##
## Xavier Functions ##
@ -270,20 +282,20 @@ def gen_snapshot(all_disks):
return snapshot
def save_snapshot_in_disk(snapshot, path):
def save_snapshot_in_disk(snapshot, path, snap_uuid):
snapshot_path = os.path.join(path, 'snapshots')
filename = "{}/{}_{}.json".format(
snapshot_path,
datetime.now().strftime("%Y%m%d-%H_%M_%S"),
snapshot['uuid'])
snap_uuid)
try:
if not os.path.exists(snapshot_path):
os.makedirs(snapshot_path)
logger.info(_("Created snapshots directory at '%s'"), snapshot_path)
with open(filename, "w") as f:
f.write(json.dumps(snapshot))
f.write(snapshot)
logger.info(_("Snapshot written in path '%s'"), filename)
except Exception as e:
try:
@ -291,16 +303,49 @@ def save_snapshot_in_disk(snapshot, path):
fallback_filename = "{}/{}_{}.json".format(
path,
datetime.now().strftime("%Y%m%d-%H_%M_%S"),
snapshot['uuid'])
snap_uuid)
with open(fallback_filename, "w") as f:
f.write(json.dumps(snapshot))
f.write(snapshot)
logger.warning(_("Snapshot written in fallback path '%s'"), fallback_filename)
except Exception as e:
logger.error(_("Could not save snapshot locally. Reason: Failed to write in fallback path:\n %s"), e)
def send_to_sign_credential(cred, token, url):
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
try:
data = json.dumps(cred).encode('utf-8')
request = urllib.request.Request(url, data=data, headers=headers)
with urllib.request.urlopen(request) as response:
status_code = response.getcode()
#response_text = response.read().decode('utf-8')
if 200 <= status_code < 300:
logger.info(_("Credential successfully signed"))
else:
logger.error(_("Credential cannot signed in '%s'"), url)
except Exception as e:
logger.error(_("Credential not remotely sent to URL '%s'. Do you have internet? Is your server up & running? Is the url token authorized?\n %s"), url, e)
# TODO sanitize url, if url is like this, it fails
# url = 'http://127.0.0.1:8000/api/snapshot/'
def send_snapshot_to_devicehub(snapshot, token, url):
url_components = urllib.parse.urlparse(url)
ev_path = "evidence/{}".format(snapshot["uuid"])
components = (url_components.schema, url_components.netloc, ev_path, '', '', '')
ev_url = urllib.parse.urlunparse(components)
# apt install qrencode
qr = "echo {} | qrencode -t ANSI".format(ev_url)
print(exec_cmd(qr))
print(ev_url)
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
@ -310,26 +355,17 @@ def send_snapshot_to_devicehub(snapshot, token, url):
request = urllib.request.Request(url, data=data, headers=headers)
with urllib.request.urlopen(request) as response:
status_code = response.getcode()
response_text = response.read().decode('utf-8')
#response_text = response.read().decode('utf-8')
if 200 <= status_code < 300:
logger.info(_("Snapshot successfully sent to '%s'"), url)
try:
response = json.loads(response_text)
if response.get('url'):
# apt install qrencode
qr = "echo {} | qrencode -t ANSI".format(response['url'])
print(exec_cmd(qr))
print("url: {}".format(response['url']))
if response.get("dhid"):
print("dhid: {}".format(response['dhid']))
except Exception:
logger.error(response_text)
else:
logger.error(_("Snapshot cannot sent to '%s'"), url)
except Exception as e:
logger.error(_("Snapshot not remotely sent to URL '%s'. Do you have internet? Is your server up & running? Is the url token authorized?\n %s"), url, e)
def load_config(config_file="settings.ini"):
"""
Tries to load configuration from a config file.
@ -349,6 +385,8 @@ def load_config(config_file="settings.ini"):
device = config.get('settings', 'device', fallback=None)
erase = config.get('settings', 'erase', fallback=None)
legacy = config.get('settings', 'legacy', fallback=None)
url_wallet = config.get('settings', 'url_wallet', fallback=None)
wb_sign_token = config.get('settings', 'wb_sign_token', fallback=None)
else:
logger.error(_("Config file '%s' not found. Using default values."), config_file)
path = os.path.join(os.getcwd())
@ -360,7 +398,9 @@ def load_config(config_file="settings.ini"):
'token': token,
'device': device,
'erase': erase,
'legacy': legacy
'legacy': legacy,
'wb_sign_token': wb_sign_token,
'url_wallet': url_wallet
}
def parse_args():
@ -422,16 +462,30 @@ def main():
all_disks = get_disks()
snapshot = gen_snapshot(all_disks)
snap_uuid = snapshot["uuid"]
if config['erase'] and config['device'] and not config.get("legacy"):
snapshot['erase'] = gen_erase(all_disks, config['erase'], user_disk=config['device'])
elif config['erase'] and not config.get("legacy"):
snapshot['erase'] = gen_erase(all_disks, config['erase'])
if config.get("wb_sign_token"):
tk = config.get("wb_sign_token").encode("utf8")
snapshot["token_hash"] = hashlib.hash256(tk).hexdigest()
if config.get("legacy"):
convert_to_legacy_snapshot(snapshot)
snapshot = json.dumps(snapshot)
else:
snapshot = convert_to_credential(snapshot)
url_wallet = config.get("url_wallet")
wb_sign_token = config.get("wb_sign_token")
if url_wallet and wb_sign_token:
snapshot = send_to_sign_credential(snapshot, wb_sign_token, url_wallet)
save_snapshot_in_disk(snapshot, config['path'])
save_snapshot_in_disk(snapshot, config['path'], snap_uuid)
if config['url']:
send_snapshot_to_devicehub(snapshot, config['token'], config['url'])