拨开荷叶行,寻梦已然成。仙女莲花里,翩翩白鹭情。
IMG-LOGO
主页 文章列表 IOError:在Ctrl C后使用multiprocessing.Pool和multiprocessing.Manager损坏管道

IOError:在Ctrl C后使用multiprocessing.Pool和multiprocessing.Manager损坏管道

白鹭 - 2022-03-07 1960 0 0

我正在使用 python 标准库中的multiprocessing.Pool来运行一堆作业人员。每个工人都使用 python 的子行程库启动子行程。每个工人都有责任管理子流程并在完成后清理它们。

import multiprocessing as mp


def main():
    processes = 3
    jobs = 6
    pool = mp.Pool(processes)
    for i in range(jobs):
        args = (i,)
        pool.apply_async(worker, args)
    pool.close()
    pool.join()


def worker(i):
    # start processes
    # wait for completion
    # clean up
    time.sleep(1)


main()

当我按 Ctrl C 从命令列退出脚本时,我试图以一种理智的方式捕获KeyboardInterrupt ,它就开始了。注意:这个例子是我实际程序的一个小版本,它尽力说明我遇到的问题我在stackoverflow上找到了这些相关的帖子:

  • 捕捉 Ctrl C / SIGINT 并在 python 中优雅地退出多行程
  • python的多处理池的键盘中断

前者比后者更适用。调查我发现在命令列上按 Ctrl C 时,signal.SIGINT 被发送到所有行程(父行程或主行程以及所有子行程和子子行程)仅供参考,我在 bash 终端上使用 Ubuntu 18.04。

我采用了建议的方法并忽略了子行程的中断信号。为了方便和我自己的理智,我给自己写了一个背景关系管理器。

import multiprocessing as mp
import contextlib # <---
import signal # <---


def main():
    processes = 3
    jobs = 6
    with ignore_interrupt_signals(): # <--- # <---
        pool = mp.Pool(processes)

    for i in range(jobs):
        args = (i,)
        pool.apply_async(worker, args)
    pool.close()
    pool.join()


@contextlib.contextmanager
def ignore_interrupt_signals(): # <---
    previous_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
    yield
    signal.signal(signal.SIGINT, previous_handler)


def worker(i):
    # start processes
    # wait for completion
    # clean up
    time.sleep(1)


main()

除了作业人员需要一种方法来知道它应该关闭并重要地清理其所有产生的子行程之外,这作业得很好。对于背景关系,每个工人大约需要 45 分钟才能完成。

I decided the best way to do this was to use a multiprocessing.Event. The event would be called stop_event and would be set if any process received a signal.SIGINT. To use a multiprocessing.Event that is used by the main and child processes it had to be managed by an multiprocessing.Manager through a proxy.

import multiprocessing as mp
import contextlib
import signal
import sys


def main():
    processes = 3
    jobs = 6

    with ignore_interrupt_signals():
        pool = mp.Pool(processes)

    manager = mp.Manager() # <---
    stop_event = manager.Event() # <---

    try:
        for i in range(jobs):
            args = (i,)
            pool.apply_async(worker, args)
    except KeyboardInterrupt: # <---
        stop_event.set() # <---
        pool.close()
        pool.join()
        sys.exit() # <---


@contextlib.contextmanager
def ignore_interrupt_signals():
    previous_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
    yield
    signal.signal(signal.SIGINT, previous_handler)


def worker(i, stop_event):
    # start processes
    while not stop_event.set(): # <---
        # wait for completion
        time.sleep(1)
    # clean up


main()

Now the workers were safe from getting interrupted and not cleaning up their subprocesses, and the main processes catches the KeyboardInterrupt after the pool is created. When the exception is caught the pool is closed and the processes are joined. I thought this would work. However, I got an IOError from the stop_event.set() call.

Termination started due to Ctrl C, cleaning up...
Traceback (most recent call last):
  File "...", line 1109, in <module>
    main()
  File "...", line 42, in main
    args.func(args)
  File "...", line 189, in run_command
    stop_event.set()
  File "/usr/lib/python2.7/multiprocessing/managers.py", line 1011, in set
    return self._callmethod('set')
  File "/usr/lib/python2.7/multiprocessing/managers.py", line 758, in _callmethod
    conn.send((self._id, methodname, args, kwds))
IOError: [Errno 32] Broken pipe

The traceback has a lot of other tracebacks removed but the traceback of interest is a broken pipe when trying to set the stop_event using the manager proxy.

uj5u.com热心网友回复:

multiprocessing.Manager作为服务器启动,该服务器作为另一个行程启动因此,管理器行程也接收到来自 Ctrl C的signal.SIGINT并终止。这会导致 stop_event 管理器代理使用的管道不正常地关闭。我发现避免这个问题的最好方法是启动管理器,同时忽略signal.SIGINT 。

import multiprocessing as mp
import contextlib
import signal
import sys


def main():
    processes = 3
    jobs = 6

    with ignore_interrupt_signals():
        pool = mp.Pool(processes)
        manager = mp.Manager() # <---

    stop_event = manager.Event()

    try:
        for i in range(jobs):
            args = (i,)
            pool.apply_async(worker, args)
    except KeyboardInterrupt:
        stop_event.set()
        pool.close()
        pool.join()
        sys.exit()

@contextlib.contextmanager
def ignore_interrupt_signals():
    previous_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
    yield
    signal.signal(signal.SIGINT, previous_handler)


def worker(i, stop_event):
    # start processes
    while not stop_event.set():
        # wait for completion
        time.sleep(1)
    # clean up


main()

我回答了我自己的问题,因为我希望这对其他人有帮助,因为我花了很多时间在我的代码库中跟踪套接字和管道错误以确定这个相当简单的解决方案。

标签:

0 评论

发表评论

您的电子邮件地址不会被公开。 必填的字段已做标记 *