classification
Title: unpickled LZMACompressor is crashy
Type: crash Stage: resolved
Components: Versions: Python 3.4, Python 3.3
process
Status: closed Resolution: fixed
Dependencies: Superseder:
Assigned To: Nosy List: cantor, jnoller, nadeem.vawda, pitrou, python-dev, sbt, tim.peters
Priority: normal Keywords:

Created on 2013-10-25 19:20 by cantor, last changed 2013-10-28 20:53 by nadeem.vawda. This issue is now closed.

Messages (13)
msg201278 - (view) Author: cantor (cantor) Date: 2013-10-25 19:20
import lzma
from functools import partial
import multiprocessing


def run_lzma(data,c):
    return c.compress(data)


def split_len(seq, length):
    return [str.encode(seq[i:i+length]) for i in range(0, len(seq), length)]



def lzma_mp(sequence,threads=3):
  lzc = lzma.LZMACompressor()
  blocksize = int(round(len(sequence)/threads))
  strings = split_len(sequence, blocksize)
  lzc_partial = partial(run_lzma,c=lzc)
  pool=multiprocessing.Pool()
  lzc_pool = list(pool.map(lzc_partial,strings))
  pool.close()
  pool.join()
  out_flush = lzc.flush()
  return b"".join(lzc_pool + [out_flush])

sequence = 'AAAAAJKDDDDDDDDDDDDDDDDDDDDDDDDDDDDGJFKSHFKLHALWEHAIHWEOIAH IOAHIOWEHIOHEIOFEAFEASFEAFWEWWWWWWWWWWWWWWWWWWWWWWWWWWWWWEWFQWEWQWQGEWQFEWFDWEWEGEFGWEG'


lzma_mp(sequence,threads=3)
msg201280 - (view) Author: cantor (cantor) Date: 2013-10-25 19:28
lzma
msg201287 - (view) Author: Antoine Pitrou (pitrou) * (Python committer) Date: 2013-10-25 20:22
The problem is that using an unpickled LZMACompressor crashes:

$ ./python -c "import lzma, pickle; c = pickle.loads(pickle.dumps(lzma.LZMACompressor())); c.compress(b'')"
Erreur de segmentation

Here is the gdb backtrace:

#0  0x00007ffff7bcafc0 in sem_trywait () from /lib/x86_64-linux-gnu/libpthread.so.0
#1  0x0000000000436c15 in PyThread_acquire_lock_timed (lock=0x0, microseconds=0, intr_flag=0) at Python/thread_pthread.h:350
#2  0x0000000000436db8 in PyThread_acquire_lock (lock=0x0, waitflag=0) at Python/thread_pthread.h:556
#3  0x00007ffff64a6538 in Compressor_compress (self=0x7ffff7e129a0, args=0x7ffff7f17468) at /home/antoine/cpython/default/Modules/_lzmamodule.c:533
msg201291 - (view) Author: Nadeem Vawda (nadeem.vawda) * (Python committer) Date: 2013-10-25 20:49
As far as I can tell, liblzma provides no way to serialize a compressor's
state, so the best we can do is raise a TypeError when attempting to
pickle the LZMACompressor (and likewise for LZMADecompressor).

Also, it's worth pointing out that the provided code wouldn't work even
if you could serialize LZMACompressor objects - each call to compress()
updates the compressor's internal state with information needed by the
final call to flush(), but each compress() call would be made on a
*copy* of the compressor rather than the original object. So flush()
would end up producing bogus data (and mostly likely all compress()
calls after the first would too).

If you are trying to do this because LZMA compression is too slow, I'd
suggest you try using zlib or bz2 instead - both of these algorithms
can compress faster than LZMA (at the expense of your compression ratio).
zlib is faster on both compression and decompression, while bz2 is slower
than lzma at decompression.

Alternatively, you can do parallel compression by calling lzma.compress()
on each block (instead of creating an LZMACompressor), and then joining
the results. But note that (a) this will give you a worse compression
ratio than serial compression (because it can't exploit redundancy shared
between blocks), and (b) using multiprocessing has a performance overhead
of its own, because you will need to copy the input when sending it to
the worker subprocess, and then copy the result when sending it back to
the main process.
msg201293 - (view) Author: cantor (cantor) Date: 2013-10-25 21:04
just to mention that map() (i.e. the non parallel version) works:

