Canvas and Mastery are experiencing issues due to an ongoing AWS incident. Follow the status at AWS Health Dashboard and Instructure Status Page
Found this content helpful? Log in or sign up to leave a like!
We are looking to download certain table data, transform it, and then upload to our Azure DataLake storage.
The solution I have been working on involves a Python Azure Function app. Since the DAP parquet files have a "key", "value", and "meta" columns to represent the data, it requires using something like pandas to normalize the json data in order to get the particular columns for the table data being consumed.
The downside of this is that this is very resource intensive since there can be large file sizes depending on the data and time being transformed. This causes memory errors when trying to process it using Durable Functions in the Azure Function app.
I have used Dask to help with some of the memory issues, but I am still getting the out of memory error around approximately 70MB parquet files from the DAP endpoint.
Does anyone have an idea or another strategy they have implemented to overcome this type of scenario?
Thanks,
Lucas
Solved! Go to Solution.
That makes sense. Since your goal is to transform the data in the parquet files, I don't imagine you're keeping them longterm. But the smaller files would be faster and potentially less costly to download. I'm not sure if that outweighs compute costs of transformation in Azure. Just wanted to ask about the possibility of starting from a different format.
@lschmidt71 Is there a reason not to fetch your data from DAP in TSV or CSV files rather than parquet?
Hey @stimme,
File size mostly. As an example, TSV/CSV are 2GB whereas the parquet counterparts are 150MB.
That makes sense. Since your goal is to transform the data in the parquet files, I don't imagine you're keeping them longterm. But the smaller files would be faster and potentially less costly to download. I'm not sure if that outweighs compute costs of transformation in Azure. Just wanted to ask about the possibility of starting from a different format.
Good call @stimme. I was too intent on figuring out the parquet file conversion which led me astray.
Dask can process the gzipped csv so I gave that a try and it performed excellently--no issues and performant as well.
Thanks for the responses.
Would be interested in how you're using the Functions.
Unsure if this is helpful given your landscape and what you're trying to do; and I dont have experience with Dask.
I'm currently getting/transforming parquet files via Azure Databricks (having found and adapted a really helpul Fabric script). The parquet files are "flattened" in the line flat_df = df.select(flat_df = df.select("key.*", "value.*", "meta.*").
We're only a small canvas instance, ~400 courses, and the script takes about 10min for the snapshot and <5min for the daily incremental updates.
#add namespaces as required and other variables
namespace = ['canvas','canvas_logs']
tables = {}
command = []
date_ts = now.strftime("%Y%m%d%H%M%S")
#get list of tables from each namespace
...
# Build bash commands for the snapshot of each table
for namespace, table_list in tables.items():
for table in table_list:
output_directory = f"/tmp/cd2/{namespace}/{table}/{date_ts}" # only able to write to /tmp when using client and then move - frustrating!
commands.append(
f"dap --base-url {base_url} --client-id {client_id} --client-secret {client_secret} "
f"snapshot --format {format} --output-directory {output_directory} --table {table} --namespace {namespace}"
)
# Create and start a separate process for each command
processes = [multiprocessing.Process(target=run_bash_command, args=(cmd,)) for cmd in commands]
# Start the processes
for process in processes:
process.start()
time.sleep(2) #trying to prevent hitting canvas api rate limiter
# Wait for all processes to complete
for process in processes:
process.join()
...
# Iterate over the tables to create a delta table for each snapshot dataframe
for namespace, table_list in tables.items():
action = f"CREATE SCHEMA IF NOT EXISTS {namespace}"
spark.sql(action)
#iterate through tables to extract/write key.*, value.* and meta.*
for table in table_list:
df = spark.read.format("parquet").option("path", (f"{dbfs_directory}/{namespace}/{table}/{date_ts}/*.parquet")).load()
flat_df = df.select("key.*", "value.*", "meta.*")
flat_df.write.format("delta").mode("overwrite").saveAsTable(f"{namespace}.{table}")
#df.show()
Both snapshot and incremental queries are working reliably.
I need to further extend this to create a dap_instructure table containing the table name and max meta.ts because if a table is empty on the snapshot query, the incremental query still needs a valid ts.
max_ts = df["meta.ts"].max()
if pd.isnull(max_ts):
max_ts = date_ts
Can an Azure Function call a command shell?
I'd really like to use Azure Functions or Automation(?) to simply run the dap command to connect to an Azure PostgreSQL Flexible Database per the dap client examples:
dap dropdb --connection-string postgresql://scott:password@server.example.com/testdb --namespace canvas --table accounts
I can run a shell from databricks (as per my other post), but ms documentation (and dr google) hasn't allowed me to confirm the best place to create a list of commands and run something like:
# Create and start a separate process for each command
processes = [multiprocessing.Process(target=run_bash_command, args=(cmd,)) for cmd in commands]
For our use case, I suspect postgres would be more cost effective than databricks. And while databricks offers some amazing functionality, we're unlikely to use it in the short term.
To interact with Panda Bot, our automated chatbot, you need to sign up or log in:
Sign inTo interact with Panda Bot, our automated chatbot, you need to sign up or log in:
Sign in