Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 53 additions & 34 deletions imas/ids_convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,16 +274,7 @@ def get_old_path(path: str, previous_name: str) -> str:
self.version_old,
)
elif self._check_data_type(old_item, new_item):
# use class helper to register simple renames and
# reciprocal mappings
self._add_rename(old_path, new_path)
if old_item.get("data_type") in DDVersionMap.STRUCTURE_TYPES:
# Add entries for common sub-elements
for path in old_paths:
if path.startswith(old_path):
npath = path.replace(old_path, new_path, 1)
if npath in new_path_set:
self._add_rename(path, npath)
elif nbc_description == "type_changed":
pass # We will handle this (if possible) in self._check_data_type
elif nbc_description == "repeat_children_first_point":
Expand Down Expand Up @@ -334,28 +325,40 @@ def get_old_path(path: str, previous_name: str) -> str:
# Additional conversion rules for DDv3 to DDv4
if self.version_old.major == 3 and new_version and new_version.major == 4:
self._apply_3to4_conversion(old, new)
# 3to4 rules may have introduced additional missing items in self.old_to_new
self._map_missing(
False, old_path_set.difference(new_path_set, self.old_to_new)
)

def _add_rename(self, old_path: str, new_path: str) -> None:
def _add_rename(
self, old_path: str, new_path: str, reciprocal: bool = True
) -> None:
"""Register a simple rename from old_path -> new_path using the
path->Element maps stored on the instance (self.old_paths/self.new_paths).
This will also add the reciprocal mapping when possible.
"""
old_item = self.old_paths[old_path]
new_item = self.new_paths[new_path]

# forward mapping
# Forward mapping
self.old_to_new[old_path] = (
new_path,
_get_tbp(new_item, self.new_paths),
_get_ctxpath(new_path, self.new_paths),
)

# reciprocal mapping
self.new_to_old[new_path] = (
old_path,
_get_tbp(old_item, self.old_paths),
_get_ctxpath(old_path, self.old_paths),
)
# Reciprocal mapping
if reciprocal:
self.new_to_old[new_path] = (
old_path,
_get_tbp(old_item, self.old_paths),
_get_ctxpath(old_path, self.old_paths),
)
# Apply to descendent nodes as well if the item is a struct or AoS
for item in old_item.findall("field"):
path = item.get("path")
assert path is not None and path.startswith(old_path)
npath = path.replace(old_path, new_path, 1)
if npath in self.new_paths:
self._add_rename(path, npath, reciprocal)

def _apply_3to4_conversion(self, old: Element, new: Element) -> None:
# Postprocessing for COCOS definition change:
Expand Down Expand Up @@ -421,6 +424,13 @@ def _apply_3to4_conversion(self, old: Element, new: Element) -> None:
to_update[p] = v
self.old_to_new.path.update(to_update)

# Migrate additional obsolescent nodes
# TODO: define migrations in a separate variable (as with the sign flips)?
if self.ids_name == "magnetics":
self._add_rename("bpol_probe", "b_field_pol_probe", reciprocal=False)
self._add_rename("method", "ip", reciprocal=False)
self.old_to_new.type_change["method"] = _magnetics_method_to_ip

# GH#59: To improve further the conversion of DD3 to DD4, especially the
# Machine Description part of the IDSs, we would like to add a 3to4 specific
# rule to convert any siblings name + identifier (that are not part of an
Expand All @@ -431,19 +441,20 @@ def _apply_3to4_conversion(self, old: Element, new: Element) -> None:
# Only perform the mapping if the corresponding target fields exist in the
# new DD and if we don't already have a mapping for the involved paths.
# use self.old_paths and self.new_paths set in _build_map
for p in self.old_paths:
for name_path in self.old_paths:
# look for name children
if not p.endswith("/name"):
if not name_path.endswith("/name"):
continue
parent = p.rsplit("/", 1)[0]
name_path = f"{parent}/name"
parent = name_path.rsplit("/", 1)[0]
id_path = f"{parent}/identifier"
index_path = f"{parent}/index"
desc_path = f"{parent}/description"
new_name_path = name_path
# Follow renames of parent structure
new_parent = self.old_to_new.path.get(parent) or parent
desc_path = f"{new_parent}/description"
new_name_path = f"{new_parent}/name"

# If neither 'name' nor 'identifier' existed in the old DD, skip this parent
if name_path not in self.old_paths or id_path not in self.old_paths:
# If 'identifier' doesn't exist in the old DD, skip this parent
if id_path not in self.old_paths:
continue
# exclude identifier-structure (has index sibling)
if index_path in self.old_paths:
Expand All @@ -454,14 +465,11 @@ def _apply_3to4_conversion(self, old: Element, new: Element) -> None:
continue

