osdir.com


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: (Ab)using parquet files on S3 storage for a huge logging database


Hi Gerlando,

I'm also using remote Parquet files as a pseudo-database for long-term
storage of log-like records. Here's what I do:

# save log files
0. Every (second|minute|hour|whatever), parse new logs and combine them
into 1 pyarrow.Table in RAM on 1 machine.
1. Use pyarrow.parquet.write_to_dataset() to save local files with paths
like /path/to/logs/date=2018-09-19/source=CRM-114/<long unique file
name>.parquet
2. Push those local files to an S3 bucket.

The bucket typically ends up storing many small Parquet files. That's not
optimal, but it allows me to get away with the next part:

# query saved logs
0. Figure out which (date,source) partitions I need. Pull any files in
those partitions from S3 which I don't already have on local disk.
1. For each file, use pyarrow.parquet.read_table().to_batches() to read the
columns I need and slice them into RecordBatches.
2. For each batch, convert to pandas.DataFrame and use pandas to do more
filtering. Concat results and save to_parquet(), to_excel(), to_csv(), etc.
3. Delete old and/or rarely-used partitions before the local disk runs out
of space.

This is my (fairly incomplete and rather badly documented) open-source tool
for query steps 1 and 2: https://github.com/samkennerly/arrowloaf

It's basically a crude DIY version of services Bill & Paul mentioned like
Apache Drill, AWS Athena, etc. It's not clever enough to scan through an S3
object before downloading it, nor to avoid loading every row of each
Parquet file into RAM. But it's dirt cheap and surprisingly fast,
especially when pyarrow can repeatedly re-use the same local files. It's
been much faster than our SQL databases for big, blunt queries like "find
all logs between 08:00-09:00 yesterday from [list of 13 machines which
failed]."

Cheers,
  - Sam Kennerly
    samkennerly@xxxxxxxxx

On Wed, Sep 19, 2018 at 5:07 PM Andreas Heider <andreas@xxxxxxxxx> wrote:

