osdir.com


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Dynamic tasks in a dag?


Hi,

I'm using apache airflow 1.10.0 and I'm trying to dynamically generate some tasks in my dag based on files that are in the dags directory. The problem is, I don't see these tasks in the ui, I just see the 'start' dummy operator. If I type 'airflow list_tasks workflow', they are listed. Thoughts?

Here is how I'm generating the tasks:


def create_snowflake_operator(file, dag, snowflake_connection):
    file_repl = file.replace('/', '_')
    file_repl = file_repl.replace('.sql', '')
    print("TASK_ID {}".format(file_repl))
    return SnowflakeOperator(
        dag=dag,
        task_id='create_{}'.format(file_repl),
        snowflake_conn_id=snowflake_connection,
        sql=file
    )

DAG_NAME = 'create_objects'
dag = DAG(
    DAG_NAME,
    default_args=args,
    dagrun_timeout=timedelta(hours=2),
    schedule_interval=None,
)

start = DummyOperator(
    dag=dag,
    task_id="start",
)

print("creating snowflake operators")

for file in glob('dags/snowsql/create/udf/*.sql'):
    print("FILE {}".format(file))
    task = create_snowflake_operator(file, dag, 'snowflake_default')
    task.set_upstream(start)

for file in glob('dags/snowsql/create/table/*.sql'):
    print("FILE {}".format(file))
    task = create_snowflake_operator(file, dag, 'snowflake_default')
    task.set_upstream(start)

for file in glob('dags/snowsql/create/view/*.sql'):
    print("FILE {}".format(file))
    task = create_snowflake_operator(file, dag, 'snowflake_default')
    task.set_upstream(start)

print("done {}".format(start.downstream_task_ids))

Thanks in advance
--
Frank Maritato