osdir.com

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: How to append to parquet file periodically and read intermediate data - pyarrow.lib.ArrowIOError: Invalid parquet file. Corrupt footer.


Hello,

For my company's usecases, we have found that the number of files was a
critical part of the time spent doing the execution plan, so we found the
idea of very regularly writing small parquet files to be rather inefficient.

There are some formats that support an `append` semantic (I have tested
successfully with avro, but there are a couple others that could be used
similarly).
So we had a few cases where we were aggregating data in a `current table`
in set of avro files, and rewriting all of it in few parquet files at the
end of the day.
This allowed us to have files that have been prepared to optimize their
querying performance (file size, row group size, sorting per column) by
maximizing the ability to benefit from the statistics.
And our queries were doing an UNION between "optimized for speed" history
tables and "optimized for latency" current tables, when the query timeframe
was crossing the boundaries of the current day.

Regards, Joel

On Wed, Dec 19, 2018 at 2:14 PM Francois Saint-Jacques <
fsaintjacques@xxxxxxxxxxxxxxx> wrote:

> Hello Darren,
>
> what Uwe suggests is usually the way to go, your active process writes to a
> new file every time. Then you have a parallel process/thread that does
> compaction of smaller files in the background such that you don't have too
> many files.
>
> On Wed, Dec 19, 2018 at 7:59 AM Uwe L. Korn <uwelk@xxxxxxxxxx> wrote:
>
> > Hello Darren,
> >
> > you're out of luck here. Parquet files are immutable and meant for batch
> > writes. Once they're written you cannot modify them anymore. To load
> them,
> > you need to know their metadata which is in the footer. The footer is
> > always at the end of the file and written once you call close.
> >
> > Your use case is normally fulfilled by continously starting new files and
> > reading them back in using the ParquetDataset class
> >
> > Cheers
> > Uwe
> >
> > Am 18.12.2018 um 21:03 schrieb Darren Gallagher <dazzag@xxxxxxxxx>:
> >
> > >> [Cross posted from https://github.com/apache/arrow/issues/3203]
> > >>
> > >> I'm adding new data to a parquet file every 60 seconds using this
> code:
> > >>
> > >> import os
> > >> import json
> > >> import time
> > >> import requests
> > >> import pandas as pd
> > >> import numpy as np
> > >> import pyarrow as pa
> > >> import pyarrow.parquet as pq
> > >>
> > >> api_url = 'https://opensky-network.org/api/states/all'
> > >>
> > >> cols = ['icao24', 'callsign', 'origin', 'time_position',
> > >>        'last_contact', 'longitude', 'latitude',
> > >>        'baro_altitude', 'on_ground', 'velocity', 'true_track',
> > >>        'vertical_rate', 'sensors', 'geo_altitude', 'squawk',
> > >>        'spi', 'position_source']
> > >>
> > >> def get_new_flight_info(writer):
> > >>    print("Requesting new data")
> > >>    req = requests.get(api_url)
> > >>    content = req.json()
> > >>
> > >>    states = content['states']
> > >>    df = pd.DataFrame(states, columns = cols)
> > >>    df['timestamp'] = content['time']
> > >>    print("Found {} new items".format(len(df)))
> > >>
> > >>    table = pa.Table.from_pandas(df)
> > >>    if writer is None:
> > >>        writer = pq.ParquetWriter('openskyflights.parquet',
> table.schema)
> > >>    writer.write_table(table=table)
> > >>    return writer
> > >>
> > >> if __name__ == '__main__':
> > >>    writer = None
> > >>    while (not os.path.exists('opensky.STOP')):
> > >>        writer = get_new_flight_info(writer)
> > >>        time.sleep(60)
> > >>
> > >>    if writer:
> > >>        writer.close()
> > >>
> > >> This is working fine and the file grows every 60 seconds.
> > >> However unless I force the loop to exit I am unable to use the parquet
> > >> file. In a separate terminal I try to access the parquet file using
> this
> > >> code:
> > >>
> > >> import pandas as pd
> > >> import pyarrow.parquet as pq
> > >>
> > >> table = pq.read_table("openskyflights.parquet")
> > >> df = table.to_pandas()
> > >> print(len(df))
> > >>
> > >> which results in this error:
> > >>
> > >> Traceback (most recent call last):
> > >>  File "checkdownloadsize.py", line 7, in <module>
> > >>    table = pq.read_table("openskyflights.parquet")
> > >>  File
> >
> "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py",
> > line 1074, in read_table
> > >>    use_pandas_metadata=use_pandas_metadata)
> > >>  File
> >
> "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/filesystem.py",
> > line 182, in read_parquet
> > >>    filesystem=self)
> > >>  File
> >
> "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py",
> > line 882, in __init__
> > >>    self.validate_schemas()
> > >>  File
> >
> "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py",
> > line 895, in validate_schemas
> > >>    self.schema = self.pieces[0].get_metadata(open_file).schema
> > >>  File
> >
> "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py",
> > line 453, in get_metadata
> > >>    return self._open(open_file_func).metadata
> > >>  File
> >
> "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py",
> > line 459, in _open
> > >>    reader = open_file_func(self.path)
> > >>  File
> >
> "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py",
> > line 984, in open_file
> > >>    common_metadata=self.common_metadata)
> > >>  File
> >
> "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py",
> > line 102, in __init__
> > >>    self.reader.open(source, metadata=metadata)
> > >>  File "pyarrow/_parquet.pyx", line 639, in
> > pyarrow._parquet.ParquetReader.open
> > >>  File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status
> > >> pyarrow.lib.ArrowIOError: Invalid parquet file. Corrupt footer.
> > >>
> > >> Is there a way to achieve this?
> > >> I'm assuming that if I call writer.close() in the while loop then it
> > will
> > >> prevent any further data being written to the file? Is there some kind
> > of
> > >> "flush" operation that can be used to ensure all data is written to
> disk
> > >> and available to other processes or threads that want to read the
> data?
> > >>
> > >> Thanks
> > >>
> >
> >
>
> --
> Sent from my jetpack.
>