parser module

This commit is contained in:
Cayo Puigdefabregas 2022-03-24 13:13:28 +01:00
parent 417276b88b
commit f7b149f926
2 changed files with 519 additions and 79 deletions

View file

@ -6,15 +6,19 @@ from contextlib import suppress
from datetime import datetime
from enum import Enum, unique
from fractions import Fraction
from subprocess import CalledProcessError, PIPE, run
from subprocess import PIPE, CalledProcessError, run
from typing import Iterator, List, Optional, Tuple, Type, TypeVar
from warnings import catch_warnings, filterwarnings
import dateutil.parser
import pySMART
from ereuse_utils import cmd, getter as g, text
from ereuse_utils.nested_lookup import get_nested_dicts_with_key_containing_value, \
get_nested_dicts_with_key_value
from ereuse_utils import cmd
from ereuse_utils import getter as g
from ereuse_utils import text
from ereuse_utils.nested_lookup import (
get_nested_dicts_with_key_containing_value,
get_nested_dicts_with_key_value,
)
from numpy import hypot
from ereuse_devicehub.parser import base2, unit, utils
@ -37,11 +41,13 @@ class Device(Dumpeable):
def from_lshw(self, lshw_node: dict):
self.manufacturer = g.dict(lshw_node, 'vendor', default=None, type=str)
self.model = g.dict(lshw_node,
self.model = g.dict(
lshw_node,
'product',
remove={self.manufacturer} if self.manufacturer else set(),
default=None,
type=str)
type=str,
)
self.serial_number = g.dict(lshw_node, 'serial', default=None, type=str)
def __str__(self) -> str:
@ -64,13 +70,16 @@ class Processor(Component):
# We want only the physical cpu's, not the logic ones
# In some cases we may get empty cpu nodes, we can detect them because
# all regular cpus have at least a description (Intel Core i5...)
return (cls(node) for node in nodes if
'logical' not in node['id']
return (
cls(node)
for node in nodes
if 'logical' not in node['id']
and node.get('description', '').lower() != 'co-processor'
and not node.get('disabled')
and 'co-processor' not in node.get('model', '').lower()
and 'co-processor' not in node.get('description', '').lower()
and 'width' in node)
and 'width' in node
)
def __init__(self, node: dict) -> None:
super().__init__(node)
@ -89,7 +98,6 @@ class Processor(Component):
assert not hasattr(self, 'cores') or 1 <= self.cores <= 16
@staticmethod
def processor_brand_generation(model: str):
"""Generates the ``brand`` and ``generation`` fields for the given model.
@ -148,7 +156,9 @@ class RamModule(Component):
memories = get_nested_dicts_with_key_value(lshw, 'class', 'memory')
TYPES = {'ddr', 'sdram', 'sodimm'}
for memory in memories:
physical_ram = any(t in memory.get('description', '').lower() for t in TYPES)
physical_ram = any(
t in memory.get('description', '').lower() for t in TYPES
)
not_empty = 'size' in memory
if physical_ram and not_empty:
yield cls(memory)
@ -183,14 +193,17 @@ class DataStorage(Component):
usb_disks = list() # List of disks that are plugged in an USB host
for usb in get_nested_dicts_with_key_containing_value(lshw, 'id', 'usbhost'):
usb_disks.extend(get_nested_dicts_with_key_containing_value(usb, 'id', 'disk'))
usb_disks.extend(
get_nested_dicts_with_key_containing_value(usb, 'id', 'disk')
)
for disk in (n for n in disks if n not in usb_disks):
# We can get nodes that are not truly disks as they don't have size
if 'size' in disk:
interface = DataStorage.get_interface(disk)
removable = interface == 'usb' or \
disk.get('capabilities', {}).get('removable', False)
removable = interface == 'usb' or disk.get('capabilities', {}).get(
'removable', False
)
if not removable:
yield cls(disk, interface)
@ -210,7 +223,9 @@ class DataStorage(Component):
super().__init__(node)
self.from_lshw(node)
self.size = unit.Quantity(node['size'], node.get('units', 'B')).to('MB').m
self.interface = self.DataStorageInterface(interface.upper()) if interface else None
self.interface = (
self.DataStorageInterface(interface.upper()) if interface else None
)
self._logical_name = node['logicalname']
self.variant = node['version']
@ -225,22 +240,29 @@ class DataStorage(Component):
self.serial_number = self.serial_number or smart.serial
self.model = self.model or smart.model
assert 1.0 < self.size < 1000000000000000.0, \
'Invalid HDD size {}'.format(self.size)
assert 1.0 < self.size < 1000000000000000.0, 'Invalid HDD size {}'.format(
self.size
)
def __str__(self) -> str:
return '{} {} {} with {} MB'.format(super().__str__(), self.interface, self.type,
self.size)
return '{} {} {} with {} MB'.format(
super().__str__(), self.interface, self.type, self.size
)
@staticmethod
def get_interface(node: dict):
interface = run('udevadm info '
interface = run(
'udevadm info '
'--query=all '
'--name={} | '
'grep '
'ID_BUS | '
'cut -c 11-'.format(node['logicalname']),
check=True, universal_newlines=True, shell=True, stdout=PIPE).stdout
check=True,
universal_newlines=True,
shell=True,
stdout=PIPE,
).stdout
# todo not sure if ``interface != usb`` is needed
return interface.strip()
@ -260,13 +282,17 @@ class GraphicCard(Component):
def _memory(bus_info):
"""The size of the memory of the gpu."""
try:
lines = cmd.run('lspci',
lines = cmd.run(
'lspci',
'-v -s {bus} | ',
'grep \'prefetchable\' | ',
'grep -v \'non-prefetchable\' | ',
'egrep -o \'[0-9]{{1,3}}[KMGT]+\''.format(bus=bus_info),
shell=True).stdout.splitlines()
return max((base2.Quantity(value).to('MiB') for value in lines), default=None)
shell=True,
).stdout.splitlines()
return max(
(base2.Quantity(value).to('MiB') for value in lines), default=None
)
except subprocess.CalledProcessError:
return None
@ -281,23 +307,29 @@ class Motherboard(Component):
def new(cls, lshw, hwinfo, **kwargs) -> C:
node = next(get_nested_dicts_with_key_value(lshw, 'description', 'Motherboard'))
bios_node = next(get_nested_dicts_with_key_value(lshw, 'id', 'firmware'))
memory_array = next(g.indents(hwinfo, 'Physical Memory Array', indent=' '), None)
memory_array = next(
g.indents(hwinfo, 'Physical Memory Array', indent=' '), None
)
return cls(node, bios_node, memory_array)
def __init__(self, node: dict, bios_node: dict, memory_array: Optional[List[str]]) -> None:
def __init__(
self, node: dict, bios_node: dict, memory_array: Optional[List[str]]
) -> None:
super().__init__(node)
self.from_lshw(node)
self.usb = self.num_interfaces(node, 'usb')
self.firewire = self.num_interfaces(node, 'firewire')
self.serial = self.num_interfaces(node, 'serial')
self.pcmcia = self.num_interfaces(node, 'pcmcia')
self.slots = int(run('dmidecode -t 17 | '
'grep -o BANK | '
'wc -l',
self.slots = int(
run(
'dmidecode -t 17 | ' 'grep -o BANK | ' 'wc -l',
check=True,
universal_newlines=True,
shell=True,
stdout=PIPE).stdout)
stdout=PIPE,
).stdout
)
self.bios_date = dateutil.parser.parse(bios_node['date'])
self.version = bios_node['version']
self.ram_slots = self.ram_max_size = None
@ -311,8 +343,11 @@ class Motherboard(Component):
def num_interfaces(node: dict, interface: str) -> int:
interfaces = get_nested_dicts_with_key_containing_value(node, 'id', interface)
if interface == 'usb':
interfaces = (c for c in interfaces
if 'usbhost' not in c['id'] and 'usb' not in c['businfo'])
interfaces = (
c
for c in interfaces
if 'usbhost' not in c['id'] and 'usb' not in c['businfo']
)
return len(tuple(interfaces))
def __str__(self) -> str:
@ -341,14 +376,17 @@ class NetworkAdapter(Component):
# and to parse it
# https://www.redhat.com/archives/redhat-list/2010-October/msg00066.html
# workbench-live includes proprietary firmwares
self.serial_number = self.serial_number or utils.get_hw_addr(node['logicalname'])
self.serial_number = self.serial_number or utils.get_hw_addr(
node['logicalname']
)
self.variant = node.get('version', None)
self.wireless = bool(node.get('configuration', {}).get('wireless', False))
def __str__(self) -> str:
return '{} {} {}'.format(super().__str__(), self.speed,
'wireless' if self.wireless else 'ethernet')
return '{} {} {}'.format(
super().__str__(), self.speed, 'wireless' if self.wireless else 'ethernet'
)
class SoundCard(Component):
@ -387,18 +425,25 @@ class Display(Component):
self.resolution_width, self.resolution_height = text.numbers(
g.kv(timings, 'Resolution')
)
x, y = (unit.Quantity(v, 'millimeter').to('inch') for v in
text.numbers(g.kv(node, 'Size')))
x, y = (
unit.Quantity(v, 'millimeter').to('inch')
for v in text.numbers(g.kv(node, 'Size'))
)
self.size = float(hypot(x, y).m)
self.technology = next((t for t in self.TECHS if t in node[0]), None)
d = '{} {} 0'.format(g.kv(node, 'Year of Manufacture'), g.kv(node, 'Week of Manufacture'))
d = '{} {} 0'.format(
g.kv(node, 'Year of Manufacture'), g.kv(node, 'Week of Manufacture')
)
# We assume it has been produced the first day of such week
self.production_date = datetime.strptime(d, '%Y %W %w')
self._aspect_ratio = Fraction(self.resolution_width, self.resolution_height)
def __str__(self) -> str:
return '{0} {1.resolution_width}x{1.resolution_height} {1.size} inches {2}'.format(
super().__str__(), self, self._aspect_ratio)
return (
'{0} {1.resolution_width}x{1.resolution_height} {1.size} inches {2}'.format(
super().__str__(), self, self._aspect_ratio
)
)
class Battery(Component):
@ -407,6 +452,7 @@ class Battery(Component):
the Linux Kernel convention, from
https://www.kernel.org/doc/Documentation/ABI/testing/sysfs-class-power.
"""
LiIon = 'Li-ion'
NiCd = 'NiCd'
NiMH = 'NiMH'
@ -419,9 +465,9 @@ class Battery(Component):
@classmethod
def new(cls, lshw, hwinfo, **kwargs) -> Iterator[C]:
try:
uevent = cmd \
.run('cat', '/sys/class/power_supply/BAT*/uevent', shell=True) \
.stdout.splitlines()
uevent = cmd.run(
'cat', '/sys/class/power_supply/BAT*/uevent', shell=True
).stdout.splitlines()
except CalledProcessError:
return
yield cls(uevent)
@ -429,17 +475,21 @@ class Battery(Component):
def __init__(self, node: List[str]) -> None:
super().__init__(node)
try:
self.serial_number = g.kv(node, self.PRE + 'SERIAL_NUMBER', sep='=', type=str)
self.serial_number = g.kv(
node, self.PRE + 'SERIAL_NUMBER', sep='=', type=str
)
self.manufacturer = g.kv(node, self.PRE + 'MANUFACTURER', sep='=')
self.model = g.kv(node, self.PRE + 'MODEL_NAME', sep='=')
self.size = g.kv(node, self.PRE + 'CHARGE_FULL_DESIGN', sep='=', default=0)
if self.size is not None:
self.size = self.size // 1000
self.technology = g.kv(node, self.PRE + 'TECHNOLOGY', sep='=', type=self.Technology)
self.technology = g.kv(
node, self.PRE + 'TECHNOLOGY', sep='=', type=self.Technology
)
measure = MeasureBattery(
size=g.kv(node, self.PRE + 'CHARGE_FULL', sep='='),
voltage=g.kv(node, self.PRE + 'VOLTAGE_NOW', sep='='),
cycle_count=g.kv(node, self.PRE + 'CYCLE_COUNT', sep='=')
cycle_count=g.kv(node, self.PRE + 'CYCLE_COUNT', sep='='),
)
try:
measure.size = measure.size.m
@ -447,28 +497,51 @@ class Battery(Component):
except AttributeError:
pass
self.actions.add(measure)
self._wear = round(1 - measure.size / self.size, 2) \
if self.size and measure.size else None
self._wear = (
round(1 - measure.size / self.size, 2)
if self.size and measure.size
else None
)
self._node = node
except NoBatteryInfo:
self._node = None
def __str__(self) -> str:
try:
return '{0} {1.technology}. Size: {1.size} Wear: {1._wear:%}'.format(super().__str__(),
self)
return '{0} {1.technology}. Size: {1.size} Wear: {1._wear:%}'.format(
super().__str__(), self
)
except TypeError:
return 'There is not currently battery information'
class Computer(Device):
CHASSIS_TYPE = {
'Desktop': {'desktop', 'low-profile', 'tower', 'docking', 'all-in-one', 'pizzabox',
'mini-tower', 'space-saving', 'lunchbox', 'mini', 'stick'},
'Laptop': {'portable', 'laptop', 'convertible', 'tablet', 'detachable', 'notebook',
'handheld', 'sub-notebook'},
'Desktop': {
'desktop',
'low-profile',
'tower',
'docking',
'all-in-one',
'pizzabox',
'mini-tower',
'space-saving',
'lunchbox',
'mini',
'stick',
},
'Laptop': {
'portable',
'laptop',
'convertible',
'tablet',
'detachable',
'notebook',
'handheld',
'sub-notebook',
},
'Server': {'server'},
'Computer': {'_virtual'}
'Computer': {'_virtual'},
}
"""
A translation dictionary whose keys are Devicehub types and values
@ -489,7 +562,7 @@ class Computer(Device):
'Convertible': {'convertible'},
'Detachable': {'detachable'},
'Tablet': {'tablet'},
'Virtual': {'_virtual'}
'Virtual': {'_virtual'},
}
"""
A conversion table from DMI's chassis type value Devicehub
@ -503,9 +576,13 @@ class Computer(Device):
def __init__(self, node: dict) -> None:
super().__init__(node)
self.from_lshw(node)
chassis = node['configuration'].get('chassis', '_virtual')
self.type = next(t for t, values in self.CHASSIS_TYPE.items() if chassis in values)
self.chassis = next(t for t, values in self.CHASSIS_DH.items() if chassis in values)
chassis = node.get('configuration', {}).get('chassis', '_virtual')
self.type = next(
t for t, values in self.CHASSIS_TYPE.items() if chassis in values
)
self.chassis = next(
t for t, values in self.CHASSIS_DH.items() if chassis in values
)
self.sku = g.dict(node, ('configuration', 'sku'), default=None, type=str)
self.version = g.dict(node, 'version', default=None, type=str)
self._ram = None
@ -529,7 +606,9 @@ class Computer(Device):
components.extend(Component.new(lshw=lshw, hwinfo=hwinfo))
components.append(Motherboard.new(lshw, hwinfo))
computer._ram = sum(ram.size for ram in components if isinstance(ram, RamModule))
computer._ram = sum(
ram.size for ram in components if isinstance(ram, RamModule)
)
return computer, components
def __str__(self) -> str:

View file

@ -0,0 +1,361 @@
import json
from dmidecode import DMIParse
from ereuse_devicehub.parser.computer import (
Display,
GraphicCard,
NetworkAdapter,
SoundCard,
)
class ParseSnapshot:
def __init__(self, snapshot, default="n/a"):
self.default = default
self.dmidecode_raw = snapshot["data"]["demidecode"]
self.smart_raw = snapshot["data"]["smart"]
self.hwinfo_raw = snapshot["data"]["hwinfo"]
self.device = {"actions": []}
self.components = []
self.dmi = DMIParse(self.dmidecode_raw)
self.smart = self.loads(self.smart_raw)
self.hwinfo = self.parse_hwinfo()
self.set_basic_datas()
self.computer = {
"device": self.device,
"software": "Workbench",
"components": self.components(),
"uuid": snapshot['uuid'],
"type": snapshot['type'],
"version": snapshot["version"],
"endTime": snapshot["endTime"],
"elapsed": 0,
"closed": True,
}
def set_basic_datas(self):
self.device['manufacturer'] = self.dmi.manufacturer()
self.device['model'] = self.dmi.model()
self.device['serialNumber'] = self.dmi.serial_number()
self.device['type'] = self.get_type()
self.device['sku'] = self.get_sku()
self.device['version'] = self.get_version()
self.device['uuid'] = self.get_uuid()
def set_components(self):
self.get_cpu()
self.get_ram()
self.get_mother_board()
def get_cpu(self):
# TODO @cayop generation, brand and address not exist in dmidecode
for cpu in self.dmi.get('Processor'):
self.components.append(
{
"actions": [],
"type": "Processor",
"speed": cpu.get('Max Speed'),
"cores": int(cpu.get('Core Count', 1)),
"model": cpu.get('Version'),
"threads": int(cpu.get('Thread Count', 1)),
"manufacturer": cpu.get('Manufacturer'),
"serialNumber": cpu.get('Serial Number'),
"generation": cpu.get('Generation'),
"brand": cpu.get('Brand'),
"address": cpu.get('Address'),
}
)
def get_ram(self):
# TODO @cayop format and model not exist in dmidecode
for ram in self.dmi.get("Memory Device"):
self.components.append(
{
"actions": [],
"type": "RamModule",
"size": self.get_ram_size(ram),
"speed": self.get_ram_speed(ram),
"manufacturer": ram.get("Manufacturer", self.default),
"serialNumber": ram.get("Serial Number", self.default),
"interface": ram.get("Type", self.default),
"format": ram.get("Format", self.default), # "DIMM",
"model": ram.get(
"Model", self.default
), # "48594D503131325336344350362D53362020",
}
)
def get_mother_board(self):
# TODO @cayop model, not exist in dmidecode
for moder_board in self.dmi.get("Baseboard"):
self.components.append(
{
"actions": [],
"type": "Motherboard",
"version": moder_board.get("Version"),
"serialNumber": moder_board.get("Serial Number"),
"manufacturer": moder_board.get("Manufacturer"),
"ramSlots": self.get_ram_slots(),
"ramMaxSize": self.get_max_ram_size(),
"slots": len(self.dmi.get("Number Of Devices")),
"biosDate": self.get_bios_date(),
"firewire": self.get_firmware(),
"model": moder_board.get("Product Name"), # ??
"pcmcia": self.get_pcmcia_num(), # ??
"serial": self.get_serial_num(), # ??
"usb": self.get_usb_num(),
}
)
def get_usb_num(self):
return len(
[u for u in self.get("Port Connector") if u.get("Port Type") == "USB"]
)
def get_serial_num(self):
return len(
[u for u in self.get("Port Connector") if u.get("Port Type") == "SERIAL"]
)
def get_pcmcia_num(self):
return len(
[u for u in self.get("Port Connector") if u.get("Port Type") == "PCMCIA"]
)
def get_bios_date(self):
return self.get("BIOS")[0].get("Release Date", self.default)
def get_firmware(self):
return self.get("BIOS")[0].get("Firmware Revision", self.default)
def get_max_ram_size(self):
size = self.dmi.get("Physical Memory Array")
if size:
size = size.get("Maximum Capacity")
return size.split(" GB")[0] if size else self.default
def get_ram_slots(self):
slots = self.dmi.get("Physical Memory Array")
if slots:
slots = slots.get("Number Of Devices")
return int(slots) if slots else self.default
def get_ram_size(self, ram):
size = ram.get("Size")
return size.split(" MB")[0] if size else self.default
def get_ram_speed(self, ram):
size = ram.get("Speed")
return size.split(" MT/s")[0] if size else self.default
def get_sku(self):
return self.get("System")[0].get("SKU Number", self.default)
def get_version(self):
return self.get("System")[0].get("Version", self.default)
def get_uuid(self):
return self.get("System")[0].get("UUID", self.default)
def get_chassis(self):
return self.get("Chassis")[0].get("Type", self.default)
def get_type(self):
chassis_type = self.get_chassis()
return self.translation_to_devicehub(chassis_type)
def translation_to_devicehub(self, original_type):
lower_type = original_type.lower()
CHASSIS_TYPE = {
'Desktop': [
'desktop',
'low-profile',
'tower',
'docking',
'all-in-one',
'pizzabox',
'mini-tower',
'space-saving',
'lunchbox',
'mini',
'stick',
],
'Laptop': [
'portable',
'laptop',
'convertible',
'tablet',
'detachable',
'notebook',
'handheld',
'sub-notebook',
],
'Server': ['server'],
'Computer': ['_virtual'],
}
for k, v in CHASSIS_TYPE.items():
if lower_type in v:
return k
return self.default
def get_data_storage(self):
for sm in self.smart:
model = sm.get('model_name')
manufacturer = None
if len(model.split(" ")) == 2:
manufacturer, model = model.split(" ")
self.components.append(
{
"actions": [],
"type": self.get_data_storage_type(sm),
"model": model,
"manufacturer": manufacturer,
"serialNumber": sm.get('serial_number'),
"size": self.get_data_storage_size(sm),
"variant": sm.get("firmware_version"),
"interface": self.get_data_storage_interface(sm),
}
)
def get_data_storage_type(self, x):
# TODO @cayop add more SSDS types
SSDS = ["nvme"]
SSD = 'SolidStateDrive'
HDD = 'HardDrive'
type_dev = x.get('device', {}).get('type')
return SSD if type_dev in SSDS else HDD
def get_data_storage_interface(self, x):
return x.get('device', {}).get('protocol', 'ATA')
def get_data_storage_size(self, x):
type_dev = x.get('device', {}).get('type')
total_capacity = "{type}_total_capacity".format(type=type_dev)
# convert bytes to Mb
return x.get(total_capacity) / 1024**2
def get_networks(self):
addr = []
for line in self.hwinfo:
for y in line:
if "Permanent HW Address:" in y:
mac = y.split(" Permanent HW Address: ")[1]
addr.extend(mac)
return addr
def parse_hwinfo(self):
hw_blocks = self.hwinfo_raw.split("\n\n")
return [x.split("\n") for x in hw_blocks]
def loads(self, x):
if isinstance(x, dict) or isinstance(x, list):
return x
return json.loads(x)
class LsHw:
def __init__(self, dmi, jshw, hwinfo, default="n/a"):
self.default = default
self.hw = self.loads(jshw)
self.hwinfo = hwinfo.splitlines()
self.childrens = self.hw.get('children', [])
self.dmi = dmi
self.components = dmi.components
self.device = dmi.device
self.add_components()
def add_components(self):
self.get_cpu_addr()
self.get_networks()
self.get_display()
self.get_sound_card()
self.get_graphic_card()
def get_cpu_addr(self):
for cpu in self.components:
if not cpu['type'] == "Processor":
continue
cpu["address"] = self.hw.get("width")
def get_networks(self):
networks = NetworkAdapter.new(self.lshw, self.hwinfo)
for x in networks:
self.components.append(
{
"actions": [],
"type": "NetworkAdapter",
"serialNumber": x.serial_number,
"speed": x.speed,
"model": x.model,
"manufacturer": x.manufacturer,
"variant": x.variant,
"wireless": x.wireless,
}
)
def get_display(self):
if not self.device['type'] == 'Laptop':
return
displays = Display.new(self.lshw, self.hwinfo)
for x in displays:
self.components.append(
{
"actions": [],
"type": "Display",
"model": x.model,
"manufacturer": x.manufacturer,
"serialNumber": x.serial_number,
"resolutionWidth": x.resolution_width,
"resolutionHeight": x.resolution_height,
"refreshRate": x.refresh_rate,
"technology": x.technology,
"productionDate": x.production_date,
"size": x.size,
}
)
def get_sound_card(self):
soundcards = SoundCard.new(self.lshw, self.hwinfo)
for x in soundcards:
self.components.append(
{
"actions": [],
"type": "SoundCard",
"model": x.model,
"manufacturer": x.manufacturer,
"serialNumber": x.serial_number,
}
)
def get_graphic_card(self):
# TODO @cayop memory get info from lspci on fly
graphicards = GraphicCard.new(self.lshw, self.hwinfo)
for x in graphicards:
self.components.append(
{
"actions": [],
"type": "GraphicCard",
"model": x.model,
"manufacturer": x.manufacturer,
"serialNumber": x.serial_number,
"memory": x.memory,
}
)
def loads(jshw):
if isinstance(jshw, dict):
return jshw
return json.loads(jshw)