osdir.com


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: How to append to parquet file periodically and read intermediate data - pyarrow.lib.ArrowIOError: Invalid parquet file. Corrupt footer.


Hello Darren, 

you're out of luck here. Parquet files are immutable and meant for batch writes. Once they're written you cannot modify them anymore. To load them, you need to know their metadata which is in the footer. The footer is always at the end of the file and written once you call close.

Your use case is normally fulfilled by continously starting new files and reading them back in using the ParquetDataset class 

Cheers
Uwe

Am 18.12.2018 um 21:03 schrieb Darren Gallagher <dazzag@xxxxxxxxx>:

>> [Cross posted from https://github.com/apache/arrow/issues/3203]
>> 
>> I'm adding new data to a parquet file every 60 seconds using this code:
>> 
>> import os
>> import json
>> import time
>> import requests
>> import pandas as pd
>> import numpy as np
>> import pyarrow as pa
>> import pyarrow.parquet as pq
>> 
>> api_url = 'https://opensky-network.org/api/states/all'
>> 
>> cols = ['icao24', 'callsign', 'origin', 'time_position',
>>        'last_contact', 'longitude', 'latitude',
>>        'baro_altitude', 'on_ground', 'velocity', 'true_track',
>>        'vertical_rate', 'sensors', 'geo_altitude', 'squawk',
>>        'spi', 'position_source']
>> 
>> def get_new_flight_info(writer):
>>    print("Requesting new data")
>>    req = requests.get(api_url)
>>    content = req.json()
>> 
>>    states = content['states']
>>    df = pd.DataFrame(states, columns = cols)
>>    df['timestamp'] = content['time']
>>    print("Found {} new items".format(len(df)))
>> 
>>    table = pa.Table.from_pandas(df)
>>    if writer is None:
>>        writer = pq.ParquetWriter('openskyflights.parquet', table.schema)
>>    writer.write_table(table=table)
>>    return writer
>> 
>> if __name__ == '__main__':
>>    writer = None
>>    while (not os.path.exists('opensky.STOP')):
>>        writer = get_new_flight_info(writer)
>>        time.sleep(60)
>> 
>>    if writer:
>>        writer.close()
>> 
>> This is working fine and the file grows every 60 seconds.
>> However unless I force the loop to exit I am unable to use the parquet
>> file. In a separate terminal I try to access the parquet file using this
>> code:
>> 
>> import pandas as pd
>> import pyarrow.parquet as pq
>> 
>> table = pq.read_table("openskyflights.parquet")
>> df = table.to_pandas()
>> print(len(df))
>> 
>> which results in this error:
>> 
>> Traceback (most recent call last):
>>  File "checkdownloadsize.py", line 7, in <module>
>>    table = pq.read_table("openskyflights.parquet")
>>  File "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py", line 1074, in read_table
>>    use_pandas_metadata=use_pandas_metadata)
>>  File "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/filesystem.py", line 182, in read_parquet
>>    filesystem=self)
>>  File "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py", line 882, in __init__
>>    self.validate_schemas()
>>  File "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py", line 895, in validate_schemas
>>    self.schema = self.pieces[0].get_metadata(open_file).schema
>>  File "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py", line 453, in get_metadata
>>    return self._open(open_file_func).metadata
>>  File "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py", line 459, in _open
>>    reader = open_file_func(self.path)
>>  File "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py", line 984, in open_file
>>    common_metadata=self.common_metadata)
>>  File "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py", line 102, in __init__
>>    self.reader.open(source, metadata=metadata)
>>  File "pyarrow/_parquet.pyx", line 639, in pyarrow._parquet.ParquetReader.open
>>  File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status
>> pyarrow.lib.ArrowIOError: Invalid parquet file. Corrupt footer.
>> 
>> Is there a way to achieve this?
>> I'm assuming that if I call writer.close() in the while loop then it will
>> prevent any further data being written to the file? Is there some kind of
>> "flush" operation that can be used to ensure all data is written to disk
>> and available to other processes or threads that want to read the data?
>> 
>> Thanks
>>