OSDir


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Ingest daily data, but delivery is always delayed by two days


S3 Bucket notification that triggers a dag?

Verstuurd vanaf mijn iPad

> Op 12 okt. 2018 om 12:42 heeft Ash Berlin-Taylor <ash@xxxxxxxxxx> het volgende geschreven:
> 
> A lot of our dags are ingesting data (usually daily or weekly) from suppliers, and they are universally late.
> 
> In the case I'm setting up now the delivery lag is about 30hours - data for 2018-10-10 turned up at 2018-10-12 05:43.
> 
> I was going to just set this up with an S3KeySensor and a daily schedule, but I'm wondering if anyone has any other bright ideas for a better way of handling this sort of case:
> 
>    dag = DAG(
>        DAG_ID
>        default_args=args,
>        start_date=args['start_date'],
>        concurrency=1,
>        schedule_interval='@daily',
>        params={'country': cc}
>    )
> 
>    with dag:
>        task = S3KeySensor(
>            task_id="await_files",
>            bucket_key="s3://bucket/raw/table1-{{ params.country }}/{{ execution_date.strftime('%Y/%m/%d') }}/SUCCESS",
>            poke_interval=60 * 60 * 2,
>            timeout=60 * 60 * 72,
>        )
> 
> That S3 key sensor is _going_ to fail the first 18 times or so it runs which just seems silly.
> 
> One option could be to use `ds_add` or similar on the execution date, but I don't like breaking the (obvious) link between execution date and which files it picks up, so I've ruled out this option
> 
> I could use a Time(Delta)Sensor to just delay the start of the checking. I guess with the new change in master to make sensors yield their execution slots that's not a terrible plan.
> 
> Does anyone else have any other idea, including possible things we could add to Airflow itself.
> 
> -ash
>