classification
Title: Threads shutting down in Py 2.7 but not in Py 3.69 while making SSH connection using Paramiko module
Type: Stage: resolved
Components: Interpreter Core Versions: Python 3.6
process
Status: open Resolution: remind
Dependencies: Superseder:
Assigned To: Nosy List: eric.smith, muralidhar.bn
Priority: normal Keywords:

Created on 2021-05-24 18:55 by muralidhar.bn, last changed 2021-06-07 12:57 by eric.smith.

Messages (5)
msg394264 - (view) Author: Muralidhar BN (muralidhar.bn) Date: 2021-05-24 18:55
Threads shutting down in Py 2.7 but not in Py 3.69 while making SSH connection using Paramiko module

Executing code qa-test-execute.py in Py 2.7 (Ubuntu 14.04.6 LTS)

Command 1 :
sudo python ./qa-test-execute.py

Output 1 :
2021-05-24 23:35:59,889[BaseCommandLine __init__ 139916740505872][DEBUG]: Attempt to close ssh-session within 10.0 seconds.
Exception AssertionError: 'attempt to release recursive lock not owned by thread' in <bound method BaseCommandLine1.__del__ of <__main__.BaseCommandLine1 object at 0x7f40e79db110>> ignored

Command 2 :

sudo python ./qa-test-execute.py 2>&1 | tee 24052021_py27_execute_1.log

Output 2 :
2021-05-24 23:50:16,303[BaseCommandLine __init__ 139863250567440][DEBUG]: Attempt to close ssh-session within 10.0 seconds.
Exception AssertionError: 'attempt to release recursive lock not owned by thread' in <bound method BaseCommandLine1.__del__ of <__main__.BaseCommandLine1 object at 0x7f34735e0110>> ignored


Executing code qa-test-execute.py in Py 3.69 (Ubuntu 18.04.5 LTS)

Command 1 :
sudo python ./qa-test-execute.py

Output 1 :
2021-05-24 23:53:49,293[BaseCommandLine __init__ 139973197423840][DEBUG]: Attempt to close ssh-session within 10.0 seconds.
2021-05-24 23:53:49,293[BaseCommandLine __init__ 139973197423840][DEBUG]: Closing internal ssh-client.
Fatal Python error: could not acquire lock for <_io.BufferedWriter name='<stdout>'> at interpreter shutdown, possibly due to daemon threads

Command 2 :
sudo python3 ./qa-test-execute.py 2>&1 | tee launcher_gt20907_24052021_execute_py369_1.log

Output 2 : Terminal hangs & CTRL C to return prompt

2021-05-24 23:56:31,646[BaseCommandLine __init__ 140516619855072][DEBUG]: Attempt to close ssh-session within 10.0 seconds.
2021-05-24 23:56:31,646[BaseCommandLine __init__ 140516619855072][DEBUG]: Closing internal ssh-client.
^C


Behaviour of same code is different when executed in Py 2.7 & Py 3.69. 

Threads not terminating normally conflicting with paramiko / logging module  


qa-test-execute.py
#!/usr/bin/env python3

import sys
import traceback
import logging
import logging.config
import time
import threading
import multiprocessing
import paramiko
########################################################################


def lock_acquire_with_timeout1(lock, timeout_seconds):
    """
    Try to acquire lock without specified timeout
    @param lock: threading lock to be acquired
    @type lock: threading.Lock
    @param timeout_seconds: maximal time to make lock acquire attempts
    @type timeout_seconds: float
    """
    begin_time = time.time()
    while time.time() - begin_time < timeout_seconds:
        if lambda: lock.acquire(False):
            return True
        else:
            time.sleep(1.0)
    return None


def call_with_timeout1(method_to_call, timeout_seconds):
    """
    Good for potentially "freeze" methods calls. Executes passed method in
    separate thread. Waits for control returns within timeout. If timeout
    exceed - return control to callee thread. Separate execution thread will
    still be active.

    @param method_to_call: method te be called
    @type method_to_call: function
    @param timeout_seconds: maximal time to wait for method call finished
    @type timeout_seconds: float
    """
    stop_thread = threading.Barrier(2)
    thread_name = threading._newname("{}-%d".format(__name__))
    call_thread = threading.Thread(target=method_to_call, name=thread_name)
    call_thread.daemon = True 
    call_thread.start()
    print ("threading.activeCount() : %s",threading.activeCount())
    print ("threading.currentThread() : %s", threading.currentThread())
    print ("threading.enumerate() : %s", threading.enumerate() )
    call_thread.join(timeout=timeout_seconds)
    if call_thread.is_alive():
        stop_thread.abort()
    return not call_thread.is_alive()


