Would be interested in how you're using the Functions.
Unsure if this is helpful given your landscape and what you're trying to do; and I dont have experience with Dask.
I'm currently getting/transforming parquet files via Azure Databricks (having found and adapted a really helpul Fabric script). The parquet files are "flattened" in the line flat_df = df.select(flat_df = df.select("key.*", "value.*", "meta.*").
We're only a small canvas instance, ~400 courses, and the script takes about 10min for the snapshot and <5min for the daily incremental updates.
#add namespaces as required and other variables
namespace = ['canvas','canvas_logs']
tables = {}
command = []
date_ts = now.strftime("%Y%m%d%H%M%S")
#get list of tables from each namespace
...
# Build bash commands for the snapshot of each table
for namespace, table_list in tables.items():
for table in table_list:
output_directory = f"/tmp/cd2/{namespace}/{table}/{date_ts}" # only able to write to /tmp when using client and then move - frustrating!
commands.append(
f"dap --base-url {base_url} --client-id {client_id} --client-secret {client_secret} "
f"snapshot --format {format} --output-directory {output_directory} --table {table} --namespace {namespace}"
)
# Create and start a separate process for each command
processes = [multiprocessing.Process(target=run_bash_command, args=(cmd,)) for cmd in commands]
# Start the processes
for process in processes:
process.start()
time.sleep(2) #trying to prevent hitting canvas api rate limiter
# Wait for all processes to complete
for process in processes:
process.join()
...
# Iterate over the tables to create a delta table for each snapshot dataframe
for namespace, table_list in tables.items():
action = f"CREATE SCHEMA IF NOT EXISTS {namespace}"
spark.sql(action)
#iterate through tables to extract/write key.*, value.* and meta.*
for table in table_list:
df = spark.read.format("parquet").option("path", (f"{dbfs_directory}/{namespace}/{table}/{date_ts}/*.parquet")).load()
flat_df = df.select("key.*", "value.*", "meta.*")
flat_df.write.format("delta").mode("overwrite").saveAsTable(f"{namespace}.{table}")
#df.show()
Both snapshot and incremental queries are working reliably.
I need to further extend this to create a dap_instructure table containing the table name and max meta.ts because if a table is empty on the snapshot query, the incremental query still needs a valid ts.
max_ts = df["meta.ts"].max()
if pd.isnull(max_ts):
max_ts = date_ts