osdir.com

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: PyArrow & Python Multiprocessing


You're welcome!

On Wed, May 16, 2018 at 6:13 PM Corey Nolet <cjnolet@xxxxxxxxx> wrote:

> I must say, I’m super excited about using Arrow and Plasma.
>
> The code you just posted worked for me at home and I’m sure I’ll figure
> out what I was doing wrong tomorrow at work.
>
> Anyways, thanks so much for your help and fast replies!
>
> Sent from my iPhone
>
> > On May 16, 2018, at 7:42 PM, Robert Nishihara <robertnishihara@xxxxxxxxx>
> wrote:
> >
> > You should be able to do something like the following.
> >
> > # Start the store.
> > plasma_store -s /tmp/store -m 1000000000
> >
> > Then in Python, do the following:
> >
> > import pandas as pd
> > import pyarrow.plasma as plasma
> > import numpy as np
> >
> > client = plasma.connect('/tmp/store', '', 0)
> > series = pd.Series(np.zeros(100))
> > object_id = client.put(series)
> >
> > And yes, I would create a separate Plasma client for each process. I
> don't
> > think you'll be able to pickle a Plasma client object successfully (it
> has
> > a socket connection to the store).
> >
> > On Wed, May 16, 2018 at 3:43 PM Corey Nolet <cjnolet@xxxxxxxxx> wrote:
> >
> >> Robert,
> >>
> >> Thank you for the quick response. I've been playing around for a few
> hours
> >> to get a feel for how this works.
> >>
> >> If I understand correctly, it's better to have the Plasma client objects
> >> instantiated within each separate process? Weird things seemed to happen
> >> when I attempted to share a single one. I was assuming that the pickle
> >> serialization by python multiprocessing would have been serializing the
> >> connection info and re-instantiating on the other side but that didn't
> seem
> >> to be the case.
> >>
> >> I managed to load up a gigantic set of CSV files into Dataframes. Now
> I'm
> >> attempting to read the chunks, perform a groupby-aggregate, and write
> the
> >> results back to the Plasma store. Unless I'm mistaken, there doesn't
> seem
> >> to be a very direct way of accomplishing this. When I tried converting
> the
> >> Series object into a Plasma Array and just doing a client.put(array) I
> get
> >> a pickling error. Unless maybe I'm misunderstanding the architecture
> here,
> >> I believe that error would have been referring to attempts to serialize
> the
> >> object into a file? I would hope that the data isn't all being sent to
> the
> >> single Plasma server (or sent over sockets for that matter).
> >>
> >> What would be the recommended strategy for serializing Pandas Series
> >> objects? I really like the StreamWriter concept here but there does not
> >> seem to be a direct way (or documentation) to accomplish this.
> >>
> >> Thanks again.
> >>
> >> On Wed, May 16, 2018 at 1:28 PM, Robert Nishihara <
> >> robertnishihara@xxxxxxxxx
> >>> wrote:
> >>
> >>> Take a look at the Plasma object store
> >>> https://arrow.apache.org/docs/python/plasma.html.
> >>>
> >>> Here's an example using it (along with multiprocessing to sort a pandas
> >>> dataframe)
> >>> https://github.com/apache/arrow/blob/master/python/
> >>> examples/plasma/sorting/sort_df.py.
> >>> It's possible the example is a bit out of date.
> >>>
> >>> You may be interested in taking a look at Ray
> >>> https://github.com/ray-project/ray. We use Plasma/Arrow under the hood
> >> to
> >>> do all of these things but hide a lot of the bookkeeping (like object
> ID
> >>> generation). For your setting, you can think of it as a replacement for
> >>> Python multiprocessing that automatically uses shared memory and Arrow
> >> for
> >>> serialization.
> >>>
> >>>> On Wed, May 16, 2018 at 10:02 AM Corey Nolet <cjnolet@xxxxxxxxx>
> wrote:
> >>>>
> >>>> I've been reading through the PyArrow documentation and trying to
> >>>> understand how to use the tool effectively for IPC (using zero-copy).
> >>>>
> >>>> I'm on a system with 586 cores & 1TB of ram. I'm using Panda's
> >> Dataframes
> >>>> to process several 10's of gigs of data in memory and the pickling
> that
> >>> is
> >>>> done by Python's multiprocessing API is very wasteful.
> >>>>
> >>>> I'm running a little hand-built map-reduce where I chunk the dataframe
> >>> into
> >>>> N_mappers number of chunks, run some processing on them, then run some
> >>>> number N_reducers to finalize the operation. What I'd like to be able
> >> to
> >>> do
> >>>> is chunk up the dataframe into Arrow Buffer objects and just have each
> >>>> mapped task read their respective Buffer object with the guarantee of
> >>>> zero-copy.
> >>>>
> >>>> I see there's a couple Filesystem abstractions for doing memory-mapped
> >>>> files. Durability isn't something I need and I'm willing to forego the
> >>>> expense of putting the files on disk.
> >>>>
> >>>> Is it possible to write the data directly to memory and pass just the
> >>>> reference around to the different processes? What's the recommended
> way
> >>> to
> >>>> accomplish my goal here?
> >>>>
> >>>>
> >>>> Thanks in advance!
> >>>>
> >>>
> >>
>