def format_all_threads_stacks1():
    """
    @return: formatted stacks for all running threads.
    """
    stacktraces = []
    for thread_id, stack in list(dict(list(sys._current_frames().items())).items()):
        for thread1 in threading.enumerate():
            if thread1.ident == thread_id:
                stacktraces.append('Thread %s (daemon=%r) stacktrace: \n%s' %
                                   (thread1.name, thread1.daemon, ''.join(traceback.format_stack(stack))))
            else:
                thread = None
    return '\n'.join(stacktraces)


class SSHClient_noauth1(paramiko.SSHClient):

    def _auth(self, username, *args):
        self._transport.auth_none(username)
        return

class BaseCommandLine1(object):

    def __init__(self, connection_timeout=None):
        self._connection_timeout = connection_timeout
        self._connection_lock = multiprocessing.RLock()

        self._ssh_client = None
        self.logger = logging.getLogger('BaseCommandLine __init__ {}'.format(id(self)))
        self.reset_connection1()

    def __del__(self):
        self.close1() 

    def _wait_for_connection1(self):
        begin_time = time.time()
        self.logger.debug("Will attempt to connect with device for %s seconds." % self._connection_timeout)
        while time.time() - begin_time < self._connection_timeout:
            try:
                self._connect_ssh1()
                self._ssh_client.get_transport().set_keepalive(5)
                self.logger.debug('BaseCommandLine1 new SSH connection with {}:{} established.'.
                                  format("10.221.42.29", "22"))
                break
            except Exception as e:
                self.logger.debug('BaseCommandLine1 SSH-connection failed after %d seconds passed with exception: %s' %
                                  (time.time() - begin_time, e))
                self.logger.debug('BaseCommandLine1 Next attempt after {} seconds.'.format(5.0))
                time.sleep(5.0)
        else:
            self.logger.debug('BaseCommandLine1 Failed to connect to {}:{} within {} seconds: {}'.format(
                "10.221.42.29","22",self._connection_timeout,traceback.format_exc()))

    def reset_connection1(self):
        with self._connection_lock:
            self.close1()
            self.logger.debug('reset connection begin.')
            self._wait_for_connection1()

    def close1(self):
        if not self._ssh_client:
            return
        close_timeout = 10.0
        begin_time = time.time()
        self.logger.debug("Attempt to close ssh-session within %s seconds." % close_timeout)
        if lock_acquire_with_timeout1(self._connection_lock, close_timeout):
            try:
                if call_with_timeout1(self.__nonblocking_close1, timeout_seconds=close_timeout):
                    self.logger.debug("Successfully closed SSHClient within %d seconds." % (time.time() - begin_time))
                else:
                    self.logger.warning("Failed to close SSHClient within %d seconds." % close_timeout)
                    self.logger.warning("All threads state:\n%s" % format_all_threads_stacks1())
            finally:
                self._connection_lock.release()
        else:
            self.logger.warning("Failed to acquire lock within %s timeout." % close_timeout)
        self._ssh_client = None

    def __nonblocking_close1(self):
        """
        Non-blocking call to close SSH connection. May freeze on internal SSHClient.close().
        """
        self.logger.debug("Closing internal ssh-client.")
        if self._ssh_client:
            self._ssh_client.close1()
        self.logger.debug("Internal ssh-client closed.")
        self._ssh_client = None

    def _connect_ssh1(self, key_filename=None):
        client = SSHClient_noauth1()
        client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        try:
            # try using specified password first
            self.logger.debug("BaseCommandLine1 _connect_ssh1 - try using specified password first")
            client.connect("10.221.42.29", port="22", username="root", password="", timeout=60.0,
                           banner_timeout=60.0, key_filename=None, allow_agent=False)
        except paramiko.AuthenticationException as e:
            # in case of authentication error try to connect without password
            self.logger.debug("BaseCommandLine1_connect_ssh1 - in case of authentication error try to connect without password : %s" % str(e))

        self._ssh_client = client
        return client


