import os import signal import sys import time import threading def mysleep(delay): start = time.monotonic() time.sleep(delay) dt = time.monotonic() - start if dt < delay: print("sleep was interrupted! thread %s" % threading.current_thread().ident) def sighandler(signum, frame): text = 'Python signal handler, thread %s\n' % threading.current_thread().ident # don't use print() to avoid reentrant lock issue in io.TextIOWrapper() os.write(sys.stdout.fileno(), text.encode()) raise Exception def worker(): print("thread: wait") mysleep(5) print("thread: done") def main(): print("main: main thread %s" % threading.current_thread().ident) signal.signal(signal.SIGUSR1, sighandler) t = threading.Thread(target=worker) t.start() print("main: spawn thread %s" % t.ident) print("main: sleep") mysleep(1) print("main: send signal to thread") signal.pthread_kill(t.ident, signal.SIGUSR1) print("main: sleep again") mysleep(1) print("main: wait thread") t.join() print("main: done") if __name__ == '__main__': main()