Skip to content

Save

This module provides functions for saving power flow data to CSV files.

save_node_edge_data

Save processed power system data to partitioned parquet files using parallel processing.

This function saves bus, generator, branch, and Y-bus data to separate partitioned parquet directories using parallel processing for improved performance. Data is partitioned by scenario with 1000 scenarios per partition.

Parameters:

Name Type Description Default
net Network

Network object containing system topology information.

required
node_path str

Path for saving bus/node data partitioned parquet directory.

required
branch_path str

Path for saving branch data partitioned parquet directory.

required
gen_path str

Path for saving generator data partitioned parquet directory.

required
y_bus_path str

Path for saving Y-bus data partitioned parquet directory.

required
runtime_path str

Path for saving runtime data partitioned parquet directory.

required
processed_data List[Tuple[ndarray, ndarray, ndarray, ndarray, ndarray]]

List of tuples containing processed data arrays for each scenario.

required
include_dc_res bool

Whether DC power flow data is included in the output.

False
Source code in gridfm_datakit/save.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
def save_node_edge_data(
    net: Network,
    node_path: str,
    branch_path: str,
    gen_path: str,
    y_bus_path: str,
    runtime_path: str,
    processed_data: List[
        Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]
    ],
    include_dc_res: bool = False,
) -> None:
    """Save processed power system data to partitioned parquet files using parallel processing.

    This function saves bus, generator, branch, and Y-bus data to separate partitioned parquet directories
    using parallel processing for improved performance. Data is partitioned by scenario with 1000 scenarios per partition.

    Args:
        net: Network object containing system topology information.
        node_path: Path for saving bus/node data partitioned parquet directory.
        branch_path: Path for saving branch data partitioned parquet directory.
        gen_path: Path for saving generator data partitioned parquet directory.
        y_bus_path: Path for saving Y-bus data partitioned parquet directory.
        runtime_path: Path for saving runtime data partitioned parquet directory.
        processed_data: List of tuples containing processed data arrays for each scenario.
        include_dc_res: Whether DC power flow data is included in the output.
    """
    n_buses = net.buses.shape[0]

    # Determine last scenario index from n_scenarios metadata file
    last_scenario = -1
    base_path = os.path.dirname(node_path)
    n_scenarios_file = os.path.join(base_path, "n_scenarios.txt")
    if os.path.exists(n_scenarios_file):
        with open(n_scenarios_file, "r") as f:
            last_scenario = int(f.read().strip()) - 1

    # Define arguments per data type
    tasks = [
        ("bus", processed_data, node_path, last_scenario, n_buses, include_dc_res),
        ("gen", processed_data, gen_path, last_scenario, n_buses, include_dc_res),
        ("branch", processed_data, branch_path, last_scenario, n_buses, include_dc_res),
        ("y_bus", processed_data, y_bus_path, last_scenario, n_buses, include_dc_res),
        (
            "runtime",
            processed_data,
            runtime_path,
            last_scenario,
            n_buses,
            include_dc_res,
        ),
    ]

    with ThreadPoolExecutor(max_workers=4) as pool:
        futures = [pool.submit(_process_and_save, task) for task in tasks]
        for f in futures:
            f.result()  # wait for each task to finish

    # Write n_scenarios metadata file
    total_scenarios = last_scenario + 1 + len(processed_data)
    with open(n_scenarios_file, "w") as f:
        f.write(str(total_scenarios))