def config_logging():
    formatter = logging.Formatter("%(asctime)s[%(name)s]"
                                  "[%(levelname)s]: %(message)s")
    console_logger = logging.StreamHandler(sys.stdout)
    console_logger.setFormatter(formatter)

    root_logger = logging.getLogger()
    root_logger.setLevel(logging.DEBUG)
    root_logger.addHandler(logging.NullHandler())

    paramiko_logger = logging.getLogger('paramiko')
    paramiko_logger.setLevel(logging.CRITICAL)
    paramiko_logger.addHandler(console_logger)

    console_logger.setLevel(logging.DEBUG)
    root_logger.addHandler(console_logger)


def main(argv):
    config_logging()
    try:
        result = BaseCommandLine1(300)
    except Exception:
        logging.debug("Exception from BaseCommandLine1")
    logging.debug("Show all threads status before exit attempt:\n %s", format_all_threads_stacks1())

if __name__ == "__main__":
    main(sys.argv)
msg394268 - (view) Author: Eric V. Smith (eric.smith) * (Python committer) Date: 2021-05-24 21:15
This was reported a few hours ago at https://github.com/paramiko/paramiko/issues/1856.

I'm closing this as a third party issue.
msg394381 - (view) Author: Muralidhar BN (muralidhar.bn) Date: 2021-05-25 17:21
We have checked this issue by adapting another 3rd party SSH packages ( replace paramiko with pyssh). Using this we have found same o/p results. Which means it is not 3rd party SSH package ( paramiko or pyssh) issue. 