import lzma
from functools import partial
import multiprocessing

def run_lzma(data,c):
    return c.compress(data)


def split_len(seq, length):
    return [str.encode(seq[i:i+length]) for i in range(0, len(seq), length)]



sequence='AAAAAJKDDDDDDDDDDDDDDDDDDDDDDDDDDDDGJFKSHFKLHALWEHAIHWEOIAH IOAHIOWEHIOHEIOFEAFEASFEAFWEWWWWWWWWWWWWWWWWWWWWWWWWWWWWWEWFQWEWQWQGEWQFEWFDWEWEGEFGWEG'
threads=3
blocksize = int(round(len(sequence)/threads))
strings = split_len(sequence, blocksize)


#map works

lzc = lzma.LZMACompressor()
out = list(map(lzc.compress,strings))
out_flush = lzc.flush()
result = b"".join(out + [out_flush])
lzma.compress(str.encode(sequence))
lzma.compress(str.encode(sequence)) == result
True

# map with the use of partial function works as well 
lzc = lzma.LZMACompressor()
lzc_partial = partial(run_lzma,c=lzc)
out = list(map(lzc_partial,strings))
out_flush = lzc.flush()
result = b"".join(out + [out_flush])
lzma.compress(str.encode(sequence)) == result
msg201294 - (view) Author: Nadeem Vawda (nadeem.vawda) * (Python committer) Date: 2013-10-25 21:12
Yes, that's because the builtin map function doesn't handle each input
in a separate process, so it uses the same LZMACompressor object
everywhere. Whereas multiprocessing.Pool.map creates a new copy of the
compressor object for each input, which is where the problem comes in.
msg201295 - (view) Author: cantor (cantor) Date: 2013-10-25 21:23
in python 2.7.3  this kind of works however it is less efficient than the pure lzma.compress()

from threading import Thread
from backports import lzma
from functools import partial
import multiprocessing


class CompressClass(Thread):
  def __init__ (self,data,c):
    Thread.__init__(self)
    self.exception=False
    self.data=data
    self.datacompressed=""
    self.c=c
  def getException(self):
    return self.exception   
  def getOutput(self):
    return self.datacompressed
  def run(self):
        self.datacompressed=(self.c).compress(self.data)


def split_len(seq, length):
    return [seq[i:i+length] for i in range(0, len(seq), length)]



def launch_multiple_lzma(data,c):
    print 'cores'
    present=CompressClass(data,c) 
    present.start()      
    present.join()
    return present.getOutput()


def threaded_lzma_map(sequence,threads):
  lzc = lzma.LZMACompressor()
  blocksize = int(round(len(sequence)/threads))
  lzc_partial = partial(launch_multiple_lzma,c=lzc)
  lzlist = map(lzc_partial,split_len(sequence, blocksize))
  #pool=multiprocessing.Pool()
  #lzclist = pool.map(lzc_partial,split_len(sequence, blocksize))
  #pool.close()
  #pool.join()
  out_flush = lzc.flush()
  res = "".join(lzlist + [out_flush])
  return res 

sequence = 'AAAAAJKDDDDDDDDDDDDDDDDDDDDDDDDDDDDGJFKSHFKLHALWEHAIHWEOIAH IOAHIOWEHIOHEIOFEAFEASFEAFWEWWWWWWWWWWWWWWWWWWWWWWWWWWWWWEWFQWEWQWQGEWQFEWFDWEWEGEFGWEG'

lzma.compress(sequence) == threaded_lzma_map(sequence,threads=16)

Any way this could be imporved?
msg201311 - (view) Author: cantor (cantor) Date: 2013-10-26 03:05
python 3.3 version - tried this code and got a sliglty faster processing time then when running lzma.compress() on its own. Could this be improved upon?

import lzma
from functools import partial
from threading import Thread

def split_len(seq, length):
    return [str.encode(seq[i:i+length]) for i in range(0, len(seq), length)]


