diff -r 08223e6cf325 Lib/multiprocessing/connection.py --- a/Lib/multiprocessing/connection.py Sun Oct 02 13:19:30 2011 -0400 +++ b/Lib/multiprocessing/connection.py Sun Oct 02 13:37:11 2011 -0700 @@ -298,8 +298,11 @@ def _send_bytes(self, buf): overlapped = win32.WriteFile(self._handle, buf, overlapped=True) nwritten, complete = overlapped.GetOverlappedResult(True) - assert complete - assert nwritten == len(buf) + if not complete: + raise WindowsError("GetOverlappedResult failed") + if nwritten != len(buf): + raise ValueError("wrote %d bytes, expected to write %d bytes" % + (nwritten, len(buf))) def _recv_bytes(self, maxsize=None, sentinels=()): if sentinels: @@ -319,7 +322,9 @@ overlapped = win32.ReadFile(self._handle, bufsize, overlapped=True) lenfirstchunk, complete = overlapped.GetOverlappedResult(True) firstchunk = overlapped.getbuffer() - assert lenfirstchunk == len(firstchunk) + if lenfirstchunk != len(firstchunk): + raise ValueError("length of first chunk should be %d, actual value is %d" % + (lenfirstchunk, len(firstchunk))) except IOError as e: if e.errno == win32.ERROR_BROKEN_PIPE: raise EOFError @@ -333,8 +338,11 @@ if nleft > 0: overlapped = win32.ReadFile(self._handle, nleft, overlapped=True) res, complete = overlapped.GetOverlappedResult(True) - assert res == nleft - assert complete + if not complete: + raise WindowsError("GetOverlappedResult failed") + if res != nleft: + raise ValueError("wrote %d bytes, expected to write %d bytes" % + (res, nleft)) buf.write(overlapped.getbuffer()) return buf @@ -367,7 +375,8 @@ overlapped.GetOverlappedResult(True) self._buffered += overlapped.getbuffer() return True - assert 0 < idx < len(handles) + if 0 >= idx or idx >= len(handles): + raise ValueError("index should be between 0 and %d" % len(handles)) raise SentinelReady([handles[idx]]) @@ -717,7 +726,8 @@ def deliver_challenge(connection, authkey): import hmac - assert isinstance(authkey, bytes) + if not isinstance(authkey, bytes): + raise TypeError("authkey should be a byte string") message = os.urandom(MESSAGE_LENGTH) connection.send_bytes(CHALLENGE + message) digest = hmac.new(authkey, message).digest() @@ -730,9 +740,11 @@ def answer_challenge(connection, authkey): import hmac - assert isinstance(authkey, bytes) + if not isinstance(authkey, bytes): + raise TypeError("authkey should be a byte string") message = connection.recv_bytes(256) # reject large message - assert message[:len(CHALLENGE)] == CHALLENGE, 'message = %r' % message + if message[:len(CHALLENGE)] != CHALLENGE: + raise ValueError("invalid message '%r'" % message) message = message[len(CHALLENGE):] digest = hmac.new(authkey, message).digest() connection.send_bytes(digest) diff -r 08223e6cf325 Lib/multiprocessing/forking.py --- a/Lib/multiprocessing/forking.py Sun Oct 02 13:19:30 2011 -0400 +++ b/Lib/multiprocessing/forking.py Sun Oct 02 13:37:11 2011 -0700 @@ -152,7 +152,9 @@ if os.WIFSIGNALED(sts): self.returncode = -os.WTERMSIG(sts) else: - assert os.WIFEXITED(sts) + if not os.WIFEXITED(sts): + raise ValueError("unexpected WIFEXITED return value '%r'" % + os.WIFEXITED(sts)) self.returncode = os.WEXITSTATUS(sts) return self.returncode @@ -321,7 +323,9 @@ Return whether commandline indicates we are forking ''' if len(argv) >= 2 and argv[1] == '--multiprocessing-fork': - assert len(argv) == 3 + if len(argv) != 3: + raise ValueError("invalid number of arguments %d, should be 3" % + len(argv)) return True else: return False @@ -366,7 +370,8 @@ ''' Run code specifed by data received over pipe ''' - assert is_forking(sys.argv) + if not is_forking(sys.argv): + raise RuntimeError("process not forking") handle = int(sys.argv[-1]) fd = msvcrt.open_osfhandle(handle, os.O_RDONLY) @@ -493,7 +498,9 @@ else: dirs = [os.path.dirname(main_path)] - assert main_name not in sys.modules, main_name + if main_name in sys.modules: + raise ValueError("'%r' should not be in loaded modules" % + main_name) file, path_name, etc = imp.find_module(main_name, dirs) try: # We would like to do "imp.load_module('__main__', ...)" diff -r 08223e6cf325 Lib/multiprocessing/heap.py --- a/Lib/multiprocessing/heap.py Sun Oct 02 13:19:30 2011 -0400 +++ b/Lib/multiprocessing/heap.py Sun Oct 02 13:37:11 2011 -0700 @@ -62,7 +62,8 @@ self.size = size self.name = 'pym-%d-%d' % (os.getpid(), next(Arena._counter)) self.buffer = mmap.mmap(-1, self.size, tagname=self.name) - assert win32.GetLastError() == 0, 'tagname already in use' + if win32.GetLastError() != 0: + raise WindowsError("tagname already in use") self._state = (self.size, self.name) def __getstate__(self): @@ -72,7 +73,8 @@ def __setstate__(self, state): self.size, self.name = self._state = state self.buffer = mmap.mmap(-1, self.size, tagname=self.name) - assert win32.GetLastError() == win32.ERROR_ALREADY_EXISTS + if win32.GetLastError() != win32.ERROR_ALREADY_EXISTS: + raise WindowsError("mmap failed") else: @@ -197,7 +199,9 @@ # synchronously sometimes later from malloc() or free(), by calling # _free_pending_blocks() (appending and retrieving from a list is not # strictly thread-safe but under cPython it's atomic thanks to the GIL). - assert os.getpid() == self._lastpid + if os.getpid() != self._lastpid: + raise ValueError("unexpected pid number '%d', should be '%d'" % + (os.getpid(), self._lastpid)) if not self._lock.acquire(False): # can't acquire the lock right now, add the block to the list of # pending blocks to free @@ -213,7 +217,9 @@ def malloc(self, size): # return a block of right size (possibly rounded up) - assert 0 <= size < sys.maxsize + if 0 > size or size >= sys.maxsize: + raise ValueError("invalid size %d, should be between 0 and %d" % + (size, sys.maxsize)) if os.getpid() != self._lastpid: self.__init__() # reinitialize after fork self._lock.acquire() @@ -239,7 +245,9 @@ _heap = Heap() def __init__(self, size): - assert 0 <= size < sys.maxsize + if 0 > size or size >= sys.maxsize: + raise ValueError("invalid size %d, should be between 0 and %d" % + (size, sys.maxsize)) block = BufferWrapper._heap.malloc(size) self._state = (block, size) Finalize(self, BufferWrapper._heap.free, args=(block,)) @@ -247,7 +255,9 @@ def get_address(self): (arena, start, stop), size = self._state address, length = _multiprocessing.address_of_buffer(arena.buffer) - assert size <= length + if size > length: + raise ValueError("invalid size %d, should be <= %d" % + (size, length)) return address + start def get_size(self): diff -r 08223e6cf325 Lib/multiprocessing/managers.py --- a/Lib/multiprocessing/managers.py Sun Oct 02 13:19:30 2011 -0400 +++ b/Lib/multiprocessing/managers.py Sun Oct 02 13:37:11 2011 -0700 @@ -111,10 +111,12 @@ if kind == '#ERROR': return result elif kind == '#TRACEBACK': - assert type(result) is str + if type(result) is not str: + raise TypeError("result '%r' is not a string" % result) return RemoteError(result) elif kind == '#UNSERIALIZABLE': - assert type(result) is str + if type(result) is not str: + raise TypeError("result '%r' is not a string" % result) return RemoteError('Unserializable message: %s\n' % result) else: return ValueError('Unrecognized message type') @@ -156,7 +158,8 @@ 'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref'] def __init__(self, registry, address, authkey, serializer): - assert isinstance(authkey, bytes) + if not isinstance(authkey, bytes): + raise TypeError("authkey should be a byte string") self.registry = registry self.authkey = AuthenticationString(authkey) Listener, Client = listener_client[serializer] @@ -201,7 +204,8 @@ connection.answer_challenge(c, self.authkey) request = c.recv() ignore, funcname, args, kwds = request - assert funcname in self.public, '%r unrecognized' % funcname + if funcname not in self.public: + raise ValueError("unrecognized function '%r'" % funcname) func = getattr(self, funcname) except Exception: msg = ('#TRACEBACK', format_exc()) @@ -383,7 +387,11 @@ self.registry[typeid] if callable is None: - assert len(args) == 1 and not kwds + if len(args) != 1: + raise ValueError("invalid number of arguments %d, should be 1" % + len(args)) + if kwds: + raise ValueError("unexpected kwds argument") obj = args[0] else: obj = callable(*args, **kwds) @@ -391,7 +399,9 @@ if exposed is None: exposed = public_methods(obj) if method_to_typeid is not None: - assert type(method_to_typeid) is dict + if type(method_to_typeid) is not dict: + raise TypeError("method_to_typeid should be a dict, not '%r'" % + type(method_to_typeid)) exposed = list(exposed) + list(method_to_typeid) ident = '%x' % id(obj) # convert to string because xmlrpclib @@ -435,7 +445,10 @@ def decref(self, c, ident): self.mutex.acquire() try: - assert self.id_to_refcount[ident] >= 1 + if self.id_to_refcount[ident] < 1: + raise ValueError("invalid refcount %d, should be >= 1" % + self.id_to_refcount[ident]) + self.id_to_refcount[ident] -= 1 if self.id_to_refcount[ident] == 0: del self.id_to_obj[ident], self.id_to_refcount[ident] @@ -491,7 +504,9 @@ ''' Return server object with serve_forever() method and address attribute ''' - assert self._state.value == State.INITIAL + if self._state.value != State.INITIAL: + raise ValueError("invalid state '%r', should be '%r'" % + (self._state.value, State.INITIAL)) return Server(self._registry, self._address, self._authkey, self._serializer) @@ -508,7 +523,9 @@ ''' Spawn a server process for this manager object ''' - assert self._state.value == State.INITIAL + if self._state.value != State.INITIAL: + raise ValueError("invalid state '%r', should be '%r'" % + (self._state.value, State.INITIAL)) if initializer is not None and not hasattr(initializer, '__call__'): raise TypeError('initializer must be a callable') @@ -564,7 +581,8 @@ ''' Create a new shared object; return the token and exposed tuple ''' - assert self._state.value == State.STARTED, 'server not yet started' + if self._state.value != State.STARTED: + raise RuntimeError("server not yet started") conn = self._Client(self._address, authkey=self._authkey) try: id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds) @@ -657,8 +675,10 @@ if method_to_typeid: for key, value in list(method_to_typeid.items()): - assert type(key) is str, '%r is not a string' % key - assert type(value) is str, '%r is not a string' % value + if type(key) is not str: + raise TypeError("'%r' is not a string" % key) + if type(value) is not str: + raise TypeError("'%r' is not a string" % value) cls._registry[typeid] = ( callable, exposed, method_to_typeid, proxytype diff -r 08223e6cf325 Lib/multiprocessing/pool.py --- a/Lib/multiprocessing/pool.py Sun Oct 02 13:19:30 2011 -0400 +++ b/Lib/multiprocessing/pool.py Sun Oct 02 13:37:11 2011 -0700 @@ -86,7 +86,12 @@ def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None): - assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0) + if maxtasks != None: + if type(maxtasks) != int: + raise TypeError("invalid type '%r' for maxtasks, expected 'int'" % + type(maxtasks)) + if maxtasks <= 0: + raise ValueError("invalid maxtasks value %d, should be > 0" % maxtasks) put = outqueue.put get = inqueue.get if hasattr(inqueue, '_writer'): @@ -239,7 +244,8 @@ ''' Equivalent of `func(*args, **kwds)`. ''' - assert self._state == RUN + if self._state != RUN: + raise ValueError("invalid state '%r', should be '%r'" % (self._state, RUN)) return self.apply_async(func, args, kwds).get() def map(self, func, iterable, chunksize=None): @@ -247,21 +253,24 @@ Apply `func` to each element in `iterable`, collecting the results in a list that is returned. ''' - assert self._state == RUN + if self._state != RUN: + raise ValueError("invalid state '%r', should be '%r'" % (self._state, RUN)) return self.map_async(func, iterable, chunksize).get() def imap(self, func, iterable, chunksize=1): ''' Equivalent of `map()` -- can be MUCH slower than `Pool.map()`. ''' - assert self._state == RUN + if self._state != RUN: + raise ValueError("invalid state '%r', should be '%r'" % (self._state, RUN)) if chunksize == 1: result = IMapIterator(self._cache) self._taskqueue.put((((result._job, i, func, (x,), {}) for i, x in enumerate(iterable)), result._set_length)) return result else: - assert chunksize > 1 + if chunksize <= 1: + raise ValueError("invalid chunksize %d, should be > 1" % chunksize) task_batches = Pool._get_tasks(func, iterable, chunksize) result = IMapIterator(self._cache) self._taskqueue.put((((result._job, i, mapstar, (x,), {}) @@ -272,14 +281,16 @@ ''' Like `imap()` method but ordering of results is arbitrary. ''' - assert self._state == RUN + if self._state != RUN: + raise ValueError("invalid state '%r', should be '%r'" % (self._state, RUN)) if chunksize == 1: result = IMapUnorderedIterator(self._cache) self._taskqueue.put((((result._job, i, func, (x,), {}) for i, x in enumerate(iterable)), result._set_length)) return result else: - assert chunksize > 1 + if chunksize <= 1: + raise ValueError("invalid chunksize %d, should be > 1" % chunksize) task_batches = Pool._get_tasks(func, iterable, chunksize) result = IMapUnorderedIterator(self._cache) self._taskqueue.put((((result._job, i, mapstar, (x,), {}) @@ -291,7 +302,8 @@ ''' Asynchronous version of `apply()` method. ''' - assert self._state == RUN + if self._state != RUN: + raise ValueError("invalid state '%r', should be '%r'" % (self._state, RUN)) result = ApplyResult(self._cache, callback, error_callback) self._taskqueue.put(([(result._job, None, func, args, kwds)], None)) return result @@ -301,7 +313,8 @@ ''' Asynchronous version of `map()` method. ''' - assert self._state == RUN + if self._state != RUN: + raise ValueError("invalid state '%r', should be '%r'" % (self._state, RUN)) if not hasattr(iterable, '__len__'): iterable = list(iterable) @@ -379,7 +392,9 @@ return if thread._state: - assert thread._state == TERMINATE + if thread._state != TERMINATE: + raise ValueError("invalid thread state '%r', should be '%r'" % + (thread_state, TERMINATE)) debug('result handler found thread._state=TERMINATE') break @@ -453,7 +468,9 @@ def join(self): debug('joining pool') - assert self._state in (CLOSE, TERMINATE) + if self._state not in (CLOSE, TERMINATE): + raise ValueError("invalid state '%r', should be either " + "CLOSE or TERMINATE" % self._state) self._worker_handler.join() self._task_handler.join() self._result_handler.join() @@ -481,7 +498,8 @@ debug('helping task handler/workers to finish') cls._help_stuff_finish(inqueue, task_handler, len(pool)) - assert result_handler.is_alive() or len(cache) == 0 + if not result_handler.is_alive() and len(cache) != 0: + raise RuntimeError("handler not alive with non-empty cache") result_handler._state = TERMINATE outqueue.put(None) # sentinel @@ -531,7 +549,8 @@ return self._ready def successful(self): - assert self._ready + if not self._ready: + raise RuntimeError("not ready") return self._success def wait(self, timeout=None): diff -r 08223e6cf325 Lib/multiprocessing/process.py --- a/Lib/multiprocessing/process.py Sun Oct 02 13:19:30 2011 -0400 +++ b/Lib/multiprocessing/process.py Sun Oct 02 13:37:11 2011 -0700 @@ -94,7 +94,8 @@ def __init__(self, group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None): - assert group is None, 'group argument must be None for now' + if group is not None: + raise ValueError("group argument must be None for now") count = next(_current_process._counter) self._identity = _current_process._identity + (count,) self._authkey = _current_process._authkey @@ -123,11 +124,12 @@ ''' Start child process ''' - assert self._popen is None, 'cannot start a process twice' - assert self._parent_pid == os.getpid(), \ - 'can only start a process object created by current process' - assert not _current_process._daemonic, \ - 'daemonic processes are not allowed to have children' + if self._popen is not None: + raise RuntimeError("cannot start a process twice") + if self._parent_pid != os.getpid(): + raise RuntimeError("can only start a process object created by current process") + if _current_process._daemonic: + raise RuntimeError("daemonic processes are not allowed to have children") _cleanup() if self._Popen is not None: Popen = self._Popen @@ -147,8 +149,10 @@ ''' Wait until child process terminates ''' - assert self._parent_pid == os.getpid(), 'can only join a child process' - assert self._popen is not None, 'can only join a started process' + if self._parent_pid != os.getpid(): + raise RuntimeError("can only join a child process") + if self._popen is None: + raise RuntimeError("can only join a started process") res = self._popen.wait(timeout) if res is not None: _current_process._children.discard(self) @@ -159,7 +163,8 @@ ''' if self is _current_process: return True - assert self._parent_pid == os.getpid(), 'can only test a child process' + if self._parent_pid != os.getpid(): + raise RuntimeError("can only test a child process") if self._popen is None: return False self._popen.poll() @@ -171,7 +176,8 @@ @name.setter def name(self, name): - assert isinstance(name, str), 'name must be a string' + if not isinstance(name, str): + raise TypeError("name '%r' is not a string" % name) self._name = name @property @@ -186,7 +192,8 @@ ''' Set whether process is a daemon ''' - assert self._popen is None, 'process has already started' + if self._popen is not None: + raise RuntimeError("process has already started") self._daemonic = daemonic @property diff -r 08223e6cf325 Lib/multiprocessing/queues.py --- a/Lib/multiprocessing/queues.py Sun Oct 02 13:19:30 2011 -0400 +++ b/Lib/multiprocessing/queues.py Sun Oct 02 13:37:11 2011 -0700 @@ -100,7 +100,8 @@ self._poll = self._reader.poll def put(self, obj, block=True, timeout=None): - assert not self._closed + if self._closed: + raise RuntimeError("queue is closed") if not self._sem.acquire(block, timeout): raise Full @@ -161,7 +162,8 @@ def join_thread(self): debug('Queue.join_thread()') - assert self._closed + if not self._closed: + raise RuntimeError("queue not closed") if self._jointhread: self._jointhread() @@ -314,7 +316,8 @@ self._cond, self._unfinished_tasks = state[-2:] def put(self, obj, block=True, timeout=None): - assert not self._closed + if self._closed: + raise RuntimeError("queue closed") if not self._sem.acquire(block, timeout): raise Full diff -r 08223e6cf325 Lib/multiprocessing/sharedctypes.py --- a/Lib/multiprocessing/sharedctypes.py Sun Oct 02 13:19:30 2011 -0400 +++ b/Lib/multiprocessing/sharedctypes.py Sun Oct 02 13:37:11 2011 -0700 @@ -124,7 +124,8 @@ return new_obj def synchronized(obj, lock=None): - assert not isinstance(obj, SynchronizedBase), 'object already synchronized' + if isinstance(obj, SynchronizedBase): + raise RuntimeError("object already synchronized") if isinstance(obj, ctypes._SimpleCData): return Synchronized(obj, lock) diff -r 08223e6cf325 Lib/multiprocessing/synchronize.py --- a/Lib/multiprocessing/synchronize.py Sun Oct 02 13:19:30 2011 -0400 +++ b/Lib/multiprocessing/synchronize.py Sun Oct 02 13:37:11 2011 -0700 @@ -230,8 +230,8 @@ return '' % (self._lock, num_waiters) def wait(self, timeout=None): - assert self._lock._semlock._is_mine(), \ - 'must acquire() condition before using wait()' + if not self._lock._semlock._is_mine(): + raise RuntimeError("must acquire() condition before using wait()") # indicate that this thread is going to sleep self._sleeping_count.release() @@ -254,14 +254,17 @@ return ret def notify(self): - assert self._lock._semlock._is_mine(), 'lock is not owned' - assert not self._wait_semaphore.acquire(False) + if not self._lock._semlock._is_mine(): + raise RuntimeError("lock is not owned") + if self._wait_semaphore.acquire(False): + raise RuntimeError("semaphore should have been acquired already") # to take account of timeouts since last notify() we subtract # woken_count from sleeping_count and rezero woken_count while self._woken_count.acquire(False): res = self._sleeping_count.acquire(False) - assert res + if not res: + return RuntimeError("acquire() failed") if self._sleeping_count.acquire(False): # try grabbing a sleeper self._wait_semaphore.release() # wake up one sleeper @@ -271,14 +274,17 @@ self._wait_semaphore.acquire(False) def notify_all(self): - assert self._lock._semlock._is_mine(), 'lock is not owned' - assert not self._wait_semaphore.acquire(False) + if not self._lock._semlock._is_mine(): + raise RuntimeError("lock is not owned") + if self._wait_semaphore.acquire(False): + raise RuntimeError("semaphore should have been acquired already") # to take account of timeouts since last notify*() we subtract # woken_count from sleeping_count and rezero woken_count while self._woken_count.acquire(False): res = self._sleeping_count.acquire(False) - assert res + if not res: + raise RuntimeError("acquire() failed") sleepers = 0 while self._sleeping_count.acquire(False): diff -r 08223e6cf325 Lib/multiprocessing/util.py --- a/Lib/multiprocessing/util.py Sun Oct 02 13:19:30 2011 -0400 +++ b/Lib/multiprocessing/util.py Sun Oct 02 13:37:11 2011 -0700 @@ -174,12 +174,15 @@ Class which supports object finalization using weakrefs ''' def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None): - assert exitpriority is None or type(exitpriority) is int + if exitpriority is not None: + if type(exitpriority) is not int: + raise TypeError("exitpriority should be an int") if obj is not None: self._weakref = weakref.ref(obj, self) else: - assert exitpriority is not None + if exitpriority is None: + raise ValueError("exitpriority should not be None") self._callback = callback self._args = args