From a3b7b0aab3121ae81ed7f578fbbfedbdf52d2520 Mon Sep 17 00:00:00 2001 From: Benjamin Zaitlen Date: Fri, 24 Apr 2026 21:23:44 -0400 Subject: [PATCH] add scaled data-gen scripts and outputs --- Data-Gen/create_join_tables.py | 473 +++++++++++++++++++++++++++++++++ Data-Gen/new_datagen_faster.py | 251 +++++++++++++++++ Data-Gen/notes.md | 71 +++++ 3 files changed, 795 insertions(+) create mode 100644 Data-Gen/create_join_tables.py create mode 100644 Data-Gen/new_datagen_faster.py create mode 100644 Data-Gen/notes.md diff --git a/Data-Gen/create_join_tables.py b/Data-Gen/create_join_tables.py new file mode 100644 index 0000000..9bdefc1 --- /dev/null +++ b/Data-Gen/create_join_tables.py @@ -0,0 +1,473 @@ +#!/usr/bin/env python3 +import pandas as pd +from pyspark.sql import SparkSession, Row +import pyspark.sql.functions as F +from pyspark.sql.types import ( + StructType, StructField, StringType, IntegerType, LongType, + FloatType, DoubleType, BooleanType, DateType, TimestampType, ArrayType +) + +spark = ( + SparkSession.builder + .master("local[*]") + .appName("CreateJoinTables") + .config("spark.driver.host", "127.0.0.1") + .config("spark.driver.bindAddress", "127.0.0.1") + .config("spark.driver.memory", "32g") + .config("spark.executor.heartbeatInterval", "60s") + .config("spark.network.timeout", "300s") + .getOrCreate() +) + +target_size = 1 * 1024**3 # 10 GB +num_nodes_per_graph = 500_000 +PATH_PREFIX = f"/scratch/prestouser/test-data/{num_nodes_per_graph}-{target_size // 1024**3}GB" + +# ------------------------------------- +# Step 1. Read all sheets from the Excel file. +# ------------------------------------- +excel_path = "../datagen_schema.xlsx" # update this path as necessary + +# Read every sheet into a dictionary: keys are sheet names, values are DataFrames. +sheets = pd.read_excel(excel_path, sheet_name=None) +# sheets = spark.read. +sheet_names = list(sheets.keys()) +print("Found sheets:", sheet_names) + +table_a = spark.read.parquet(PATH_PREFIX) +# ------------------------------------- +# Step 2. Process the tables overview (first sheet) +# ------------------------------------- +# Assumption: The first sheet (e.g. "Tables") lists the table names and approximate row counts. +tables_overview_df = sheets[sheet_names[0]] +# Adjust these column names if your Excel file uses different names. +table_names = tables_overview_df["masked_table_id"].tolist() +approx_row_counts = tables_overview_df["num_rows_approx"].tolist() + +print("Tables and approximate row counts:") +for tbl, cnt in zip(table_names, approx_row_counts): + print(f" {tbl}: ~{cnt} rows") + +# ------------------------------------- +# Step 3. Read each table's metadata (columns, types, etc.) +# ------------------------------------- +# Here we assume that the sheet name for each table is the same as the table name. +table_metadata = {} +for tbl in table_names: + if tbl in sheets: + meta_df = sheets[tbl] + table_metadata[tbl] = meta_df + print(f"Loaded metadata for table '{tbl}'.") + else: + print(f"Warning: No metadata sheet found for table '{tbl}'.") + +# ------------------------------------- +# Step 4. Define a mapping from your Excel type names to Spark types. +# ------------------------------------- +spark_type_mapping = { + "StringType()": StringType(), + "StringType": StringType(), + "IntegerType()": IntegerType(), + "IntegerType()": IntegerType(), + "LongType()": LongType(), + "FloatType()": FloatType(), + "DoubleType()": DoubleType(), + "BooleanType()": BooleanType(), + "BooleanType()": BooleanType(), + "DateType()": DateType(), + "TimestampType()": TimestampType(), + "ArrayType(IntegerType(), True)": ArrayType(IntegerType(), True), + "ArrayType(StringType(), True)": ArrayType(StringType(), True) +} + +def create_schema(meta_df): + """ + Create a Spark schema (StructType) from the metadata DataFrame. + For numerical types, if "min" and "max" are provided, they are stored in the field metadata. + This version ensures that the type from the spreadsheet is used (if it matches). + """ + fields = [] + # Ensure that the range columns exist in the DataFrame. + has_range = ("min" in meta_df.columns) and ("max" in meta_df.columns) + + for idx, row in meta_df.iterrows(): + col_name = row["masked_column_name"] + # Convert the Type from the spreadsheet to a lower-case string. + type_str = str(row["spark_data_type"]).strip() if pd.notna(row["spark_data_type"]) else "string" + spark_type = spark_type_mapping.get(type_str) + + if spark_type is None: + # If the type is not recognized, warn and default to StringType. + print(f"Warning: Unrecognized type '{row['spark_data_type']}' for column '{col_name}'. Using StringType.") + spark_type = StringType() + + md = {} + # For numerical types, if min and max values are provided, store them in metadata. + if isinstance(spark_type, (IntegerType, LongType, FloatType, DoubleType)) and has_range: + if pd.notna(row["min"]) and pd.notna(row["max"]): + md["min"] = row["min"] + md["max"] = row["max"] + + fields.append(StructField(col_name, spark_type, True, metadata=md)) + + return StructType(fields) + +# Create a dictionary of schemas for each table. +schemas = {} +for tbl, meta_df in table_metadata.items(): + schema = create_schema(meta_df) + schemas[tbl] = schema + print(f"Schema for table '{tbl}': {schema}") + + +# ------------------------------------- +# Step 5. Process join information. +# ------------------------------------- +# Assumption: The final sheet (last sheet) is named "Joins" and holds the join definitions. +join_info_df = sheets[sheet_names[1]] +joins = [] +# Here we assume join_info_df has columns: "LeftTable", "LeftColumn", "RightTable", "RightColumn", and optionally "JoinType" +for idx, row in join_info_df.iterrows(): + join_detail = { + "left_table": row["table1"], + "right_table": row["table2"], + "join_method": row["join_method"], + "left_column": row["column1"], + "right_column": row["column2"] + } + joins.append(join_detail) + +# ======================================== +# PART 2: Generate random data for each table and register as temp views +# ======================================== + +def generate_random_dataframe(schema, num_rows): + """ + Simpler version with basic array generation + """ + df = spark.range(num_rows) + + for field in schema.fields: + col_name = field.name + dt = field.dataType + md = field.metadata or {} + + if isinstance(dt, (IntegerType, LongType)): + min_val = md.get("min", 1) + max_val = md.get("max", 1000) + expr = (F.rand() * (float(max_val) - float(min_val)) + float(min_val)) + if isinstance(dt, IntegerType): + df = df.withColumn(col_name, expr.cast("int")) + else: + df = df.withColumn(col_name, expr.cast("long")) + + elif isinstance(dt, (FloatType, DoubleType)): + min_val = md.get("min", 0.0) + max_val = md.get("max", 1000.0) + expr = (F.rand() * (float(max_val) - float(min_val)) + float(min_val)) + if isinstance(dt, FloatType): + df = df.withColumn(col_name, expr.cast("float")) + else: + df = df.withColumn(col_name, expr.cast("double")) + + elif isinstance(dt, BooleanType): + df = df.withColumn(col_name, F.rand() > 0.5) + + elif isinstance(dt, DateType): + df = df.withColumn(col_name, F.expr("date_add('2000-01-01', cast(rand() * 9000 as int))")) + + elif isinstance(dt, TimestampType): + df = df.withColumn(col_name, F.expr("to_timestamp(date_add('2000-01-01', cast(rand() * 9000 as int)))")) + + elif isinstance(dt, StringType): + df = df.withColumn(col_name, + F.concat(F.lit("str_"), + F.abs(F.hash(F.col("id"), F.rand())).cast("string"))) + + elif isinstance(dt, ArrayType): + # Simpler array generation - fixed size arrays + element_type = dt.elementType + + if isinstance(element_type, IntegerType): + # Create array of 3 random integers + df = df.withColumn(col_name, + F.array( + (F.rand() * 100).cast("int"), + (F.rand() * 100).cast("int"), + (F.rand() * 100).cast("int") + )) + + elif isinstance(element_type, LongType): + # Create array of 3 random longs + df = df.withColumn(col_name, + F.array( + (F.rand() * 1000).cast("long"), + (F.rand() * 1000).cast("long"), + (F.rand() * 1000).cast("long") + )) + + elif isinstance(element_type, StringType): + # Create array of 3 random strings + df = df.withColumn(col_name, + F.array( + F.concat(F.lit("item_"), (F.rand() * 100).cast("int").cast("string")), + F.concat(F.lit("item_"), (F.rand() * 100).cast("int").cast("string")), + F.concat(F.lit("item_"), (F.rand() * 100).cast("int").cast("string")) + )) + + else: + # Default to empty array for unsupported types + df = df.withColumn(col_name, F.array()) + + else: + df = df.withColumn(col_name, F.lit(None)) + + return df.drop("id") + +# Create and register a DataFrame for each table using the distributed random data generation. +# NOTE: THIS WAS SCALED DOWN FOR TESTING PURPOSES. UNCOMMENT LINE 74 AND COMMENT OUT LINES 68-73 FOR REAL TESTING +dfs = {} +for tbl, count in zip(table_names, approx_row_counts): + if tbl != 'table_a': + schema = schemas[tbl] + if tbl == 'table_c': + num_rows = 21000000 + else: + num_rows = int(count) + # num_rows = int(count) + df = generate_random_dataframe(schema, num_rows) + dfs[tbl] = df + print(f"Created DataFrame for table '{tbl}' with {num_rows} random rows.") + +table_b = dfs['table_b'] +table_c = dfs['table_c'] +table_d = dfs['table_d'] +table_e = dfs['table_e'] + + +# ========================= +# CONFIGURATION - Set your desired match percentages here +# ========================= +MATCH_PERCENTAGE_A = 0.001 # 0.1% of table_a rows will match +MATCH_PERCENTAGE_C = 0.01 # 1% of table_c rows will match +MATCH_PERCENTAGE_D = 0.01 # 1% of table_d rows will match +MATCH_PERCENTAGE_E = 0.01 # 1% of table_e rows will match + +print("=" * 60) +print("FORCING TABLES TO MATCH TABLE_B VALUES") +print(f"Match percentages: A={MATCH_PERCENTAGE_A*100}%, C={MATCH_PERCENTAGE_C*100}%, D={MATCH_PERCENTAGE_D*100}%, E={MATCH_PERCENTAGE_E*100}%") +print("=" * 60) + +# ========================= +# Force table_a (4 columns: col_a, col_c, col_b, col_d) +# ========================= +print("\n1. Processing table_a...") +table_a_combos_list = ( + table_b + .select("col_b_8", "col_b_3", "col_b_9", "col_b_1") + .distinct() + .filter( + F.col("col_b_8").isNotNull() & + F.col("col_b_3").isNotNull() & + F.col("col_b_9").isNotNull() & + F.col("col_b_1").isNotNull() + ) + .collect() +) + +combo_count_a = len(table_a_combos_list) +print(f" Distinct combinations for table_a: {combo_count_a}") + +combos_a_with_id = [ + Row( + new_col_a=combo['col_b_8'], + new_col_c=combo['col_b_3'], + new_col_b=combo['col_b_9'], + new_col_d=combo['col_b_1'], + combo_id=idx + 1 + ) + for idx, combo in enumerate(table_a_combos_list) +] + +table_a_combos_df = spark.createDataFrame(combos_a_with_id) + +# Add a random number to each row to decide if it should be forced +table_a_forced = ( + table_a + .withColumn("should_force", F.rand() < MATCH_PERCENTAGE_A) + .withColumn("combo_id", + F.when(F.col("should_force"), F.floor(F.rand() * combo_count_a) + 1) + .otherwise(F.lit(None)) + ) + # Left join to preserve all rows + .join( + F.broadcast(table_a_combos_df), + "combo_id", + "left" + ) + # For forced rows, use new values; for others, keep original + .withColumn("col_a", F.coalesce("new_col_a", "col_a")) + .withColumn("col_c", F.coalesce("new_col_c", "col_c")) + .withColumn("col_b", F.coalesce("new_col_b", "col_b")) + .withColumn("col_d", F.coalesce("new_col_d", "col_d")) + .drop("should_force", "combo_id", "new_col_a", "new_col_c", "new_col_b", "new_col_d") + .select(*table_a.columns) +) + +table_a_forced.write.mode("overwrite").parquet(f"{PATH_PREFIX}/temp_forced_table_a") +table_a_forced = spark.read.parquet(f"{PATH_PREFIX}/temp_forced_table_a") +print(f" ✓ table_a_forced created ({MATCH_PERCENTAGE_A*100}% forced)") + +# ========================= +# Force table_c (3 columns: col_c_10, col_c_9, col_c_11) +# ========================= +print("\n2. Processing table_c...") +table_c_combos_list = ( + table_b + .select("col_b_8", "col_b_9", "col_b_3") + .distinct() + .filter( + F.col("col_b_8").isNotNull() & + F.col("col_b_9").isNotNull() & + F.col("col_b_3").isNotNull() + ) + .collect() +) + +combo_count_c = len(table_c_combos_list) +print(f" Distinct combinations for table_c: {combo_count_c}") + +combos_c_with_id = [ + Row( + new_col_c_10=combo['col_b_8'], + new_col_c_9=combo['col_b_9'], + new_col_c_11=combo['col_b_3'], + combo_id=idx + 1 + ) + for idx, combo in enumerate(table_c_combos_list) +] + +table_c_combos_df = spark.createDataFrame(combos_c_with_id) + +table_c_forced = ( + table_c + .withColumn("should_force", F.rand() < MATCH_PERCENTAGE_C) + .withColumn("combo_id", + F.when(F.col("should_force"), F.floor(F.rand() * combo_count_c) + 1) + .otherwise(F.lit(None)) + ) + .join( + F.broadcast(table_c_combos_df), + "combo_id", + "left" + ) + .withColumn("col_c_10", F.coalesce("new_col_c_10", "col_c_10")) + .withColumn("col_c_9", F.coalesce("new_col_c_9", "col_c_9")) + .withColumn("col_c_11", F.coalesce("new_col_c_11", "col_c_11")) + .drop("should_force", "combo_id", "new_col_c_10", "new_col_c_9", "new_col_c_11") + .select(*table_c.columns) +) + +table_c_forced.write.mode("overwrite").parquet(f"{PATH_PREFIX}/temp_forced_table_c") +table_c_forced = spark.read.parquet(f"{PATH_PREFIX}/temp_forced_table_c") +print(f" ✓ table_c_forced created ({MATCH_PERCENTAGE_C*100}% forced)") + +# ========================= +# Force table_d (2 columns: col_d_0, col_d_1) +# ========================= +print("\n3. Processing table_d...") +table_d_combos_list = ( + table_b + .select("col_b_8", "col_b_9") + .distinct() + .filter( + F.col("col_b_8").isNotNull() & + F.col("col_b_9").isNotNull() + ) + .collect() +) + +combo_count_d = len(table_d_combos_list) +print(f" Distinct combinations for table_d: {combo_count_d}") + +combos_d_with_id = [ + Row( + new_col_d_0=combo['col_b_8'], + new_col_d_1=combo['col_b_9'], + combo_id=idx + 1 + ) + for idx, combo in enumerate(table_d_combos_list) +] + +table_d_combos_df = spark.createDataFrame(combos_d_with_id) + +table_d_forced = ( + table_d + .withColumn("should_force", F.rand() < MATCH_PERCENTAGE_D) + .withColumn("combo_id", + F.when(F.col("should_force"), F.floor(F.rand() * combo_count_d) + 1) + .otherwise(F.lit(None)) + ) + .join( + F.broadcast(table_d_combos_df), + "combo_id", + "left" + ) + .withColumn("col_d_0", F.coalesce("new_col_d_0", "col_d_0")) + .withColumn("col_d_1", F.coalesce("new_col_d_1", "col_d_1")) + .drop("should_force", "combo_id", "new_col_d_0", "new_col_d_1") + .select(*table_d.columns) +) + +table_d_forced.write.mode("overwrite").parquet(f"{PATH_PREFIX}/temp_forced_table_d") +table_d_forced = spark.read.parquet(f"{PATH_PREFIX}/temp_forced_table_d") +print(f" ✓ table_d_forced created ({MATCH_PERCENTAGE_D*100}% forced)") + +# ========================= +# Force table_e (1 column: col_e_0) +# ========================= +print("\n4. Processing table_e...") +table_e_values_list = ( + table_b + .select("col_b_8") + .distinct() + .filter(F.col("col_b_8").isNotNull()) + .collect() +) + +value_count_e = len(table_e_values_list) +print(f" Distinct values for table_e: {value_count_e}") + +values_e_with_id = [ + Row( + new_col_e_0=value['col_b_8'], + value_id=idx + 1 + ) + for idx, value in enumerate(table_e_values_list) +] + +table_e_values_df = spark.createDataFrame(values_e_with_id) + +table_e_forced = ( + table_e + .withColumn("should_force", F.rand() < MATCH_PERCENTAGE_E) + .withColumn("value_id", + F.when(F.col("should_force"), F.floor(F.rand() * value_count_e) + 1) + .otherwise(F.lit(None)) + ) + .join( + F.broadcast(table_e_values_df), + "value_id", + "left" + ) + .withColumn("col_e_0", F.coalesce("new_col_e_0", "col_e_0")) + .drop("should_force", "value_id", "new_col_e_0") + .select(*table_e.columns) +) + +table_e_forced.write.mode("overwrite").parquet(f"{PATH_PREFIX}/temp_forced_table_e") +table_e_forced = spark.read.parquet(f"{PATH_PREFIX}/temp_forced_table_e") +print(f" ✓ table_e_forced created ({MATCH_PERCENTAGE_E*100}% forced)") + +table_b.write.mode("overwrite").parquet(f"{PATH_PREFIX}/temp_table_b") +table_b = spark.read.parquet(f"{PATH_PREFIX}/temp_table_b") \ No newline at end of file diff --git a/Data-Gen/new_datagen_faster.py b/Data-Gen/new_datagen_faster.py new file mode 100644 index 0000000..5e09e49 --- /dev/null +++ b/Data-Gen/new_datagen_faster.py @@ -0,0 +1,251 @@ +import random +import subprocess + +import networkx as nx +import numpy as np +import pandas as pd +from pyspark.sql import SparkSession, Row + +target_size = 1 * 1024**3 # 1 GB +num_nodes_per_graph = 500_000 +PATH_PREFIX = f"/scratch/prestouser/test-data/{num_nodes_per_graph}-{target_size // 1024**3}GB" +directory_path = PATH_PREFIX + +spark = ( + SparkSession.builder + .master("local[*]") + .appName("LargeGraph") + .config("spark.driver.host", "127.0.0.1") + .config("spark.driver.bindAddress", "127.0.0.1") + .config("spark.driver.memory", "32g") + .config("spark.executor.heartbeatInterval", "60s") + .config("spark.network.timeout", "300s") + .getOrCreate() +) +def create_synthetic_distribution(params, plot=True): + + slope = params.get('slope', -2) + min_degree = params.get('min_degree', 1) + max_degree = params.get('max_degree', 200_000) + max_prob = params.get('max_prob', 0.5) + + degrees = np.arange(min_degree, max_degree + 1, dtype=float) + + A = max_prob / (min_degree ** slope) + + y_values = A * degrees ** slope + + degrees_int = degrees.astype(int) + + decay_dict = dict(zip(degrees_int, y_values)) + + return decay_dict + +params = { + 'slope': -2, + 'intercpet': 0.8, + 'r_squared': 0.98, + 'max_degree': 200_000, + 'min_degree': 1, + 'max_prob': 0.5, + 'degree_range': list(np.arange(1, 200_000)) +} + +target_distribution = create_synthetic_distribution(params, 200_000) + +def get_disk_usage(path): + try: + result = subprocess.run( + ['du', '-sb', path], + check=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True + ) + size_in_bytes = int(result.stdout.split()[0]) + return size_in_bytes + except subprocess.CalledProcessError as e: + raise RuntimeError(f"Failed to get disk usage: {e.stderr.strip()}") + +def random_node(): + return int(np.random.randint(1_000_000, 10_000_000_000)) + +def random_feature(): + return int(np.random.randint(1, 70000)) # cast to native int + +def random_col_e(): + return str(np.random.choice(['col_e_A', 'col_e_B'])) # cast to native str + +num_graphs = spark.sparkContext.defaultParallelism # number of cores available + +def configuration_model_with_distribution(n, degree_distribution,seed): + """ + Generate a graph with a specific degree distribution + """ + degrees = [] + remaining_nodes = n + + for degree, prob in sorted(degree_distribution.items()): + if remaining_nodes <= 0: + break + count = min(int(n * prob + 0.5), remaining_nodes) + if count > 0: + degrees.extend([int(degree)] * count) + remaining_nodes -= count + + if remaining_nodes > 0: + min_degree = min(degree_distribution.keys()) + degrees.extend([min_degree] * remaining_nodes) + + if len(degrees) < 2: + degrees = [1, 1] + + if sum(degrees) % 2 != 0: + degrees[0] += 1 + + try: + g = nx.configuration_model(degrees, seed=seed) + g = nx.Graph(g) + + if g.number_of_edges() == 0: + raise nx.NetworkXError("Generated graph has no edges") + + return g + except Exception as e: + print(f"Error generating graph: {e}") + return nx.barabasi_albert_graph(n, 2) + +def generate_graph_partition(pdf_iterator): + """ + Generator function that yeilds edges for each partition. + pdf_iterator yields pandas DataFrames (one per partition) + """ + # Get broadcasted values + degree_dist = target_distribution_bc.value + seed_val = seed_bc.value + + for pdf in pdf_iterator: + all_edges = [] + + for partition_id in pdf['id'].values: + partition_id = int(partition_id) + + g = configuration_model_with_distribution( + num_nodes_per_graph, + degree_dist, + seed_val + partition_id + ) + + node_map = {node: random_node() for node in g.nodes()} + + for edge in g.edges(): + all_edges.append({ + 'col_a': int(node_map[edge[0]]), + 'col_b': int(node_map[edge[1]]), + 'col_c': int(random_feature()), + 'col_d': int(random_feature()), + 'col_e': random_col_e() + }) + + if all_edges: + yield pd.DataFrame(all_edges) + +target_distribution_bc = spark.sparkContext.broadcast(target_distribution) + +seed = 1000 +seed_bc = spark.sparkContext.broadcast(seed) + +edge_df = ( + spark.range(num_graphs) + .repartition(num_graphs) + .mapInPandas( + generate_graph_partition, + schema="col_a long, col_b long, col_c int, col_d int, col_e string" + ) + .distinct() +) + +edge_df.write.mode("overwrite").parquet(directory_path) + +print(f"Initial write complete. Size: {get_disk_usage(directory_path) / 1024**3:.2f} GB") + + +# ======================================================================================= +# ================================ REVAMPED DATA SCALER ================================= +# ======================================================================================= + +from pyspark.sql.functions import col, floor, rand, lit, when, hash as spark_hash +import math + +initial_dir_size = get_disk_usage(directory_path) + +print(f"Initial dataset size: {round(initial_dir_size / 1024**3, 2)} GB") + +copies_needed = math.ceil(target_size / initial_dir_size) + +print(f"Target size: {round(target_size / 1024**3, 2)} GB") +print(f"Copies needed (including original): {copies_needed}") + +long_cols = ["col_a", "col_b"] +integer_cols = ["col_c", "col_d"] +string_cols = ["col_e"] + +print(f"Reading data from: {directory_path}") +df_original = spark.read.parquet(directory_path) + +if 'source' in df_original.columns: + df_original = df_original.drop('source') + +# Create a "copy_id" Dataframe +df_copies = spark.range(copies_needed).toDF("copy_id") + +# Cross join to create all copies at once +df_expanded = df_original.crossJoin(df_copies) + +print(f"Creating {copies_needed} versions in parallel...") + +# Define noise range +NOISE_MIN = -1 +NOISE_MAX = 1 + +# Create a seed column based on copy_id and row hash +# This gives us different randomness for each day +df_with_seed = df_expanded.withColumn( + "rand_seed", + (spark_hash(col("col_a"), col("col_b"), col("copy_id")) % 1000000).cast("integer") +) + +# Apply noise based on copy_id (copy_id=0 is orignal, no noise) +# Use rand() with a base seed, then add variation based on the row's values +df_augmented = df_with_seed.select( + *[ + when(col("copy_id") == 0, col(c)).otherwise( + (col(c) + floor(rand(42) * (NOISE_MAX - NOISE_MIN + 1)) + NOISE_MIN) + ).cast("long").alias(c) + for c in long_cols + ], + *[ + when(col("copy_id") == 0, col(c)).otherwise( + (col(c) + floor(rand(1042) * (NOISE_MAX - NOISE_MIN + 1)) + NOISE_MIN) + ).cast("integer").alias(c) + for c in integer_cols + ], + *[col(c) for c in string_cols] +) + +# Calculate partitions based on target file size +target_file_size_mb = 250 +estimated_size = initial_dir_size * copies_needed +repartitions = max(1, int(estimated_size / (target_file_size_mb * 1024**2))) + +print(f"Writing combined dataset with {repartitions} partitions...") +print(f"Estimated final size: {round(estimated_size / 1024**3, 2)} GB") + +# Write everytihing in one shot +df_augmented.repartition(repartitions).write.mode("overwrite").parquet(directory_path) + +# Verify final size +final_size = get_disk_usage(directory_path) +print(f"\nFinal size: {round(final_size / 1024**3, 2)} GB") +print(f"Target was: {round(target_size / 1024**3, 2)} GB") +print(f"Achieved: {round(100 * final_size / target_size, 1)}% of target") \ No newline at end of file diff --git a/Data-Gen/notes.md b/Data-Gen/notes.md new file mode 100644 index 0000000..d658c3b --- /dev/null +++ b/Data-Gen/notes.md @@ -0,0 +1,71 @@ +## 1GB + +target_size = 1 * 1024**3 # 1 GB +num_nodes_per_graph = 500_000 +PATH_PREFIX = f"/scratch/prestouser/test-data/{num_nodes_per_graph}-1GB" + +Reading data from: /scratch/prestouser/test-data/500000-1GB +Creating 1 versions in parallel... +Writing combined dataset with 10 partitions... +Estimated final size: 2.68 GB + Final size: 2.76 GB +Target was: 1.0 GB +Achieved: 276.0% of target + + +1. Processing table_a... + Distinct combinations for table_a: 14000 + ✓ table_a_forced created (0.1% forced) + +2. Processing table_c... + Distinct combinations for table_c: 14000 + ✓ table_c_forced created (1.0% forced) + +3. Processing table_d... + Distinct combinations for table_d: 14000 + ✓ table_d_forced created (1.0% forced) + +4. Processing table_e... + Distinct values for table_e: 14000 + ✓ table_e_forced created (1.0% forced) + + + +## 10GB +target_size = 10 * 1024**3 # 1 GB +num_nodes_per_graph = 500_000 +PATH_PREFIX = f"/scratch/prestouser/test-data/{num_nodes_per_graph}-1GB" + +Reading data from: /scratch/prestouser/test-data/500000-1GB +Creating 4 versions in parallel... +Writing combined dataset with 43 partitions... +Estimated final size: 10.7 GB + +Final size: 11.04 GB +Target was: 10.0 GB +Achieved: 110.4% of target + +Created DataFrame for table 'table_b' with 14000 random rows. +Created DataFrame for table 'table_c' with 21000000 random rows. +Created DataFrame for table 'table_d' with 12000000 random rows. +Created DataFrame for table 'table_e' with 33000000 random rows. +============================================================ +FORCING TABLES TO MATCH TABLE_B VALUES +Match percentages: A=0.1%, C=1.0%, D=1.0%, E=1.0% +============================================================ + +1. Processing table_a... + Distinct combinations for table_a: 14000 + ✓ table_a_forced created (0.1% forced) + +2. Processing table_c... + Distinct combinations for table_c: 14000 + ✓ table_c_forced created (1.0% forced) + +3. Processing table_d... + Distinct combinations for table_d: 14000 + ✓ table_d_forced created (1.0% forced) + +4. Processing table_e... + Distinct values for table_e: 14000 + ✓ table_e_forced created (1.0% forced)