From 7e76b12b65c4616eee41b0a14e2ce37a1f3386d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=20Kr=C3=BCger?= Date: Mon, 7 Dec 2020 01:03:54 +0100 Subject: [PATCH] fixed freezing Dispatcher --- al2/experiment.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/al2/experiment.py b/al2/experiment.py index 91a98fc..f25d355 100644 --- a/al2/experiment.py +++ b/al2/experiment.py @@ -51,21 +51,23 @@ class Dispatcher (threading.Thread): worker.start() def wait_to_continue(workers, stop_called): - any_worker_alive = any(map(lambda w: w.is_alive(), workers)) + any_worker_alive = lambda: any(map(lambda w: w.is_alive(), workers)) - while any_worker_alive and not stop_called.is_set(): + while any_worker_alive() and not stop_called.is_set(): time.sleep(0) + waiter = threading.Thread(target=wait_to_continue, args=(self.__workers, self.__stop_called)) waiter.start() waiter.join() - + if self.__stop_called.is_set(): for worker in self.__workers: worker.terminate() + for worker in self.__workers: worker.join() @@ -73,6 +75,13 @@ class Dispatcher (threading.Thread): def stop(self): self.__stop_called.set() + def num_active_workers(self): + count = 0 + for worker in self.__workers: + count += 1 if worker.is_alive() else 0 + + return count + class Worker (multiprocessing.Process): def __init__(self, exp_mod, exp_plan): multiprocessing.Process.__init__(self)