diff --git a/.gitignore b/.gitignore index 1905dcb..d29eef7 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,5 @@ __pycache__ dist/ *.swp + +*.kdev4 diff --git a/README.md b/README.md index 8396aaa..9cd3b0b 100644 --- a/README.md +++ b/README.md @@ -45,7 +45,7 @@ The experiment description in `example.experiment` roughly translates to: Perfor ###### `example.py` ```python3 -def run(instance): +def run(instance, save_callback, state): # do some stuff on "instance" ``` @@ -53,7 +53,7 @@ The `run` function is where the magic happens. For every file in our batch the Now that we have specified everything, we can start executing our experiment. -``` +```python3 >>> import alma.experiment >>> dispatcher = alma.experiment.load("example.experiment") @@ -62,13 +62,13 @@ Now that we have specified everything, we can start executing our experiment. The line `dispatcher.start()` starts the concurrent non blocking execution of our experiment. This means the dispatcher stays responsive and we can pause/stop the execution at any given time. -``` +```python3 >>> dispatcher.stop() ``` During the execution the `dispatcher` continuously keeps track of which files he still needs to call `run(...)` on and how many iterations he has left. He does so by saving the current state of the execution in a file. Loading an experiment (`alma.experiment.load(...)`) the framework first looks for such a save file and if one exists, the execution will pick up at the point we've called `dispatcher.stop()`. To pick up the experiment we can perform: -``` +```python3 >>> dispatcher = alma.experiment.load("example.experiment") >>> dispatcher.start() ``` @@ -79,6 +79,6 @@ During the execution the `dispatcher` continuously keeps track of which files he Fist clone the repository and then switch into it's root directory and call ```bash -$> pip install -e . +$ pip install -e . ``` This will locally install the **pyalma** framework on your system. diff --git a/alma/batch.py b/alma/batch.py index 23bc65f..505af96 100644 --- a/alma/batch.py +++ b/alma/batch.py @@ -6,7 +6,7 @@ def load(batch_file): with open(batch_file, "r") as bfile: batch_obj = json.loads(bfile.read()) - + batch_obj["base_path"] = batch_file.parent return __load_batch_obj(batch_obj) diff --git a/alma/experiment.py b/alma/experiment.py index 6e79c67..402eae8 100644 --- a/alma/experiment.py +++ b/alma/experiment.py @@ -32,7 +32,7 @@ def load(exp_file): num_workers = os.cpu_count() else: num_workers = int(exp_obj["workers"]) - + return Dispatcher(exp_mod, exp_plan, num_workers) class Dispatcher (threading.Thread): @@ -75,6 +75,7 @@ class Dispatcher (threading.Thread): worker.join() self.__done() + def stop(self): self.__stop_called.set() @@ -102,11 +103,19 @@ class Worker (multiprocessing.Process): instance = self.__exp_plan.next() while instance != None: - self.__exp_mod.run(instance) + instance_state = self.__exp_plan.load_instance_state(instance) + + self.__exp_mod.run(instance, + lambda data: self.__exp_plan.save_instance_state( + instance, + data + ), + instance_state) + self.__exp_plan.done_with(instance) instance = self.__exp_plan.next() - + def terminate(self): self.__exp_plan.delete() multiprocessing.Process.terminate(self) diff --git a/alma/plan.py b/alma/plan.py index f55fd14..4cbfccf 100644 --- a/alma/plan.py +++ b/alma/plan.py @@ -12,6 +12,8 @@ class Plan: self.file = None self.pending_instances = [] self.assigned_instances = [] + self.instance_states = {} + self.__lock = threading.Lock() if lock == None else lock if experiment: @@ -82,9 +84,14 @@ class Plan: if "iterations_left" in content: self.iterations_left = content["iterations_left"] + + if "instance_states" in content: + self.instance_states = content["instance_states"] + def __is_finished(self): return False if self.file.is_file() else True + def next(self): @@ -111,6 +118,9 @@ class Plan: if instance in self.assigned_instances: self.assigned_instances.remove(instance) + + if instance in self.instance_states: + self.instance_states.pop(instance) self.__update_file() @@ -120,7 +130,9 @@ class Plan: all_done = True content["iterations_left"] = self.iterations_left - + + content["instance_states"] = self.instance_states + if len(self.assigned_instances) > 0: content["assigned"] = self.assigned_instances all_done = False @@ -154,6 +166,25 @@ class Plan: with open(self.file, "w") as pfile: pfile.write(json.dumps(content)) + + def save_instance_state(self, instance, data): + + with self.__lock: + self.__load() + + self.instance_states[instance] = data + + self.__update_file() + + def load_instance_state(self, instance): + + with self.__lock: + self.__load() + + if instance in self.instance_states: + return self.instance_states[instance] + else: + return "" def delete(self): with self.__lock: diff --git a/docs/run_module.md b/docs/run_module.md new file mode 100644 index 0000000..82f480b --- /dev/null +++ b/docs/run_module.md @@ -0,0 +1,2 @@ +# Run Module +