Issue report in Paramiko github will be removed (https://github.com/paramiko/paramiko/issues/1856)

To our analysis this issue is not observed when "logging" module is disabled. Request to provide your analysis from python perspective ?

Executing code qa-test-execute.py in Py 2.7 (Ubuntu 14.04.6 LTS)

Command 1 :
sudo python ./qa-test-execute.py

Output 1 :

2021-05-25 21:23:44,889[BaseCommandLine __init__ 140392385488400][DEBUG]: Attempt to close ssh-session within 10.0 seconds.
Exception AssertionError: 'attempt to release recursive lock not owned by thread' in <bound method BaseCommandLine1.__del__ of <__main__.BaseCommandLine1 object at 0x7fafa6442610>> ignored

Command 2 :

2021-05-25 21:23:44,889[BaseCommandLine __init__ 140392385488400][DEBUG]: Attempt to close ssh-session within 10.0 seconds.
Exception AssertionError: 'attempt to release recursive lock not owned by thread' in <bound method BaseCommandLine1.__del__ of <__main__.BaseCommandLine1 object at 0x7fafa6442610>> ignored


Executing code qa-test-execute.py in Py 3.69 (Ubuntu 18.04.5 LTS)

Command 1 :
sudo python3 ./qa-test-execute.py

Output 1 :
2021-05-25 22:32:54,945[BaseCommandLine __init__ 140490991571128][DEBUG]: Attempt to close ssh-session within 10.0 seconds.
2021-05-25 22:32:54,945[BaseCommandLine __init__ 140490991571128][DEBUG]: Closing internal ssh-client.
Fatal Python error: could not acquire lock for <_io.BufferedWriter name='<stdout>'> at interpreter shutdown, possibly due to daemon threads

Thread 0x00007fc69880e700 (most recent call first):
  File "/usr/lib/python3.6/logging/__init__.py", line 997 in emit
  File "/usr/lib/python3.6/logging/__init__.py", line 865 in handle
  File "/usr/lib/python3.6/logging/__init__.py", line 1516 in callHandlers
  File "/usr/lib/python3.6/logging/__init__.py", line 1454 in handle
  File "/usr/lib/python3.6/logging/__init__.py", line 1444 in _log
  File "/usr/lib/python3.6/logging/__init__.py", line 1296 in debug
  File "./qa-test-execute_sajal.py", line 132 in __nonblocking_close1
  File "/usr/lib/python3.6/threading.py", line 864 in run
  File "/usr/lib/python3.6/threading.py", line 916 in _bootstrap_inner
  File "/usr/lib/python3.6/threading.py", line 884 in _bootstrap

Current thread 0x00007fc69dab5740 (most recent call first):
  File "./qa-test-execute_sajal.py", line 48 in call_with_timeout1
  File "./qa-test-execute_sajal.py", line 117 in close1
  File "./qa-test-execute_sajal.py", line 83 in __del__
Aborted (core dumped)

Command 2 :
sudo python3 ./qa-test-execute.py 2>&1 | tee launcher_gt20907_24052021_execute_py369_1.log

Output 2 :
2021-05-25 22:33:26,393[BaseCommandLine __init__ 140711046725760][DEBUG]: Attempt to close ssh-session within 10.0 seconds.
2021-05-25 22:33:26,393[BaseCommandLine __init__ 140711046725760][DEBUG]: Closing internal ssh-client.
^C



Behaviour of same code is different when executed in Py 2.7 & Py 3.69. 
 
qa-test-execute.py
#!/usr/bin/env python3

import sys
import traceback
import logging
import logging.config
import time
import threading
import multiprocessing
import pyssh
########################################################################


def lock_acquire_with_timeout1(lock, timeout_seconds):
    """
    Try to acquire lock without specified timeout
    @param lock: threading lock to be acquired
    @type lock: threading.Lock
    @param timeout_seconds: maximal time to make lock acquire attempts
    @type timeout_seconds: float
    """
    begin_time = time.time()
    while time.time() - begin_time < timeout_seconds:
        if lambda: lock.acquire(False):
            return True
        else:
            time.sleep(1.0)
    return None


def call_with_timeout1(method_to_call, timeout_seconds):
    """
    Good for potentially "freeze" methods calls. Executes passed method in
    separate thread. Waits for control returns within timeout. If timeout
    exceed - return control to callee thread. Separate execution thread will
    still be active.

    @param method_to_call: method te be called
    @type method_to_call: function
    @param timeout_seconds: maximal time to wait for method call finished
    @type timeout_seconds: float
    """
    stop_thread = threading.Barrier(2)
    thread_name = threading._newname("{}-%d".format(__name__))
    call_thread = threading.Thread(target=method_to_call, name=thread_name)
    call_thread.daemon = True
    call_thread.start()
    print ("threading.activeCount() : %s",threading.activeCount())
    print ("threading.currentThread() : %s", threading.currentThread())
    print ("threading.enumerate() : %s", threading.enumerate() )
    call_thread.join(timeout=timeout_seconds)
    if call_thread.is_alive():
        stop_thread.abort()
    return not call_thread.is_alive()


def format_all_threads_stacks1():
    """
    @return: formatted stacks for all running threads.
    """
    stacktraces = []
    for thread_id, stack in list(dict(list(sys._current_frames().items())).items()):
        for thread1 in threading.enumerate():
            if thread1.ident == thread_id:
                stacktraces.append('Thread %s (daemon=%r) stacktrace: \n%s' %
                                   (thread1.name, thread1.daemon, ''.join(traceback.format_stack(stack))))
            else:
                thread = None
    return '\n'.join(stacktraces)


class BaseCommandLine1(object):

    def __init__(self, connection_timeout=None):
        self._connection_timeout = connection_timeout
        self._connection_lock = multiprocessing.RLock()

        self._ssh_client = None
        self.logger = logging.getLogger('BaseCommandLine __init__ {}'.format(id(self)))
        self.reset_connection1()

    def __del__(self):
        self.close1()

    def _wait_for_connection1(self):
        begin_time = time.time()
        self.logger.debug("Will attempt to connect with device for %s seconds." % self._connection_timeout)
        while time.time() - begin_time < self._connection_timeout:
            try:
                self._connect_ssh1()
                self.logger.debug('BaseCommandLine1 new SSH connection with {}:{} established.'.
                                  format("10.221.42.29", "22"))
                break
            except Exception as e:
                self.logger.debug('BaseCommandLine1 SSH-connection failed after %d seconds passed with exception: %s' %
                                  (time.time() - begin_time, e))
                self.logger.debug('BaseCommandLine1 Next attempt after {} seconds.'.format(5.0))
                time.sleep(5.0)
        else:
            self.logger.debug('BaseCommandLine1 Failed to connect to {}:{} within {} seconds: {}'.format(
                "10.221.42.29","22",self._connection_timeout,traceback.format_exc()))

    def reset_connection1(self):
        with self._connection_lock:
            self.close1()
            self.logger.debug('reset connection begin.')
            self._wait_for_connection1()

    def close1(self):
        if not self._ssh_client:
            return
        close_timeout = 10.0
        begin_time = time.time()
        self.logger.debug("Attempt to close ssh-session within %s seconds." % close_timeout)
        if lock_acquire_with_timeout1(self._connection_lock, close_timeout):
            try:
                if call_with_timeout1(self.__nonblocking_close1, timeout_seconds=close_timeout):
                    self.logger.debug("Successfully closed SSHClient within %d seconds." % (time.time() - begin_time))
                else:
                    self.logger.warning("Failed to close SSHClient within %d seconds." % close_timeout)
                    self.logger.warning("All threads state:\n%s" % format_all_threads_stacks1())
            finally:
                self._connection_lock.release()
        else:
            self.logger.warning("Failed to acquire lock within %s timeout." % close_timeout)
        self._ssh_client = None

    def __nonblocking_close1(self):
        """
        Non-blocking call to close SSH connection. May freeze on internal SSHClient.close().
        """
        self.logger.debug("Closing internal ssh-client.")
        if self._ssh_client:
            self._ssh_client.close1()
        self.logger.debug("Internal ssh-client closed.")
        self._ssh_client = None

    def _connect_ssh1(self, key_filename=None):
        try:
            s = pyssh.new_session(hostname="10.221.42.29", port="22", username="root", password="")
            r = s.execute("uname -a")
            print(r.as_bytes())
        except Exception as e:
            print("Connection Error", str(e))
        self._ssh_client = s
        return s

def config_logging():
    formatter = logging.Formatter("%(asctime)s[%(name)s]"
                                  "[%(levelname)s]: %(message)s")
    console_logger = logging.StreamHandler(sys.stdout)
    console_logger.setFormatter(formatter)

    root_logger = logging.getLogger()
    root_logger.setLevel(logging.DEBUG)
    root_logger.addHandler(logging.NullHandler())

    paramiko_logger = logging.getLogger('paramiko')
    paramiko_logger.setLevel(logging.CRITICAL)
    paramiko_logger.addHandler(console_logger)

    console_logger.setLevel(logging.DEBUG)
    root_logger.addHandler(console_logger)


def main(argv):
    config_logging()
    try:
        result = BaseCommandLine1(300)
    except Exception:
        logging.debug("Exception from BaseCommandLine1")
    logging.debug("Show all threads status before exit attempt:\n %s", format_all_threads_stacks1())

if __name__ == "__main__":
    main(sys.argv)
msg395262 - (view) Author: Muralidhar BN (muralidhar.bn) Date: 2021-06-07 12:38
Dear Eric Smith,
Do we have any analysis for this issue for information shared ? 
Appreciate your quick reply.
PS: Making a simple socket connection instead of paramiko or pyssh ssh connection gives similar error

Thank you
msg395263 - (view) Author: Eric V. Smith (eric.smith) * (Python committer) Date: 2021-06-07 12:57
No, I'm sorry that I don't. The example is too large for me (or probably any of our volunteers) to look through and reason about. I think asking on a forum like Stack Overflow is your best bet, but even there they will ask for a simpler example. Or maybe try the python-list mailing list https://mail.python.org/mailman/listinfo/python-list.

If you could get this down to a 10 to 20 line example, maybe we could determine here if it's a python bug, or a deliberate change.
History
Date User Action Args
2021-06-07 12:57:09eric.smithsetmessages: + msg395263
2021-06-07 12:38:14muralidhar.bnsetmessages: + msg395262
2021-05-25 17:21:07muralidhar.bnsetstatus: closed -> open
resolution: third party -> remind
messages: + msg394381
2021-05-24 21:15:51eric.smithsetstatus: open -> closed

nosy: + eric.smith
messages: + msg394268

resolution: third party
stage: resolved
2021-05-24 18:55:16muralidhar.bncreate