Hello,
in my implementation I want to call mqtt_client.publish_json from a sync method:
-----------------------------
def __on_event(self, ev):
...
asyncio.run_coroutine_threadsafe(self.mqtt_client.publish_json(INGESTION_TOPIC, msg), loop=self.publish_loop)
-----------------------------
where self.broker.publish_json is an async function and publish_loop is a loop which has been started (run_forever) in a separate thread:
---
self.publish_loop = asyncio.new_event_loop()
t = threading.Thread(target=self.publish_loop.run_forever)
t.start()
---
Sometimes (about 1 in 400 calls) I get the following Assertion Error:
-----------------------------
?[94;1m== APP == Exception in callback _ProactorBaseWritePipeTransport._loop_writing(<_OverlappedF...ed result=366>)
?[0m
?[94;1m== APP == handle: <Handle _ProactorBaseWritePipeTransport._loop_writing(<_OverlappedF...ed result=366>)>
[0m
[94;1m== APP == Traceback (most recent call last):
[0m
[94;1m== APP == File "C:\Users\sio7fe\AppData\Local\Programs\Python\Python38-32\lib\asyncio\events.py", line 81, in _run
[0m
[94;1m== APP == self._context.run(self._callback, *self._args)
[0m
[94;1m== APP == File "C:\Users\sio7fe\AppData\Local\Programs\Python\Python38-32\lib\asyncio\proactor_events.py", line 375, in _loop_writing
[0m
[94;1m== APP == assert f is self._write_fut
[0m
[94;1m== APP == AssertionError
-----------------------------
Run time behavior:
- publish_loop will be re-used on each event
- run_coroutine_threadsafe will be called multiple times without any sleep (its in a for-loop with 1-3 elements)
Since this is a thread safe method and its working multiple times, it seems to be some run-time related issue and I can't figure out the root cause.
|