Browse Source

merge pass_data_struct_per_instance

master
Tom Krüger 11 months ago
parent
commit
efe5d19fa0
3 changed files with 33 additions and 21 deletions
  1. +15
    -5
      alma/batch.py
  2. +1
    -1
      alma/experiment.py
  3. +17
    -15
      alma/plan.py

+ 15
- 5
alma/batch.py View File

@ -13,7 +13,7 @@ def load(batch_file):
def __load_batch_obj(batch_obj): def __load_batch_obj(batch_obj):
paths = []
instances = []
instance_dir = None instance_dir = None
@ -27,16 +27,26 @@ def __load_batch_obj(batch_obj):
if "instances" in batch_obj: if "instances" in batch_obj:
for instance in batch_obj["instances"]: for instance in batch_obj["instances"]:
paths.append(pl.Path(instance_dir, instance))
file_path = ""
instance_data = {}
if "file" in instance:
file_path = pl.Path(instance_dir, instance["file"]).resolve()
if "data" in instance:
instance_data = instance["data"]
instances.append({"file": file_path, "data": instance_data})
if "batches" in batch_obj: if "batches" in batch_obj:
for batch in batch_obj["batches"]: for batch in batch_obj["batches"]:
batch["base_path"] = instance_dir batch["base_path"] = instance_dir
paths.extend(__load_batch_obj(batch))
instances.extend(__load_batch_obj(batch))
if "include" in batch_obj: if "include" in batch_obj:
for batch_file in batch_obj["include"]: for batch_file in batch_obj["include"]:
paths.extend(load(pl.Path(instance_dir, batch_file)))
instances.extend(load(pl.Path(instance_dir, batch_file)))
return instances
return paths

+ 1
- 1
alma/experiment.py View File

@ -101,7 +101,7 @@ class Worker (multiprocessing.Process):
def run(self): def run(self):
instance = self.__exp_plan.next() instance = self.__exp_plan.next()
print(instance)
while instance != None: while instance != None:
instance_state = self.__exp_plan.load_instance_state(instance) instance_state = self.__exp_plan.load_instance_state(instance)


+ 17
- 15
alma/plan.py View File

@ -13,7 +13,9 @@ class Plan:
self.pending_instances = [] self.pending_instances = []
self.assigned_instances = [] self.assigned_instances = []
self.instance_states = {} self.instance_states = {}
self.__instance_id_counter = 0
self.__lock = threading.Lock() if lock == None else lock self.__lock = threading.Lock() if lock == None else lock
if experiment: if experiment:
@ -44,7 +46,7 @@ class Plan:
with open(self.experiment, "r") as expf: with open(self.experiment, "r") as expf:
exp_obj = json.loads(expf.read()) exp_obj = json.loads(expf.read())
instances = batch.load(pl.Path(exp_obj["batch"]).resolve())
instances = batch.load(pl.Path(exp_obj["batch"]))
if iterations_left == None: if iterations_left == None:
if "iterations" in exp_obj: if "iterations" in exp_obj:
@ -57,7 +59,7 @@ class Plan:
content["iterations_left"] = iterations_left content["iterations_left"] = iterations_left
return content return content
def __set_file(self): def __set_file(self):
if self.experiment == None: if self.experiment == None:
@ -78,6 +80,8 @@ class Plan:
if "assigned" in content: if "assigned" in content:
self.assigned_instances = content["assigned"] self.assigned_instances = content["assigned"]
self.__instance_id_counter = max(map(lambda i: i["id"], self.assigned_instances)) + 1
if "pending" in content: if "pending" in content:
self.pending_instances = content["pending"] self.pending_instances = content["pending"]
@ -105,6 +109,9 @@ class Plan:
return None return None
next_instance = self.pending_instances.pop() next_instance = self.pending_instances.pop()
next_instance["id"] = self.__instance_id_counter
self.__instance_id_counter += 1
self.assigned_instances.append(next_instance) self.assigned_instances.append(next_instance)
self.__update_file() self.__update_file()
@ -116,11 +123,11 @@ class Plan:
with self.__lock: with self.__lock:
self.__load() self.__load()
if instance in self.assigned_instances:
self.assigned_instances.remove(instance)
self.assigned_instances = list(filter(lambda i: i["id"] != instance["id"],
self.assigned_instances ))
if instance in self.instance_states:
self.instance_states.pop(instance)
if str(instance["id"]) in self.instance_states:
self.instance_states.pop(str(instance["id"]))
self.__update_file() self.__update_file()
@ -158,11 +165,6 @@ class Plan:
self.__write_content(content) self.__write_content(content)
def __write_content(self, content): def __write_content(self, content):
if "assigned" in content:
content["assigned"][:] = map(str, content["assigned"])
if "pending" in content:
content["pending"][:] = map(str, content["pending"])
with open(self.file, "w") as pfile: with open(self.file, "w") as pfile:
pfile.write(json.dumps(content)) pfile.write(json.dumps(content))
@ -172,7 +174,7 @@ class Plan:
with self.__lock: with self.__lock:
self.__load() self.__load()
self.instance_states[instance] = data
self.instance_states[str(instance["id"])] = data
self.__update_file() self.__update_file()
@ -181,8 +183,8 @@ class Plan:
with self.__lock: with self.__lock:
self.__load() self.__load()
if instance in self.instance_states:
return self.instance_states[instance]
if str(instance["id"]) in self.instance_states:
return self.instance_states[str(instance["id"])]
else: else:
return "" return ""


Loading…
Cancel
Save