diff --git a/alma/batch.py b/alma/batch.py index 505af96..2e29ac4 100644 --- a/alma/batch.py +++ b/alma/batch.py @@ -13,7 +13,7 @@ def load(batch_file): def __load_batch_obj(batch_obj): - paths = [] + instances = [] instance_dir = None @@ -27,16 +27,26 @@ def __load_batch_obj(batch_obj): if "instances" in batch_obj: for instance in batch_obj["instances"]: - paths.append(pl.Path(instance_dir, instance)) + file_path = "" + instance_data = {} + + if "file" in instance: + file_path = pl.Path(instance_dir, instance["file"]).resolve() + + if "data" in instance: + instance_data = instance["data"] + + instances.append({"file": file_path, "data": instance_data}) if "batches" in batch_obj: for batch in batch_obj["batches"]: batch["base_path"] = instance_dir - paths.extend(__load_batch_obj(batch)) + instances.extend(__load_batch_obj(batch)) if "include" in batch_obj: for batch_file in batch_obj["include"]: - paths.extend(load(pl.Path(instance_dir, batch_file))) + instances.extend(load(pl.Path(instance_dir, batch_file))) + + return instances - return paths diff --git a/alma/experiment.py b/alma/experiment.py index 402eae8..07446ce 100644 --- a/alma/experiment.py +++ b/alma/experiment.py @@ -101,7 +101,7 @@ class Worker (multiprocessing.Process): def run(self): instance = self.__exp_plan.next() - + print(instance) while instance != None: instance_state = self.__exp_plan.load_instance_state(instance) diff --git a/alma/plan.py b/alma/plan.py index 4cbfccf..58af20c 100644 --- a/alma/plan.py +++ b/alma/plan.py @@ -13,7 +13,9 @@ class Plan: self.pending_instances = [] self.assigned_instances = [] self.instance_states = {} - + + self.__instance_id_counter = 0 + self.__lock = threading.Lock() if lock == None else lock if experiment: @@ -44,7 +46,7 @@ class Plan: with open(self.experiment, "r") as expf: exp_obj = json.loads(expf.read()) - instances = batch.load(pl.Path(exp_obj["batch"]).resolve()) + instances = batch.load(pl.Path(exp_obj["batch"])) if iterations_left == None: if "iterations" in exp_obj: @@ -57,7 +59,7 @@ class Plan: content["iterations_left"] = iterations_left return content - + def __set_file(self): if self.experiment == None: @@ -78,6 +80,8 @@ class Plan: if "assigned" in content: self.assigned_instances = content["assigned"] + + self.__instance_id_counter = max(map(lambda i: i["id"], self.assigned_instances)) + 1 if "pending" in content: self.pending_instances = content["pending"] @@ -105,6 +109,9 @@ class Plan: return None next_instance = self.pending_instances.pop() + next_instance["id"] = self.__instance_id_counter + self.__instance_id_counter += 1 + self.assigned_instances.append(next_instance) self.__update_file() @@ -116,11 +123,11 @@ class Plan: with self.__lock: self.__load() - if instance in self.assigned_instances: - self.assigned_instances.remove(instance) + self.assigned_instances = list(filter(lambda i: i["id"] != instance["id"], + self.assigned_instances )) - if instance in self.instance_states: - self.instance_states.pop(instance) + if str(instance["id"]) in self.instance_states: + self.instance_states.pop(str(instance["id"])) self.__update_file() @@ -158,11 +165,6 @@ class Plan: self.__write_content(content) def __write_content(self, content): - if "assigned" in content: - content["assigned"][:] = map(str, content["assigned"]) - - if "pending" in content: - content["pending"][:] = map(str, content["pending"]) with open(self.file, "w") as pfile: pfile.write(json.dumps(content)) @@ -172,7 +174,7 @@ class Plan: with self.__lock: self.__load() - self.instance_states[instance] = data + self.instance_states[str(instance["id"])] = data self.__update_file() @@ -181,8 +183,8 @@ class Plan: with self.__lock: self.__load() - if instance in self.instance_states: - return self.instance_states[instance] + if str(instance["id"]) in self.instance_states: + return self.instance_states[str(instance["id"])] else: return ""