[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Use xcom in task retry


Hi Taylor,

I am providing the code to show how I am using xcom pull and push, yes the
push is succeeding to the database.
But, the behaviour that I observed is when execute() is called for retry,
xcom values are deleted from the table for combination of <dag_id, task_id,
execution_date>
Then I searched for this in the Airflow code and I found that
clear_xcom_data() is called before the execute() starts.(providing this
code as well). We are using Airflow 1.9

def execute(self, context, **kwargs):

    pull_response = ti.xcom_pull(key='computeResponse', task_ids=None)
    logging.info("pull pull_response %s", pull_response)

    ti.xcom_push('computeResponse', self.compute_response)


################

def clear_xcom_data(self, session=None):
    """
    Clears all XCom data from the database for the task instance
    """
    self.log.info('------------------->>>>> c')
    session.query(XCom).filter(
        XCom.dag_id == self.dag_id,
        XCom.task_id == self.task_id,
        XCom.execution_date == self.execution_date
    ).delete()
    session.commit()


On Sun, Sep 9, 2018 at 1:20 AM Taylor Edmiston <tedmiston@xxxxxxxxx> wrote:

> XComs push and pull should work as expected when a task is retried.  It
> shouldn't make a difference, but are you using XComs with explicit keys or
> the implicit return-based style? Is the push succeeding to the database?
>
> Also can you show a simplified example of your code for this DAG?
>
> Taylor
>
> On Thu, Sep 6, 2018 at 12:48 PM Ben Gregory <ben@xxxxxxxxxxxxx> wrote:
>
> > Hi Mishika --
> >
> > Posting this question on StackOverflow with some code examples you're
> using
> > will likely be the fastest way to have this addressed.
> >
> > Just make sure to tag it with "airflow" so people can find it.
> >
> > - Ben
> >
> > On Thu, Sep 6, 2018 at 12:12 AM Mishika Singh <mishikaps@xxxxxxxxx>
> wrote:
> >
> > > I am pushing some [key,value] using xcom in an operator, which I want
> to
> > > use when that task fails and comes for retry, for which I am using
> > > xcom_pull in that operator. But it is returning None instead of that
> > value.
> > > Any pointer around this will be helpful.
> > >
> > >
> > > --
> > > Regards
> > > Mishika Singh
> > >
> >
> >
> > --
> >
> > [image: Astronomer Logo] <https://www.astronomer.io/>
> >
> > *Ben Gregory*
> > Data Engineer
> >
> > Mobile: +1-615-483-3653 • Online: astronomer.io <
> > https://www.astronomer.io/>
> >
> > Download our new ebook. <http://marketing.astronomer.io/guide/> From
> > Volume
> > to Value - A Guide to Data Engineering.
> >
> --
> *Taylor Edmiston*
> Blog <https://blog.tedmiston.com/> | CV
> <https://stackoverflow.com/cv/taylor> | LinkedIn
> <https://www.linkedin.com/in/tedmiston/> | AngelList
> <https://angel.co/taylor> | Stack Overflow
> <https://stackoverflow.com/users/149428/taylor-edmiston>
>


-- 
Regards
Mishika Singh