[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: H5 potential intermediate solution

Hi Kevin / Cham / all,

Thank you for your comments. I was not aware of the create_disposition option. I am planning to publish the code for working with H5 for the public use and will contribute to the open source project when I have more resources available.

Following are my insights and questions:

Pipeline - writing each sample's feature to BigQuery:

Schema - 
Pandas allows me to create an empty table with list of column names that I can use as well - this will automatically generate the schema (the API is gbq.to_gbq).
Data size - 
Writing bulk data with Pandas BQ API, there is a limit to the size of the dataFrame that is being written. I am looking for another solution to write bulk data to BQ.
- I understand tha bigQuery sink is not the right solution - since it is writing every row separately. Is that the right approach? In addition, to my understanding, Sinking rows might be the most expensive way to work with BQ. Writing columns should be cheaper (I am not expert in BQ, but I think that BQ data structure recommends on using columns as much as possible to reduce cost). Is there a way to work with columns when sinking bulk data to BQ?
Asynchronous writing to BQ:
- This can be done with ParDo OR other BQ API that might be available (I am looking for a different way now). any other suggestions?

Pipeline - writing each sample's feature to a text file:
The values are spread into 3 different files.  How can I identify the order of the values. The file names are  
-00000-of-00003, -00002-of-00003. 
 while -00001-of-00003 has the values at the beginning. Am I missing something here?

Thank you,

On Tue, Apr 3, 2018 at 12:27 PM, Kevin Peterson <kevincp@xxxxxxxxxx> wrote:
https://issues.apache.org/jira/browse/BEAM-3167 might be what you want for this to all work as part of the pipeline, but looks like that is for Java, not Python.

Also, the pandas_gbq library has a function for generating a BQ schema from a DataFrame. Maybe something like that would work for your data?

On Mon, Apr 2, 2018 at 5:32 PM, Chamikara Jayalath <chamikara@xxxxxxxxxx> wrote:

On Mon, Apr 2, 2018 at 5:19 PM Eila Arich-Landkof <eila@xxxxxxxxxxxxxxxxx> wrote:
Hi Cham,

Thanks. I have created a PCollection from the dataset that is available in the H5 file which is provided as numpy array. 
It is very challenging for my use case to describe the schema. The original dimensions of the dataset are 70K x 30K . Any suggestion how to work around that?

Can you write to a pre-created table by using "create_disposition = BigQueryDisposition.CREATE_NEVER" ? You can try to use BigQuery schema auto detection (https://cloud.google.com/bigquery/docs/schema-detect) to create the table before running the Beam pipeline.
I think that it was mentioned at the summit that there will be a way to write to BQ without schema. Is something like that on the roadmap? 

I don't think supporting this is in the immediate road map of Beam but any contributions in this space are welcome. 


Sent from my iPhone

On Apr 2, 2018, at 7:33 PM, Chamikara Jayalath <chamikara@xxxxxxxxxx> wrote:

(moving dev to bcc)

Hi Eila,

On Mon, Apr 2, 2018 at 3:50 PM OrielResearch Eila Arich-Landkof <eila@xxxxxxxxxxxxxxxxx> wrote:
Hi All,

I was able to make it work by creating the PCollection with the numpy array. However, writing to BQ was impossible because it requested for the schema.
The code:
(p | "create all" >> beam.Create(_expression_[1:5,1:5])
   | "write all text" >> beam.io.WriteToText('gs://archs4/output/', file_name_suffix='.txt'))

Is there a walk around for providing schema for beam.io.BigQuerySink?

Regarding your earlier question, you do need at least one element in the PCollection that triggers the ParDo to do any work (which can be a create with a single element that you ignore).

Not sure if I fully understood the BigQuery question. You have to specify a schema when writing to a new BigQuery table. See following example,



Many thanks,

On Mon, Apr 2, 2018 at 11:33 AM, OrielResearch Eila Arich-Landkof <eila@xxxxxxxxxxxxxxxxx> wrote:
Hello all,

I would like to try a different way to leverage Apache beam for H5 => BQ (file to table transfer).

For my use case, I would like to read every 10K rows of H5 data (numpy array format), transpose them and write them to BQ 10K columns. 10K is BQ columns limit.

My code is below and fires the following error (I might have missed something basic). I am not using beam.Create and trying to create a PCollection from the ParDo transfer. is this posssible? if not, what is the alternative for creating a PColleciton from numpy array? (if any)

ERROR:root:Exception at bundle <apache_beam.runners.direct.bundle_factory._Bundle object at 0x7f00aad7b7a0>, due to an exception.
 Traceback (most recent call last):
  File "/usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/direct/executor.py", line 307, in call
  File "/usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/direct/executor.py", line 332, in attempt_call
  File "/usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/direct/transform_evaluator.py", line 540, in start_bundle
AttributeError: 'PBegin' object has no attribute 'windowing'

ERROR:root:Giving up after 4 attempts.
WARNING:root:A task failed with exception: 'PBegin' object has no attribute 'windowing'
WARNING:root:A task failed with exception: 'PBegin' object has no attribute 'windowing'


options = PipelineOptions()
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = 'orielresearch-188115'
google_cloud_options.job_name = 'h5-to-bq-10K'
google_cloud_options.staging_location = 'gs://archs4/staging'
google_cloud_options.temp_location = 'gs://archs4/temp'
options.view_as(StandardOptions).runner = 'DirectRunner'  

p = beam.Pipeline(options=options)

class read10kRowsDoFn(beam.DoFn):
  def process(self, element,index):
    row_start = index
    row_end = index+10000
    # returns numpy array - numpy.ndarray
    d = _expression_[row_start,row_end,:]

#for i in range(0,_expression_.shape[0],10000):
k=210 # allows creating unique labels for the runner
for i in range(0,3,2): # test 
  label_read_row = "read "+bigQuery_dataset_table_name
  label_write_col = "write "+bigQuery_dataset_table_name
# is this possible to generate a PCollection with ParDo without create?
  (p | label_read_row >> beam.ParDo(read10kRowsDoFn(i))
     | label_write_col >> beam.io.Write(beam.io.BigQuerySink(bigQuery_dataset_table_name)))

p.run().wait_until_finish() #fires an error

Many thanks,