OSDir


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: How to get airflow to add thousands of tasks to celery at one time?


Sounds good.  Happy this answer could help.  It seems like concurrency
settings are a commonly confusing topic.

*Taylor Edmiston*
Blog <https://blog.tedmiston.com/> | CV
<https://stackoverflow.com/cv/taylor> | LinkedIn
<https://www.linkedin.com/in/tedmiston/> | AngelList
<https://angel.co/taylor> | Stack Overflow
<https://stackoverflow.com/users/149428/taylor-edmiston>


On Thu, Jun 28, 2018 at 2:40 PM, PAULI, KEVIN CHRISTIAN [AG/1000] <
kevin.christian.pauli@xxxxxxxxxxxx> wrote:

> Thanks.  Through trial and error I learned that I need to set all three of
> these:
>
>      - AIRFLOW__CORE__PARALLELISM=10000
>      - AIRFLOW__CORE__NON_POOLED_TASK_SLOT_COUNT=10000
>      - AIRFLOW__CORE__DAG_CONCURRENCY=10000
>
> With only these two enabled, I can get to 10K but it is very slow, only
> adding 100 new tasks in bursts every 30 seconds, in a stair-step fashion:
>      - AIRFLOW__CORE__PARALLELISM=10000
>      - AIRFLOW__CORE__NON_POOLED_TASK_SLOT_COUNT=10000
>
> If I only enable these two, it is the same "stair-step" pattern, with 128
> added every 30 seconds:
>
>      - AIRFLOW__CORE__PARALLELISM=10000
>      - AIRFLOW__CORE__DAG_CONCURRENCY=10000
>
> But if I set all three, it does add 10K to the queue in one shot.
>
> --
> Regards,
> Kevin Pauli
>
> On 6/27/18, 4:41 PM, "Taylor Edmiston" <tedmiston@xxxxxxxxx> wrote:
>
>     It sounds like you may be getting bottlenecked by executor concurrency
>     settings.
>
>     Are you using default values for the other concurrency settings,
>     specifically the ones
>     <https://stackoverflow.com/questions/50737800/how-many-
> tasks-can-be-scheduled-in-a-single-airflow-dag/50743825#50743825>
>     mentioned here?  If you increase the other ones to be very high as
> well, do
>     you still experience the issue?
>
>     Taylor
>
>     *Taylor Edmiston*
>     Blog <https://blog.tedmiston.com/> | CV
>     <https://stackoverflow.com/cv/taylor> | LinkedIn
>     <https://www.linkedin.com/in/tedmiston/> | AngelList
>     <https://angel.co/taylor> | Stack Overflow
>     <https://stackoverflow.com/users/149428/taylor-edmiston>
>
>
>     On Mon, Jun 25, 2018 at 1:40 PM, PAULI, KEVIN CHRISTIAN [AG/1000] <
>     kevin.christian.pauli@xxxxxxxxxxxx> wrote:
>
>     > Greetings Airflowers.  I'm evaluating Airflow 1.9.0 for our
> distributed
>     > orchestration needs (using CeleryExecutor and RabbitMQ), and I am
> seeing
>     > something strange.
>     >
>     > I made a dag that has three main stages: 1) start, 2) fan out and
> run N
>     > tasks concurrently, 3) finish.
>     >
>     > N can be large, maybe up to 10K.  I would expect to see N tasks get
> dumped
>     > onto the Rabbit queue when stage 2 begins.  Instead I am seeing only
> a few
>     > hundred added at a time.  As the workers process the tasks and the
> queue
>     > gets smaller, then more get added to Celery/Rabbit.  Eventually, it
> does
>     > finish, however I would really prefer that it dump ALL the work (all
> 10K
>     > tasks) into Celery immediately, for two reasons:
>     >
>     >
>     >   1.  The current way makes the scheduler long-lived and stateful.
> The
>     > scheduler might die after only 5K have completed, in which case the
>     > remaining 5K tasks would never get added (I verified this)
>     >   2.  I want to use the size of the Rabbit queue as metric to trigger
>     > autoscaling events to add more workers.  So I need a true picture of
> how
>     > much outstanding work remains (10K, not a few hundred)
>     >
>     > I assume the scheduler has some kind of throttle that keeps it from
>     > dumping all 10K messages simultaneously?  If so is this configurable?
>     >
>     > FYI I have already set “parallelism” to 10K in the airflow.cfg
>     >
>     > Here is my test dag:
>     >
>     > # This dag tests how well airflow fans out
>     >
>     > from airflow import DAG
>     > from datetime import datetime, timedelta
>     >
>     > from airflow.operators.bash_operator import BashOperator
>     >
>     > default_args = {
>     >     'owner': 'airflow',
>     >     'depends_on_past': False,
>     >     'start_date': datetime(2015, 6, 1),
>     >     'email': ['airflow@xxxxxxxxxxx<mailto:airflow@xxxxxxxxxxx>'],
>     >     'email_on_failure': False,
>     >     'email_on_retry': False,
>     >     'retries': 1,
>     >     'retry_delay': timedelta(minutes=5),
>     > }
>     >
>     > dag = DAG('fan_out', default_args=default_args,
> schedule_interval=None)
>     >
>     > num_tasks = 1000
>     >
>     > starting = BashOperator(
>     >     task_id='starting',
>     >     bash_command='echo starting',
>     >     dag=dag
>     > )
>     >
>     > all_done = BashOperator(
>     >     task_id='all_done',
>     >     bash_command='echo all done',
>     >     dag=dag)
>     >
>     > for i in range(0, num_tasks):
>     >     task = BashOperator(
>     >         task_id='say_hello_' + str(i),
>     >         bash_command='echo hello world',
>     >         dag=dag)
>     >     task.set_upstream(starting)
>     >     task.set_downstream(all_done)
>     >
>     >
>     >
>     > --
>     > Regards,
>     > Kevin Pauli
>     >
>     > This email and any attachments were sent from a Monsanto email
> account and
>     > may contain confidential and/or privileged information. If you are
> not the
>     > intended recipient, please contact the sender and delete this email
> and any
>     > attachments immediately. Any unauthorized use, including disclosing,
>     > printing, storing, copying or distributing this email, is
> prohibited. All
>     > emails and attachments sent to or from Monsanto email accounts may be
>     > subject to monitoring, reading, and archiving by Monsanto, including
> its
>     > affiliates and subsidiaries, as permitted by applicable law. Thank
> you.
>     >
>
>
> This email and any attachments were sent from a Monsanto email account and
> may contain confidential and/or privileged information. If you are not the
> intended recipient, please contact the sender and delete this email and any
> attachments immediately. Any unauthorized use, including disclosing,
> printing, storing, copying or distributing this email, is prohibited. All
> emails and attachments sent to or from Monsanto email accounts may be
> subject to monitoring, reading, and archiving by Monsanto, including its
> affiliates and subsidiaries, as permitted by applicable law. Thank you.
>