Browse Source

implemented instance state saving/loading

master
Tom Krüger 4 years ago
parent
commit
0eb376779e
6 changed files with 54 additions and 10 deletions
  1. +2
    -0
      .gitignore
  2. +5
    -5
      README.md
  3. +1
    -1
      alma/batch.py
  4. +12
    -3
      alma/experiment.py
  5. +32
    -1
      alma/plan.py
  6. +2
    -0
      docs/run_module.md

+ 2
- 0
.gitignore View File

@ -3,3 +3,5 @@ __pycache__
dist/ dist/
*.swp *.swp
*.kdev4

+ 5
- 5
README.md View File

@ -45,7 +45,7 @@ The experiment description in `example.experiment` roughly translates to: Perfor
###### `example.py` ###### `example.py`
```python3 ```python3
def run(instance):
def run(instance, save_callback, state):
# do some stuff on "instance" # do some stuff on "instance"
``` ```
@ -53,7 +53,7 @@ The `run` function is where the magic happens. For every file in our batch the
Now that we have specified everything, we can start executing our experiment. Now that we have specified everything, we can start executing our experiment.
```
```python3
>>> import alma.experiment >>> import alma.experiment
>>> dispatcher = alma.experiment.load("example.experiment") >>> dispatcher = alma.experiment.load("example.experiment")
@ -62,13 +62,13 @@ Now that we have specified everything, we can start executing our experiment.
The line `dispatcher.start()` starts the concurrent non blocking execution of our experiment. This means the dispatcher stays responsive and we can pause/stop the execution at any given time. The line `dispatcher.start()` starts the concurrent non blocking execution of our experiment. This means the dispatcher stays responsive and we can pause/stop the execution at any given time.
```
```python3
>>> dispatcher.stop() >>> dispatcher.stop()
``` ```
During the execution the `dispatcher` continuously keeps track of which files he still needs to call `run(...)` on and how many iterations he has left. He does so by saving the current state of the execution in a file. Loading an experiment (`alma.experiment.load(...)`) the framework first looks for such a save file and if one exists, the execution will pick up at the point we've called `dispatcher.stop()`. To pick up the experiment we can perform: During the execution the `dispatcher` continuously keeps track of which files he still needs to call `run(...)` on and how many iterations he has left. He does so by saving the current state of the execution in a file. Loading an experiment (`alma.experiment.load(...)`) the framework first looks for such a save file and if one exists, the execution will pick up at the point we've called `dispatcher.stop()`. To pick up the experiment we can perform:
```
```python3
>>> dispatcher = alma.experiment.load("example.experiment") >>> dispatcher = alma.experiment.load("example.experiment")
>>> dispatcher.start() >>> dispatcher.start()
``` ```
@ -79,6 +79,6 @@ During the execution the `dispatcher` continuously keeps track of which files he
Fist clone the repository and then switch into it's root directory and call Fist clone the repository and then switch into it's root directory and call
```bash ```bash
$> pip install -e .
$ pip install -e .
``` ```
This will locally install the **pyalma** framework on your system. This will locally install the **pyalma** framework on your system.

+ 1
- 1
alma/batch.py View File

@ -6,7 +6,7 @@ def load(batch_file):
with open(batch_file, "r") as bfile: with open(batch_file, "r") as bfile:
batch_obj = json.loads(bfile.read()) batch_obj = json.loads(bfile.read())
batch_obj["base_path"] = batch_file.parent batch_obj["base_path"] = batch_file.parent
return __load_batch_obj(batch_obj) return __load_batch_obj(batch_obj)


+ 12
- 3
alma/experiment.py View File

@ -32,7 +32,7 @@ def load(exp_file):
num_workers = os.cpu_count() num_workers = os.cpu_count()
else: else:
num_workers = int(exp_obj["workers"]) num_workers = int(exp_obj["workers"])
return Dispatcher(exp_mod, exp_plan, num_workers) return Dispatcher(exp_mod, exp_plan, num_workers)
class Dispatcher (threading.Thread): class Dispatcher (threading.Thread):
@ -75,6 +75,7 @@ class Dispatcher (threading.Thread):
worker.join() worker.join()
self.__done() self.__done()
def stop(self): def stop(self):
self.__stop_called.set() self.__stop_called.set()
@ -102,11 +103,19 @@ class Worker (multiprocessing.Process):
instance = self.__exp_plan.next() instance = self.__exp_plan.next()
while instance != None: while instance != None:
self.__exp_mod.run(instance)
instance_state = self.__exp_plan.load_instance_state(instance)
self.__exp_mod.run(instance,
lambda data: self.__exp_plan.save_instance_state(
instance,
data
),
instance_state)
self.__exp_plan.done_with(instance) self.__exp_plan.done_with(instance)
instance = self.__exp_plan.next() instance = self.__exp_plan.next()
def terminate(self): def terminate(self):
self.__exp_plan.delete() self.__exp_plan.delete()
multiprocessing.Process.terminate(self) multiprocessing.Process.terminate(self)


+ 32
- 1
alma/plan.py View File

@ -12,6 +12,8 @@ class Plan:
self.file = None self.file = None
self.pending_instances = [] self.pending_instances = []
self.assigned_instances = [] self.assigned_instances = []
self.instance_states = {}
self.__lock = threading.Lock() if lock == None else lock self.__lock = threading.Lock() if lock == None else lock
if experiment: if experiment:
@ -82,9 +84,14 @@ class Plan:
if "iterations_left" in content: if "iterations_left" in content:
self.iterations_left = content["iterations_left"] self.iterations_left = content["iterations_left"]
if "instance_states" in content:
self.instance_states = content["instance_states"]
def __is_finished(self): def __is_finished(self):
return False if self.file.is_file() else True return False if self.file.is_file() else True
def next(self): def next(self):
@ -111,6 +118,9 @@ class Plan:
if instance in self.assigned_instances: if instance in self.assigned_instances:
self.assigned_instances.remove(instance) self.assigned_instances.remove(instance)
if instance in self.instance_states:
self.instance_states.pop(instance)
self.__update_file() self.__update_file()
@ -120,7 +130,9 @@ class Plan:
all_done = True all_done = True
content["iterations_left"] = self.iterations_left content["iterations_left"] = self.iterations_left
content["instance_states"] = self.instance_states
if len(self.assigned_instances) > 0: if len(self.assigned_instances) > 0:
content["assigned"] = self.assigned_instances content["assigned"] = self.assigned_instances
all_done = False all_done = False
@ -154,6 +166,25 @@ class Plan:
with open(self.file, "w") as pfile: with open(self.file, "w") as pfile:
pfile.write(json.dumps(content)) pfile.write(json.dumps(content))
def save_instance_state(self, instance, data):
with self.__lock:
self.__load()
self.instance_states[instance] = data
self.__update_file()
def load_instance_state(self, instance):
with self.__lock:
self.__load()
if instance in self.instance_states:
return self.instance_states[instance]
else:
return ""
def delete(self): def delete(self):
with self.__lock: with self.__lock:


+ 2
- 0
docs/run_module.md View File

@ -0,0 +1,2 @@
# Run Module

Loading…
Cancel
Save