Select Git revision
file_contexts
drive_handle.py 11.68 KiB
import ctypes
import errno
import os
import shutil
import subprocess
from distutils.dir_util import copy_tree
from hashlib import sha1
import psutil
from progress.spinner import Spinner
from termcolor import colored
class DriveHandle:
_DEFAULT_CLUSTER_SIZE: int = 4_096
_MAX_RAM_ALLOCATION: int = 1_024 * 1_024 * 1_024 # equal to 1 GB
_drive_letter: chr = None
def __init__(self, drive_letter: chr, format_drive: bool, max_memory: int) -> None:
# sanity checks
if (not os.path.isdir(f"{drive_letter}:\\")):
raise ValueError("Specified partition cannot be accessed.")
if (format_drive):
print(colored(f"[DriveHandle] Do you want to format drive {drive_letter}:\\? All information will be lost.", "red"))
confirmation = input("y(es)/n(o): ")
if (confirmation.lower() == "yes" or confirmation.lower() == "y"):
print(colored(f"[DriveHandle] Formatting drive {drive_letter}:\\", "light_grey"))
os.system(f"format {drive_letter}: /FS:NTFS /A:{DriveHandle._DEFAULT_CLUSTER_SIZE} /x /Y")
print(colored("[DriveHandle] Formatting completed.", "light_grey"))
else:
print(colored("[DriveHandle] Skipping the format process.", "light_grey"))
filesystem: str = ""
for partition in psutil.disk_partitions():
if f"{drive_letter}:\\".startswith(partition.mountpoint):
filesystem = partition.fstype
if filesystem != "NTFS":
raise ValueError("Selected partition is not formatted to NTFS.")
if (len(os.listdir(f"{drive_letter}:\\")) > 1): # Magic value of 1 is used due to the 'System Volume Information' file of each partition
raise ValueError("Selected partition is not empty.")
if (str(subprocess.check_output(['powershell.exe', f'(Get-Disk -Number (Get-Partition -DriveLetter {drive_letter}).DiskNumber).IsBoot'])).find("True") != -1):
raise ValueError("Selected partition is the boot drive.")
# great, everything works
# save path
self._MAX_RAM_ALLOCATION = max_memory * 1_024 * 1_024 # convert from MiB to B
self._drive_letter = drive_letter
def get_free_drive_space(self) -> int:
"""
Determines the free usable disk space on the drive
Returns:
free_disk_space (int): The free disk space measured in bytes
"""
return psutil.disk_usage(self.get_path()).free
def get_drive_size(self) -> int:
"""
Determines the total drive size
Returns:
drive_size (int): The drive size measured in bytes
"""
return psutil.disk_usage(self.get_path()).total
def get_allocation_unit_sizes(self) -> tuple:
"""
Determines the allocation unit sizes of the used volume
Returns:
(bytes_per_sector, sectors_per_cluster, bytes_per_cluster) (int, int, int): The allocation unit sizes
"""
sectors_per_cluster: ctypes.c_ulong = ctypes.c_ulonglong(0)
bytes_per_sector: ctypes.c_ulong = ctypes.c_ulonglong(0)
encoded_path: ctypes.c_wchar_p = ctypes.c_wchar_p(self.get_path())
ctypes.windll.kernel32.GetDiskFreeSpaceW(encoded_path, ctypes.pointer(sectors_per_cluster), ctypes.pointer(bytes_per_sector), None, None)
return (bytes_per_sector.value, sectors_per_cluster.value, sectors_per_cluster.value * bytes_per_sector.value)
def get_clusters_for_file(self, path: str) -> list:
"""
Determines the cluster numbers associated with a file
Parameters:
path (str): The path to the file
Returns:
cluster_numbers (list<int>): The cluster numbers associated with the file specified in the path
"""
print(colored(f"[DriveHandle] Gathering the cluster numbers associated with file {path}", "light_grey"))
file_name: str = path.split("\\")[-1]
cluster_numbers: list = []
progressbar = Spinner('[DriveHandle] Gathering clusters')
file_extents_query_responses: str = subprocess.check_output(['fsutil', 'file', 'queryextents', path]).decode("utf-8").split('\r\n')
for file_extents_query_response in file_extents_query_responses:
if file_extents_query_response == "":
break
# the extent (LCN) marks the first cluster of the (maybe fragmented) chunk
current_trim_index: int = file_extents_query_response.find("LCN: ") + len("LNC: ")
extents_first_cluster: int = int(file_extents_query_response[current_trim_index:].strip(), 16)
current_cluster: int = extents_first_cluster
# find associated clusters from that point on
while True:
progressbar.next()
if self.__check_cluster_file_association(current_cluster, file_name):
cluster_numbers.append(current_cluster)
else:
# no more clusters from this point on
# go to next extent
break
current_cluster += 1
# all extents are checked
# return final clusters
return cluster_numbers
def __check_cluster_file_association(self, cluster_number: int, file_name: str) -> bool:
"""
Checks wether a cluster number is associated with a file
Parameters:
cluster_number (int): The cluster number, that needs to be checked
file_name (str): The name of the file, the cluster might be associated with
Returns:
association (bool): True, if the cluster is associated with the file, False otherwise
"""
try:
response: str = subprocess.check_output(['fsutil', 'volume', 'querycluster', f'{self._drive_letter}:', f'{cluster_number}']).decode("utf-8")
except subprocess.CalledProcessError:
return False
found_path_string: int = response.find(file_name)
if found_path_string != -1:
return True
return False
def write_pattern(self, symbol: chr, path: str) -> None:
"""
Writes the pattern file to the disk
Parameters:
symbol (chr): The symbol the pattern file shall be filled with
path (str): The path to the pattern file
"""
print(colored(f"[DriveHandle] Writing a file ({path}) with 0x{symbol.encode().hex()} pattern to fill the disk.", "light_grey"))
progressbar = Spinner('[DriveHandle] Writing the pattern file')
write_string = str(symbol) * 1024 * 1024 * 5 # 5 MiB
with open(path, "w") as file_descriptor:
while True:
try:
file_descriptor.write(write_string)
file_descriptor.flush()
progressbar.next()
except OSError as error:
if error.errno == errno.ENOSPC:
if len(write_string) > 1:
write_string = write_string[:(len(write_string) // 2)]
if (self.get_free_drive_space() == 0):
break
else:
break
else:
raise
progressbar.finish()
print(colored("[DriveHandle] Completed writing the pattern file.", "light_grey"))
def create_overwrite_file(self, symbol: chr, path: str) -> None:
"""
Writes the overwrite file to the disk
Parameters:
symbol (chr): The symbol the overwrite file shall be filled with
path (str): The path to the overwrite file
"""
(sector_size, _, cluster_size) = self.get_allocation_unit_sizes()
half_cluster: int = cluster_size // 2
half_sector: int = sector_size // 2
file_size: int = cluster_size + half_cluster + half_sector
print(colored(f"[DriveHandle] Writing a overwriting file ({path}) with the size {file_size}B.", "light_grey"))
with open(path, "wb") as file_descriptor:
print(colored(f"[DriveHandle] Writing the symbol '{symbol}' (0x{symbol.encode().hex()}) to the overwrite file.", "light_grey"))
file_descriptor.write(symbol.encode() * file_size)
print(colored("[DriveHandle] Completed writing the overwrite file.", "light_grey"))
def copy_demo_files(self) -> None:
"""
Copies the files of the demo folder to the drive
"""
working_dir = os.path.abspath(os.path.join((os.path.dirname(os.path.abspath(__file__))), os.pardir))
copy_tree(f'{working_dir}\\demo', f'{self._drive_letter}:\\demo')
def remove_demo_files(self) -> None:
"""
Removes all demo files from the disk
"""
if (os.path.isdir(f'{self._drive_letter}:\\demo')):
shutil.rmtree(f'{self._drive_letter}:\\demo')
def delete_file(self, path: str) -> None:
"""
Deletes the specified file without moving it to the recyle bin
Parameters:
path (str): The path to the deletable file
"""
print(colored(f"[DriveHandle] Deleting file {path}.", "light_grey"))
os.remove(path)
def read_cluster(self, cluster_number: int) -> bytes:
"""
Reads a cluster from the disk
Parameters:
cluster_number (int): The number of the readable cluster
Returns:
cluster (bytes): The values stored in the cluster with the specified number
"""
# print(colored(f"[DriveHandle] Reading cluster number {cluster}.", "light_grey"))
(_, _, cluster_size) = self.get_allocation_unit_sizes()
with open(f'\\\\.\\{self._drive_letter}:', 'rb') as disk_cursor:
disk_cursor.seek(cluster_number * cluster_size)
return disk_cursor.read(cluster_size)
def generate_hash(self) -> str:
"""
Generates the forensic hash value of the volume.
The hash algorithm is SHA1
Returns:
hash_values (str): The SHA1 hash of the volume
"""
print(colored("[DriveHandle] Generating the hash of the drive.", "light_grey"))
hash_function = sha1()
progressbar = Spinner('[DriveHandle] Generating hash')
with open(f'\\\\.\\{self._drive_letter}:', 'rb') as disk_cursor:
while True:
read_chunk = disk_cursor.read(min(self._MAX_RAM_ALLOCATION, self.get_drive_size()))
hash_function.update(read_chunk)
progressbar.next()
if (len(read_chunk) == 0):
break
volume_hash: str = hash_function.hexdigest()
progressbar.finish()
return volume_hash
def get_path(self) -> str:
"""
Determines the path to the selected volume
Returns:
path (str): The path to the volume
"""
return f"{self._drive_letter}:\\"
def set_read_only(self, read_only: bool) -> None:
"""
Sets the drive's read-only property
Parameters:
read_only (bool): Sets the drive to read-only, if True, writeable otherwise
"""
print(colored(f"[DriveHandle] Setting drive {self._drive_letter}'s read-only state to {read_only}.", "light_grey"))
powershell_bool: str = "$true" if read_only else "$false"
subprocess.check_output(['powershell.exe', f'Get-Disk -Number (Get-Partition -DriveLetter {self._drive_letter}).DiskNumber | Set-Disk -IsReadOnly {powershell_bool}'])
def flush_cache(self) -> None:
"""
Flushes the write cache of the volume
"""
print(colored("[DriveHandle] Flushing drive's write cache.", "light_grey"))
subprocess.check_output(['powershell.exe', f'Write-VolumeCache -DriveLetter {self._drive_letter}'])