Skip to content
Snippets Groups Projects
Commit c608fbaa authored by Justus Müller's avatar Justus Müller :bee:
Browse files

doc: Add documentation to key methods

parent 0b068ce8
No related branches found
No related tags found
No related merge requests found
...@@ -52,12 +52,30 @@ class DriveHandle: ...@@ -52,12 +52,30 @@ class DriveHandle:
self._drive_letter = drive_letter self._drive_letter = drive_letter
def get_free_drive_space(self) -> int: def get_free_drive_space(self) -> int:
"""
Determines the free usable disk space on the drive
Returns:
free_disk_space (int): The free disk space measured in bytes
"""
return psutil.disk_usage(self.get_path()).free return psutil.disk_usage(self.get_path()).free
def get_drive_size(self) -> int: def get_drive_size(self) -> int:
"""
Determines the total drive size
Returns:
drive_size (int): The drive size measured in bytes
"""
return psutil.disk_usage(self.get_path()).total return psutil.disk_usage(self.get_path()).total
def get_allocation_unit_sizes(self) -> tuple: def get_allocation_unit_sizes(self) -> tuple:
"""
Determines the allocation unit sizes of the used volume
Returns:
(bytes_per_sector, sectors_per_cluster, bytes_per_cluster) (int, int, int): The allocation unit sizes
"""
sectors_per_cluster: ctypes.c_ulong = ctypes.c_ulonglong(0) sectors_per_cluster: ctypes.c_ulong = ctypes.c_ulonglong(0)
bytes_per_sector: ctypes.c_ulong = ctypes.c_ulonglong(0) bytes_per_sector: ctypes.c_ulong = ctypes.c_ulonglong(0)
encoded_path: ctypes.c_wchar_p = ctypes.c_wchar_p(self.get_path()) encoded_path: ctypes.c_wchar_p = ctypes.c_wchar_p(self.get_path())
...@@ -66,6 +84,15 @@ class DriveHandle: ...@@ -66,6 +84,15 @@ class DriveHandle:
return (bytes_per_sector.value, sectors_per_cluster.value, sectors_per_cluster.value * bytes_per_sector.value) return (bytes_per_sector.value, sectors_per_cluster.value, sectors_per_cluster.value * bytes_per_sector.value)
def get_clusters_for_file(self, path: str) -> list: def get_clusters_for_file(self, path: str) -> list:
"""
Determines the cluster numbers associated with a file
Parameters:
path (str): The path to the file
Returns:
cluster_numbers (list<int>): The cluster numbers associated with the file specified in the path
"""
print(colored(f"[DriveHandle] Gathering the cluster numbers associated with file {path}", "light_grey")) print(colored(f"[DriveHandle] Gathering the cluster numbers associated with file {path}", "light_grey"))
file_name: str = path.split("\\")[-1] file_name: str = path.split("\\")[-1]
cluster_numbers: list = [] cluster_numbers: list = []
...@@ -96,6 +123,16 @@ class DriveHandle: ...@@ -96,6 +123,16 @@ class DriveHandle:
return cluster_numbers return cluster_numbers
def __check_cluster_file_association(self, cluster_number: int, file_name: str) -> bool: def __check_cluster_file_association(self, cluster_number: int, file_name: str) -> bool:
"""
Checks wether a cluster number is associated with a file
Parameters:
cluster_number (int): The cluster number, that needs to be checked
file_name (str): The name of the file, the cluster might be associated with
Returns:
association (bool): True, if the cluster is associated with the file, False otherwise
"""
try: try:
response: str = subprocess.check_output(['fsutil', 'volume', 'querycluster', f'{self._drive_letter}:', f'{cluster_number}']).decode("utf-8") response: str = subprocess.check_output(['fsutil', 'volume', 'querycluster', f'{self._drive_letter}:', f'{cluster_number}']).decode("utf-8")
except subprocess.CalledProcessError: except subprocess.CalledProcessError:
...@@ -106,6 +143,12 @@ class DriveHandle: ...@@ -106,6 +143,12 @@ class DriveHandle:
return False return False
def write_pattern(self, path: str) -> None: def write_pattern(self, path: str) -> None:
"""
Writes the pattern file to the disk
Parameters:
path (str): The path to the pattern file
"""
print(colored(f"[DriveHandle] Writing a file ({path}) with a rolling pattern to fill the disk.", "light_grey")) print(colored(f"[DriveHandle] Writing a file ({path}) with a rolling pattern to fill the disk.", "light_grey"))
pattern_start_symbol: chr = 'A' pattern_start_symbol: chr = 'A'
iteration: int = 0 iteration: int = 0
...@@ -121,6 +164,13 @@ class DriveHandle: ...@@ -121,6 +164,13 @@ class DriveHandle:
print(colored("[DriveHandle] Completed writing the pattern file.", "light_grey")) print(colored("[DriveHandle] Completed writing the pattern file.", "light_grey"))
def create_overwrite_file(self, symbol: chr, path: str) -> None: def create_overwrite_file(self, symbol: chr, path: str) -> None:
"""
Writes the overwrite file to the disk
Parameters:
symbol (chr): The symbol the overwrite file shall be filled with
path (str): The path to the overwrite file
"""
(sector_size, _, cluster_size) = self.get_allocation_unit_sizes() (sector_size, _, cluster_size) = self.get_allocation_unit_sizes()
half_cluster: int = cluster_size // 2 half_cluster: int = cluster_size // 2
half_sector: int = sector_size // 2 half_sector: int = sector_size // 2
...@@ -133,25 +183,53 @@ class DriveHandle: ...@@ -133,25 +183,53 @@ class DriveHandle:
print(colored("[DriveHandle] Completed writing the overwrite file.", "light_grey")) print(colored("[DriveHandle] Completed writing the overwrite file.", "light_grey"))
def copy_demo_files(self) -> None: def copy_demo_files(self) -> None:
"""
Copies the files of the demo folder to the drive
"""
working_dir = os.path.abspath(os.path.join((os.path.dirname(os.path.abspath(__file__))), os.pardir)) working_dir = os.path.abspath(os.path.join((os.path.dirname(os.path.abspath(__file__))), os.pardir))
copy_tree(f'{working_dir}\\demo', f'{self._drive_letter}:\\demo') copy_tree(f'{working_dir}\\demo', f'{self._drive_letter}:\\demo')
def remove_demo_files(self) -> None: def remove_demo_files(self) -> None:
"""
Removes all demo files from the disk
"""
if (os.path.isdir(f'{self._drive_letter}:\\demo')): if (os.path.isdir(f'{self._drive_letter}:\\demo')):
shutil.rmtree(f'{self._drive_letter}:\\demo') shutil.rmtree(f'{self._drive_letter}:\\demo')
def delete_file(self, path: str) -> None: def delete_file(self, path: str) -> None:
"""
Deletes the specified file without moving it to the recyle bin
Parameters:
path (str): The path to the deletable file
"""
print(colored(f"[DriveHandle] Deleting file {path}.", "light_grey")) print(colored(f"[DriveHandle] Deleting file {path}.", "light_grey"))
os.remove(path) os.remove(path)
def read_cluster(self, cluster: int) -> bytes: def read_cluster(self, cluster_number: int) -> bytes:
"""
Reads a cluster from the disk
Parameters:
cluster_number (int): The number of the readable cluster
Returns:
cluster (bytes): The values stored in the cluster with the specified number
"""
# print(colored(f"[DriveHandle] Reading cluster number {cluster}.", "light_grey")) # print(colored(f"[DriveHandle] Reading cluster number {cluster}.", "light_grey"))
(_, _, cluster_size) = self.get_allocation_unit_sizes() (_, _, cluster_size) = self.get_allocation_unit_sizes()
with open(f'\\\\.\\{self._drive_letter}:', 'rb') as disk_cursor: with open(f'\\\\.\\{self._drive_letter}:', 'rb') as disk_cursor:
disk_cursor.seek(cluster * cluster_size) disk_cursor.seek(cluster_number * cluster_size)
return disk_cursor.read(cluster_size) return disk_cursor.read(cluster_size)
def generate_hash(self) -> str: def generate_hash(self) -> str:
"""
Generates the forensic hash value of the volume.
The hash algorithm is SHA1
Returns:
hash_values (str): The SHA1 hash of the volume
"""
print(colored("[DriveHandle] Generating the hash of the drive.", "light_grey")) print(colored("[DriveHandle] Generating the hash of the drive.", "light_grey"))
hash_function = sha1() hash_function = sha1()
progressbar = Spinner('[DriveHandle] Generating hash') progressbar = Spinner('[DriveHandle] Generating hash')
...@@ -167,13 +245,28 @@ class DriveHandle: ...@@ -167,13 +245,28 @@ class DriveHandle:
return volume_hash return volume_hash
def get_path(self) -> str: def get_path(self) -> str:
"""
Determines the path to the selected volume
Returns:
path (str): The path to the volume
"""
return f"{self._drive_letter}:\\" return f"{self._drive_letter}:\\"
def set_read_only(self, read_only: bool) -> None: def set_read_only(self, read_only: bool) -> None:
"""
Sets the drive's read-only property
Parameters:
read_only (bool): Sets the drive to read-only, if True, writeable otherwise
"""
print(colored(f"[DriveHandle] Setting drive {self._drive_letter}'s read-only state to {read_only}.", "light_grey")) print(colored(f"[DriveHandle] Setting drive {self._drive_letter}'s read-only state to {read_only}.", "light_grey"))
powershell_bool: str = "$true" if read_only else "$false" powershell_bool: str = "$true" if read_only else "$false"
subprocess.check_output(['powershell.exe', f'Get-Disk -Number (Get-Partition -DriveLetter {self._drive_letter}).DiskNumber | Set-Disk -IsReadOnly {powershell_bool}']) subprocess.check_output(['powershell.exe', f'Get-Disk -Number (Get-Partition -DriveLetter {self._drive_letter}).DiskNumber | Set-Disk -IsReadOnly {powershell_bool}'])
def flush_cache(self) -> None: def flush_cache(self) -> None:
"""
Flushes the write cache of the volume
"""
print(colored("[DriveHandle] Flushing drive's write cache.", "light_grey")) print(colored("[DriveHandle] Flushing drive's write cache.", "light_grey"))
subprocess.check_output(['powershell.exe', f'Write-VolumeCache -DriveLetter {self._drive_letter}']) subprocess.check_output(['powershell.exe', f'Write-VolumeCache -DriveLetter {self._drive_letter}'])
...@@ -15,12 +15,24 @@ class TestOrchistrator: ...@@ -15,12 +15,24 @@ class TestOrchistrator:
@classmethod @classmethod
def get_instance(cls): def get_instance(cls):
"""
Singleton implementation of getInstance()
Returns:
instance (TestOrchistrator): The instance, if it has been created successfully, None otherwise
"""
if cls._test_orchistrator is None: if cls._test_orchistrator is None:
raise RuntimeError("No instance has been created.") raise RuntimeError("No instance has been created.")
return cls._test_orchistrator return cls._test_orchistrator
@classmethod @classmethod
def create_instance(cls, drive_handle: DriveHandle) -> None: def create_instance(cls, drive_handle: DriveHandle) -> None:
"""
Creates the Singleton instance with the specified parameters
Parameters:
drive_handle (DriveHandle): The wrapping handle to the drive
"""
print(colored("[TestOrchistrator] Creating Orchistrator instance.", "light_grey")) print(colored("[TestOrchistrator] Creating Orchistrator instance.", "light_grey"))
if cls._test_orchistrator is not None: if cls._test_orchistrator is not None:
return cls.get_instance() return cls.get_instance()
...@@ -29,6 +41,12 @@ class TestOrchistrator: ...@@ -29,6 +41,12 @@ class TestOrchistrator:
cls._drive_handle = drive_handle cls._drive_handle = drive_handle
def generate_tests(self, **kwargs) -> None: def generate_tests(self, **kwargs) -> None:
"""
Generates the tests specified in the arguments
Parameters:
kwargs (**kwargs): The keyword arguments for the specified tests
"""
print(colored("[TestOrchistrator] Generating test instances.", "light_grey")) print(colored("[TestOrchistrator] Generating test instances.", "light_grey"))
if (kwargs.get("slack")): if (kwargs.get("slack")):
self._tests.append(SlackTest(self._drive_handle, bool(kwargs.get("check_cluster_overlap")))) self._tests.append(SlackTest(self._drive_handle, bool(kwargs.get("check_cluster_overlap"))))
...@@ -38,6 +56,12 @@ class TestOrchistrator: ...@@ -38,6 +56,12 @@ class TestOrchistrator:
print(colored("[TestOrchistrator] Generated hash test instance.", "light_grey")) print(colored("[TestOrchistrator] Generated hash test instance.", "light_grey"))
def calculate_SSD_indication_score(self) -> int: def calculate_SSD_indication_score(self) -> int:
"""
Calculates an SSD likelyhood score based on the selected tests
Returns:
cummulated_score (int): The SSD likelyhood score
"""
cummulated_score: int = 0 cummulated_score: int = 0
max_score: int = 0 max_score: int = 0
for test in self._tests: for test in self._tests:
......
...@@ -15,34 +15,44 @@ class HashTest(DecisionTest): ...@@ -15,34 +15,44 @@ class HashTest(DecisionTest):
def __init__(self, drive_handle: DriveHandle) -> None: def __init__(self, drive_handle: DriveHandle) -> None:
self._drive_handle = drive_handle self._drive_handle = drive_handle
def determine_SSD_indication(self) -> float: def determine_SSD_indication(self) -> bool:
"""
Determines wether the drive abstracted by the DriveHandle uses SSD technology
with the help of forensic hash value analysis.
Underlying is the dynamic wear-levelling, that might occurr with SSDs
Returns:
SSD_indication (bool): True, if the analysis found indications for SSD usage, False otherwise
"""
print(colored("[HashTest] Running the hash test!", 'light_blue')) print(colored("[HashTest] Running the hash test!", 'light_blue'))
print(colored("[HashTest] Copying the demo files.", 'light_grey')) print(colored("[HashTest] Copying the demo files.", 'light_grey'))
self._drive_handle.copy_demo_files() self.drive_handle.copy_demo_files()
print(colored("[HashTest] Waiting 120s for the OS to flush the write cache.", "light_grey")) print(colored("[HashTest] Waiting 120s for the OS to flush the write cache.", "light_grey"))
self._drive_handle.flush_cache() self.drive_handle.flush_cache()
time.sleep(120) time.sleep(120)
print(colored(f"[HashTest] The current hash of the drive is equal to {self._drive_handle.generate_hash()}", "yellow")) print(colored(f"[HashTest] The current hash of the drive is equal to {self.drive_handle.generate_hash()}", "yellow"))
# remember a cluster of a random file # remember a cluster of a random file
# if the cluster is set to zero immediately after deletion, # if the cluster is set to zero immediately after deletion,
# the likelyhood of DRAT or DZAT is very high # the likelyhood of DRAT or DZAT is very high
compare_cluster_number: int = self._drive_handle.get_clusters_for_file(f'{self._drive_handle.get_path()}demo\\t5\\000503.text')[0] compare_cluster_number: int = self.drive_handle.get_clusters_for_file(f'{self.drive_handle.get_path()}demo\\t5\\000503.text')[0]
compare_cluster_values: bytes = self._drive_handle.read_cluster(compare_cluster_number) compare_cluster_values: bytes = self.drive_handle.read_cluster(compare_cluster_number)
print(colored("[HashTest] Deleting the demo files.", 'light_grey')) print(colored("[HashTest] Deleting the demo files.", 'light_grey'))
self._drive_handle.remove_demo_files() self.drive_handle.remove_demo_files()
self._drive_handle.flush_cache() self.drive_handle.flush_cache()
self._drive_handle.set_read_only(True) self.drive_handle.set_read_only(True)
# check for DRAT or DZAT # check for DRAT or DZAT
print(colored("[HashTest] Testing for DRAT/DZAT.", "light_grey")) print(colored("[HashTest] Testing for DRAT/DZAT.", "light_grey"))
cluster_values: bytes = self._drive_handle.read_cluster(compare_cluster_number) cluster_values: bytes = self.drive_handle.read_cluster(compare_cluster_number)
if (cluster_values != compare_cluster_values): if (cluster_values != compare_cluster_values):
# DRAT or DZAT # DRAT or DZAT
print(colored(f"[HashTest] Cluster {compare_cluster_number} read a different value immediately after deletion. DRAT or DZAT is likely.", "yellow")) print(colored(f"[HashTest] Cluster {compare_cluster_number} read a different value immediately after deletion. DRAT or DZAT is likely.", "yellow"))
print(colored("[HashTest] SSD usage likely due to DRAT/DZAT. Skipping test.", "red")) print(colored("[HashTest] SSD usage likely due to DRAT/DZAT. Skipping test.", "red"))
self._drive_handle.set_read_only(False) self.drive_handle.set_read_only(False)
# SSD usage likely # SSD usage likely
return True return True
else: else:
...@@ -52,7 +62,7 @@ class HashTest(DecisionTest): ...@@ -52,7 +62,7 @@ class HashTest(DecisionTest):
prior_hash: str = None prior_hash: str = None
iteration = 0 iteration = 0
while True: while True:
current_hash = self._drive_handle.generate_hash() current_hash = self.drive_handle.generate_hash()
print(colored(f"[HashTest] The current hash of the drive is {current_hash}", "yellow")) print(colored(f"[HashTest] The current hash of the drive is {current_hash}", "yellow"))
iteration += 1 iteration += 1
if prior_hash is None: if prior_hash is None:
...@@ -77,7 +87,7 @@ class HashTest(DecisionTest): ...@@ -77,7 +87,7 @@ class HashTest(DecisionTest):
time.sleep(60 * 60) time.sleep(60 * 60)
# release drive # release drive
self._drive_handle.set_read_only(False) self.drive_handle.set_read_only(False)
print(colored("[HashTest] Hash test completed.", "light_blue")) print(colored("[HashTest] Hash test completed.", "light_blue"))
return detected_wear_levelling return detected_wear_levelling
......
...@@ -18,31 +18,40 @@ class SlackTest(DecisionTest): ...@@ -18,31 +18,40 @@ class SlackTest(DecisionTest):
self._check_cluster_overlap = check_cluster_overlap self._check_cluster_overlap = check_cluster_overlap
def determine_SSD_indication(self) -> bool: def determine_SSD_indication(self) -> bool:
"""
Determines wether the drive abstracted by the DriveHandle uses SSD technology
with the help of slack space analysis
Returns:
SSD_indication (bool): True, if the analysis found indications for SSD usage, False otherwise
"""
print(colored("[SlackTest] Running the slack test!", 'light_blue')) print(colored("[SlackTest] Running the slack test!", 'light_blue'))
# create rolling pattern # create rolling pattern
pattern_file_path: str = f'{self._drive_handle.get_path()}transsd-slacktest-pattern' pattern_file_path: str = f'{self.drive_handle.get_path()}transsd-slacktest-pattern'
self._drive_handle.write_pattern(pattern_file_path) self.drive_handle.write_pattern(pattern_file_path)
self._drive_handle.flush_cache() self.drive_handle.flush_cache()
pattern_file_clusters: list = [] pattern_file_clusters: list = []
if self._check_cluster_overlap: if self._check_cluster_overlap:
pattern_file_clusters = self._drive_handle.get_clusters_for_file(pattern_file_path) pattern_file_clusters = self.drive_handle.get_clusters_for_file(pattern_file_path)
print(colored(f"[SlackTest] First cluster of the pattern file ({pattern_file_path}) is {pattern_file_clusters[0]}", "light_grey")) print(colored(f"[SlackTest] First cluster of the pattern file ({pattern_file_path}) is {pattern_file_clusters[0]}", "light_grey"))
print(colored(f"[SlackTest] Last cluster of the pattern file ({pattern_file_path}) is {pattern_file_clusters[-1]}", "light_grey")) print(colored(f"[SlackTest] Last cluster of the pattern file ({pattern_file_path}) is {pattern_file_clusters[-1]}", "light_grey"))
# delete file # delete file
self._drive_handle.delete_file(pattern_file_path) self.drive_handle.delete_file(pattern_file_path)
print("[SlackTest] Waiting 120s for OS to clean up the disk...") print("[SlackTest] Waiting 120s for OS to clean up the disk...")
self._drive_handle.flush_cache() self.drive_handle.flush_cache()
time.sleep(120) time.sleep(120)
print(colored("[SlackTest] File should be removed from disk without remainder.", "light_grey")) print(colored("[SlackTest] File should be removed from disk without remainder.", "light_grey"))
# create overwriting file # create overwriting file
overwrite_file_path: str = f'{self._drive_handle.get_path()}transsd-slacktest-overwrite' overwrite_file_path: str = f'{self.drive_handle.get_path()}transsd-slacktest-overwrite'
self._drive_handle.create_overwrite_file('!', overwrite_file_path) self.drive_handle.create_overwrite_file('!', overwrite_file_path)
overwrite_file_clusters: list = self._drive_handle.get_clusters_for_file(overwrite_file_path) overwrite_file_clusters: list = self.drive_handle.get_clusters_for_file(overwrite_file_path)
print(colored(f"[SlackTest] The cluster numbers associated with the overwrite file ({overwrite_file_path}) are {overwrite_file_clusters}", "light_grey")) print(colored(f"[SlackTest] The cluster numbers associated with the overwrite file ({overwrite_file_path}) are {overwrite_file_clusters}", "light_grey"))
overwrite_file_last_cluster: int = self._drive_handle.read_cluster(overwrite_file_clusters[-1]) overwrite_file_last_cluster: int = self.drive_handle.read_cluster(overwrite_file_clusters[-1])
if self._check_cluster_overlap: if self._check_cluster_overlap:
if overwrite_file_clusters[-1] in pattern_file_clusters: if overwrite_file_clusters[-1] in pattern_file_clusters:
...@@ -53,7 +62,7 @@ class SlackTest(DecisionTest): ...@@ -53,7 +62,7 @@ class SlackTest(DecisionTest):
raise RuntimeError("SlackTest could not be performed.") raise RuntimeError("SlackTest could not be performed.")
# analyse the slack space # analyse the slack space
(sector_size, sectors_per_cluster, _) = self._drive_handle.get_allocation_unit_sizes() (sector_size, sectors_per_cluster, _) = self.drive_handle.get_allocation_unit_sizes()
half_cluster: int = sector_size * (sectors_per_cluster // 2) half_cluster: int = sector_size * (sectors_per_cluster // 2)
print("[SlackTest] Showing the RAM slack:") print("[SlackTest] Showing the RAM slack:")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment