Skip to content
Snippets Groups Projects
Select Git revision
  • de2fdc577e265daa85215e5ff88fdec0892cbb50
  • master default protected
  • android-7.1.2_r28_klist
  • pie-cts-release
  • pie-vts-release
  • pie-cts-dev
  • oreo-mr1-iot-release
  • sdk-release
  • oreo-m6-s4-release
  • oreo-m4-s12-release
  • pie-release
  • pie-r2-release
  • pie-r2-s1-release
  • oreo-vts-release
  • oreo-cts-release
  • oreo-dev
  • oreo-mr1-dev
  • pie-gsi
  • pie-platform-release
  • pie-dev
  • oreo-cts-dev
  • android-o-mr1-iot-release-1.0.4
  • android-9.0.0_r8
  • android-9.0.0_r7
  • android-9.0.0_r6
  • android-9.0.0_r5
  • android-8.1.0_r46
  • android-8.1.0_r45
  • android-n-iot-release-smart-display-r2
  • android-vts-8.1_r5
  • android-cts-8.1_r8
  • android-cts-8.0_r12
  • android-cts-7.1_r20
  • android-cts-7.0_r24
  • android-o-mr1-iot-release-1.0.3
  • android-cts-9.0_r1
  • android-8.1.0_r43
  • android-8.1.0_r42
  • android-n-iot-release-smart-display
  • android-p-preview-5
  • android-9.0.0_r3
41 results

file_contexts

