diff --git a/al2/experiment.py b/al2/experiment.py new file mode 100644 index 0000000..6b06649 --- /dev/null +++ b/al2/experiment.py @@ -0,0 +1,61 @@ +import pathlib as pl +import json +import importlib.machinery as impmach +import multiprocessing +import threading +import concurrent.futures as concfut +import os + +from . import batch +from . import plan + +def execute(exp_file): + dispatcher = load(exp_file) + dispatcher.start() + dispatcher.join() + +def load(exp_file): + exp_plan = plan.Plan(exp_file, multiprocessing.Lock()) + + with open(exp_file) as efile: + exp_obj = json.loads(efile.read()) + exp_obj["load"] = pl.Path(exp_obj["load"]) + + exp_mod = impmach.SourceFileLoader(exp_obj["load"].stem, + str(exp_obj["load"])).load_module() + + return Dispatcher(exp_mod.run, exp_plan, os.cpu_count()) + +class Dispatcher (threading.Thread): + def __init__(self, exp_func, exp_plan, num_workers): + threading.Thread.__init__(self) + + self.__exp_func = exp_func + self.__plan = exp_plan + + self.__num_workers = num_workers + self.__workers = [] + + for i in range(self.__num_workers): + self.__workers.append(multiprocessing.Process(target=self.__run_exp, + args=(self.__exp_func, + self.__plan))) + + def run(self): + for worker in self.__workers: + worker.start() + + for worker in self.__workers: + worker.join() + + @staticmethod + def __run_exp(exp_func, exp_plan): + instance = exp_plan.next() + + while instance != None: + exp_func(instance) + + exp_plan.done_with(instance) + + instance = exp_plan.next() + diff --git a/al2/plan.py b/al2/plan.py new file mode 100644 index 0000000..82a8911 --- /dev/null +++ b/al2/plan.py @@ -0,0 +1,115 @@ +import pathlib as pl +import json +import os +import multiprocessing +import threading + +from . import batch + +class Plan: + def __init__(self, experiment=None, lock=None): + self.experiment = None + self.file = None + self.pending_instances = [] + self.assigned_instances = [] + self.__lock = threading.Lock() if lock == None else lock + + if experiment: + self.create(experiment) + + + def create(self, experiment): + self.experiment = pl.Path(experiment).resolve() + self.__set_file() + + if self.__is_finished(): + self.__create() + else: + self.__load() + + def __create(self): + with open(self.experiment, "r") as expf: + exp_obj = json.loads(expf.read()) + + instances = batch.load(pl.Path(exp_obj["batch"]).resolve()) + + self.pending_instances = instances + self.__update_file() + + + def __set_file(self): + if self.experiment == None: + self.file = None + else: + exp_path = pl.Path(self.experiment) + self.file = exp_path.parent / (exp_path.stem + ".plan") + + def __load(self): + self.pending_instances = [] + self.assigned_instances = [] + + if not self.file.is_file(): + return + + with open(self.file, "r") as pfile: + content = json.loads(pfile.read()) + + if "assigned" in content: + self.assigned_instances = content["assigned"] + + if "pending" in content: + self.pending_instances = content["pending"] + + def __is_finished(self): + return False if self.file.is_file() else True + + def next(self): + + with self.__lock: + self.__load() + + if len(self.pending_instances) == 0: + return None + + next_instance = self.pending_instances.pop() + self.assigned_instances.append(next_instance) + + self.__update_file() + + return next_instance + + def done_with(self, instance): + + with self.__lock: + self.__load() + + if instance in self.assigned_instances: + self.assigned_instances.remove(instance) + + self.__update_file() + + def __update_file(self): + content = {} + + if len(self.assigned_instances) > 0: + content["assigned"] = list(map(str, self.assigned_instances)) + + if len(self.pending_instances) > 0: + content["pending"] = list(map(str, self.pending_instances)) + + if content: + with open(self.file, "w") as pfile: + pfile.write(json.dumps(content)) + + elif self.file.is_file(): + self.file.unlink() + + def __del__(self): + + with self.__lock: + self.__load() + + self.pending_instances.extend(self.assigned_instances) + self.assigned_instances = [] + + self.__update_file()