osdir.com

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: PyArrow & Python Multiprocessing


I must say, I’m super excited about using Arrow and Plasma.

The code you just posted worked for me at home and I’m sure I’ll figure out what I was doing wrong tomorrow at work. 

Anyways, thanks so much for your help and fast replies! 

Sent from my iPhone

> On May 16, 2018, at 7:42 PM, Robert Nishihara <robertnishihara@xxxxxxxxx> wrote:
> 
> You should be able to do something like the following.
> 
> # Start the store.
> plasma_store -s /tmp/store -m 1000000000
> 
> Then in Python, do the following:
> 
> import pandas as pd
> import pyarrow.plasma as plasma
> import numpy as np
> 
> client = plasma.connect('/tmp/store', '', 0)
> series = pd.Series(np.zeros(100))
> object_id = client.put(series)
> 
> And yes, I would create a separate Plasma client for each process. I don't
> think you'll be able to pickle a Plasma client object successfully (it has
> a socket connection to the store).
> 
> On Wed, May 16, 2018 at 3:43 PM Corey Nolet <cjnolet@xxxxxxxxx> wrote:
> 
>> Robert,
>> 
>> Thank you for the quick response. I've been playing around for a few hours
>> to get a feel for how this works.
>> 
>> If I understand correctly, it's better to have the Plasma client objects
>> instantiated within each separate process? Weird things seemed to happen
>> when I attempted to share a single one. I was assuming that the pickle
>> serialization by python multiprocessing would have been serializing the
>> connection info and re-instantiating on the other side but that didn't seem
>> to be the case.
>> 
>> I managed to load up a gigantic set of CSV files into Dataframes. Now I'm
>> attempting to read the chunks, perform a groupby-aggregate, and write the
>> results back to the Plasma store. Unless I'm mistaken, there doesn't seem
>> to be a very direct way of accomplishing this. When I tried converting the
>> Series object into a Plasma Array and just doing a client.put(array) I get
>> a pickling error. Unless maybe I'm misunderstanding the architecture here,
>> I believe that error would have been referring to attempts to serialize the
>> object into a file? I would hope that the data isn't all being sent to the
>> single Plasma server (or sent over sockets for that matter).
>> 
>> What would be the recommended strategy for serializing Pandas Series
>> objects? I really like the StreamWriter concept here but there does not
>> seem to be a direct way (or documentation) to accomplish this.
>> 
>> Thanks again.
>> 
>> On Wed, May 16, 2018 at 1:28 PM, Robert Nishihara <
>> robertnishihara@xxxxxxxxx
>>> wrote:
>> 
>>> Take a look at the Plasma object store
>>> https://arrow.apache.org/docs/python/plasma.html.
>>> 
>>> Here's an example using it (along with multiprocessing to sort a pandas
>>> dataframe)
>>> https://github.com/apache/arrow/blob/master/python/
>>> examples/plasma/sorting/sort_df.py.
>>> It's possible the example is a bit out of date.
>>> 
>>> You may be interested in taking a look at Ray
>>> https://github.com/ray-project/ray. We use Plasma/Arrow under the hood
>> to
>>> do all of these things but hide a lot of the bookkeeping (like object ID
>>> generation). For your setting, you can think of it as a replacement for
>>> Python multiprocessing that automatically uses shared memory and Arrow
>> for
>>> serialization.
>>> 
>>>> On Wed, May 16, 2018 at 10:02 AM Corey Nolet <cjnolet@xxxxxxxxx> wrote:
>>>> 
>>>> I've been reading through the PyArrow documentation and trying to
>>>> understand how to use the tool effectively for IPC (using zero-copy).
>>>> 
>>>> I'm on a system with 586 cores & 1TB of ram. I'm using Panda's
>> Dataframes
>>>> to process several 10's of gigs of data in memory and the pickling that
>>> is
>>>> done by Python's multiprocessing API is very wasteful.
>>>> 
>>>> I'm running a little hand-built map-reduce where I chunk the dataframe
>>> into
>>>> N_mappers number of chunks, run some processing on them, then run some
>>>> number N_reducers to finalize the operation. What I'd like to be able
>> to
>>> do
>>>> is chunk up the dataframe into Arrow Buffer objects and just have each
>>>> mapped task read their respective Buffer object with the guarantee of
>>>> zero-copy.
>>>> 
>>>> I see there's a couple Filesystem abstractions for doing memory-mapped
>>>> files. Durability isn't something I need and I'm willing to forego the
>>>> expense of putting the files on disk.
>>>> 
>>>> Is it possible to write the data directly to memory and pass just the
>>>> reference around to the different processes? What's the recommended way
>>> to
>>>> accomplish my goal here?
>>>> 
>>>> 
>>>> Thanks in advance!
>>>> 
>>> 
>>