class CompressClass(Thread):
  def __init__ (self,data,c):
    Thread.__init__(self)
    self.exception=False
    self.data=data
    self.datacompressed=""
    self.c=c
  def getException(self):
    return self.exception   
  def getOutput(self):
    return self.datacompressed
  def run(self):
        self.datacompressed=(self.c).compress(self.data)


def launch_multiple_lzma(data,c):
    present=CompressClass(data,c) 
    present.start()      
    present.join()
    return present.getOutput()


def threaded_lzma_map(sequence,threads):
  lzc = lzma.LZMACompressor()
  blocksize = int(round(len(sequence)/threads))
  lzc_partial = partial(launch_multiple_lzma,c=lzc)
  lzlist = list(map(lzc_partial,split_len(sequence, blocksize)))
  out_flush = lzc.flush()
  return b"".join(lzlist + [out_flush])

threaded_lzma_map(sequence,threads=16)
msg201312 - (view) Author: Tim Peters (tim.peters) * (Python committer) Date: 2013-10-26 03:08
@cantor, this is a Python issue tracker, not a help desk.  If you want advice about Python programming, please use the Python mailing list or any number of "help desk" web sites (e.g., stackoverflow).
msg201373 - (view) Author: Antoine Pitrou (pitrou) * (Python committer) Date: 2013-10-26 17:00
If it's not possible (or easily doable) to recreate the compressor's internal state, I agree it would be helpful for pickling to raise a TypeError.
msg201389 - (view) Author: Nadeem Vawda (nadeem.vawda) * (Python committer) Date: 2013-10-26 18:11
It looks like there's also a separate problem in the multiprocessing
module. The following code hangs after hitting a TypeError trying to
pickle one of the TextIOWrapper objects:

    import multiprocessing

    def read(f): return f.read()

    files = [open(path) for path in 3 * ['/dev/null']]
    pool = multiprocessing.Pool()
    results = pool.map(read, files)
    print(results)
msg201577 - (view) Author: Roundup Robot (python-dev) Date: 2013-10-28 20:46
New changeset be363c1e94fe by Nadeem Vawda in branch '3.3':
#19395: Raise exception when pickling a (BZ2|LZMA)(Compressor|Decompressor).
http://hg.python.org/cpython/rev/be363c1e94fe

New changeset b9df25608ad0 by Nadeem Vawda in branch 'default':
#19395: Raise exception when pickling a (BZ2|LZMA)(Compressor|Decompressor).
http://hg.python.org/cpython/rev/b9df25608ad0
msg201581 - (view) Author: Nadeem Vawda (nadeem.vawda) * (Python committer) Date: 2013-10-28 20:53
The part of this issue specific to LZMACompressor should now be fixed;
I've filed issue 19425 for the issue with Pool.map hanging.
History
Date User Action Args
2013-10-28 20:53:56nadeem.vawdasetstatus: open -> closed
resolution: fixed
messages: + msg201581

stage: needs patch -> resolved
2013-10-28 20:46:06python-devsetnosy: + python-dev
messages: + msg201577
2013-10-26 18:11:41nadeem.vawdasetnosy: + jnoller, sbt
messages: + msg201389
2013-10-26 17:00:01pitrousetmessages: + msg201373
2013-10-26 03:08:44tim.peterssetnosy: + tim.peters
messages: + msg201312
2013-10-26 03:05:46cantorsetmessages: + msg201311
2013-10-25 21:23:18cantorsetmessages: + msg201295
2013-10-25 21:12:34nadeem.vawdasetmessages: + msg201294
2013-10-25 21:04:59cantorsetmessages: + msg201293
2013-10-25 20:49:19nadeem.vawdasetmessages: + msg201291
2013-10-25 20:22:07pitrousetversions: + Python 3.4
type: behavior -> crash

nosy: + pitrou
title: lzma hangs for a very long time when run in parallel using python's muptiprocessing module? -> unpickled LZMACompressor is crashy
messages: + msg201287
stage: needs patch
2013-10-25 19:36:01cantorsetcomponents: - ctypes
2013-10-25 19:28:17cantorsetmessages: + msg201280
2013-10-25 19:27:55cantorsetnosy: + nadeem.vawda
2013-10-25 19:20:43cantorcreate