"""This is the main module of the application. It contains the main function which processes ACM files and applies
corrections.
Functions:
----------
- apply_corrections: Applies corrections based on user input.
- read_files: Reads and parses ACM and TXT files.
- generate_plots: Generates plots and animations.
- calculate_dose_values: Calculates dose values and dose rate values.
- snc_format_array: Formats the array to be compatible with the SNC measured txt file.
- get_user_input: Gets user input for batch folder path and correction type.
- main: Main function to process ACM files and apply corrections.
"""
import os
import numpy as np
import io_snc
import plots
from corrections import apply_jager_corrections, get_intrinsic_corrections
[docs]
def apply_corrections(counts_accumulated_df, bkrnd_and_calibration_df, include_intrinsic_corrections, array_data):
"""
Apply corrections based on user input.
Parameters
----------
counts_accumulated_df : pandas.DataFrame
DataFrame with accumulated counts.
bkrnd_and_calibration_df : pandas.DataFrame
DataFrame with background and calibration data.
include_intrinsic_corrections : str
Whether to include intrinsic corrections ('y' or 'n').
array_data : numpy.ndarray
Array data.
Returns
-------
numpy.ndarray
Corrected count array.
"""
if include_intrinsic_corrections == 'y':
intrinsic_corrections = get_intrinsic_corrections(array_data)
else:
intrinsic_corrections = None
corrected_count_array = apply_jager_corrections(counts_accumulated_df, bkrnd_and_calibration_df,
intrinsic_corrections)
return corrected_count_array
[docs]
def read_files(acml_path, txt_path):
"""
Read and parse ACM and TXT files.
Parameters
----------
acml_path : str
Path to the ACM file.
txt_path : str
Path to the TXT file.
Returns
-------
tuple
DataFrames and arrays with parsed data.
"""
frame_data_df, counts_accumulated_df, bkrnd_and_calibration_df = io_snc.parse_acm_file(acml_path)
header_data = io_snc.parse_arccheck_header(txt_path)
array_data = io_snc.parse_arrays_from_file(txt_path)
return frame_data_df, counts_accumulated_df, bkrnd_and_calibration_df, header_data, array_data
[docs]
def generate_plots(dose_rate_arrays, dose_df, dose_rate_df, dose_accumulated_df, startframe, endframe, detector_number):
"""
Generate plots and animations.
Parameters
----------
dose_rate_arrays : numpy.ndarray
Dose rate arrays.
dose_df : pandas.DataFrame
DataFrame with dose data.
dose_rate_df : pandas.DataFrame
DataFrame with dose rate data.
dose_accumulated_df : pandas.DataFrame
DataFrame with accumulated dose data.
startframe : int
Starting frame.
endframe : int
Ending frame.
detector_number : int
Detector number.
"""
xn, yn = 31, 15
diode_numbers_in_snc_array = io_snc.diode_numbers_in_snc_array()
X = np.arange(-32.5, 33, 0.5)
Y = np.arange(10, -10.5, -0.5)
row, col = np.where(diode_numbers_in_snc_array == detector_number)
row, col = row[0], col[0]
half_xn = xn // 2
half_yn = yn // 2
start_row = row - half_yn if row - half_yn >= 0 else row
end_row = row + half_yn + 1 if row + half_yn + 1 <= diode_numbers_in_snc_array.shape[0] else row + 1
start_col = col - half_xn if col - half_xn >= 0 else col
end_col = col + half_xn + 1 if col + half_xn + 1 <= diode_numbers_in_snc_array.shape[1] else col + 1
selected_dose_rate_arrays = dose_rate_arrays[startframe:endframe, start_row:end_row, start_col:end_col]
selected_X = X[start_col:end_col]
selected_Y = Y[start_row:end_row]
plots.create_animation(dose_rate_arrays, xn, yn, detector_number)
plots.scatter_cumulative_dose(dose_rate_df[startframe:endframe], dose_accumulated_df[startframe:endframe],
detector_number)
plots.bar_doserate_histogram(dose_df, dose_rate_df, [630, 610, 590, 570, 550])
[docs]
def calculate_dose_values(counts_accumulated_df, dose_per_count):
"""
Calculate dose values and dose rate values.
Parameters
----------
counts_accumulated_df : pandas.DataFrame
DataFrame with accumulated counts.
dose_per_count : float
Dose per count.
Returns
-------
tuple
DataFrames with dose values and dose rate values.
"""
dose_accumulated_df = counts_accumulated_df * dose_per_count # cGy
# Calculate the difference between current row and previous row
dose_df = dose_accumulated_df.diff() # cGy
dose_df = dose_df[1:] # The first row will be NaN, drop the first row
dose_accumulated_df = dose_accumulated_df[1:] # Matching the length of dose_df
# Calculate the dose rate values
# Assuming the time interval between each frame is 50 ms
time_interval = 50 / 60000 # in minutes
dose_rate_df = dose_df / time_interval # cGy/min
# Create a ndarray of detector arrays arranged in the SNC Patient display configuration
# Each array represents a frame of the detector array at a specific time point
dose_rate_arrays = io_snc.detector_arrays(dose_rate_df)
return dose_df, dose_accumulated_df, dose_rate_df, dose_rate_arrays
[docs]
def main():
"""
Main function to process ACM files and apply corrections.
"""
batch_folder_path, correction_type, include_intrinsic_corrections = get_user_input()
for file in os.listdir(batch_folder_path):
if file.endswith(".acm"):
print(f"Processing file: {file}")
acm_file_path = os.path.join(batch_folder_path, file)
txt_file_path = os.path.join(batch_folder_path, file[:-4] + ".txt")
if not os.path.isfile(txt_file_path):
print(f"Matching .txt file for {file} not found. Skipping this file.")
continue
try:
(frame_data_df,
counts_accumulated_df,
bkrnd_and_calibration_df,
header_data,
array_data) = read_files(acm_file_path, txt_file_path)
if include_intrinsic_corrections == 'y':
file_name_suffix_intrinsic = '_intrinsic'
else:
file_name_suffix_intrinsic = ''
corrected_count_array = apply_corrections(counts_accumulated_df,
bkrnd_and_calibration_df,
include_intrinsic_corrections,
array_data)
array_data_to_write = array_data.copy()
if correction_type == 'pr':
array_data_to_write['Corrected Counts'] = snc_format_array(corrected_count_array[0],
array_data_to_write['Corrected Counts'])
else:
array_data_to_write['Corrected Counts'] = snc_format_array(corrected_count_array[1],
array_data_to_write['Corrected Counts'])
write_file_path = (txt_file_path[:-4] + '_corrected_' + correction_type +
file_name_suffix_intrinsic + '.txt')
io_snc.write_snc_txt_file(array_data_to_write, header_data, write_file_path)
except FileNotFoundError:
print(f"File {file} not found.")
except Exception as e:
print(f"An error occurred while processing {file}: {str(e)}")
if __name__ == "__main__":
main()