diff --git a/Time_stamp_comments/taste_time_stamp_creater.py b/Time_stamp_comments/taste_time_stamp_creater.py index b3ee571bf181d05e29cbc9def70f5bb67728f342..2d59e61978799070685fbd7499b3d021f8cb2e9b 100644 --- a/Time_stamp_comments/taste_time_stamp_creater.py +++ b/Time_stamp_comments/taste_time_stamp_creater.py @@ -13,7 +13,7 @@ def split_seconds_in_time(time:str)-> float: #hrs = int(temp[0])*3600 min =int(temp[1])*60 sec =float(temp[2]) - return min+sec + return min+sec def get_excel_time(time:str)->float: """ @@ -119,7 +119,7 @@ def read_single_csv_file(filename: str = "../taste_test_dir/for_charles_Taste_fi labchart_initial_time = adi.read_file(adicht_file).records[0].record_time.rec_datetime event_datetime = unix_to_datetime(end_time) - # Iryna found the time line skey resulting in 0 at the end of the time line + # Iryna found the timeline skew resulting in 0 at the end of the timeline # synchronised_time = event_datetime - event_datetime[-2] #labchart_initial_time synchronised_time = event_datetime - labchart_initial_time event_ticks = get_ticks(synchronised_time).astype(int) @@ -152,7 +152,7 @@ def convert_for_all_files(directory: str = "test_dir") -> None: generate_taste_macro(time_array=time, flavour_array=events, filename=os.path.splitext(entry.path)[0] +".txt") #if n != file_count: # raise Exception(" Please make sure the directory only contains the .csv file if possible," - # " But Never the less this time the new text files has been written so ignore this Error") + # " But Nevertheless this time the new text files has been written so ignore this Error") print(fr"There are totally : {n} txt files written in the same path as {directory} .") print("please consider reviewing the files ") pass diff --git a/cognitive_load _analysis/adicht_utils.py b/cognitive_load _analysis/adicht_utils.py index f3099e1a06ea3eaf4e52a8f5d30c612c28849f2b..0541187c83c3ff4e1e697802802747d9a5ba3c11 100644 --- a/cognitive_load _analysis/adicht_utils.py +++ b/cognitive_load _analysis/adicht_utils.py @@ -11,6 +11,8 @@ from neurokit2.signal import signal_rate, signal_sanitize from neurokit2.rsp.rsp_amplitude import rsp_amplitude from neurokit2.rsp.rsp_clean import rsp_clean +from scipy import signal + DATA = {"signal": np.ndarray, "block_name": np.ndarray} @@ -132,7 +134,7 @@ def rsp_process(rsp_signal, sampling_rate=1000, method="khodadad2018", min_heigh def puls_process(hr_signal, sampling_rate=1000): signals, info = nk.ppg_process(hr_signal, sampling_rate=sampling_rate) - rate = nk.signal_rate(info["PPG_Peaks"],sampling_rate=sampling_rate,desired_length=len(hr_signal)) + rate = nk.signal_rate(info["PPG_Peaks"], sampling_rate=sampling_rate, desired_length=len(hr_signal)) return rate @@ -300,10 +302,11 @@ def slice_channels(channels: list, indx_start: np.ndarray, indx_end: np.ndarray, for id, channel in enumerate(channels): # will fail for puls , have to bypass it if channel.name in channel_names: - # TODO implement the post processing for Puls from HR + # TODO implement the post processing for the breathing signal Error = None if channel.name == "HR": + # default post processing number is set to 1 data = slice_signals(channel, indx_start, indx_end, take_mean, post_process) if channel.name == "Hautleitfähigkeit": post_process = 2 * post_process @@ -322,7 +325,8 @@ def slice_channels(channels: list, indx_start: np.ndarray, indx_end: np.ndarray, return channels_df, Error def slice_channels_for_analysis(channels: list, indx_start: np.ndarray, indx_end: np.ndarray, block_names: np.ndarray, - channel_names: list=['Atmung', 'HR','Hautleitfähigkeit' ,'Channel 4'],subject:str="subject_name", mean:bool = False) -> Tuple[np.ndarray, np.ndarray]: + channel_names: list=['Atmung', 'HR','Hautleitfähigkeit','Puls', 'Atmungsrate'],subject:str="subject_name", mean:bool = False, + base_normalization:bool=True) -> Tuple[np.ndarray, np.ndarray]: """ slice the signals for all the channels and return signal :param channels: array of adi object channel @@ -335,7 +339,7 @@ def slice_channels_for_analysis(channels: list, indx_start: np.ndarray, indx_end """ channels_df = pd.DataFrame() block_names = np.array(block_names, dtype=str) - block_names = np.delete(block_names, 0) + subject_ids = [subject] * block_names.size channels_df["subject"] = subject_ids channels_df["comments"] = block_names @@ -352,35 +356,38 @@ def slice_channels_for_analysis(channels: list, indx_start: np.ndarray, indx_end else: take_mean = np.zeros_like(indx_start, dtype=bool) - list_features = [] - - for id, channel in enumerate(channels): # will fail for puls , have to bypass it - #if channel.name in channel_names: + if channel.name in channel_names: - # TODO implement the post processing for Puls from HR + # TODO implement the post processing for other channels + if channel.name == "HR": + data = slice_signals(channel, indx_start, indx_end, take_mean, post_process) + if channel.name == "Hautleitfähigkeit": + post_process = 2 * post_process + data = slice_signals(channel, indx_start, indx_end, take_mean, post_process) - #if channel.name == "HR": - # data = slice_signals(channel, indx_start, indx_end, take_mean, post_process) - if channel.name == "Hautleitfähigkeit": - post_process = 2 * post_process - data = slice_signals(channel, indx_start, indx_end, take_mean, post_process) + else: + data = slice_signals(channel, indx_start, indx_end,take_mean,post_process) + + # subtracting baseline values - #else: - #data = slice_signals(channel, indx_start, indx_end) - # subtracting baseline values - baseline = np.zeros_like(data) - min_shape = data[-1].shape[0] - baseline.fill(data[data[0].shape[0]-min_shape:]) + # verify baseline subtraction + if base_normalization: + baseline = np.mean(data[0]) + data = np.subtract(data, baseline).astype(np.float16) + #data = signal.detrend(data, axis=0, type="constant", bp= baseline,overwrite_data=False) - data = data - baseline + # base line subtraction is not relevant as per neurokit tonic signals unless there is not much change in the regression - #list_features.append(normalized_data) + # storing all the channel blocks per subject in a data frame including baseline + channels_df[channel.name] = data + else: + Error = None # "channel name not found in the channel list" - return channels_df, data + return channels_df, Error def get_slice_range(tick_position: np.ndarray, intend_front:int=2e4, intend_back:int=2e4,edit_last_comment:bool=False) -> Tuple[np.ndarray, np.ndarray]: """ diff --git a/cognitive_load _analysis/taste_load_analysis.py b/cognitive_load _analysis/taste_load_analysis.py index 63a1258033b377ffba207305efb0ca2da1012f67..623129284436feda1bd75214c3fc8e1d055421fe 100644 --- a/cognitive_load _analysis/taste_load_analysis.py +++ b/cognitive_load _analysis/taste_load_analysis.py @@ -11,7 +11,8 @@ def read_adicht_file(file: str, front_offset: int = 28e3, back_offset:int=28e3, channel_names: list = ['Atmung', 'HR', 'Hautleitfähigkeit','Puls', 'Atmungsrate'], start_block: list = ['baseline_start'], end_block: list = ['baseline_end'], - subject_id: str = "subject_id") -> pd.DataFrame: + subject_id: str = "subject_id", + retun_signal: bool = False) -> pd.DataFrame: """ :param file: filename for each participant @@ -56,24 +57,36 @@ def read_adicht_file(file: str, front_offset: int = 28e3, back_offset:int=28e3, # Removing baseline_end tick positions red_block_names = np.delete(block_names_per_channel, baseline_ids[0, 1]) - sliced_channel_df, Error = slice_channels(channels=adi_object.channels, - block_names=red_block_names, - indx_start=slice_start, - indx_end=slice_end, - channel_names=channel_names, - subject=subject_id) + if retun_signal: + sliced_channel_df, Error = slice_channels_for_analysis(channels=adi_object.channels, + block_names=red_block_names, + indx_start=slice_start, + indx_end=slice_end, + channel_names=channel_names, + subject=subject_id) + + else: + feature_data = None + sliced_channel_df, Error = slice_channels(channels=adi_object.channels, + block_names=red_block_names, + indx_start=slice_start, + indx_end=slice_end, + channel_names=channel_names, + subject=subject_id) + if Error is not None: - raise AssertionError("The comment name/s: ", *Error.flatten(), rf" in the file{file} does not matches the default comment order.") + raise AssertionError(Error + " in the file{file} does not matches the default comment order.") return sliced_channel_df def get_data_for_analysis(directory: str = "taste_adicht_files/", - channels: list = ['Hautleitfähigkeit','HR'], + channels: list = ['Hautleitfähigkeit','HR', 'Puls'] , start_block: list = ['baseline_start'], - end_block: list = ['baseline_end'],offeset_sec:int=22) \ + end_block: list = ['baseline_end'],offeset_sec:int=22, + return_signals:bool=True) \ -> [np.ndarray, np.ndarray, np.ndarray, np.ndarray]: """ - Main function thats collects data from all subjects + Main function that collects data from all subjects :param directory: directory where adicht files are located :param channels: list of channel that you want to consider :param start_block: list of block starting names @@ -81,7 +94,7 @@ def get_data_for_analysis(directory: str = "taste_adicht_files/", :return: all participant data, odd_data, even_data, subject id """ - offeset_window = offeset_sec*1e3 + offeset_window = offeset_sec * 1e3 files_df = collect_adicht_files(directory_=directory) participants_data = [] @@ -89,10 +102,14 @@ def get_data_for_analysis(directory: str = "taste_adicht_files/", for Subject_ID, participant in tqdm(zip(files_df.subject,files_df.filename), desc="looping over subjects", total=len(files_df), unit="subjects"): - single_participant_df = read_adicht_file(participant, + single_participant_df= read_adicht_file(participant, channel_names=channels, start_block=start_block, end_block=end_block, subject_id=Subject_ID, - back_offset=offeset_window,front_offset=offeset_window) + back_offset=offeset_window,front_offset=offeset_window, + retun_signal=return_signals) + + + # TODO: try to handle the feature in a suitable HDF5 format participants_data.append(single_participant_df) return pd.concat(participants_data,ignore_index=True) @@ -103,6 +120,7 @@ if __name__ == "__main__": parser = argparse.ArgumentParser(description="Average over cognitive load ") parser.add_argument("-w", dest="working_dir", type=str, help="absolute path to directory containing taste adicht files", default=None) parser.add_argument("-s", dest="offset_sec", type=str, help="Seconds to be taken out on either side of the tick position", default=28) + parser.add_argument("-g", dest="graph", type=bool, help="If you like to take the mean and plot", default=False) args = parser.parse_args() data_df = None @@ -115,9 +133,11 @@ if __name__ == "__main__": window.title("Cognitive load Analysis ") def graph(): - data_df = get_data_for_analysis(directory=args.working_dir, offeset_sec=get_current_value()) + data_df = get_data_for_analysis(directory=args.working_dir, offeset_sec=get_current_value(),return_signals= False) data_df.to_csv("taste_cognitive_load_analysis.csv") print("A csv file is written in the same folder ") + + grouped_data = data_df.groupby("comments").agg({'Hautleitfähigkeit': ['mean']}) axes = grouped_data.plot.bar(figsize=(10,8)) axes.set_xlabel("taste_load") @@ -126,6 +146,12 @@ if __name__ == "__main__": axes.autoscale_view() plt.show() return None + def analyse(): + data_df = get_data_for_analysis(directory=args.working_dir, offeset_sec=get_current_value(),return_signals= True) + data_df.to_csv("taste_cognitive_load_analysis.csv") + print("A csv file is written in the same folder ") + + return None window.columnconfigure(0, weight=1) window.columnconfigure(1, weight=3) @@ -158,7 +184,9 @@ if __name__ == "__main__": value_label.pack() args.working_dir = filedialog.askdirectory(title="select a taste directory",) button = Button(window, text="plot", command=graph) + button1 = Button(window, text="Analyse", command=analyse) button.pack() + button1.pack() #Label(window, text=args.working_dir, font=13).pack() print(f"Analysis will be performed on taste adicht files in {args.working_dir} folder")