osdir.com


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

array.array()'s memory shared with multiprocessing.Process()


Hi,

I'm writing a multiprocessing program whose behavior I don't understand.
Essentially, the main process collects data and then passes it to a consumer process.
For performance reasons I'm using a "static" circular buffer created through array.array(), and then passing it "as-is" by pushing it onto a queue.

According to:
https://docs.python.org/3/library/multiprocessing.html#pipes-and-queues

I would expect the array to be pickled by the sending process and then unpickled at the other end (i.e. no memory would be shared among the two processes).
Thus, overwriting data on the buffer should be safe in my understanding.

What happens instead is that the consumer thread may indeed receive a corrupted array, in that some elements might have already been overwritten by the producer.
I did somehow overcome this limitation by just passing a copy.copy() of the buffer, but I really don't understand why this would be necessary at all.

Could someone please shed some light on this?
Thank you!

Here's the example code:

---
import multiprocessing as mp
import array
import time

def consumer_process(queue):
  while True:
    ts_buffer = queue.get()
    #time.sleep(.1)
    i = 0
    for idx in range(1, len(ts_buffer)):
      diff = ts_buffer[idx] - ts_buffer[idx-1]
      if diff < 0:
        print("error: idx = ", idx, " diff =", diff, ":", ts_buffer[idx-1], " -> ", ts_buffer[idx])
        
queue = mp.Queue(100)
p = mp.Process(name="consumer", target=consumer_process, args=(queue,))
p.daemon = True  # can't use daemon as kwargs when using multiprocessing.dummy
p.start()
samples_dump = 20000

ts_buffer = array.array('f', bytearray(4 * (samples_dump)))

i = 0
while True:
  for idx in range(0,len(ts_buffer)):
    ts_buffer[idx] = i
    i += 1
  queue.put(ts_buffer)
  # enable this to make the error go away
  #time.sleep(.1)
---
error: idx =  18372  diff = -19999.0 : 38371.0  ->  18372.0
error: idx =  17011  diff = -19999.0 : 97010.0  ->  77011.0
error: idx =  15670  diff = -19999.0 : 135669.0  ->  115670.0
error: idx =  14914  diff = -19999.0 : 154913.0  ->  134914.0
error: idx =  19405  diff = -19999.0 : 179404.0  ->  159405.0
error: idx =  17160  diff = -19999.0 : 197159.0  ->  177160.0
error: idx =  19130  diff = -19999.0 : 219129.0  ->  199130.0
error: idx =  14298  diff = -19999.0 : 254297.0  ->  234298.0
error: idx =  9307  diff = -19999.0 : 289306.0  ->  269307.0
error: idx =  15815  diff = -19999.0 : 315814.0  ->  295815.0
error: idx =  11587  diff = -19999.0 : 331586.0  ->  311587.0
---
$ python3 --version
Python 3.5.2