Browse Source

implemented experiment execution

master
Tom Krüger 4 years ago
parent
commit
23a609a482
2 changed files with 176 additions and 0 deletions
  1. +61
    -0
      al2/experiment.py
  2. +115
    -0
      al2/plan.py

+ 61
- 0
al2/experiment.py View File

@ -0,0 +1,61 @@
import pathlib as pl
import json
import importlib.machinery as impmach
import multiprocessing
import threading
import concurrent.futures as concfut
import os
from . import batch
from . import plan
def execute(exp_file):
dispatcher = load(exp_file)
dispatcher.start()
dispatcher.join()
def load(exp_file):
exp_plan = plan.Plan(exp_file, multiprocessing.Lock())
with open(exp_file) as efile:
exp_obj = json.loads(efile.read())
exp_obj["load"] = pl.Path(exp_obj["load"])
exp_mod = impmach.SourceFileLoader(exp_obj["load"].stem,
str(exp_obj["load"])).load_module()
return Dispatcher(exp_mod.run, exp_plan, os.cpu_count())
class Dispatcher (threading.Thread):
def __init__(self, exp_func, exp_plan, num_workers):
threading.Thread.__init__(self)
self.__exp_func = exp_func
self.__plan = exp_plan
self.__num_workers = num_workers
self.__workers = []
for i in range(self.__num_workers):
self.__workers.append(multiprocessing.Process(target=self.__run_exp,
args=(self.__exp_func,
self.__plan)))
def run(self):
for worker in self.__workers:
worker.start()
for worker in self.__workers:
worker.join()
@staticmethod
def __run_exp(exp_func, exp_plan):
instance = exp_plan.next()
while instance != None:
exp_func(instance)
exp_plan.done_with(instance)
instance = exp_plan.next()

+ 115
- 0
al2/plan.py View File

@ -0,0 +1,115 @@
import pathlib as pl
import json
import os
import multiprocessing
import threading
from . import batch
class Plan:
def __init__(self, experiment=None, lock=None):
self.experiment = None
self.file = None
self.pending_instances = []
self.assigned_instances = []
self.__lock = threading.Lock() if lock == None else lock
if experiment:
self.create(experiment)
def create(self, experiment):
self.experiment = pl.Path(experiment).resolve()
self.__set_file()
if self.__is_finished():
self.__create()
else:
self.__load()
def __create(self):
with open(self.experiment, "r") as expf:
exp_obj = json.loads(expf.read())
instances = batch.load(pl.Path(exp_obj["batch"]).resolve())
self.pending_instances = instances
self.__update_file()
def __set_file(self):
if self.experiment == None:
self.file = None
else:
exp_path = pl.Path(self.experiment)
self.file = exp_path.parent / (exp_path.stem + ".plan")
def __load(self):
self.pending_instances = []
self.assigned_instances = []
if not self.file.is_file():
return
with open(self.file, "r") as pfile:
content = json.loads(pfile.read())
if "assigned" in content:
self.assigned_instances = content["assigned"]
if "pending" in content:
self.pending_instances = content["pending"]
def __is_finished(self):
return False if self.file.is_file() else True
def next(self):
with self.__lock:
self.__load()
if len(self.pending_instances) == 0:
return None
next_instance = self.pending_instances.pop()
self.assigned_instances.append(next_instance)
self.__update_file()
return next_instance
def done_with(self, instance):
with self.__lock:
self.__load()
if instance in self.assigned_instances:
self.assigned_instances.remove(instance)
self.__update_file()
def __update_file(self):
content = {}
if len(self.assigned_instances) > 0:
content["assigned"] = list(map(str, self.assigned_instances))
if len(self.pending_instances) > 0:
content["pending"] = list(map(str, self.pending_instances))
if content:
with open(self.file, "w") as pfile:
pfile.write(json.dumps(content))
elif self.file.is_file():
self.file.unlink()
def __del__(self):
with self.__lock:
self.__load()
self.pending_instances.extend(self.assigned_instances)
self.assigned_instances = []
self.__update_file()

Loading…
Cancel
Save