# Map DD3 name -> DD4 description
if name_path not in self.old_to_new.path:
self._add_rename(name_path, desc_path)
# GH#114: Also preserve name in DD4 name when identifier is empty
self.old_to_new.type_change[name_path] = _name_identifier_3to4

self._add_rename(name_path, desc_path)
# GH#114: Also preserve name in DD4 name when identifier is empty
self.old_to_new.type_change[name_path] = _name_identifier_3to4
# Map DD3 identifier -> DD4 name
if id_path in self.old_to_new.path:
self._add_rename(id_path, new_name_path)
self._add_rename(id_path, new_name_path)

def _map_missing(self, is_new: bool, missing_paths: Set[str]):
rename_map = self.new_to_old if is_new else self.old_to_new
Expand Down Expand Up @@ -1329,3 +1337,14 @@ def _equilibrium_boundary_3to4(eq3: IDSToplevel, eq4: IDSToplevel, deepcopy: boo
node[2].psi = -ts3.boundary_secondary_separatrix.psi # COCOS change
node[2].levelset.r = copy(ts3.boundary_secondary_separatrix.outline.r)
node[2].levelset.z = copy(ts3.boundary_secondary_separatrix.outline.z)


def _magnetics_method_to_ip(method: IDSBase, ip: IDSBase) -> None:
"""Convert obsolescent method(:) to ip(:) in the magnetics IDS."""
if not len(method):
return
ip.resize(len(method))
for old_item, new_item in zip(method, ip, strict=True):
new_item.method_name.value = old_item.name.value
new_item.data.value = old_item.ip.data.value
new_item.time.value = old_item.ip.time.value
49 changes: 49 additions & 0 deletions imas/test/test_ids_convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,55 @@ def test_3to4_cocos_magnetics_workaround(dd4factory):
compare_children(mag, mag3)


def test_3to4_deprecated_magnetics(dd4factory):
# Test migrating deprecated bpol_probe
mag = IDSFactory("3.39.0").magnetics()
mag.bpol_probe.resize(2)
mag.bpol_probe[0].name = "name1"
mag.bpol_probe[0].identifier = "identifier1"
mag.bpol_probe[0].position.r = 1
mag.bpol_probe[0].field.data = [0.1, 0.2, 0.3]
mag.bpol_probe[1].name = "name2"
mag.bpol_probe[1].voltage.data = [0.1, 0.2, 0.3]

mag.method.resize(2)
for i, method in enumerate(mag.method):
method.name = f"name{i}"
method.ip.data = [i, 1.0, 2.0]
method.ip.time = [i + 1, 2.0, 3.0]

mag4 = convert_ids(mag, None, factory=dd4factory)
assert len(mag4.b_field_pol_probe) == 2
assert mag4.b_field_pol_probe[0].name == "identifier1"
assert mag4.b_field_pol_probe[0].description == "name1"
assert mag4.b_field_pol_probe[0].position.r == 1
assert array_equal(mag4.b_field_pol_probe[0].field.data, [0.1, 0.2, 0.3])
assert mag4.b_field_pol_probe[1].name == "name2"
assert mag4.b_field_pol_probe[1].description == "name2"
assert array_equal(mag4.b_field_pol_probe[1].voltage.data, [0.1, 0.2, 0.3])

assert len(mag4.ip) == 2
assert mag4.ip[0].method_name == "name0"
assert array_equal(mag4.ip[0].data, [0.0, 1.0, 2.0])
assert array_equal(mag4.ip[0].time, [1.0, 2.0, 3.0])
assert mag4.ip[1].method_name == "name1"
assert array_equal(mag4.ip[1].data, [1.0, 1.0, 2.0])
assert array_equal(mag4.ip[1].time, [2.0, 2.0, 3.0])

# If both the deprecated and the "correct" quantity exist, we expect only the
# correct one to be converted to DD4:
mag.b_field_pol_probe.resize(1)
mag.b_field_pol_probe[0].name = "test"
mag.ip.resize(1)
mag.ip[0].method_name = "ip"

mag4 = convert_ids(mag, None, factory=dd4factory)
assert len(mag4.b_field_pol_probe) == 1
assert mag4.b_field_pol_probe[0].name == "test"
assert len(mag4.ip) == 1
assert mag4.ip[0].method_name == "ip"


def test_3to4_pulse_schedule():
ps = IDSFactory("3.39.0").pulse_schedule()
ps.ids_properties.homogeneous_time = IDS_TIME_MODE_HETEROGENEOUS
Expand Down
Loading