Blame
  • drive_handle.py 11.68 KiB
    import ctypes
    import errno
    import os
    import shutil
    import subprocess
    from distutils.dir_util import copy_tree
    from hashlib import sha1
    
    import psutil
    from progress.spinner import Spinner
    from termcolor import colored
    
    
    class DriveHandle:
    
        _DEFAULT_CLUSTER_SIZE: int = 4_096
        _MAX_RAM_ALLOCATION: int = 1_024 * 1_024 * 1_024  # equal to 1 GB
        _drive_letter: chr = None
    
        def __init__(self, drive_letter: chr, format_drive: bool, max_memory: int) -> None:
    
            # sanity checks
            if (not os.path.isdir(f"{drive_letter}:\\")):
                raise ValueError("Specified partition cannot be accessed.")
    
            if (format_drive):
                print(colored(f"[DriveHandle] Do you want to format drive {drive_letter}:\\? All information will be lost.", "red"))
                confirmation = input("y(es)/n(o): ")
                if (confirmation.lower() == "yes" or confirmation.lower() == "y"):
                    print(colored(f"[DriveHandle] Formatting drive {drive_letter}:\\", "light_grey"))
                    os.system(f"format {drive_letter}: /FS:NTFS /A:{DriveHandle._DEFAULT_CLUSTER_SIZE} /x /Y")
                    print(colored("[DriveHandle] Formatting completed.", "light_grey"))
                else:
                    print(colored("[DriveHandle] Skipping the format process.", "light_grey"))
    
            filesystem: str = ""
            for partition in psutil.disk_partitions():
                if f"{drive_letter}:\\".startswith(partition.mountpoint):
                    filesystem = partition.fstype
    
            if filesystem != "NTFS":
                raise ValueError("Selected partition is not formatted to NTFS.")
    
            if (len(os.listdir(f"{drive_letter}:\\")) > 1):  # Magic value of 1 is used due to the 'System Volume Information' file of each partition
                raise ValueError("Selected partition is not empty.")
    
            if (str(subprocess.check_output(['powershell.exe', f'(Get-Disk -Number (Get-Partition -DriveLetter {drive_letter}).DiskNumber).IsBoot'])).find("True") != -1):
                raise ValueError("Selected partition is the boot drive.")
    
            # great, everything works
            # save path
            self._MAX_RAM_ALLOCATION = max_memory * 1_024 * 1_024  # convert from MiB to B
            self._drive_letter = drive_letter
    
        def get_free_drive_space(self) -> int:
            """
            Determines the free usable disk space on the drive
    
            Returns:
                free_disk_space (int): The free disk space measured in bytes
            """
            return psutil.disk_usage(self.get_path()).free
    
        def get_drive_size(self) -> int:
            """
            Determines the total drive size
    
            Returns:
                drive_size (int): The drive size measured in bytes
            """
            return psutil.disk_usage(self.get_path()).total
    
        def get_allocation_unit_sizes(self) -> tuple:
            """
            Determines the allocation unit sizes of the used volume
    
            Returns:
                (bytes_per_sector, sectors_per_cluster, bytes_per_cluster) (int, int, int): The allocation unit sizes
            """
            sectors_per_cluster: ctypes.c_ulong = ctypes.c_ulonglong(0)
            bytes_per_sector: ctypes.c_ulong = ctypes.c_ulonglong(0)
            encoded_path: ctypes.c_wchar_p = ctypes.c_wchar_p(self.get_path())
    
            ctypes.windll.kernel32.GetDiskFreeSpaceW(encoded_path, ctypes.pointer(sectors_per_cluster), ctypes.pointer(bytes_per_sector), None, None)
            return (bytes_per_sector.value, sectors_per_cluster.value, sectors_per_cluster.value * bytes_per_sector.value)
    
        def get_clusters_for_file(self, path: str) -> list:
            """
            Determines the cluster numbers associated with a file
    
            Parameters:
                path (str): The path to the file
    
            Returns:
                cluster_numbers (list<int>): The cluster numbers associated with the file specified in the path
            """
            print(colored(f"[DriveHandle] Gathering the cluster numbers associated with file {path}", "light_grey"))
            file_name: str = path.split("\\")[-1]
            cluster_numbers: list = []
            progressbar = Spinner('[DriveHandle] Gathering clusters')
    
            file_extents_query_responses: str = subprocess.check_output(['fsutil', 'file', 'queryextents', path]).decode("utf-8").split('\r\n')
            for file_extents_query_response in file_extents_query_responses:
                if file_extents_query_response == "":
                    break
    
                # the extent (LCN) marks the first cluster of the (maybe fragmented) chunk
                current_trim_index: int = file_extents_query_response.find("LCN: ") + len("LNC: ")
                extents_first_cluster: int = int(file_extents_query_response[current_trim_index:].strip(), 16)
                current_cluster: int = extents_first_cluster
                # find associated clusters from that point on
                while True:
                    progressbar.next()
                    if self.__check_cluster_file_association(current_cluster, file_name):
                        cluster_numbers.append(current_cluster)
                    else:
                        # no more clusters from this point on
                        # go to next extent
                        break
                    current_cluster += 1
    
            # all extents are checked
            # return final clusters
            return cluster_numbers
    
        def __check_cluster_file_association(self, cluster_number: int, file_name: str) -> bool:
            """
            Checks wether a cluster number is associated with a file
    
            Parameters:
                cluster_number (int): The cluster number, that needs to be checked
                file_name (str): The name of the file, the cluster might be associated with
    
            Returns:
                association (bool): True, if the cluster is associated with the file, False otherwise
            """
            try:
                response: str = subprocess.check_output(['fsutil', 'volume', 'querycluster', f'{self._drive_letter}:', f'{cluster_number}']).decode("utf-8")
            except subprocess.CalledProcessError:
                return False
            found_path_string: int = response.find(file_name)
            if found_path_string != -1:
                return True
            return False
    
        def write_pattern(self, symbol: chr, path: str) -> None:
            """
            Writes the pattern file to the disk
    
            Parameters:
                symbol (chr): The symbol the pattern file shall be filled with
                path (str): The path to the pattern file
            """
            print(colored(f"[DriveHandle] Writing a file ({path}) with 0x{symbol.encode().hex()} pattern to fill the disk.", "light_grey"))
            progressbar = Spinner('[DriveHandle] Writing the pattern file')
    
            write_string = str(symbol) * 1024 * 1024 * 5  # 5 MiB
            with open(path, "w") as file_descriptor:
                while True:
                    try:
                        file_descriptor.write(write_string)
                        file_descriptor.flush()
                        progressbar.next()
                    except OSError as error:
                        if error.errno == errno.ENOSPC:
                            if len(write_string) > 1:
                                write_string = write_string[:(len(write_string) // 2)]
                                if (self.get_free_drive_space() == 0):
                                    break
                            else:
                                break
                        else:
                            raise
    
            progressbar.finish()
            print(colored("[DriveHandle] Completed writing the pattern file.", "light_grey"))
    
        def create_overwrite_file(self, symbol: chr, path: str) -> None:
            """
            Writes the overwrite file to the disk
    
            Parameters:
                symbol (chr): The symbol the overwrite file shall be filled with
                path (str): The path to the overwrite file
            """
            (sector_size, _, cluster_size) = self.get_allocation_unit_sizes()
            half_cluster: int = cluster_size // 2
            half_sector: int = sector_size // 2
            file_size: int = cluster_size + half_cluster + half_sector
            print(colored(f"[DriveHandle] Writing a overwriting file ({path}) with the size {file_size}B.", "light_grey"))
    
            with open(path, "wb") as file_descriptor:
                print(colored(f"[DriveHandle] Writing the symbol '{symbol}' (0x{symbol.encode().hex()}) to the overwrite file.", "light_grey"))
                file_descriptor.write(symbol.encode() * file_size)
            print(colored("[DriveHandle] Completed writing the overwrite file.", "light_grey"))
    
        def copy_demo_files(self) -> None:
            """
            Copies the files of the demo folder to the drive
            """
            working_dir = os.path.abspath(os.path.join((os.path.dirname(os.path.abspath(__file__))), os.pardir))
            copy_tree(f'{working_dir}\\demo', f'{self._drive_letter}:\\demo')
    
        def remove_demo_files(self) -> None:
            """
            Removes all demo files from the disk
            """
            if (os.path.isdir(f'{self._drive_letter}:\\demo')):
                shutil.rmtree(f'{self._drive_letter}:\\demo')
    
        def delete_file(self, path: str) -> None:
            """
            Deletes the specified file without moving it to the recyle bin
    
            Parameters:
                path (str): The path to the deletable file
            """
            print(colored(f"[DriveHandle] Deleting file {path}.", "light_grey"))
            os.remove(path)
    
        def read_cluster(self, cluster_number: int) -> bytes:
            """
            Reads a cluster from the disk
    
            Parameters:
                cluster_number (int): The number of the readable cluster
    
            Returns:
                cluster (bytes): The values stored in the cluster with the specified number
            """
            # print(colored(f"[DriveHandle] Reading cluster number {cluster}.", "light_grey"))
            (_, _, cluster_size) = self.get_allocation_unit_sizes()
            with open(f'\\\\.\\{self._drive_letter}:', 'rb') as disk_cursor:
                disk_cursor.seek(cluster_number * cluster_size)
                return disk_cursor.read(cluster_size)
    
        def generate_hash(self) -> str:
            """
            Generates the forensic hash value of the volume.
            The hash algorithm is SHA1
    
            Returns:
                hash_values (str): The SHA1 hash of the volume
            """
            print(colored("[DriveHandle] Generating the hash of the drive.", "light_grey"))
            hash_function = sha1()
            progressbar = Spinner('[DriveHandle] Generating hash')
            with open(f'\\\\.\\{self._drive_letter}:', 'rb') as disk_cursor:
                while True:
                    read_chunk = disk_cursor.read(min(self._MAX_RAM_ALLOCATION, self.get_drive_size()))
                    hash_function.update(read_chunk)
                    progressbar.next()
                    if (len(read_chunk) == 0):
                        break
            volume_hash: str = hash_function.hexdigest()
            progressbar.finish()
            return volume_hash
    
        def get_path(self) -> str:
            """
            Determines the path to the selected volume
    
            Returns:
                path (str): The path to the volume
            """
            return f"{self._drive_letter}:\\"
    
        def set_read_only(self, read_only: bool) -> None:
            """
            Sets the drive's read-only property
    
            Parameters:
                read_only (bool): Sets the drive to read-only, if True, writeable otherwise
            """
            print(colored(f"[DriveHandle] Setting drive {self._drive_letter}'s read-only state to {read_only}.", "light_grey"))
            powershell_bool: str = "$true" if read_only else "$false"
            subprocess.check_output(['powershell.exe', f'Get-Disk -Number (Get-Partition -DriveLetter {self._drive_letter}).DiskNumber | Set-Disk -IsReadOnly {powershell_bool}'])
    
        def flush_cache(self) -> None:
            """
            Flushes the write cache of the volume
            """
            print(colored("[DriveHandle] Flushing drive's write cache.", "light_grey"))
            subprocess.check_output(['powershell.exe', f'Write-VolumeCache -DriveLetter {self._drive_letter}'])