diff -r b3987d758e49 Lib/asyncio/base_events.py --- a/Lib/asyncio/base_events.py Tue Feb 18 00:11:21 2014 +0100 +++ b/Lib/asyncio/base_events.py Tue Feb 18 00:39:25 2014 +0100 @@ -15,7 +15,6 @@ to modify the meaning of the API call it import collections -import concurrent.futures import heapq import logging import socket @@ -27,6 +26,7 @@ import sys from . import events from . import futures from . import tasks +from .executor import get_default_executor from .log import logger @@ -294,7 +294,7 @@ class BaseEventLoop(events.AbstractEvent if executor is None: executor = self._default_executor if executor is None: - executor = concurrent.futures.ThreadPoolExecutor(_MAX_WORKERS) + executor = get_default_executor() self._default_executor = executor return futures.wrap_future(executor.submit(callback, *args), loop=self) diff -r b3987d758e49 Lib/asyncio/executor.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Lib/asyncio/executor.py Tue Feb 18 00:39:25 2014 +0100 @@ -0,0 +1,84 @@ +from .log import logger + +__all__ = ( + 'CancelledError', 'TimeoutError', + 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED', + ) + +# Argument for default thread pool executor creation. +_MAX_WORKERS = 5 + +try: + import concurrent.futures + import concurrent.futures._base +except ImportError: + FIRST_COMPLETED = 'FIRST_COMPLETED' + FIRST_EXCEPTION = 'FIRST_EXCEPTION' + ALL_COMPLETED = 'ALL_COMPLETED' + + class Future(object): + def __init__(self, callback, args): + try: + self._result = callback(*args) + self._exception = None + except Exception as err: + self._result = None + self._exception = err + self.callbacks = [] + + def cancelled(self): + return False + + def done(self): + return True + + def exception(self): + return self._exception + + def result(self): + if self._exception is not None: + raise self._exception + else: + return self._result + + def add_done_callback(self, callback): + callback(self) + + class Error(Exception): + """Base class for all future-related exceptions.""" + pass + + class CancelledError(Error): + """The Future was cancelled.""" + pass + + class TimeoutError(Error): + """The operation exceeded the given deadline.""" + pass + + class SynchronousExecutor: + """ + Synchronous executor: submit() blocks until it gets the result. + """ + def submit(self, callback, *args): + return Future(callback, args) + + def shutdown(self, wait): + pass + + def get_default_executor(): + logger.error("concurrent.futures module is missing: " + "use a synchrounous executor as fallback!") + return SynchronousExecutor() +else: + FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED + FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION + ALL_COMPLETED = concurrent.futures.ALL_COMPLETED + + Future = concurrent.futures.Future + Error = concurrent.futures._base.Error + CancelledError = concurrent.futures.CancelledError + TimeoutError = concurrent.futures.TimeoutError + + def get_default_executor(): + return concurrent.futures.ThreadPoolExecutor(_MAX_WORKERS) diff -r b3987d758e49 Lib/asyncio/futures.py --- a/Lib/asyncio/futures.py Tue Feb 18 00:11:21 2014 +0100 +++ b/Lib/asyncio/futures.py Tue Feb 18 00:39:25 2014 +0100 @@ -5,12 +5,12 @@ 'Future', 'wrap_future', ] -import concurrent.futures._base import logging import sys import traceback from . import events +from . import executor from .log import logger # States for Future. @@ -20,10 +20,9 @@ from .log import logger _PY34 = sys.version_info >= (3, 4) -# TODO: Do we really want to depend on concurrent.futures internals? -Error = concurrent.futures._base.Error -CancelledError = concurrent.futures.CancelledError -TimeoutError = concurrent.futures.TimeoutError +Error = executor.Error +CancelledError = executor.CancelledError +TimeoutError = executor.TimeoutError STACK_DEBUG = logging.DEBUG - 1 # heavy-duty debugging @@ -347,7 +346,7 @@ def wrap_future(fut, *, loop=None): """Wrap concurrent.futures.Future object.""" if isinstance(fut, Future): return fut - assert isinstance(fut, concurrent.futures.Future), \ + assert isinstance(fut, executor.Future), \ 'concurrent.futures.Future is expected, got {!r}'.format(fut) if loop is None: loop = events.get_event_loop() diff -r b3987d758e49 Lib/asyncio/tasks.py --- a/Lib/asyncio/tasks.py Tue Feb 18 00:11:21 2014 +0100 +++ b/Lib/asyncio/tasks.py Tue Feb 18 00:39:25 2014 +0100 @@ -8,7 +8,6 @@ ] import collections -import concurrent.futures import functools import inspect import linecache @@ -16,6 +15,7 @@ import traceback import weakref from . import events +from . import executor from . import futures from .log import logger @@ -338,9 +338,9 @@ class Task(futures.Future): # wait() and as_completed() similar to those in PEP 3148. -FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED -FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION -ALL_COMPLETED = concurrent.futures.ALL_COMPLETED +FIRST_COMPLETED = executor.FIRST_COMPLETED +FIRST_EXCEPTION = executor.FIRST_EXCEPTION +ALL_COMPLETED = executor.ALL_COMPLETED @coroutine diff -r b3987d758e49 Lib/test/test_asyncio/__init__.py --- a/Lib/test/test_asyncio/__init__.py Tue Feb 18 00:11:21 2014 +0100 +++ b/Lib/test/test_asyncio/__init__.py Tue Feb 18 00:39:25 2014 +0100 @@ -5,8 +5,6 @@ from test.support import run_unittest, i # Skip tests if we don't have threading. import_module('threading') -# Skip tests if we don't have concurrent.futures. -import_module('concurrent.futures') def suite(): diff -r b3987d758e49 Lib/test/test_asyncio/test_events.py --- a/Lib/test/test_asyncio/test_events.py Tue Feb 18 00:11:21 2014 +0100 +++ b/Lib/test/test_asyncio/test_events.py Tue Feb 18 00:39:25 2014 +0100 @@ -1,5 +1,9 @@ """Tests for events.py.""" +try: + import concurrent.futures +except ImportError: + concurrent = None import functools import gc import io @@ -311,7 +315,8 @@ class EventLoopTestsMixin: f2 = self.loop.run_in_executor(None, run, 'yo') res, thread_id = self.loop.run_until_complete(f2) self.assertEqual(res, 'yo') - self.assertNotEqual(thread_id, threading.get_ident()) + if concurrent is not None: + self.assertNotEqual(thread_id, threading.get_ident()) def test_reader_callback(self): r, w = test_utils.socketpair() diff -r b3987d758e49 Lib/test/test_asyncio/test_futures.py --- a/Lib/test/test_asyncio/test_futures.py Tue Feb 18 00:11:21 2014 +0100 +++ b/Lib/test/test_asyncio/test_futures.py Tue Feb 18 00:39:25 2014 +0100 @@ -1,6 +1,9 @@ """Tests for futures.py.""" -import concurrent.futures +try: + import concurrent.futures +except ImportError: + concurrent = None import threading import unittest import unittest.mock @@ -219,6 +222,7 @@ class FutureTests(unittest.TestCase): del fut self.assertFalse(m_log.error.called) + @unittest.skipIf(concurrent is None, 'need concurrent.futures modules') def test_wrap_future(self): def run(arg): @@ -236,6 +240,7 @@ class FutureTests(unittest.TestCase): f2 = asyncio.wrap_future(f1) self.assertIs(f1, f2) + @unittest.skipIf(concurrent is None, 'need concurrent.futures modules') @unittest.mock.patch('asyncio.futures.events') def test_wrap_future_use_global_loop(self, m_events): def run(arg): @@ -245,6 +250,7 @@ class FutureTests(unittest.TestCase): f2 = asyncio.wrap_future(f1) self.assertIs(m_events.get_event_loop.return_value, f2._loop) + @unittest.skipIf(concurrent is None, 'need concurrent.futures modules') def test_wrap_future_cancel(self): f1 = concurrent.futures.Future() f2 = asyncio.wrap_future(f1, loop=self.loop) @@ -253,6 +259,7 @@ class FutureTests(unittest.TestCase): self.assertTrue(f1.cancelled()) self.assertTrue(f2.cancelled()) + @unittest.skipIf(concurrent is None, 'need concurrent.futures modules') def test_wrap_future_cancel2(self): f1 = concurrent.futures.Future() f2 = asyncio.wrap_future(f1, loop=self.loop)