2024-09-18 14:13:40 +00:00
# -*- coding: utf-8 -*-
import os
import json
import uuid
import hashlib
import argparse
2024-09-19 15:42:03 +00:00
import configparser
2024-10-31 15:30:45 +00:00
import urllib . parse
2024-10-15 11:32:36 +00:00
import urllib . request
2024-09-18 14:13:40 +00:00
2024-10-15 22:26:20 +00:00
import gettext
import locale
import logging
2024-09-18 14:13:40 +00:00
from datetime import datetime
2024-11-06 13:10:02 +00:00
SNAPSHOT_BASE = {
' timestamp ' : str ( datetime . now ( ) ) ,
' type ' : ' Snapshot ' ,
' uuid ' : str ( uuid . uuid4 ( ) ) ,
' software ' : " workbench-script " ,
' version ' : " 0.0.1 " ,
' token_hash ' : " " ,
' data ' : { } ,
' erase ' : [ ]
}
2024-09-27 16:07:48 +00:00
## Legacy Functions ##
2024-10-31 15:30:45 +00:00
2024-09-27 16:07:48 +00:00
def convert_to_legacy_snapshot ( snapshot ) :
2024-11-06 13:10:02 +00:00
snapshot [ " sid " ] = str ( uuid . uuid4 ( ) ) . split ( " - " ) [ 1 ]
2024-09-27 16:07:48 +00:00
snapshot [ " software " ] = " workbench-script "
snapshot [ " version " ] = " dev "
snapshot [ " schema_api " ] = " 1.0.0 "
snapshot [ " settings_version " ] = " No Settings Version (NaN) "
snapshot [ " timestamp " ] = snapshot [ " timestamp " ] . replace ( " " , " T " )
snapshot [ " data " ] [ " smart " ] = snapshot [ " data " ] [ " disks " ]
2024-11-11 01:30:30 +00:00
snapshot [ " data " ] [ " lshw " ] = json . loads ( snapshot [ " data " ] [ " lshw " ] )
2024-09-27 16:07:48 +00:00
snapshot [ " data " ] . pop ( " disks " )
snapshot . pop ( " erase " )
2024-11-06 13:10:02 +00:00
snapshot . pop ( " token_hash " )
2024-10-31 15:30:45 +00:00
2024-09-27 16:07:48 +00:00
## End Legacy Functions ##
2024-09-18 14:13:40 +00:00
## Utility Functions ##
def logs ( f ) :
def wrapper ( * args , * * kwargs ) :
try :
return f ( * args , * * kwargs )
except Exception as err :
2024-10-15 22:26:20 +00:00
logger . error ( err )
2024-09-18 14:13:40 +00:00
return ' '
return wrapper
@logs
def exec_cmd ( cmd ) :
2024-10-15 22:26:20 +00:00
logger . info ( _ ( ' Running command ` %s ` ' ) , cmd )
2024-09-18 14:13:40 +00:00
return os . popen ( cmd ) . read ( )
@logs
def exec_cmd_erase ( cmd ) :
2024-10-15 22:26:20 +00:00
logger . info ( _ ( ' Running command ` %s ` ' ) , cmd )
2024-09-18 14:13:40 +00:00
return ' '
# return os.popen(cmd).read()
## End Utility functions ##
2024-11-06 13:10:02 +00:00
def convert_to_credential ( snapshot ) :
snapshot [ " data " ] = json . dumps ( snapshot [ " data " ] )
file_path = os . path . join ( BASE_DIR , " templates " , " snapshot.json " )
with open ( file_path ) as f :
ff = f . read ( )
template = Template ( ff )
cred = template . substitute ( * * snapshot )
return cred
2024-09-18 14:13:40 +00:00
## Command Functions ##
## Erase Functions ##
## Xavier Functions ##
def erase_basic ( disk ) :
"""
Basic Erasure
https : / / tsapps . nist . gov / publication / get_pdf . cfm ? pub_id = 917935
Settings for basic data erasure using shred Linux command .
A software - based fast non - 100 % - secured way of erasing data storage .
Performs 1 pass overwriting one round using all zeros .
Compliant with NIST SP - 800 - 8 y8 .
In settings appear :
WB_ERASE = EraseBasic
WB_ERASE_STEPS = 1
WB_ERASE_LEADING_ZEROS = False
"""
cmd = f ' shred -vn 1 /dev/ { disk } '
return [ exec_cmd_erase ( cmd ) ]
def erase_baseline ( disk ) :
"""
Baseline Secure Erasure
Settings for advanced data erasure using badblocks Linux software .
A secured - way of erasing data storages , erase hidden areas ,
checking the erase sector by sector .
Performs 1 pass overwriting each sector with zeros and a final verification .
Compliant with HMG Infosec Standard 5 Baseline .
In settings appear :
WB_ERASE = EraseSectors
WB_ERASE_STEPS = 1
WB_ERASE_LEADING_ZEROS = True
WB_ERASE_1_METHOD = EraseBasic
WB_ERASE_1_STEP_TYPE = 0
WB_ERASE_2_METHOD = EraseSectors
WB_ERASE_2_STEP_TYPE = 1
"""
result = [ ]
cmd = f ' shred -zvn 0 /dev/ { disk } '
result . append ( exec_cmd_erase ( cmd ) )
cmd = f ' badblocks -st random -w /dev/ { disk } '
result . append ( exec_cmd_erase ( cmd ) )
return result
def erase_enhanced ( disk ) :
"""
Enhanced Secure Erasure
Settings for advanced data erasure using badblocks Linux software .
A secured - way of erasing data storages , erase hidden areas ,
checking the erase sector by sector .
Performs 3 passes overwriting every sector with zeros and ones ,
and final verification . Compliant with HMG Infosec Standard 5 Enhanced .
In settings appear :
WB_ERASE = EraseSectors
WB_ERASE_LEADING_ZEROS = True
WB_ERASE_1_METHOD = EraseBasic
WB_ERASE_1_STEP_TYPE = 1
WB_ERASE_2_METHOD = EraseBasic
WB_ERASE_2_STEP_TYPE = 0
WB_ERASE_3_METHOD = EraseSectors
WB_ERASE_3_STEP_TYPE = 1
"""
result = [ ]
cmd = f ' shred -vn 1 /dev/ { disk } '
result . append ( exec_cmd_erase ( cmd ) )
cmd = f ' shred -zvn 0 /dev/ { disk } '
result . append ( exec_cmd_erase ( cmd ) )
## creo que realmente seria asi (3 pases y una extra poniendo a ceros):
# shred -zvn 3 /def/{disk}
# tampoco estoy seguro que el badblocks haga un proceso de verificacion.
cmd = f ' badblocks -st random -w /dev/ { disk } '
result . append ( exec_cmd_erase ( cmd ) )
return result
## End Xavier Functions ##
def ata_secure_erase_null ( disk ) :
cmd_baseline = f ' hdparm --user-master u --security-erase NULL /dev/ { disk } '
return [ exec_cmd_erase ( cmd_baseline ) ]
def ata_secure_erase_enhanced ( disk ) :
cmd_enhanced = f ' hdparm --user-master u --security-erase-enhanced /dev/ { disk } '
return [ exec_cmd_erase ( cmd_enhanced ) ]
def nvme_secure_erase ( disk ) :
cmd_encrypted = f ' nvme format /dev/ { disk } --ses=1 '
return [ exec_cmd_erase ( cmd_encrypted ) ]
## End Erase Functions ##
@logs
def get_disks ( ) :
disks = json . loads (
exec_cmd ( ' lsblk -Jdo NAME,TYPE,MOUNTPOINTS,ROTA,TRAN ' )
)
return disks . get ( ' blockdevices ' , [ ] )
@logs
def gen_erase ( all_disks , type_erase , user_disk = None ) :
erase = [ ]
for disk in all_disks :
if user_disk and disk [ ' name ' ] not in user_disk :
continue
if disk [ ' type ' ] != ' disk ' :
continue
if ' boot ' in disk [ ' mountpoints ' ] :
continue
if not disk [ ' rota ' ] :
# if soport nvme erase
erase . append ( nvme_secure_erase ( disk [ ' name ' ] ) )
elif disk [ ' tran ' ] in [ ' ata ' , ' sata ' ] :
# if soport ata erase
if type_erase == ' basic ' :
erase . append ( ata_secure_erase_null ( disk [ ' name ' ] ) )
elif type_erase == ' baseline ' :
erase . append ( ata_secure_erase_null ( disk [ ' name ' ] ) )
elif type_erase == ' enhanced ' :
erase . append ( ata_secure_erase_enhanced ( disk [ ' name ' ] ) )
else :
# For old disks
if type_erase == ' basic ' :
erase . append ( erase_basic ( disk [ ' name ' ] ) )
elif type_erase == ' baseline ' :
erase . append ( erase_baseline ( disk [ ' name ' ] ) )
elif type_erase == ' enhanced ' :
erase . append ( erase_enhanced ( disk [ ' name ' ] ) )
return erase
@logs
def exec_smart ( disk ) :
2024-10-15 11:38:01 +00:00
cmd = f ' sudo smartctl -x --json=cosviu /dev/ { disk } '
2024-09-18 14:13:40 +00:00
return json . loads ( exec_cmd ( cmd ) )
@logs
def smartctl ( all_disks , disk = None ) :
if disk :
2024-09-30 15:48:38 +00:00
return [ exec_smart ( disk ) ]
2024-09-18 14:13:40 +00:00
data_list = [ ]
for disk in all_disks :
if disk [ ' type ' ] == ' disk ' :
data = exec_smart ( disk [ ' name ' ] )
data_list . append ( data )
return data_list
## End Command Functions ##
2024-09-26 11:49:53 +00:00
# TODO permitir selección
# TODO permitir que vaya más rápido
2024-09-18 14:13:40 +00:00
def get_data ( all_disks ) :
2024-11-07 16:33:36 +00:00
lshw = ' sudo lshw -xml '
2024-09-19 15:40:04 +00:00
hwinfo = ' sudo hwinfo --reallyall '
dmidecode = ' sudo dmidecode '
2024-10-10 17:10:46 +00:00
lspci = ' sudo lspci -vv '
2024-09-18 14:13:40 +00:00
data = {
2024-10-15 11:54:16 +00:00
' lshw ' : exec_cmd ( lshw ) or " {} " ,
2024-09-18 14:13:40 +00:00
' disks ' : smartctl ( all_disks ) ,
' hwinfo ' : exec_cmd ( hwinfo ) ,
2024-10-10 17:10:46 +00:00
' dmidecode ' : exec_cmd ( dmidecode ) ,
' lspci ' : exec_cmd ( lspci )
2024-09-18 14:13:40 +00:00
}
return data
def gen_snapshot ( all_disks ) :
snapshot = SNAPSHOT_BASE . copy ( )
snapshot [ ' data ' ] = get_data ( all_disks )
return snapshot
2024-11-06 13:10:02 +00:00
def save_snapshot_in_disk ( snapshot , path , snap_uuid ) :
2024-09-25 14:51:07 +00:00
snapshot_path = os . path . join ( path , ' snapshots ' )
2024-09-18 14:13:40 +00:00
filename = " {} / {} _ {} .json " . format (
2024-09-25 14:51:07 +00:00
snapshot_path ,
2024-09-18 14:13:40 +00:00
datetime . now ( ) . strftime ( " % Y % m %d - % H_ % M_ % S " ) ,
2024-11-06 13:10:02 +00:00
snap_uuid )
2024-09-18 14:13:40 +00:00
2024-09-25 14:51:07 +00:00
try :
if not os . path . exists ( snapshot_path ) :
os . makedirs ( snapshot_path )
2024-10-15 22:26:20 +00:00
logger . info ( _ ( " Created snapshots directory at ' %s ' " ) , snapshot_path )
2024-09-25 14:51:07 +00:00
with open ( filename , " w " ) as f :
2024-11-06 13:10:02 +00:00
f . write ( snapshot )
2024-10-15 22:26:20 +00:00
logger . info ( _ ( " Snapshot written in path ' %s ' " ) , filename )
2024-09-25 14:51:07 +00:00
except Exception as e :
2024-09-26 14:24:38 +00:00
try :
2024-10-15 22:26:20 +00:00
logger . warning ( _ ( " Attempting to save file in actual path. Reason: Failed to write in snapshots directory: \n %s . " ) , e )
2024-09-26 14:24:38 +00:00
fallback_filename = " {} / {} _ {} .json " . format (
path ,
datetime . now ( ) . strftime ( " % Y % m %d - % H_ % M_ % S " ) ,
2024-11-06 13:10:02 +00:00
snap_uuid )
2024-09-26 15:04:05 +00:00
with open ( fallback_filename , " w " ) as f :
2024-11-06 13:10:02 +00:00
f . write ( snapshot )
2024-10-15 22:26:20 +00:00
logger . warning ( _ ( " Snapshot written in fallback path ' %s ' " ) , fallback_filename )
2024-09-26 14:24:38 +00:00
except Exception as e :
2024-10-15 22:26:20 +00:00
logger . error ( _ ( " Could not save snapshot locally. Reason: Failed to write in fallback path: \n %s " ) , e )
2024-09-18 14:13:40 +00:00
2024-11-06 13:10:02 +00:00
def send_to_sign_credential ( cred , token , url ) :
headers = {
" Authorization " : f " Bearer { token } " ,
" Content-Type " : " application/json "
}
try :
data = json . dumps ( cred ) . encode ( ' utf-8 ' )
request = urllib . request . Request ( url , data = data , headers = headers )
with urllib . request . urlopen ( request ) as response :
status_code = response . getcode ( )
#response_text = response.read().decode('utf-8')
if 200 < = status_code < 300 :
logger . info ( _ ( " Credential successfully signed " ) )
else :
logger . error ( _ ( " Credential cannot signed in ' %s ' " ) , url )
except Exception as e :
logger . error ( _ ( " Credential not remotely sent to URL ' %s ' . Do you have internet? Is your server up & running? Is the url token authorized? \n %s " ) , url , e )
2024-09-19 17:01:54 +00:00
# TODO sanitize url, if url is like this, it fails
# url = 'http://127.0.0.1:8000/api/snapshot/'
2024-11-07 09:04:34 +00:00
def send_snapshot_to_devicehub ( snapshot , token , url , legacy ) :
2024-10-31 15:30:45 +00:00
url_components = urllib . parse . urlparse ( url )
ev_path = " evidence/ {} " . format ( snapshot [ " uuid " ] )
2024-11-08 19:14:08 +00:00
components = ( url_components . scheme , url_components . netloc , ev_path , ' ' , ' ' , ' ' )
2024-10-31 15:30:45 +00:00
ev_url = urllib . parse . urlunparse ( components )
# apt install qrencode
2024-09-18 14:13:40 +00:00
headers = {
2024-09-30 15:48:38 +00:00
" Authorization " : f " Bearer { token } " ,
2024-09-18 14:13:40 +00:00
" Content-Type " : " application/json "
}
2024-09-20 19:31:12 +00:00
try :
2024-10-15 11:32:36 +00:00
data = json . dumps ( snapshot ) . encode ( ' utf-8 ' )
request = urllib . request . Request ( url , data = data , headers = headers )
with urllib . request . urlopen ( request ) as response :
status_code = response . getcode ( )
2024-11-07 09:04:34 +00:00
response_text = response . read ( ) . decode ( ' utf-8 ' )
2024-10-15 11:32:36 +00:00
if 200 < = status_code < 300 :
2024-10-15 22:26:20 +00:00
logger . info ( _ ( " Snapshot successfully sent to ' %s ' " ) , url )
2024-11-07 09:04:34 +00:00
if legacy :
try :
response = json . loads ( response_text )
2024-11-12 00:15:34 +00:00
public_url = response . get ( ' public_url ' )
dhid = response . get ( ' dhid ' )
if public_url :
2024-11-07 09:04:34 +00:00
# apt install qrencode
2024-11-12 00:15:34 +00:00
qr = " echo {} | qrencode -t ANSI " . format ( public_url )
2024-11-07 09:04:34 +00:00
print ( exec_cmd ( qr ) )
2024-11-12 00:15:34 +00:00
print ( " url: {} " . format ( public_url ) )
if dhid :
print ( " dhid: {} " . format ( dhid ) )
2024-11-07 09:04:34 +00:00
except Exception :
logger . error ( response_text )
2024-11-07 16:42:03 +00:00
else :
qr = " echo {} | qrencode -t ANSI " . format ( ev_url )
print ( exec_cmd ( qr ) )
print ( f " url: { ev_url } " )
2024-10-31 15:30:45 +00:00
else :
2024-11-08 10:27:31 +00:00
logger . error ( _ ( " Snapshot %s could not be sent to URL ' %s ' " ) , snapshot [ " uuid " ] , url )
2024-11-11 01:24:06 +00:00
# TODO review all the try-except thing here; maybe the try inside legacy does not make sense anymore
except urllib . error . HTTPError as e :
error_details = e . read ( ) . decode ( ' utf-8 ' ) # Get the error response body
logger . error ( _ ( " Snapshot %s not remotely sent to URL ' %s ' . Server responded with error: \n %s " ) ,
snapshot [ " uuid " ] , url , error_details )
2024-09-30 11:09:23 +00:00
2024-09-27 12:17:48 +00:00
except Exception as e :
2024-11-08 10:27:31 +00:00
logger . error ( _ ( " Snapshot %s not remotely sent to URL ' %s ' . Do you have internet? Is your server up & running? Is the url token authorized? \n %s " ) , snapshot [ " uuid " ] , url , e )
2024-09-18 14:13:40 +00:00
2024-10-31 15:30:45 +00:00
2024-10-31 15:30:45 +00:00
2024-09-19 15:42:03 +00:00
def load_config ( config_file = " settings.ini " ) :
"""
Tries to load configuration from a config file .
"""
config = configparser . ConfigParser ( )
if os . path . exists ( config_file ) :
# If config file exists, read from it
2024-10-15 22:26:20 +00:00
logger . info ( _ ( " Found config file in path: %s . " ) , config_file )
2024-09-19 15:42:03 +00:00
config . read ( config_file )
path = config . get ( ' settings ' , ' path ' , fallback = os . getcwd ( ) )
# TODO validate that has http:// start
url = config . get ( ' settings ' , ' url ' , fallback = None )
token = config . get ( ' settings ' , ' token ' , fallback = None )
# TODO validate that the device exists?
device = config . get ( ' settings ' , ' device ' , fallback = None )
erase = config . get ( ' settings ' , ' erase ' , fallback = None )
2024-09-27 16:07:48 +00:00
legacy = config . get ( ' settings ' , ' legacy ' , fallback = None )
2024-11-06 13:10:02 +00:00
url_wallet = config . get ( ' settings ' , ' url_wallet ' , fallback = None )
wb_sign_token = config . get ( ' settings ' , ' wb_sign_token ' , fallback = None )
2024-09-19 15:42:03 +00:00
else :
2024-10-15 22:26:20 +00:00
logger . error ( _ ( " Config file ' %s ' not found. Using default values. " ) , config_file )
2024-09-19 15:42:03 +00:00
path = os . path . join ( os . getcwd ( ) )
2024-09-27 16:07:48 +00:00
url , token , device , erase , legacy = None , None , None , None , None
2024-09-19 15:42:03 +00:00
return {
' path ' : path ,
' url ' : url ,
' token ' : token ,
' device ' : device ,
2024-09-27 16:07:48 +00:00
' erase ' : erase ,
2024-11-06 13:10:02 +00:00
' legacy ' : legacy ,
' wb_sign_token ' : wb_sign_token ,
' url_wallet ' : url_wallet
2024-09-19 15:42:03 +00:00
}
2024-09-18 14:13:40 +00:00
2024-09-19 15:42:03 +00:00
def parse_args ( ) :
"""
Parse config argument , if available
"""
2024-10-15 22:26:20 +00:00
parser = argparse . ArgumentParser (
usage = _ ( " workbench-script.py [-h] [--config CONFIG] " ) ,
description = _ ( " Optional config loader for workbench. " ) )
2024-09-18 14:13:40 +00:00
parser . add_argument (
2024-09-19 15:42:03 +00:00
' --config ' ,
2024-10-15 22:26:20 +00:00
help = _ ( " path to the config file. Defaults to ' settings.ini ' in the current directory. " ) ,
2024-09-19 15:42:03 +00:00
default = " settings.ini " # Fallback to 'settings.ini' by default
2024-09-18 14:13:40 +00:00
)
2024-09-19 15:42:03 +00:00
return parser . parse_args ( )
2024-10-15 22:26:20 +00:00
def prepare_lang ( ) :
locale_path = os . path . join ( os . path . dirname ( __file__ ) , ' locale ' )
domain = ' messages '
gettext . bindtextdomain ( domain , locale_path )
gettext . textdomain ( domain )
global _
# with LANG=es_ES.UTF-8, it detects spanish
_ = gettext . gettext
# # this would force it to spanish
# lang = gettext.translation(domain, localedir=locale_path, languages=['es'])
# lang.install()
# _ = lang.gettext
def prepare_logger ( ) :
global logger
logger = logging . getLogger ( __name__ )
logger . setLevel ( logging . INFO )
console_handler = logging . StreamHandler ( )
console_handler . setLevel ( logging . INFO )
formatter = logging . Formatter ( ' [ %(asctime)s ] workbench: %(levelname)s : %(message)s ' )
console_handler . setFormatter ( formatter )
logger . addHandler ( console_handler )
2024-09-19 15:42:03 +00:00
def main ( ) :
2024-10-15 22:26:20 +00:00
prepare_lang ( )
prepare_logger ( )
logger . info ( _ ( " START " ) )
2024-09-19 15:42:03 +00:00
# Parse the command-line arguments
args = parse_args ( )
# Load the config file, either specified via --config or the default 'settings.ini'
config_file = args . config
config = load_config ( config_file )
2024-11-07 09:04:34 +00:00
legacy = config . get ( " legacy " )
2024-09-18 14:13:40 +00:00
2024-09-26 11:49:53 +00:00
# TODO show warning if non root, means data is not complete
# if annotate as potentially invalid snapshot (pending the new API to be done)
if os . geteuid ( ) != 0 :
2024-10-15 22:26:20 +00:00
logger . warning ( _ ( " This script must be run as root. Collected data will be incomplete or unusable " ) )
2024-09-26 11:49:53 +00:00
2024-09-18 14:13:40 +00:00
all_disks = get_disks ( )
snapshot = gen_snapshot ( all_disks )
2024-11-06 13:10:02 +00:00
snap_uuid = snapshot [ " uuid " ]
2024-09-18 14:13:40 +00:00
2024-09-27 16:07:48 +00:00
if config [ ' erase ' ] and config [ ' device ' ] and not config . get ( " legacy " ) :
2024-09-19 15:42:03 +00:00
snapshot [ ' erase ' ] = gen_erase ( all_disks , config [ ' erase ' ] , user_disk = config [ ' device ' ] )
2024-09-27 16:07:48 +00:00
elif config [ ' erase ' ] and not config . get ( " legacy " ) :
2024-09-19 15:42:03 +00:00
snapshot [ ' erase ' ] = gen_erase ( all_disks , config [ ' erase ' ] )
2024-11-07 09:04:34 +00:00
if legacy :
2024-09-27 16:07:48 +00:00
convert_to_legacy_snapshot ( snapshot )
2024-11-06 13:10:02 +00:00
snapshot = json . dumps ( snapshot )
else :
url_wallet = config . get ( " url_wallet " )
wb_sign_token = config . get ( " wb_sign_token " )
2024-11-08 11:23:45 +00:00
if wb_sign_token :
tk = wb_sign_token . encode ( " utf8 " )
snapshot [ " token_hash " ] = hashlib . hash256 ( tk ) . hexdigest ( )
2024-11-06 13:10:02 +00:00
if url_wallet and wb_sign_token :
snapshot = send_to_sign_credential ( snapshot , wb_sign_token , url_wallet )
2024-10-15 22:26:20 +00:00
2024-11-06 13:10:02 +00:00
save_snapshot_in_disk ( snapshot , config [ ' path ' ] , snap_uuid )
2024-09-18 14:13:40 +00:00
2024-09-19 15:42:03 +00:00
if config [ ' url ' ] :
2024-11-07 09:04:34 +00:00
send_snapshot_to_devicehub ( snapshot , config [ ' token ' ] , config [ ' url ' ] , legacy )
2024-09-18 14:13:40 +00:00
2024-10-15 22:26:20 +00:00
logger . info ( _ ( " END " ) )
2024-09-18 14:13:40 +00:00
if __name__ == ' __main__ ' :
main ( )