Во-первых, удалите выполненные задания из списка результатов
results = [r for r in results if not r.ready()]
Количество незавершенных процессов равно длине списка результатов:
pending = len(results)
И число ожидающих, но не запущенных, является общим ожидающим - pool_size
not_started = pending - pool_size
pool_size будет multiprocessing.cpu_count(), если пул создан с аргументом по умолчанию, как вы сделали
ОБНОВЛЕНИЕ: после первоначального неправильного понимания вопроса, вот способ сделать то, о чем спрашивал ОП.
Я подозреваю, что эту функциональность можно без особых проблем добавить в класс Pool, поскольку AsyncResult реализуется в классе Pool с очередью. Эта очередь также может использоваться внутри, чтобы указать, запущена она или нет.
Но вот способ реализации с использованием Pool и Pipe. ПРИМЕЧАНИЕ: это не работает в Python 2.x - не знаю почему. Протестировано в Python 3.8.
import multiprocessing
import time
import os
def worker_function(pipe):
pipe.send('started')
print('[{}] started pipe={}'.format(os.getpid(), pipe))
time.sleep(3)
pipe.close()
def test():
pool = multiprocessing.Pool(processes=2)
print('[{}] pool={}'.format(os.getpid(), pool))
workers = []
for x in range(1, 4):
parent, child = multiprocessing.Pipe()
pool.apply_async(worker_function, (child,))
worker = {'name': 'worker{}'.format(x), 'pipe': parent, 'started': False}
workers.append(worker)
pool.close()
while True:
for worker in workers:
if worker.get('started'):
continue
pipe = worker.get('pipe')
if pipe.poll(0.1):
message = pipe.recv()
print('[{}] {} says {}'.format(os.getpid(), worker.get('name'), message))
worker['started'] = True
pipe.close()
count_in_queue = len(workers)
for worker in workers:
if worker.get('started'):
count_in_queue -= 1
print('[{}] count_in_queue = {}'.format(os.getpid(), count_in_queue))
if not count_in_queue:
break
time.sleep(0.5)
pool.join()
if __name__ == '__main__':
test()
26.11.2019
results[i]
(для любогоi
) или он стоит в очереди, ожидая запуска. 29.11.2019