Merge pull request 'legacy-mode' (#3) from legacy-mode into main
Reviewed-on: #3
This commit is contained in:
commit
9954f70e6c
|
@ -4,3 +4,4 @@ token = '1234'
|
||||||
# path = /path/to/save
|
# path = /path/to/save
|
||||||
# device = your_device_name
|
# device = your_device_name
|
||||||
# # erase = basic
|
# # erase = basic
|
||||||
|
# legacy = true
|
||||||
|
|
|
@ -6,13 +6,27 @@ import uuid
|
||||||
import hashlib
|
import hashlib
|
||||||
import argparse
|
import argparse
|
||||||
import configparser
|
import configparser
|
||||||
|
import urllib.request
|
||||||
import requests
|
|
||||||
|
|
||||||
|
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
|
||||||
|
|
||||||
|
## Legacy Functions ##
|
||||||
|
def convert_to_legacy_snapshot(snapshot):
|
||||||
|
snapshot["sid"] = str(uuid.uuid4()).split("-")[0]
|
||||||
|
snapshot["software"] = "workbench-script"
|
||||||
|
snapshot["version"] = "dev"
|
||||||
|
snapshot["schema_api"] = "1.0.0"
|
||||||
|
snapshot["settings_version"] = "No Settings Version (NaN)"
|
||||||
|
snapshot["timestamp"] = snapshot["timestamp"].replace(" ", "T")
|
||||||
|
snapshot["data"]["smart"] = snapshot["data"]["disks"]
|
||||||
|
snapshot["data"].pop("disks")
|
||||||
|
snapshot.pop("code")
|
||||||
|
snapshot.pop("erase")
|
||||||
|
|
||||||
|
## End Legacy Functions ##
|
||||||
|
|
||||||
|
|
||||||
## Utility Functions ##
|
## Utility Functions ##
|
||||||
def logs(f):
|
def logs(f):
|
||||||
def wrapper(*args, **kwargs):
|
def wrapper(*args, **kwargs):
|
||||||
|
@ -27,6 +41,7 @@ def logs(f):
|
||||||
|
|
||||||
@logs
|
@logs
|
||||||
def exec_cmd(cmd):
|
def exec_cmd(cmd):
|
||||||
|
print(f'workbench: INFO: running command `{cmd}`')
|
||||||
return os.popen(cmd).read()
|
return os.popen(cmd).read()
|
||||||
|
|
||||||
@logs
|
@logs
|
||||||
|
@ -206,7 +221,7 @@ def gen_erase(all_disks, type_erase, user_disk=None):
|
||||||
|
|
||||||
@logs
|
@logs
|
||||||
def exec_smart(disk):
|
def exec_smart(disk):
|
||||||
cmd = f'smartctl -x --json=cosviu /dev/{disk}'
|
cmd = f'sudo smartctl -x --json=cosviu /dev/{disk}'
|
||||||
return json.loads(exec_cmd(cmd))
|
return json.loads(exec_cmd(cmd))
|
||||||
|
|
||||||
|
|
||||||
|
@ -214,7 +229,7 @@ def exec_smart(disk):
|
||||||
def smartctl(all_disks, disk=None):
|
def smartctl(all_disks, disk=None):
|
||||||
|
|
||||||
if disk:
|
if disk:
|
||||||
return exec_smart(disk)
|
return [exec_smart(disk)]
|
||||||
|
|
||||||
data_list = []
|
data_list = []
|
||||||
for disk in all_disks:
|
for disk in all_disks:
|
||||||
|
@ -233,11 +248,13 @@ def get_data(all_disks):
|
||||||
lshw = 'sudo lshw -json'
|
lshw = 'sudo lshw -json'
|
||||||
hwinfo = 'sudo hwinfo --reallyall'
|
hwinfo = 'sudo hwinfo --reallyall'
|
||||||
dmidecode = 'sudo dmidecode'
|
dmidecode = 'sudo dmidecode'
|
||||||
|
lspci = 'sudo lspci -vv'
|
||||||
data = {
|
data = {
|
||||||
'lshw': exec_cmd(lshw),
|
'lshw': exec_cmd(lshw) or "{}",
|
||||||
'disks': smartctl(all_disks),
|
'disks': smartctl(all_disks),
|
||||||
'hwinfo': exec_cmd(hwinfo),
|
'hwinfo': exec_cmd(hwinfo),
|
||||||
'dmidecode': exec_cmd(dmidecode)
|
'dmidecode': exec_cmd(dmidecode),
|
||||||
|
'lspci': exec_cmd(lspci)
|
||||||
}
|
}
|
||||||
|
|
||||||
return data
|
return data
|
||||||
|
@ -282,14 +299,39 @@ def save_snapshot_in_disk(snapshot, path):
|
||||||
# url = 'http://127.0.0.1:8000/api/snapshot/'
|
# url = 'http://127.0.0.1:8000/api/snapshot/'
|
||||||
def send_snapshot_to_devicehub(snapshot, token, url):
|
def send_snapshot_to_devicehub(snapshot, token, url):
|
||||||
headers = {
|
headers = {
|
||||||
f"Authorization": "Basic {token}",
|
"Authorization": f"Bearer {token}",
|
||||||
"Content-Type": "application/json"
|
"Content-Type": "application/json"
|
||||||
}
|
}
|
||||||
try:
|
try:
|
||||||
requests.post(url, data=json.dumps(snapshot), headers=headers)
|
data = json.dumps(snapshot).encode('utf-8')
|
||||||
print(f"workbench: INFO: Snapshot sent to '{url}'")
|
request = urllib.request.Request(url, data=data, headers=headers)
|
||||||
|
with urllib.request.urlopen(request) as response:
|
||||||
|
status_code = response.getcode()
|
||||||
|
response_text = response.read().decode('utf-8')
|
||||||
|
|
||||||
|
if 200 <= status_code < 300:
|
||||||
|
print(f"workbench: INFO: Snapshot successfully sent to '{url}'")
|
||||||
|
else:
|
||||||
|
txt = "workbench: ERROR: Failed to send snapshot. HTTP {}: {}".format(
|
||||||
|
status_code,
|
||||||
|
response_text
|
||||||
|
)
|
||||||
|
raise Exception(txt)
|
||||||
|
|
||||||
|
try:
|
||||||
|
response = json.loads(response_text)
|
||||||
|
if response.get('url'):
|
||||||
|
# apt install qrencode
|
||||||
|
qr = "echo {} | qrencode -t ANSI".format(response['url'])
|
||||||
|
print(exec_cmd(qr))
|
||||||
|
print("url: {}".format(response['url']))
|
||||||
|
if response.get("dhid"):
|
||||||
|
print("dhid: {}".format(response['dhid']))
|
||||||
|
except Exception:
|
||||||
|
print(response_text)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"workbench: ERROR: Snapshot not remotely sent. URL '{url}' is unreachable. Do you have internet? Is your server up & running?\n {e}")
|
print(f"workbench: ERROR: Snapshot not remotely sent to URL '{url}'. Do you have internet? Is your server up & running? Is the url token authorized?\n {e}")
|
||||||
|
|
||||||
def load_config(config_file="settings.ini"):
|
def load_config(config_file="settings.ini"):
|
||||||
"""
|
"""
|
||||||
|
@ -309,17 +351,19 @@ def load_config(config_file="settings.ini"):
|
||||||
# TODO validate that the device exists?
|
# TODO validate that the device exists?
|
||||||
device = config.get('settings', 'device', fallback=None)
|
device = config.get('settings', 'device', fallback=None)
|
||||||
erase = config.get('settings', 'erase', fallback=None)
|
erase = config.get('settings', 'erase', fallback=None)
|
||||||
|
legacy = config.get('settings', 'legacy', fallback=None)
|
||||||
else:
|
else:
|
||||||
print(f"workbench: ERROR: Config file '{config_file}' not found. Using default values.")
|
print(f"workbench: ERROR: Config file '{config_file}' not found. Using default values.")
|
||||||
path = os.path.join(os.getcwd())
|
path = os.path.join(os.getcwd())
|
||||||
url, token, device, erase = None, None, None, None
|
url, token, device, erase, legacy = None, None, None, None, None
|
||||||
|
|
||||||
return {
|
return {
|
||||||
'path': path,
|
'path': path,
|
||||||
'url': url,
|
'url': url,
|
||||||
'token': token,
|
'token': token,
|
||||||
'device': device,
|
'device': device,
|
||||||
'erase': erase
|
'erase': erase,
|
||||||
|
'legacy': legacy
|
||||||
}
|
}
|
||||||
|
|
||||||
def parse_args():
|
def parse_args():
|
||||||
|
@ -354,11 +398,14 @@ def main():
|
||||||
all_disks = get_disks()
|
all_disks = get_disks()
|
||||||
snapshot = gen_snapshot(all_disks)
|
snapshot = gen_snapshot(all_disks)
|
||||||
|
|
||||||
if config['erase'] and config['device']:
|
if config['erase'] and config['device'] and not config.get("legacy"):
|
||||||
snapshot['erase'] = gen_erase(all_disks, config['erase'], user_disk=config['device'])
|
snapshot['erase'] = gen_erase(all_disks, config['erase'], user_disk=config['device'])
|
||||||
elif config['erase']:
|
elif config['erase'] and not config.get("legacy"):
|
||||||
snapshot['erase'] = gen_erase(all_disks, config['erase'])
|
snapshot['erase'] = gen_erase(all_disks, config['erase'])
|
||||||
|
|
||||||
|
if config.get("legacy"):
|
||||||
|
convert_to_legacy_snapshot(snapshot)
|
||||||
|
|
||||||
save_snapshot_in_disk(snapshot, config['path'])
|
save_snapshot_in_disk(snapshot, config['path'])
|
||||||
|
|
||||||
if config['url']:
|
if config['url']:
|
||||||
|
|
Loading…
Reference in a new issue