Squeak.ru - шаблоны программирования

Как узнать, запущена ли функция apply_async или она все еще находится в очереди с многопроцессорностью. Пул

Я использую python multiprocessing.Pool и apply_async для вызова множества функций.

Как узнать, начала ли функция выполняться членом пула или она находится в очереди?

Например:

import multiprocessing
import time

def func(t):
    #take some time processing
    print 'func({}) started'.format(t)
    time.sleep(t)

pool = multiprocessing.Pool()

results = [pool.apply_async(func, [t]) for t in [100]*50] #adds 50 func calls to the queue

Для каждого AsyncResult в results вы можете вызвать ready() или get(0), чтобы узнать, завершилась ли работа функции. Но как узнать, запущена ли функция, но еще не завершена?

т. е. для данного объекта AsyncResult (т. е. данного элемента результатов) есть ли способ узнать, была ли вызвана функция или она находится в очереди пула?


Ответы:


1

Во-первых, удалите выполненные задания из списка результатов

    results = [r for r in results if not r.ready()]

Количество незавершенных процессов равно длине списка результатов:

    pending = len(results)

И число ожидающих, но не запущенных, является общим ожидающим - pool_size

    not_started = pending - pool_size

pool_size будет multiprocessing.cpu_count(), если пул создан с аргументом по умолчанию, как вы сделали

ОБНОВЛЕНИЕ: после первоначального неправильного понимания вопроса, вот способ сделать то, о чем спрашивал ОП.

Я подозреваю, что эту функциональность можно без особых проблем добавить в класс Pool, поскольку AsyncResult реализуется в классе Pool с очередью. Эта очередь также может использоваться внутри, чтобы указать, запущена она или нет.

Но вот способ реализации с использованием Pool и Pipe. ПРИМЕЧАНИЕ: это не работает в Python 2.x - не знаю почему. Протестировано в Python 3.8.

import multiprocessing
import time
import os

def worker_function(pipe):
    pipe.send('started')
    print('[{}] started pipe={}'.format(os.getpid(), pipe))
    time.sleep(3)
    pipe.close()

def test():
    pool = multiprocessing.Pool(processes=2)
    print('[{}] pool={}'.format(os.getpid(), pool))

    workers = []

    for x in range(1, 4):
        parent, child = multiprocessing.Pipe()
        pool.apply_async(worker_function, (child,))
        worker = {'name': 'worker{}'.format(x), 'pipe': parent, 'started': False}
        workers.append(worker)

    pool.close()

    while True:
        for worker in workers:
            if worker.get('started'):
                continue
            pipe = worker.get('pipe')
            if pipe.poll(0.1):
                message = pipe.recv()
                print('[{}] {} says {}'.format(os.getpid(), worker.get('name'), message))
                worker['started'] = True
                pipe.close()
        count_in_queue = len(workers)
        for worker in workers:
            if worker.get('started'):
                count_in_queue -= 1
        print('[{}] count_in_queue = {}'.format(os.getpid(), count_in_queue))
        if not count_in_queue:
            break
        time.sleep(0.5)

    pool.join()

if __name__ == '__main__':
    test()
26.11.2019
  • Получается общее количество ожидающих заданий. Но я хочу знать, выполняется ли в данный момент results[i] (для любого i) или он стоит в очереди, ожидая запуска. 29.11.2019
  • @ Алекс, верно, я неправильно понял вопрос. Я обновил некоторый код, который будет отслеживать, какие из них запущены, а какие находятся в очереди пула. 16.01.2020
  • Новые материалы

    Угловая структура архитектуры
    Обратите внимание, что эта статья устарела, я решил создать новую с лучшей структурой и с учетом автономных компонентов: https://medium.com/@marekpanti/angular-standalone-architecture-b645edd0d54a..

    «Данные, которые большинство людей используют для обучения своих моделей искусственного интеллекта, поставляются со встроенным…
    Первоначально опубликовано HalkTalks: https://hacktown.com.br/blog/blog/os-dados-que-a-maioria-das-pessoas-usa-para-treinar-seus-modelos-de-inteligencia-artificial- ja-vem-com-um-vies-embutido/..

    Сильный ИИ против слабого ИИ: различия парадигм искусственного интеллекта
    В последние годы изучению и развитию искусственного интеллекта (ИИ) уделяется большое внимание и прогресс. Сильный ИИ и Слабый ИИ — две основные парадигмы в области искусственного интеллекта...

    Правильный способ добавить Firebase в ваш проект React с помощью React Hooks
    React + Firebase - это мощная комбинация для быстрого и безопасного создания приложений, от проверки концепции до массового производства. Раньше (знаете, несколько месяцев назад) добавление..

    Создайте API с помощью Python FastAPI
    Создание API с помощью Python становится очень простым при использовании пакета FastAPI. После установки и импорта вы можете создать приложение FastAPI и указать несколько конечных точек. Каждой..

    Веселье с прокси-сервером JavaScript
    Прокси-серверы JavaScript — это чистый сахар, если вы хотите создать некоторую общую логику в своих приложениях, чтобы облегчить себе жизнь. Вот один пример: Связь клиент-сервер Мы..

    Получить бесплатный хостинг для разработчиков | Разместите свой сайт за несколько шагов 🔥
    Статические веб-сайты — это веб-страницы с фиксированным содержанием и его постоянным содержанием. Но теперь статические сайты также обрабатывают динамические данные с помощью API и запросов...