Skip to content

Data Loading (.MAT)

Functions for loading model files into form that can be read by GUI

idx2map(ch_idx)

Given a channel index, return the channel's row and col

Parameters:

Name Type Description Default
ch_idx int

single numerical index for array (up to 1024)

required

Returns:

Type Description

channel row and channel index

Source code in src/model/data_loading_mat.py
def idx2map(ch_idx: int):
    """ Given a channel index, return the channel's row and col

    Args:
        ch_idx: single numerical index for array (up to 1024)

    Returns:
        channel row and channel index
    """
    if ch_idx > 1023 or ch_idx < 0:
        print('Chan num out of range')
        return -1
    else:
        ch_row = int(ch_idx/32)
        ch_col = int(ch_idx - ch_row*32)
    return ch_row, ch_col

init_data_loading(path)

Parameters:

Name Type Description Default
path str required
Source code in src/model/data_loading_mat.py
def init_data_loading(path : str):
    """

    Args:
        path:

    Returns:

    """
    bufDir = os.listdir(path)
    num_of_buf = len(bufDir)
    bramdepth = 65536

    datarun = os.path.basename(path)
    datapiece = os.path.basename(os.path.dirname(path))

    # initialize variables
    dataAll = np.zeros((32, 32, int(bramdepth * num_of_buf / 2)))  # Largest possible value of dataAll, perfect recording, only double cnt
    cntAll = np.zeros((32, 32, int(bramdepth * num_of_buf / 2)))
    times = np.zeros((int(bramdepth * num_of_buf / 2)))

    loadingDict = {
        "path": path,
        "datarun": datarun,
        "datapiece": datapiece,
        "bufDir": bufDir,
        "num_of_buf": num_of_buf,
        "bramDepth": bramdepth,
        "dataAll": dataAll,
        "cntAll": cntAll,
        "times": times
    }
    return loadingDict

load_first_buffer_info(app)

Parameters:

Name Type Description Default
app

MainWindow

required
Source code in src/model/data_loading_mat.py
def load_first_buffer_info(app):
    """

    Args:
        app: MainWindow

    Returns:

    """
    data_run = os.path.basename(app.settings["path"])
    file_dir = app.settings["path"] +  "/" + data_run + "_" + str(0) + ".mat"
    first_file_params = {
        "file_dir": file_dir,
        "filter_type": app.settings["filter"],
        "packet_idx": 0,
        "SPIKING_THRESHOLD": app.settings["spikeThreshold"],
        "BIN_SIZE": app.settings["binSize"]
    }
    packet = load_one_mat_file(first_file_params)
    app.data.to_serialize.put(packet)
    app.curr_buf_idx = 1

    NUM_CHANNELS_PER_BUFFER = len(packet["packet_data"])
    return NUM_CHANNELS_PER_BUFFER

load_one_mat_file(params)

Parameters:

Name Type Description Default
params required
Source code in src/model/data_loading_mat.py
def load_one_mat_file(params):
    """

    Args:
        params:

    Returns:

    """
    # this is designed to be multi-processed
    file_dir = params["file_dir"]
    filter_type = params["filter_type"]
    SPIKING_THRESHOLD = params["SPIKING_THRESHOLD"]
    BIN_SIZE = params["BIN_SIZE"]

    mat_contents = sio.loadmat(file_dir)
    dataRaw = mat_contents['gmem1'][0][:]

    from src.model.raw_data_helpers import removeMultipleCounts
    data_real, cnt_real, N = removeMultipleCounts(dataRaw)

    # Note: this code does not timestamp model bc that cannot be parallelized properly
    packet_data = preprocess_raw_data(data_real, cnt_real, N)

    packet = {
        "packet_data": packet_data,
        "packet_idx": params["packet_idx"],
        "file_dir": params["file_dir"],
        "filter_type": params["filter_type"]
    }

    from src.model.filters import filter_preprocessed_data
    packet = filter_preprocessed_data(packet, filter_type=filter_type)

    from src.model.statistics import calculate_channel_stats
    packet = calculate_channel_stats(packet, SPIKING_THRESHOLD, BIN_SIZE)

    return packet

map2idx(ch_row, ch_col)

Given a channel's row and col, return channel's index

Parameters:

Name Type Description Default
ch_row int

row index of channel in array (up to 32)

required
ch_col int

column index of channel in array (up to 32)

required

Returns: numerical index of array

Source code in src/model/data_loading_mat.py
def map2idx(ch_row: int, ch_col: int):
    """ Given a channel's row and col, return channel's index

    Args:
        ch_row: row index of channel in array (up to 32)
        ch_col: column index of channel in array (up to 32)

    Returns: numerical index of array
    """
    if ch_row > 31 or ch_row < 0: print('Row out of range')
    elif ch_col > 31 or ch_col < 0: print('Col out of range')
    else: ch_idx = int(ch_row * 32 + ch_col)
    return ch_idx

preprocess_raw_data(data_real, cnt_real, N, SAMPLING_PERIOD=0.05)

Parameters:

Name Type Description Default
data_real required
cnt_real required
N required
SAMPLING_PERIOD 0.05
Source code in src/model/data_loading_mat.py
def preprocess_raw_data(data_real, cnt_real, N, SAMPLING_PERIOD=0.05):
    """

    Args:
        data_real:
        cnt_real:
        N:
        SAMPLING_PERIOD:

    Returns:

    """
    # Determine time estimate and sample counts for the total combined buffers
    # (note this does not take into account communication delays - hence an estimate)
    end_time = N * SAMPLING_PERIOD  # 20kHz sampling rate, means time_recording (ms) = num_sam * 0.05ms

    # we are not determining absolute time, because we want to parallelize this,
    # and time tracking must be done in sequence
    times = np.linspace(0, end_time, N + 1)

    # identify relevant, nonzero channels, and then append only this model into recorded_data
    from src.model.raw_data_helpers import identify_relevant_channels
    num_channels, channel_map, channel_id, start_idx, find_coords, recorded_channels = identify_relevant_channels(data_real)

    packet_data = []
    for i in range(recorded_channels.shape[0]):

        # process each channel index
        channel_idx = int(recorded_channels[i][0])
        x, y = idx2map(channel_idx)
        start_idx = recorded_channels[i][0]

        channel_data = data_real[x, y, start_idx:N]
        # channel_times = self.times[self.count_track + start_idx: self.count_track+N] can't do
        # prune the times in the packet where nothing is recorded (aka when model == 0)
        # TODO turn on
        # actual_recording_times = (channel_data != 0]
        # channel_times = channel_times[actual_recording_times]
        # channel_data = channel_data[actual_recording_times]

        channel_data = {
            "data_real": data_real,
            "cnt_real": cnt_real,
            "N": N,
            "channel_idx": channel_idx,
            "preprocessed_data": channel_data
        }

        packet_data.append(channel_data)

    return packet_data