"""
This module contains the utility functions for the datamodels sub-package. Mainly
the open/factory function for creating datamodels
"""
import warnings
from collections.abc import Mapping
from pathlib import Path
import asdf
import numpy as np
from roman_datamodels import validate
from ._core import MODEL_REGISTRY, DataModel
__all__ = ["FilenameMismatchWarning", "rdm_open"]
[docs]
class FilenameMismatchWarning(UserWarning):
"""
Warning when the filename in the meta attribute does not match the filename
of the file being opened.
"""
def _node_update(to_node, from_node, extras=None, extras_key=None, ignore=None):
"""Copy node contents from an existing node to another existing node
How the copy occurs depends on existence of keys in `to_node`
If key exists in `to_node`, contents are converted from `from_node` stnode type to
the stnode type expected in order to preserve validation of the node.
If key only exists in `from_node`, the contents are copied as-is.
If key exists in the list `extras`, the contents are placed in the dict `["extras"]`.
if `extras_key` is given, then the sub-dictionary `["extras"][extras_key]` is used.
Extra keys are used to avoid collisions between node trees where the underlying structures are
completely different.
Keys in `ignore` are not considered.
Keys are also
Parameters
----------
to_node : stnode
Node to receive the contents.
from_node : stnode, DataModel
Node to copy from
extras : [str[,...]]
Keys that may create collisions between the two node trees. All such keys are placed
in the `extras` key. If `extras_key` is defined, the contents are placed in a subdict
of that name.
extras_key : str or None
See parameter `extras`.
ignore : list-like or None
Keys that should be completely ignored.
"""
# Define utilities functions
def _descend(attributes, key):
next_attributes = list()
for item in attributes:
level, _, name = item.partition(".")
if level == key and name:
next_attributes.append(name)
return next_attributes
def _traverse(to_node, from_node, extras=None, ignore=None):
if extras is None:
extras = tuple()
new_extras = dict()
if ignore is None:
ignore = tuple()
for key in from_node.keys():
if key in ignore:
continue
if key in extras:
new_extras[key] = from_node[key]
continue
if key in to_node:
if isinstance(to_node[key], Mapping):
next_extras = _descend(extras, key)
next_ignores = _descend(ignore, key)
returned_extras = _traverse(
getattr(to_node, key), getattr(from_node, key), extras=next_extras, ignore=next_ignores
)
if returned_extras:
new_extras[key] = returned_extras
else:
if isinstance(to_node[key], list):
value = getattr(from_node, key).data
elif isinstance(to_node[key], np.ndarray):
value = getattr(from_node, key).astype(to_node[key].dtype)
value = getattr(value, "value", value)
else:
value = getattr(from_node, key)
setattr(to_node, key, value)
else:
to_node[key] = from_node[key]
return new_extras
# Now do the copy.
new_extras = _traverse(to_node, from_node, extras=extras, ignore=ignore)
if new_extras:
extras_node = to_node.get("extras", dict())
if extras_key:
extras_node[extras_key] = new_extras
else:
extras_node.update(new_extras)
to_node["extras"] = extras_node
def _open_asdf(init, lazy_tree=True, **kwargs):
"""
Open init with `asdf.open`.
If init is a path-like object the ``roman.meta.filename`` attribute
will be checked against ``Path.name`` and updated if they does not match.
Parameters
----------
init : str, ``Path`` or file-like
An object that can be opened by `asdf.open`
lazy_tree : bool
If we should open the file with a "lazy tree"
**kwargs:
Any additional arguments to pass to asdf.open
Returns
-------
`asdf.AsdfFile`
"""
# asdf defaults to lazy_tree=False, this overwrites it to
# lazy_tree=True for roman_datamodels
kwargs["lazy_tree"] = lazy_tree
if isinstance(init, str):
path = Path(init)
elif isinstance(init, Path):
path = init
else:
path = None
try:
asdf_file = asdf.open(init, **kwargs)
except ValueError as err:
raise TypeError("Open requires a filepath, file-like object, or Roman datamodel") from err
if (
path is not None
and "roman" in asdf_file
and isinstance(asdf_file["roman"], Mapping) # Fix issue for Python 3.10
and "meta" in asdf_file["roman"]
and "filename" in asdf_file["roman"]["meta"]
and asdf_file["roman"]["meta"]["filename"] != path.name
):
warnings.warn(
f"meta.filename: {asdf_file['roman']['meta']['filename']} does not match filename: {path.name}, updating the filename in memory!",
FilenameMismatchWarning,
stacklevel=2,
)
asdf_file["roman"]["meta"]["filename"] = type(asdf_file["roman"]["meta"]["filename"])(path.name)
return asdf_file
def rdm_open(init, memmap=False, **kwargs):
"""
Datamodel open/create function.
This function opens a Roman datamodel from an asdf file or generates
the datamodel from an existing one.
Parameters
----------
init : str, ``Path``, `DataModel`, `asdf.AsdfFile`, file-like
May be any one of the following types:
- `asdf.AsdfFile` instance
- string or ``Path`` indicating the path to an ASDF file
- `DataModel` Roman data model instance
- file-like object compatible with `asdf.open`
memmap : bool
Open ASDF file binary data using memmap (default: False)
Returns
-------
`DataModel`
"""
if isinstance(init, str | Path):
if Path(init).suffix.lower() == ".json":
try:
from romancal.datamodels.library import ModelLibrary
return ModelLibrary(init)
except ImportError as err:
raise ImportError("Please install romancal to allow opening associations with roman_datamodels") from err
with validate.nuke_validation():
if isinstance(init, DataModel):
# Copy the object so it knows not to close here
return init.copy(deepcopy=False)
# Temp fix to catch JWST args before being passed to asdf open
kwargs.pop("asn_n_members", None)
asdf_file = init if isinstance(init, asdf.AsdfFile) else _open_asdf(init, memmap=memmap, **kwargs)
if (model_type := type(asdf_file.tree["roman"])) in MODEL_REGISTRY:
return MODEL_REGISTRY[model_type](asdf_file, **kwargs)
# Check if the datamodel is a GDPS datamodel
try:
import roman_gdps # noqa: F401
except ImportError as err:
asdf_file.close()
raise ImportError("Please install roman-gdps to allow opening GDPS datamodels") from err
# We assume at this point that an asdf file with `roman` key is a GDPS datamodel
if "roman" in asdf_file.tree:
return asdf_file.tree["roman"]
asdf_file.close()
raise TypeError(f"Unknown datamodel type: {model_type}")