Skip to content

Utils

This module provides utility functions for the project.

Functions

get_num_scenarios

Get total number of scenarios from data directory.

Reads from n_scenarios.txt metadata file in the data directory.

Parameters:

Name Type Description Default
data_dir str

Directory containing parquet files and n_scenarios.txt

required

Returns:

Type Description
int

Total number of scenarios

Raises:

Type Description
ValueError

If n_scenarios.txt metadata file not found

Source code in gridfm_datakit/utils/utils.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
def get_num_scenarios(data_dir: str) -> int:
    """Get total number of scenarios from data directory.

    Reads from n_scenarios.txt metadata file in the data directory.

    Args:
        data_dir: Directory containing parquet files and n_scenarios.txt

    Returns:
        Total number of scenarios

    Raises:
        ValueError: If n_scenarios.txt metadata file not found
    """
    n_scenarios_file = os.path.join(data_dir, "n_scenarios.txt")
    if os.path.exists(n_scenarios_file):
        with open(n_scenarios_file, "r") as f:
            return int(f.read().strip())

    else:
        print(
            f"No n_scenarios metadata file found in {data_dir}, using bus_data.parquet to get total number of scenarios",
        )
        return int(
            pd.read_parquet(
                os.path.join(data_dir, "bus_data.parquet"),
                engine="pyarrow",
            )["scenario"].max()
            + 1,
        )

read_partitions

Read sampled partition folders in parallel and concatenate them.

Source code in gridfm_datakit/utils/utils.py
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
def read_partitions(
    base_path: str,
    sampled: list,
    max_workers: int = None,
) -> pd.DataFrame:
    """Read sampled partition folders in parallel and concatenate them."""
    if max_workers is None:
        from os import cpu_count

        max_workers = min(32, cpu_count())  # sensible default

    dfs = []

    # Submit all partition reads to the ThreadPool
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {
            executor.submit(
                pd.read_parquet,
                os.path.join(base_path, f"scenario_partition={k}"),
                engine="pyarrow",
            ): k
            for k in sampled
        }

        # Collect results as they complete with tqdm
        for future in tqdm(
            as_completed(futures),
            total=len(futures),
            desc=f"Reading {len(sampled)} partitions from {base_path}",
        ):
            df = future.result()
            dfs.append(df)

    # Concatenate all partitions
    return pd.concat(dfs, ignore_index=True)

write_ram_usage_distributed

Source code in gridfm_datakit/utils/utils.py
44
45
46
47
48
49
50
51
52
def write_ram_usage_distributed(tqdm_log: TextIO) -> None:
    process = psutil.Process(os.getpid())  # Parent process
    mem_usage = process.memory_info().rss / 1024**2  # Parent memory in MB

    # Sum memory usage of all child processes
    for child in process.children(recursive=True):
        mem_usage += child.memory_info().rss / 1024**2

    tqdm_log.write(f"Total RAM usage (Parent + Children): {mem_usage:.2f} MB\n")

Classes

Tee

Source code in gridfm_datakit/utils/utils.py
55
56
57
58
59
60
61
62
63
64
65
66
class Tee:
    def __init__(self, *streams):
        self.streams = streams

    def write(self, data):
        for s in self.streams:
            s.write(data)
            s.flush()

    def flush(self):
        for s in self.streams:
            s.flush()