> There's been a bunch of work on adding page indices to parquet:
> https://github.com/apache/parquet-format/blob/master/PageIndex.md <
> https://github.com/apache/parquet-format/blob/master/PageIndex.md>
> I haven't followed progress in detail but I think the Java implementation
> supports this now.
>
> Looked pretty similar to what kdb or clickhouse do, so with fitting
> storage underneath it should enable very fast point lookups.
>
> Andreas
>
> > Am 19.09.2018 um 20:30 schrieb Brian Bowman <Brian.Bowman@xxxxxxx>:
> >
> > Gerlando,
> >
> > AFAIK Parquet does not yet support indexing.  I believe it does store
> min/max values at the row batch (or maybe it's page) level which may help
> eliminate large "swaths" of data depending on how actual data values
> corresponding to a search predicate are distributed across large Parquet
> files.
> >
> > I have an interest in the future of indexing within the native Parquet
> structure as well.  It will be interesting to see where this discussion
> goes from here.
> >
> > -Brian
> >
> > On 9/19/18, 3:21 PM, "Gerlando Falauto" <gerlando.falauto@xxxxxxxxx>
> wrote:
> >
> >    EXTERNAL
> >
> >    Thank you all guys, you've been extremely helpful with your ideas.
> >    I'll definitely have a look at all your suggestions to see what
> others have
> >    been doing in this respect.
> >
> >    What I forgot to mention was that while the service uses the S3 API,
> it's
> >    not provided by AWS so any solution should be based on a cloud
> offering
> >    from a different big player (it's the three-letter big-blue one, in
> case
> >    you're wondering).
> >
> >    However, I'm still not clear as to how Drill (or pyarrow) would be
> able to
> >    gather data with random access. In any database, you just build an
> index on
> >    the fields you're going to run most of your queries over, and then the
> >    database takes care of everything else.
> >
> >    With Parquet, as I understand, you can do folder-based partitioning
> (is
> >    that called "hive" partitioning?) so that you can get random access
> over
> >    let's say
> >    source=source1/date=20180918/*.parquet.
> >    I assume drill could be instructed into doing this or even figure it
> out by
> >    itself, by just looking at the folder structure.
> >    What I still don't get though, is how to "index" the parquet file(s),
> so
> >    that random (rather than sequential) access can be performed over the
> whole
> >    file.
> >    Brian mentioned metadata, I had a quick look at the parquet
> specification
> >    and I sortof understand it somehow resembles an index.
> >    Yet I fail to understand how such an index could be built (if at all
> >    possible), for instance using pyarrow (or any other tool, for that
> matter)
> >    for reading and/or writing.
> >
> >    Thank you!
> >    Gerlando
> >
> >    On Wed, Sep 19, 2018 at 7:55 PM Ted Dunning <ted.dunning@xxxxxxxxx>
> wrote:
> >
> >> The effect of rename can be had by handling a small inventory file that
> is
> >> updated atomically.
> >>
> >> Having real file semantics is sooo much nicer, though.
> >>
> >>
> >>
> >> On Wed, Sep 19, 2018 at 1:51 PM Bill Glennon <wglennon@xxxxxxxxx>
> wrote:
> >>
> >>> Also, may want to take a look at https://aws.amazon.com/athena/.
> >>>
> >>> Thanks,
> >>> Bill
> >>>
> >>> On Wed, Sep 19, 2018 at 1:43 PM Paul Rogers <par0328@xxxxxxxxx.invalid
> >
> >>> wrote:
> >>>
> >>>> Hi Gerlando,
> >>>>
> >>>> I believe AWS has entire logging pipeline they offer. If you want
> >>>> something quick, perhaps look into that offering.
> >>>>
> >>>> What you describe is pretty much the classic approach to log
> >> aggregation:
> >>>> partition data, gather data incrementally, then later consolidate. A
> >>> while
> >>>> back, someone invented the term "lambda architecture" for this idea.
> >> You
> >>>> should be able to find examples of how others have done something
> >>> similar.
> >>>>
> >>>> Drill can scan directories of files. So, in your buckets (source-date)
> >>>> directories, you can have multiple files. If you receive data, say,
> >>> every 5
> >>>> or 10 minutes, you can just create a separate file for each new drop
> of
> >>>> data. You'll end up with many files, but you can query the data as it
> >>>> arrives.
> >>>>
> >>>> Then, later, say once per day, you can consolidate the files into a
> few
> >>>> big files. The only trick is the race condition of doing the
> >>> consolidation
> >>>> while running queries. Not sure how to do that on S3, since you can't
> >>>> exploit rename operations as you can on Linux. Anyone have suggestions
> >>> for
> >>>> this step?
> >>>>
> >>>> Thanks,
> >>>> - Paul
> >>>>
> >>>>
> >>>>
> >>>>    On Wednesday, September 19, 2018, 6:23:13 AM PDT, Gerlando Falauto
> >> <
> >>>> gerlando.falauto@xxxxxxxxx> wrote:
> >>>>
> >>>> Hi,
> >>>>
> >>>> I'm looking for a way to store huge amounts of logging data in the
> >> cloud
> >>>> from about 100 different data sources, each producing about 50MB/day
> >> (so
> >>>> it's something like 5GB/day).
> >>>> The target storage would be an S3 object storage for cost-efficiency
> >>>> reasons.
> >>>> I would like to be able to store (i.e. append-like) data in realtime,
> >> and
> >>>> retrieve data based on time frame and data source with fast access. I
> >> was
> >>>> thinking of partitioning data based on datasource and calendar day, so
> >> to
> >>>> have one file per day, per data source, each 50MB.
> >>>>
> >>>> I played around with pyarrow and parquet (using s3fs), and came across
> >>> the
> >>>> following limitations:
> >>>>
> >>>> 1) I found no way to append to existing files. I believe that's some
> >>>> limitation with S3, but it could be worked around by using datasets
> >>>> instead. In principle, I believe I could also trigger some daily job
> >>> which
> >>>> coalesces, today's data into a single file, if having too much
> >>>> fragmentation causes any disturbance. Would that make any sense?
> >>>>
> >>>> 2) When reading, if I'm only interested in a small portion of the data
> >>> (for
> >>>> instance, based on a timestamp field), I obviously wouldn't want to
> >> have
> >>> to
> >>>> read (i.e. download) the whole file. I believe Parquet was designed to
> >>>> handle huge amounts of data with relatively fast access. Yet I fail to
> >>>> understand if there's some way to allow for random access,
> particularly
> >>>> when dealing with a file stored within S3.
> >>>> The following code snippet refers to a 150MB dataset composed of 1000
> >>>> rowgroups of 150KB each. I was expecting it to run very fast, yet it
> >>>> apparently downloads the whole file (pyarrow 0.9.0):
> >>>>
> >>>> fs = s3fs.S3FileSystem(key=access_key, secret=secret_key,
> >>>> client_kwargs=client_kwargs)
> >>>> with fs.open(bucket_uri) as f:
> >>>>    pf = pq.ParquetFile(f)
> >>>>    print(pf.num_row_groups) # yields 1000
> >>>>    pf.read_row_group(1)
> >>>>
> >>>> 3) I was also expecting to be able to perform some sort of query, but
> >> I'm
> >>>> also failing to see how to specify index columns or such.
> >>>>
> >>>> What am I missing? Did I get it all wrong?
> >>>>
> >>>> Thank you!
> >>>> Gerlando
> >>>>
> >>>
> >>
> >
> >
>
>