OSDir


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: How to get airflow to add thousands of tasks to celery at one time?


It sounds like you may be getting bottlenecked by executor concurrency
settings.

Are you using default values for the other concurrency settings,
specifically the ones
<https://stackoverflow.com/questions/50737800/how-many-tasks-can-be-scheduled-in-a-single-airflow-dag/50743825#50743825>
mentioned here?  If you increase the other ones to be very high as well, do
you still experience the issue?

Taylor

*Taylor Edmiston*
Blog <https://blog.tedmiston.com/> | CV
<https://stackoverflow.com/cv/taylor> | LinkedIn
<https://www.linkedin.com/in/tedmiston/> | AngelList
<https://angel.co/taylor> | Stack Overflow
<https://stackoverflow.com/users/149428/taylor-edmiston>


On Mon, Jun 25, 2018 at 1:40 PM, PAULI, KEVIN CHRISTIAN [AG/1000] <
kevin.christian.pauli@xxxxxxxxxxxx> wrote:

> Greetings Airflowers.  I'm evaluating Airflow 1.9.0 for our distributed
> orchestration needs (using CeleryExecutor and RabbitMQ), and I am seeing
> something strange.
>
> I made a dag that has three main stages: 1) start, 2) fan out and run N
> tasks concurrently, 3) finish.
>
> N can be large, maybe up to 10K.  I would expect to see N tasks get dumped
> onto the Rabbit queue when stage 2 begins.  Instead I am seeing only a few
> hundred added at a time.  As the workers process the tasks and the queue
> gets smaller, then more get added to Celery/Rabbit.  Eventually, it does
> finish, however I would really prefer that it dump ALL the work (all 10K
> tasks) into Celery immediately, for two reasons:
>
>
>   1.  The current way makes the scheduler long-lived and stateful.  The
> scheduler might die after only 5K have completed, in which case the
> remaining 5K tasks would never get added (I verified this)
>   2.  I want to use the size of the Rabbit queue as metric to trigger
> autoscaling events to add more workers.  So I need a true picture of how
> much outstanding work remains (10K, not a few hundred)
>
> I assume the scheduler has some kind of throttle that keeps it from
> dumping all 10K messages simultaneously?  If so is this configurable?
>
> FYI I have already set “parallelism” to 10K in the airflow.cfg
>
> Here is my test dag:
>
> # This dag tests how well airflow fans out
>
> from airflow import DAG
> from datetime import datetime, timedelta
>
> from airflow.operators.bash_operator import BashOperator
>
> default_args = {
>     'owner': 'airflow',
>     'depends_on_past': False,
>     'start_date': datetime(2015, 6, 1),
>     'email': ['airflow@xxxxxxxxxxx<mailto:airflow@xxxxxxxxxxx>'],
>     'email_on_failure': False,
>     'email_on_retry': False,
>     'retries': 1,
>     'retry_delay': timedelta(minutes=5),
> }
>
> dag = DAG('fan_out', default_args=default_args, schedule_interval=None)
>
> num_tasks = 1000
>
> starting = BashOperator(
>     task_id='starting',
>     bash_command='echo starting',
>     dag=dag
> )
>
> all_done = BashOperator(
>     task_id='all_done',
>     bash_command='echo all done',
>     dag=dag)
>
> for i in range(0, num_tasks):
>     task = BashOperator(
>         task_id='say_hello_' + str(i),
>         bash_command='echo hello world',
>         dag=dag)
>     task.set_upstream(starting)
>     task.set_downstream(all_done)
>
>
>
> --
> Regards,
> Kevin Pauli
>
> This email and any attachments were sent from a Monsanto email account and
> may contain confidential and/or privileged information. If you are not the
> intended recipient, please contact the sender and delete this email and any
> attachments immediately. Any unauthorized use, including disclosing,
> printing, storing, copying or distributing this email, is prohibited. All
> emails and attachments sent to or from Monsanto email accounts may be
> subject to monitoring, reading, and archiving by Monsanto, including its
> affiliates and subsidiaries, as permitted by applicable law. Thank you.
>