Skip to content

Pipeline

🧠 L.A.S.S.I.E. Pipeline Summary#

Step Method Purpose
L LASSIE.load() Load raw data from a supported source (e.g., MongoDB, UAS, MetricWire).
A LASSIE.assure() Validate that required columns exist before processing.
S LASSIE.score() Apply scoring logic based on predefined or custom rules.
S LASSIE.summarize() Aggregate scored data by participant, session, or custom groups.
I LASSIE.inspect() Visualize distributions or pairwise plots for quality checks.
E LASSIE.export() Save scored and summarized data to tidy files and optionally metadata.

🔌 Supported Sources#

Source Type Loader Class Key Arguments Notes
mongodb MongoDBImporter source_path (JSON) Expects flat or nested JSON documents.
uas UASImporter source_path (URL) Parses newline-delimited JSON.
metricwire MetricWireImporter source_path (glob pattern or default) Processes JSON files from unzipped export.
multicsv MultiCSVImporter source_map (dict of CSV paths) Each activity type is its own file.

pipeline #

LASSIE() #

Source code in m2c2_datakit/core/pipeline.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
def __init__(self):
    self.flat: Optional[pd.DataFrame] = None
    self.grouped: Optional[Dict[str, pd.DataFrame]] = None
    self.flat_scored: Optional[pd.DataFrame] = None
    self.grouped_scored: Optional[Dict[str, pd.DataFrame]] = None
    self.grouped_summary: Optional[pd.DataFrame] = None
    self.scoring_func_map: Optional[Dict[str, List[Tuple[str, Callable]]]] = None
    self.validated: bool = False
    self.activities: Optional[Dict[str, set]] = None
    self.timestamp: Optional[str] = None
    self.source_name: Optional[str] = None
    self.source_path: Optional[str] = None
    self.errors: Optional[List[str]] = None
    self.activity_name_col: Optional[str] = None
    self.raw_data_stats: Optional[Dict[str, pd.DataFrame]] = None

summarize(summary_func_map, groupby_cols=['participant_id', 'session_id', 'session_uuid'], **kwargs) #

Summarize scored data using task-specific summary functions, with metadata.

Parameters:

Name Type Description Default
summary_func_map Dict[str, Callable]

Map of activity/task -> summary function.

required
**kwargs

Additional arguments passed to the summarization functions.

{}
Source code in m2c2_datakit/core/pipeline.py
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
def summarize(
    self,
    summary_func_map: Dict[str, Callable],
    groupby_cols: List[str] = ['participant_id', 'session_id', 'session_uuid'],
    **kwargs,
):
    """
    Summarize scored data using task-specific summary functions, with metadata.

    Parameters:
        summary_func_map (Dict[str, Callable]): Map of activity/task -> summary function.
        **kwargs: Additional arguments passed to the summarization functions.
    """
    if self.grouped_scored is None:
        raise ValueError("Scoring must be completed before summarizing.")

    self.summary_func_map = summary_func_map

    # TODO: add a default func map
    # summary_func_map = {
    #     "Symbol Search": m2c2.tasks.symbol_search.summarize,
    #     "Grid Memory": m2c2.tasks.grid_memory.summarize,
    # }

    try:
        # Determine columns based on data source (diff keys, diff platforms)

        # Apply group-specific summarization
        self.grouped_summary = summarize_by_group_key(
            self.grouped_scored,
            summary_func_map,
            groupby_cols=groupby_cols,
            **kwargs,
        )

        # Optionally flatten into one wide dataframe
        self.flat_summary = pd.concat(
            self.grouped_summary.values(), ignore_index=True
        )

        log_info(
            "[S] Summarization by activity completed.",
            {"session_timestamp": self.timestamp},
        )
    except Exception as e:
        log_exception(
            "Summarization by activity failed",
            e,
            {
                "session_timestamp": self.timestamp,
                "activities": list(self.grouped_scored.keys()),
            },
        )
        raise

    return self