Source code for mario_addons.plugins.walking
"""Functions for walking a tree."""
import builtins
import collections.abc
import typing as t
import mario.interpret
[docs]def build_new(old, new):
"""Build a new instance of the old type, with new data."""
return type(old)(new)
[docs]def traverse_one_level_json(handle_item, tree):
"""Traverse one level of a json-like tree."""
if isinstance(tree, collections.abc.Sequence) and not isinstance(tree, str):
return build_new(tree, [handle_item(x) for i, x in enumerate(tree)])
if isinstance(tree, collections.abc.Mapping):
return build_new(tree, {k: handle_item(v) for k, v in tree.items()})
return tree
[docs]def build_walker(descend: t.Callable, handle_item: t.Callable) -> t.Callable:
"""Build a walker with specified functions."""
def walk_tree(data, *args, **kwargs):
recursed = descend(walk_tree, data, *args, **kwargs)
return handle_item(recursed, *args, **kwargs)
return walk_tree
[docs]def make_func(code: str, namespace: t.Dict[str, object]) -> t.Callable:
"""Make a transformer function."""
# pylint: disable=fixme
# TODO Use the parser from mario core.
if "x" not in code:
code += "(x)"
# pylint: disable=fixme
# TODO Move this earlier in the process so it only needs to be executed
# once, maybe with click.ParamType.
# pylint: disable=eval-used
return lambda x: eval(code, {**namespace, "x": x})
[docs]def prefixes(multi_attribute_access: str) -> t.List[str]:
"""Get prefixes of a namepsaced name.
>>> prefixes('a.b.c')
['a.b.c', 'a.b', 'a']
"""
split = multi_attribute_access.split(".")
out = []
while split:
out.append(".".join(split))
split.pop(-1)
return out
[docs]def get_type_object(namespaced_type_name: str) -> t.Type:
"""Turns a qualname into the corresponding object."""
if "." not in namespaced_type_name:
return getattr(builtins, namespaced_type_name)
name_to_module = mario.interpret.build_name_to_module(namespaced_type_name)
for module_name in prefixes(namespaced_type_name):
module = name_to_module.get(module_name)
if module is not None:
break
else:
raise ImportError(module_name)
obj = module
# pylint: disable=undefined-loop-variable
remainder = namespaced_type_name[len(module_name) :]
names = [name for name in remainder.split(".") if name]
while names:
obj = getattr(obj, names[0])
names.pop()
return obj
[docs]def build_mapping(pairs: t.Iterable[t.Tuple[str, str]]):
"""Build a type-to-transformer mapping."""
mapping = {}
for input_type, converter in pairs:
converter_namespace = mario.interpret.build_name_to_module(converter)
mapping[get_type_object(input_type)] = make_func(converter, converter_namespace)
return mapping
# class PairParam(click.ParamType):
# def __init__(self, left_type, right_type):
# self.left_type = left_type
# self.right_type = right_type
# def convert(self, value, param, ctx):
# left, right = value.split("=", maxsplit=1)
# return self.left_type.convert(left), self.right_type.convert(right)
# class EvalParam(click.ParamType):
# def convert(self, value, param, ctx):
# return make_func(value, mario.interpret.build_name_to_module(value))