Found this content helpful? Log in or sign up to leave a like!

Sharing: Airflow (tested in 2.10) DAG workflow for syncing Canvas Data to Postgres

phanley
Community Contributor

In case it's helpful for anyone, here's my working-and-also-work-in-progress DAG ("Directed Acyclic Graph" -- it's a fancy word for "workflow" or "batch job") for Apache Airflow.

It will probably work as far back as 2.4, and I currently have it running on Airflow 2.10.  If you're familiar with python, it's probably pretty decipherable although it's probably not incredibly useful for people not using Airflow except for the individual functions.

The main part of the workflow is that it uses DAPClient to fetch the list of tables 

the_tables = dap_get_table_list()

which in turn uses an Airflow feature [ .expand() ] where a list can be iterated through by a task definition function,  in this case calling the dap_init_or_sync_db() task for each item in the list 

dap_init_or_sync_db.expand(table_name=the_tables)

dap_init_or_sync_db() is a bash task, which is to say each instance generates a bash command (either dap init or dap syncdb, determined by 

if dap_db_exists():

which is executed by a docker-style container in Kubernetes, although I'm pretty sure if you installed AIrflow locally and pointed it at a postgres instance the DAG would work just as well. It takes quite a while to run the first time. 

I'm not entirely happy with how I'm currently handling dap_db_exists(), but it's been working pretty well for a while now, and most of the tables take less than 20 seconds to sync (there's ~10 that wouldn't surprise you that take around 20 min together).   I feel a little like I'm bragging or something, but real talk if no one here cares about this post, then I'm probably the only one who does. 🤣

 

Screenshot 2025-05-21 at 4.24.53 PM.png

P.S. if you're wondering why I'm logging with print() instead of doing "professional" logging  with the standard library or whatever, Airflow handles task logging by writing stdout & stderr to a text file stored somewhere, s3 is the norm, so people just log with print(). I thought it was weird at first too.

from __future__ import annotations
from airflow import DAG
from airflow.decorators import task
from pendulum import datetime, duration

with DAG(
        dag_id="canvas_data_sync",
        schedule="15 12 * * *",
        start_date=datetime(2023, 6, 25),
        catchup=False,
        tags=["canvas", "canvas data 2", "instructure-dap"],
        default_args={
            "retries": 3,
            "retry_delay": duration(seconds=5),
            "retry_exponential_backoff": True,
            "max_retry_delay": duration(hours=2),
        },
) as dag:
    def canvas_data_sync():

        def get_sql() -> str:
            pass

        def dap_db_exists() -> bool:
            import os
            import psycopg2
            from sqlalchemy import false

            conn = psycopg2.connect(
                database="canvas",
                host=os.getenv("PG_HOST"),
                user=os.getenv("PG_OP_USER"),
                password=os.getenv("PG_OP_PASS"),
                port=5432,
            )
            sql_canvas_db_exists = """
                SELECT EXISTS (
                SELECT FROM pg_tables WHERE schemaname = 'instructure_dap' AND tablename = 'table_sync'
                );
                """
            try:
                conn.autocommit = True
                cursor = conn.cursor()
                cursor.execute(sql_canvas_db_exists)
                # natural result == [(True,)]
                return cursor.fetchall()[0][0]
            except Exception as err:
                print("Oops! An exception has occured:", err)
                print("Exception TYPE:", type(err))
                return false


        def cd_table_exists(table_name: str) -> bool:
            import os
            import psycopg2
            from sqlalchemy import false

            conn = psycopg2.connect(
                database="canvas",
                host=os.getenv("PG_HOST"),
                user=os.getenv("PG_OP_USER"),
                password=os.getenv("PG_OP_PASS"),
                port=5432,
            )
            sql = """
            SELECT EXISTS (SELECT FROM pg_tables WHERE schemaname = 'canvas' AND tablename  = %s);
            """
            try:
                conn.autocommit = True
                cursor = conn.cursor()
                cursor.execute(sql, (table_name,))
                # natural result == [(True,)]
                return cursor.fetchall()[0][0]
            except Exception as err:
                print("sql err:", err)
                print("Exception TYPE:", type(err))
                return false


        def sync_table_row_exists(table_name: str) -> bool:
            import os
            import psycopg2
            from sqlalchemy import false

            conn = psycopg2.connect(
                database="canvas",
                host=os.getenv("PG_HOST"),
                user=os.getenv("PG_OP_USER"),
                password=os.getenv("PG_OP_PASS"),
                port=5432,
            )
            sql = """SELECT EXISTS 
            ( SELECT source_table FROM instructure_dap.table_sync where source_table = %s );
            """
            try:
                conn.autocommit = True
                cursor = conn.cursor()
                cursor.execute(sql, (table_name,))
                # natural result == [(True,)]
                return cursor.fetchall()[0][0]
            except Exception as err:
                print("Oops! An exception has occured:", err)
                print("Exception TYPE:", type(err))
                return false

        # FUTURE_TODO: analyse future failures to see if any programmatic remediation is needed/feasible
        #         (e.g. manually dropping failed tables and running `dap initdb` or similar with successive failures)
        def on_table_sync_failure(context):
            print(context)
            pass

        @task
        def dap_get_table_list(table_list=None):
            import asyncio
            # import json
            import os
            from dap.api import DAPClient
            from dap.dap_types import Credentials

            # by default this job get the entire list from canvas data,
            # but if a table_list is passed in, skip it and use that instead (for testing)
            if not table_list:
                client_id: str = os.environ["DAP_CLIENT_ID"]
                client_secret: str = os.environ["DAP_CLIENT_SECRET"]

                cd2_cred = Credentials.create(client_id=client_id, client_secret=client_secret)

                async def as_get_table_list() -> list:
                    async with DAPClient(os.environ["DAP_API_URL"],
                                         cd2_cred) as session:
                        tables = await session.get_tables("canvas")
                        return tables

                # this is the oddity
                get_table_list = as_get_table_list()
                table_list = asyncio.run(get_table_list)


            return table_list

        # noinspection PyTypedDict
        @task.bash(
            max_active_tis_per_dag=1,
            map_index_template="{{ table_name_map_index }}",
            on_failure_callback=on_table_sync_failure,
        )
        def dap_init_or_sync_db(table_name:str):
            """
            Generates a dap command that will be executed as a bash task by airflow
            :param table_name: str
            :return: cmd:str, the bash command to be executed, e.g.
                     set -e; dap syncdb --namespace canvas --table courses
            """
            """ use current_context to set the task id to table_name, via map_index_template"""
            from airflow.operators.python import get_current_context
            context = get_current_context()
            context["table_name_map_index"] = table_name

            """ build the bash cmd """
            switches = f" --namespace canvas --table {table_name}"

            if dap_db_exists():
                sync_row_exists = sync_table_row_exists(table_name)
                table_data_exists = cd_table_exists(table_name)
                print(f"sync_table_row_exists: {sync_row_exists}, cd_table_exists: {table_data_exists})")
                if sync_row_exists and table_data_exists:
                    cmd = f"""set -e; dap syncdb {switches} """
                else:
                    cmd = f"""set -e; dap initdb {switches}  """
            else:
                cmd = f"""set -e; dap initdb {switches} """

            return cmd
        """
        @task function needs to be called (either with 
        f.expand(), f.expand_kwargs(), or just f() like a function) 
        to become a concrete task
        """
        the_tables = dap_get_table_list()
        dap_init_or_sync_db.expand(table_name=the_tables)


        """ end canvas_data_sync() """

    """ the workflow canvas_data_sync() is defined, execute it as the final step of the DAG """
    canvas_data_sync()