OSDir


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Problem with SparkSubmit


Hi Anton,

Which version of Airflow are you running?

Cheers, Fokko

2018-04-27 10:24 GMT+02:00 Anton Mushin <Anton_Mushin@xxxxxxxx>:

> Hi all,
> I have problem with spark operator. I get exception
>
> user@host:/# airflow test myDAG myTask 2018-04-26
> [2018-04-26 15:32:11,279] {driver.py:120} INFO - Generating grammar tables
> from /usr/lib/python3.5/lib2to3/Grammar.txt
> [2018-04-26 15:32:11,323] {driver.py:120} INFO - Generating grammar tables
> from /usr/lib/python3.5/lib2to3/PatternGrammar.txt
> [2018-04-26 15:32:11,456] {__init__.py:45} INFO - Using executor
> SequentialExecutor
> [2018-04-26 15:32:11,535] {models.py:189} INFO - Filling up the DagBag
> from /usr/local/airflow/dags
> [2018-04-26 15:32:11,811] {base_hook.py:80} INFO - Using connection to:
> sparkhost
> Traceback (most recent call last):
>   File "/usr/local/bin/airflow", line 27, in <module>
>     args.func(args)
>   File "/usr/local/lib/python3.5/dist-packages/airflow/bin/cli.py", line
> 528, in test
>     ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True)
>   File "/usr/local/lib/python3.5/dist-packages/airflow/utils/db.py", line
> 50, in wrapper
>     result = func(*args, **kwargs)
>   File "/usr/local/lib/python3.5/dist-packages/airflow/models.py", line
> 1584, in run
>     session=session)
>   File "/usr/local/lib/python3.5/dist-packages/airflow/utils/db.py", line
> 50, in wrapper
>     result = func(*args, **kwargs)
>   File "/usr/local/lib/python3.5/dist-packages/airflow/models.py", line
> 1493, in _run_raw_task
>     result = task_copy.execute(context=context)
>   File "/usr/local/lib/python3.5/dist-packages/airflow/contrib/
> operators/spark_submit_operator.py", line 145, in execute
>     self._hook.submit(self._application)
>   File "/usr/local/lib/python3.5/dist-packages/airflow/contrib/hooks/spark_submit_hook.py",
> line 231, in submit
>     **kwargs)
>   File "/usr/lib/python3.5/subprocess.py", line 947, in __init__
>     restore_signals, start_new_session)
>   File "/usr/lib/python3.5/subprocess.py", line 1490, in _execute_child
>     restore_signals, start_new_session, preexec_fn)
> TypeError: Can't convert 'list' object to str implicitly
>
> My DAG look like:
>
> from airflow import DAG
> from datetime import datetime, timedelta, date
> from airflow.contrib.operators.spark_submit_operator import
> SparkSubmitOperator
>
> default_args = {
>     'owner': 'spark',
>     'depends_on_past': False,
>     'start_date': datetime.now(),
>     'retries': 1,
>     'retry_delay': timedelta(minutes=1)
> }
>
> dag = DAG('myDAG', default_args=default_args,)
>
> connection_id = "SPARK"
> os.environ[('AIRFLOW_CONN_%s' % connection_id)] = 'spark://sparkhost:7077'
>
> _config = {
>     'jars': 'spark_job.jar',
>     'executor_memory': '2g',
>     'name': 'myJob',
>     'conn_id': connection_id,
>     'java_class':'org.Job'
> }
>
> operator = SparkSubmitOperator(
>     task_id='myTask',
>     dag=dag,
>     **_config
> )
>
> What is wrong? Could somebody help me wit it?
>
>