Title: Assertion in run_coroutine_threadsafe
Author: Jochen Kienzle (JoKi) Date: 2020-11-23 15:58

in my implementation I want to call mqtt_client.publish_json from a sync method:

def __on_event(self, ev):
    asyncio.run_coroutine_threadsafe(self.mqtt_client.publish_json(INGESTION_TOPIC, msg), loop=self.publish_loop)

where is an async function and publish_loop is a loop which has been started (run_forever) in a separate thread:

    self.publish_loop = asyncio.new_event_loop()
    t = threading.Thread(target=self.publish_loop.run_forever)

Sometimes (about 1 in 400 calls) I get the following Assertion Error:

?[94;1m== APP == Exception in callback _ProactorBaseWritePipeTransport._loop_writing(<_OverlappedF...ed result=366>)
?[94;1m== APP == handle: <Handle _ProactorBaseWritePipeTransport._loop_writing(<_OverlappedF...ed result=366>)>
[94;1m== APP == Traceback (most recent call last):
[94;1m== APP ==   File "C:\Users\sio7fe\AppData\Local\Programs\Python\Python38-32\lib\asyncio\", line 81, in _run
[94;1m== APP ==, *self._args)
[94;1m== APP ==   File "C:\Users\sio7fe\AppData\Local\Programs\Python\Python38-32\lib\asyncio\", line 375, in _loop_writing
[94;1m== APP ==     assert f is self._write_fut
[94;1m== APP == AssertionError

Run time behavior:
 - publish_loop will be re-used on each event
 - run_coroutine_threadsafe will be called multiple times without any sleep (its in a for-loop with 1-3 elements)

Since this is a thread safe method and its working multiple times, it seems to be some run-time related issue and I can't figure out the root cause.
Author: Andrew Svetlov (asvetlov) Date: 2020-11-23 20:18
Transferring the loop instance between threads is not safe. You should create a loop and after that call run_forever() in the same thread.
Or, even better, call in a thread.
Author: Jochen Kienzle (JoKi) Date: 2020-11-24 08:17
Thanks for your reply.

I already tried 

----------------------------, msg))

but this leads (rarely) to the following error:

got Future <Future pending> attached to a different loop

I want to point out that was called in a thread without an event loop.

So, it seems to be that the issue is either on asyncio or on hbmqtt.client side (loop handling).

Any opinions, ideas or hints?
Author: Andrew Svetlov (asvetlov) Date: 2020-11-24 08:25
"got Future <Future pending> attached to a different loop" means exactly what it says: you create a future object in one loop but awaits it in the different one. This is a programming error.

Most likely the error in your script, not in mqtt and definitely not in asyncio.

Sorry, the mqtt is not a part of CPython and I don't know how to work with it properly. Please contact the library maintainers.

Nothing to do on the asyncio side.
