OSDir


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Disable Processing of DAG file


Thanks Maxime,
we have 100(s) of dags with schedule set to @once with new DAGs keep on coming in the system. 
Scheduler process each and every DAG inside the local DAG folder. Each Dag file processing takes around 400 millisecond and we have set max_threads to 8(As we have 8 core machine). i.e 8 DAG files would be processed in parallele
In the processing loop Scheduler also sleeps for 1 sec (_processor_poll_interval is set to 1) so effectively it  processes 8 files every 1 seconds. 
so Every DAG file gets processed after ~(Number of Dags/8) seconds. And Scheduler latency of processing  new DAG file increases with increase in the number of DAGs.

-Raman Gupta

On 2018/05/30 06:08:55, Maxime Beauchemin <maximebeauchemin@xxxxxxxxx> wrote: 
> The TLDR of how the processor works is:
> 
> while True:
> * sets a multiprocessing queue with N processes (say 32)
> * main process looks for the list of all .py files in DAGS_FOLDER
> * fills in the queue with all .py
> * each one of the 32 suprocess opens a file and interprets it (it's
> insulated from the main process, a sys.exit() wouldn't affect the main
> process), looks for DAG object in module namespace
> * if it finds a DAG object, it looks for active DAG runs, and creates new
> DAG runs if a new schedule is ready to start
> * for each active DAG Run, it looks at all "runable" tasks and looks to see
> if dependencies have been met
> * returns a list of all tasks ready to get triggered to main process
> * main process wait for a certain specified amount of time, accumulates
> task instance list that are all ready to run
> * the scheduling train leaves the station, prioritize tasks based
> priority_weight and schedules where pool slots are availlable
> * other supervisor-type tasks, like handling zombie tasks and such
> 
> A long long time ago we didn't have subprocesses and things like a DAG with
> a `sys.exit()` would crash the scheduler, and modules imported in DAGs
> files would get cached in `sys.modules` unless you'd force
> `reload(my_submodule)`. There was (and still is) a flag on the scheduler
> CLI command to force it to exit after a certain number of runs so that your
> service would restart it in a loop and flush sys.modules .  But those days
> are long gone, and there's no reason to do this anymore.
> 
> Max
> 
> 
> On Mon, May 28, 2018 at 11:29 PM Ruiqin Yang <yrqls21@xxxxxxxxx> wrote:
> 
> > Hi folks,
> > This config line
> > <
> > https://github.com/apache/incubator-airflow/blob/master/airflow/config_templates/default_airflow.cfg#L414
> > >
> > controls how often the scheduler scan the DAG folder and tries to discover/
> > forget DAGs.
> >
> > For doing dag file processing part, scheduler does parse the DAG file
> > everytime before it schedules tasks through DagFileProcessor.
> >
> > Cheers,
> > Kevin Y
> >
> > On Mon, May 28, 2018 at 10:14 PM, Ananth Durai <vananth22@xxxxxxxxx>
> > wrote:
> >
> > > It is an interesting question. On a slightly related note, Correct me if
> > > I'm wrong, AFAIK we require restarting airflow scheduler in order pick
> > any
> > > new DAG file changes by the scheduler. In that case, should the scheduler
> > > do the DAGFileProcessing every time before scheduling the tasks?
> > >
> > > Regards,
> > > Ananth.P,
> > >
> > >
> > >
> > >
> > >
> > >
> > > On 28 May 2018 at 21:46, ramandumcs@xxxxxxxxx <ramandumcs@xxxxxxxxx>
> > > wrote:
> > >
> > > > Hi All,
> > > >
> > > > We have a use case where there would be 100(s) of DAG files with
> > schedule
> > > > set to "@once". Currently it seems that scheduler processes each and
> > > every
> > > > file and creates a Dag Object.
> > > > Is there a way or config to tell scheduler to stop processing certain
> > > > files.
> > > >
> > > > Thanks,
> > > > Raman Gupta
> > > >
> > >
> >
>