devicehub-django/snapshot/parse.py

674 lines
23 KiB
Python

import os
import json
import shutil
import hashlib
from datetime import datetime
from django.conf import settings
from device.models import Device, Computer
from snapshot.models import Snapshot
HID = [
"manufacturer",
"model",
"chassis",
"serialNumber",
"sku"
]
class Build:
def __init__(self, snapshot_json, user):
self.json = snapshot_json
self.user = user
self.hid = None
self.result = False
self.save_disk()
self.json_device = self.json["device"]
self.json_components = self.json["components"]
self.uuid = self.json["uuid"]
self.get_hid()
self.gen_computer()
self.gen_components()
self.gen_actions()
self.gen_snapshot()
self.result = True
self.move_json()
def save_disk(self):
snapshot_path = settings.SNAPSHOT_PATH
user = self.user.email
uuid = self.json.get("uuid", "")
now = datetime.now().strftime("%Y-%m-%d-%H-%M")
filename = f"{now}_{user}_{uuid}.json"
path_dir_base = os.path.join(snapshot_path, user)
path_upload = os.path.join(path_dir_base, 'upload')
path_name = os.path.join(path_upload, filename)
self.filename = path_name
self.path_dir_base = path_dir_base
if not os.path.isdir(path_dir_base):
os.system(f'mkdir -p {path_upload}')
with open(path_name, 'w') as file:
file.write(json.dumps(self.json))
def move_json(self):
if not self.result:
return
shutil.copy(self.filename, self.path_dir_base)
os.remove(self.filename)
def get_hid(self):
hid = ""
for f in HID:
hid += "-" + self.json_device[f]
self.hid = hid
def gen_computer(self):
self.device = Device.objects.filter(
hid=self.hid,
active=True,
reliable=True,
owner=self.user
).first()
if self.device:
return
chid = hashlib.sha3_256(self.hid.encode()).hexdigest()
self.device = Device.objects.create(
serial_number=self.json_device["serialNumber"],
manufacturer=self.json_device["manufacturer"],
version=self.json_device["version"],
model=self.json_device["model"],
type=self.json_device["type"],
hid=self.hid,
chid=chid,
owner=self.user
)
Computer.objects.create(
device=self.device,
sku=self.json_device["sku"],
chassis=self.json_device["chassis"]
)
def gen_snapshot(self):
self.snapshot = Snapshot.objects.create(
uuid = self.uuid,
version = self.json["version"],
computer = self.device.computer,
sid = self.json.get("sid", ""),
settings_version = self.json.get("settings_version", ""),
end_time = self.json.get("endTime"),
owner=self.user
)
def gen_components(self):
pass
def gen_actions(self):
pass
# class ParseSnapshot:
# def __init__(self, snapshot, default="n/a"):
# self.default = default
# self.dmidecode_raw = snapshot["hwmd"]["dmidecode"]
# self.smart_raw = snapshot["hwmd"]["smart"]
# self.hwinfo_raw = snapshot["hwmd"]["hwinfo"]
# self.lshw_raw = snapshot["hwmd"]["lshw"]
# self.lscpi_raw = snapshot["hwmd"]["lspci"]
# self.sanitize_raw = snapshot.get("sanitize", [])
# self.device = {"actions": []}
# self.components = []
# self.monitors = []
# self.dmi = DMIParse(self.dmidecode_raw)
# self.smart = self.loads(self.smart_raw)
# self.lshw = self.loads(self.lshw_raw)
# self.hwinfo = self.parse_hwinfo()
# self.set_computer()
# self.get_hwinfo_monitors()
# self.set_components()
# self.snapshot_json = {
# "type": "Snapshot",
# "device": self.device,
# "software": "Workbench",
# # "software": snapshot["software"],
# "components": self.components,
# "uuid": snapshot['uuid'],
# "version": "15.0.0",
# # "version": snapshot['version'],
# "settings_version": snapshot['settings_version'],
# "endTime": snapshot["timestamp"],
# "elapsed": 1,
# "sid": snapshot["sid"],
# }
# def get_snapshot(self):
# return Snapshot().load(self.snapshot_json)
# def set_computer(self):
# self.device['manufacturer'] = self.dmi.manufacturer().strip()
# self.device['model'] = self.dmi.model().strip()
# self.device['serialNumber'] = self.dmi.serial_number()
# self.device['type'] = self.get_type()
# self.device['sku'] = self.get_sku()
# self.device['version'] = self.get_version()
# self.device['system_uuid'] = self.get_uuid()
# self.device['family'] = self.get_family()
# self.device['chassis'] = self.get_chassis_dh()
# def set_components(self):
# self.get_cpu()
# self.get_ram()
# self.get_mother_board()
# self.get_graphic()
# self.get_data_storage()
# self.get_display()
# self.get_sound_card()
# self.get_networks()
# def get_cpu(self):
# for cpu in self.dmi.get('Processor'):
# serial = cpu.get('Serial Number')
# if serial == 'Not Specified' or not serial:
# serial = cpu.get('ID').replace(' ', '')
# self.components.append(
# {
# "actions": [],
# "type": "Processor",
# "speed": self.get_cpu_speed(cpu),
# "cores": int(cpu.get('Core Count', 1)),
# "model": cpu.get('Version'),
# "threads": int(cpu.get('Thread Count', 1)),
# "manufacturer": cpu.get('Manufacturer'),
# "serialNumber": serial,
# "generation": None,
# "brand": cpu.get('Family'),
# "address": self.get_cpu_address(cpu),
# }
# )
# def get_ram(self):
# for ram in self.dmi.get("Memory Device"):
# if ram.get('size') == 'No Module Installed':
# continue
# if not ram.get("Speed"):
# continue
# self.components.append(
# {
# "actions": [],
# "type": "RamModule",
# "size": self.get_ram_size(ram),
# "speed": self.get_ram_speed(ram),
# "manufacturer": ram.get("Manufacturer", self.default),
# "serialNumber": ram.get("Serial Number", self.default),
# "interface": ram.get("Type", "DDR"),
# "format": ram.get("Form Factor", "DIMM"),
# "model": ram.get("Part Number", self.default),
# }
# )
# def get_mother_board(self):
# for moder_board in self.dmi.get("Baseboard"):
# self.components.append(
# {
# "actions": [],
# "type": "Motherboard",
# "version": moder_board.get("Version"),
# "serialNumber": moder_board.get("Serial Number", "").strip(),
# "manufacturer": moder_board.get("Manufacturer", "").strip(),
# "biosDate": self.get_bios_date(),
# "ramMaxSize": self.get_max_ram_size(),
# "ramSlots": len(self.dmi.get("Memory Device")),
# "slots": self.get_ram_slots(),
# "model": moder_board.get("Product Name", "").strip(),
# "firewire": self.get_firmware_num(),
# "pcmcia": self.get_pcmcia_num(),
# "serial": self.get_serial_num(),
# "usb": self.get_usb_num(),
# }
# )
# def get_graphic(self):
# nodes = get_nested_dicts_with_key_value(self.lshw, 'class', 'display')
# for c in nodes:
# if not c['configuration'].get('driver', None):
# continue
# self.components.append(
# {
# "actions": [],
# "type": "GraphicCard",
# "memory": self.get_memory_video(c),
# "manufacturer": c.get("vendor", self.default),
# "model": c.get("product", self.default),
# "serialNumber": c.get("serial", self.default),
# }
# )
# def get_memory_video(self, c):
# # get info of lspci
# # pci_id = c['businfo'].split('@')[1]
# # lspci.get(pci_id) | grep size
# # lspci -v -s 00:02.0
# return None
# def get_data_storage(self):
# for sm in self.smart:
# if sm.get('smartctl', {}).get('exit_status') == 1:
# continue
# model = sm.get('model_name')
# manufacturer = None
# if model and len(model.split(" ")) > 1:
# mm = model.split(" ")
# model = mm[-1]
# manufacturer = " ".join(mm[:-1])
# self.components.append(
# {
# "actions": self.sanitize(sm),
# "type": self.get_data_storage_type(sm),
# "model": model,
# "manufacturer": manufacturer,
# "serialNumber": sm.get('serial_number'),
# "size": self.get_data_storage_size(sm),
# "variant": sm.get("firmware_version"),
# "interface": self.get_data_storage_interface(sm),
# }
# )
# def sanitize(self, disk):
# disk_sanitize = None
# for d in self.sanitize_raw:
# s = d.get('device_info', {}).get('export_data', {})
# s = s.get('block', {}).get('serial')
# if s == disk.get('serial_number'):
# disk_sanitize = d
# break
# if not disk_sanitize:
# return []
# steps = []
# step_type = 'EraseBasic'
# if d.get("method", {}).get('name') == 'Baseline Cryptographic':
# step_type = 'EraseCrypto'
# if disk.get('type') == 'EraseCrypto':
# step_type = 'EraseCrypto'
# erase = {
# 'type': step_type,
# 'severity': "Info",
# 'steps': steps,
# 'startTime': None,
# 'endTime': None,
# }
# severities = []
# for step in disk_sanitize.get('steps', []):
# severity = "Info"
# if not step['success']:
# severity = "Error"
# steps.append(
# {
# 'severity': severity,
# 'startTime': unix_isoformat(step['start_time']),
# 'endTime': unix_isoformat(step['end_time']),
# 'type': 'StepRandom',
# }
# )
# severities.append(severity)
# erase['endTime'] = unix_isoformat(step['end_time'])
# if not erase['startTime']:
# erase['startTime'] = unix_isoformat(step['start_time'])
# if "Error" in severities:
# erase['severity'] = "Error"
# return [erase]
# def get_networks(self):
# nodes = get_nested_dicts_with_key_value(self.lshw, 'class', 'network')
# for c in nodes:
# capacity = c.get('capacity')
# units = c.get('units')
# speed = None
# if capacity and units:
# speed = unit.Quantity(capacity, units).to('Mbit/s').m
# wireless = bool(c.get('configuration', {}).get('wireless', False))
# self.components.append(
# {
# "actions": [],
# "type": "NetworkAdapter",
# "model": c.get('product'),
# "manufacturer": c.get('vendor'),
# "serialNumber": c.get('serial'),
# "speed": speed,
# "variant": c.get('version', 1),
# "wireless": wireless,
# }
# )
# def get_sound_card(self):
# nodes = get_nested_dicts_with_key_value(self.lshw, 'class', 'multimedia')
# for c in nodes:
# self.components.append(
# {
# "actions": [],
# "type": "SoundCard",
# "model": c.get('product'),
# "manufacturer": c.get('vendor'),
# "serialNumber": c.get('serial'),
# }
# )
# def get_display(self): # noqa: C901
# TECHS = 'CRT', 'TFT', 'LED', 'PDP', 'LCD', 'OLED', 'AMOLED'
# for c in self.monitors:
# resolution_width, resolution_height = (None,) * 2
# refresh, serial, model, manufacturer, size = (None,) * 5
# year, week, production_date = (None,) * 3
# for x in c:
# if "Vendor: " in x:
# manufacturer = x.split('Vendor: ')[-1].strip()
# if "Model: " in x:
# model = x.split('Model: ')[-1].strip()
# if "Serial ID: " in x:
# serial = x.split('Serial ID: ')[-1].strip()
# if " Resolution: " in x:
# rs = x.split(' Resolution: ')[-1].strip()
# if 'x' in rs:
# resolution_width, resolution_height = [
# int(r) for r in rs.split('x')
# ]
# if "Frequencies: " in x:
# try:
# refresh = int(float(x.split(',')[-1].strip()[:-3]))
# except Exception:
# pass
# if 'Year of Manufacture' in x:
# year = x.split(': ')[1]
# if 'Week of Manufacture' in x:
# week = x.split(': ')[1]
# if "Size: " in x:
# size = self.get_size_monitor(x)
# technology = next((t for t in TECHS if t in c[0]), None)
# if year and week:
# d = '{} {} 0'.format(year, week)
# production_date = datetime.strptime(d, '%Y %W %w').isoformat()
# self.components.append(
# {
# "actions": [],
# "type": "Display",
# "model": model,
# "manufacturer": manufacturer,
# "serialNumber": serial,
# 'size': size,
# 'resolutionWidth': resolution_width,
# 'resolutionHeight': resolution_height,
# "productionDate": production_date,
# 'technology': technology,
# 'refreshRate': refresh,
# }
# )
# def get_hwinfo_monitors(self):
# for c in self.hwinfo:
# monitor = None
# external = None
# for x in c:
# if 'Hardware Class: monitor' in x:
# monitor = c
# if 'Driver Info' in x:
# external = c
# if monitor and not external:
# self.monitors.append(c)
# def get_size_monitor(self, x):
# i = 1 / 25.4
# t = x.split('Size: ')[-1].strip()
# tt = t.split('mm')
# if not tt:
# return 0
# sizes = tt[0].strip()
# if 'x' not in sizes:
# return 0
# w, h = [int(x) for x in sizes.split('x')]
# return numpy.sqrt(w**2 + h**2) * i
# def get_cpu_address(self, cpu):
# default = 64
# for ch in self.lshw.get('children', []):
# for c in ch.get('children', []):
# if c['class'] == 'processor':
# return c.get('width', default)
# return default
# def get_usb_num(self):
# return len(
# [
# u
# for u in self.dmi.get("Port Connector")
# if "USB" in u.get("Port Type", "").upper()
# ]
# )
# def get_serial_num(self):
# return len(
# [
# u
# for u in self.dmi.get("Port Connector")
# if "SERIAL" in u.get("Port Type", "").upper()
# ]
# )
# def get_firmware_num(self):
# return len(
# [
# u
# for u in self.dmi.get("Port Connector")
# if "FIRMWARE" in u.get("Port Type", "").upper()
# ]
# )
# def get_pcmcia_num(self):
# return len(
# [
# u
# for u in self.dmi.get("Port Connector")
# if "PCMCIA" in u.get("Port Type", "").upper()
# ]
# )
# def get_bios_date(self):
# return self.dmi.get("BIOS")[0].get("Release Date", self.default)
# def get_firmware(self):
# return self.dmi.get("BIOS")[0].get("Firmware Revision", '1')
# def get_max_ram_size(self):
# size = 0
# for slot in self.dmi.get("Physical Memory Array"):
# capacity = slot.get("Maximum Capacity", '0').split(" ")[0]
# size += int(capacity)
# return size
# def get_ram_slots(self):
# slots = 0
# for x in self.dmi.get("Physical Memory Array"):
# slots += int(x.get("Number Of Devices", 0))
# return slots
# def get_ram_size(self, ram):
# try:
# memory = ram.get("Size", "0")
# memory = memory.split(' ')
# if len(memory) > 1:
# size = int(memory[0])
# units = memory[1]
# return base2.Quantity(size, units).to('MiB').m
# return int(size.split(" ")[0])
# except Exception as err:
# logger.error("get_ram_size error: {}".format(err))
# return 0
# def get_ram_speed(self, ram):
# size = ram.get("Speed", "0")
# return int(size.split(" ")[0])
# def get_cpu_speed(self, cpu):
# speed = cpu.get('Max Speed', "0")
# return float(speed.split(" ")[0]) / 1024
# def get_sku(self):
# return self.dmi.get("System")[0].get("SKU Number", self.default).strip()
# def get_version(self):
# return self.dmi.get("System")[0].get("Version", self.default).strip()
# def get_uuid(self):
# return self.dmi.get("System")[0].get("UUID", '').strip()
# def get_family(self):
# return self.dmi.get("System")[0].get("Family", '')
# def get_chassis(self):
# return self.dmi.get("Chassis")[0].get("Type", '_virtual')
# def get_type(self):
# chassis_type = self.get_chassis()
# return self.translation_to_devicehub(chassis_type)
# def translation_to_devicehub(self, original_type):
# lower_type = original_type.lower()
# CHASSIS_TYPE = {
# 'Desktop': [
# 'desktop',
# 'low-profile',
# 'tower',
# 'docking',
# 'all-in-one',
# 'pizzabox',
# 'mini-tower',
# 'space-saving',
# 'lunchbox',
# 'mini',
# 'stick',
# ],
# 'Laptop': [
# 'portable',
# 'laptop',
# 'convertible',
# 'tablet',
# 'detachable',
# 'notebook',
# 'handheld',
# 'sub-notebook',
# ],
# 'Server': ['server'],
# 'Computer': ['_virtual'],
# }
# for k, v in CHASSIS_TYPE.items():
# if lower_type in v:
# return k
# return self.default
# def get_chassis_dh(self):
# CHASSIS_DH = {
# 'Tower': {'desktop', 'low-profile', 'tower', 'server'},
# 'Docking': {'docking'},
# 'AllInOne': {'all-in-one'},
# 'Microtower': {'mini-tower', 'space-saving', 'mini'},
# 'PizzaBox': {'pizzabox'},
# 'Lunchbox': {'lunchbox'},
# 'Stick': {'stick'},
# 'Netbook': {'notebook', 'sub-notebook'},
# 'Handheld': {'handheld'},
# 'Laptop': {'portable', 'laptop'},
# 'Convertible': {'convertible'},
# 'Detachable': {'detachable'},
# 'Tablet': {'tablet'},
# 'Virtual': {'_virtual'},
# }
# chassis = self.get_chassis()
# lower_type = chassis.lower()
# for k, v in CHASSIS_DH.items():
# if lower_type in v:
# return k
# return self.default
# def get_data_storage_type(self, x):
# # TODO @cayop add more SSDS types
# SSDS = ["nvme"]
# SSD = 'SolidStateDrive'
# HDD = 'HardDrive'
# type_dev = x.get('device', {}).get('type')
# trim = x.get('trim', {}).get("supported") in [True, "true"]
# return SSD if type_dev in SSDS or trim else HDD
# def get_data_storage_interface(self, x):
# interface = x.get('device', {}).get('protocol', 'ATA')
# try:
# DataStorageInterface(interface.upper())
# except ValueError as err:
# txt = "Sid: {}, interface {} is not in DataStorageInterface Enum".format(
# self.sid, interface
# )
# self.errors("{}".format(err))
# self.errors(txt, severity=Severity.Warning)
# return "ATA"
# def get_data_storage_size(self, x):
# total_capacity = x.get('user_capacity', {}).get('bytes')
# if not total_capacity:
# return 1
# # convert bytes to Mb
# return total_capacity / 1024**2
# def parse_hwinfo(self):
# hw_blocks = self.hwinfo_raw.split("\n\n")
# return [x.split("\n") for x in hw_blocks]
# def loads(self, x):
# if isinstance(x, str):
# return json.loads(x)
# return x
# def errors(self, txt=None, severity=Severity.Error):
# if not txt:
# return self._errors
# logger.error(txt)
# self._errors.append(txt)
# error = SnapshotsLog(
# description=txt,
# snapshot_uuid=self.uuid,
# severity=severity,
# sid=self.sid,
# version=self.version,
# )
# error.save()