Source code for sigtools._signatures

# sigtools - Collection of Python modules for manipulating function signatures
# Copyright (C) 2013-2022 Yann Kaiser
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.

import __future__
import abc
import sys
import types
from itertools import zip_longest
import itertools
import collections
from functools import partial
import typing
import warnings

import attr

from sigtools import _util


[docs]class UpgradedAnnotation(metaclass=abc.ABCMeta): """Represents an annotation, whether already evaluated, or deferred by :pep:`563`. """
[docs] @abc.abstractmethod def source_value(self): """Value of this annotation as would be evaluated at the site of its definition.""" raise NotImplementedError
[docs] @classmethod def upgrade(cls, raw_annotation, function, param_name, *, _stacklevel=0) -> 'UpgradedAnnotation': """Wraps a ``raw_annotation`` as found on ``function`` in an `~sigtools.signatures.UpgradedAnnotation`.""" if raw_annotation is UpgradedParameter.empty: return EmptyAnnotation if not function: warnings.warn( "No function provided when upgrading annotation", DeprecationWarning, stacklevel=_stacklevel + 1 ) return EmptyAnnotation has_feature = _is_co_flag_enabled(function) if has_feature is None: return EmptyAnnotation elif has_feature: return _PostponedAnnotation(raw_annotation, function) else: return _PreEvaluatedAnnotation(raw_annotation)
[docs] @classmethod def preevaluated(cls, value) -> 'UpgradedAnnotation': """Wraps an already-evaluated annotation value in an `~sigtools.signatures.UpgradedAnnotation`""" if value is UpgradedParameter.empty: return EmptyAnnotation return _PreEvaluatedAnnotation(value)
def __eq__(self, other): if isinstance(other, UpgradedAnnotation): return self.source_value() == other.source_value() return False
def _is_co_flag_enabled(obj): feature = getattr(__future__, "annotations", None) if not feature: return False mandatory_release = feature.getMandatoryRelease() if mandatory_release and sys.version_info >= mandatory_release: return True try: has_flag = obj.__code__.co_flags & feature.compiler_flag except AttributeError: return None else: return has_flag @attr.define(eq=False) class _PostponedAnnotation(UpgradedAnnotation): """An annotation whose evaluation was postponed per :PEP:`563`""" _raw_annotation: typing.Any _function: types.FunctionType def source_value(self): return eval(self._raw_annotation, self._function.__globals__, {}) @attr.define(eq=False) class _PreEvaluatedAnnotation(UpgradedAnnotation): """An annotation that did not go through postponed evaluation""" _annotation: typing.Any def source_value(self): return self._annotation class _EmptyAnnotation(UpgradedAnnotation): """An annotation that was not supplied""" def source_value(self): return UpgradedParameter.empty def __repr__(self): return "EmptyAnnotation" EmptyAnnotation: UpgradedAnnotation = _EmptyAnnotation()
[docs]class UpgradedSignature(_util.funcsigs.Signature): """A `~inspect.Signature` augmented with parameter sources and upgraded annotations, as returned by `sigtools.signature` or `sigtools.signatures.signature` """ __slots__ = _util.funcsigs.Signature.__slots__ + ('sources', 'upgraded_return_annotation') def __init__(self, parameters=None, *args, upgraded_return_annotation=EmptyAnnotation, _stacklevel=0, **kwargs): self.sources = kwargs.pop('sources', {}) """ Sources of the signature's parameters. .. warning:: Interface is likely to change in `sigtools` 5.0. """ self.upgraded_return_annotation = upgraded_return_annotation """ Return annotation. :type: sigtools.signatures.UpgradedAnnotation """ parameters = _upgrade_parameters_with_warning(parameters, stacklevel=_stacklevel + 1) super(Signature, self).__init__(parameters, *args, **kwargs) @classmethod def _upgrade(cls, inst, function, sources, *, _stacklevel=0): """Upgrades an `inspect.Signature` given a function and soources""" if isinstance(inst, cls): return inst params = [ UpgradedParameter._upgrade(param, function, sources) for param in inst.parameters.values() ] return cls( params, return_annotation=inst.return_annotation, upgraded_return_annotation=UpgradedAnnotation.upgrade(inst.return_annotation, function, 'return'), sources=sources, _stacklevel=_stacklevel, ) @classmethod def _upgrade_with_warning(cls, inst, *, _stacklevel=0): if isinstance(inst, cls): return inst warnings.warn( "inspect.Signature instances passed to sigtools " "must be upgraded with sigtools.Signature.upgrade", DeprecationWarning, stacklevel=_stacklevel + 2, ) return cls._upgrade(inst, None, {})
[docs] def replace(self, *args, _stacklevel=0, **kwargs): try: sources = kwargs.pop('sources') except KeyError: sources = self.sources try: parameters = kwargs.pop('parameters') except KeyError: parameters = self.parameters.values() else: parameters = _upgrade_parameters_with_warning(parameters, stacklevel=_stacklevel + 1) try: upgraded_return_annotation = kwargs.pop("upgraded_return_annotation") except KeyError: upgraded_return_annotation = self.upgraded_return_annotation ret = super().replace(*args, parameters=parameters, **kwargs) assert isinstance(ret, type(self)) ret.sources = sources ret.upgraded_return_annotation = upgraded_return_annotation return ret
[docs] def evaluated(self): """Returns a copy of this Signature with annotations replaced by their evaluated counterparts""" return self.replace( parameters=[param.evaluated() for param in self.parameters.values()], return_annotation=self.upgraded_return_annotation.source_value(), )
def __eq__(self, other): if not super().__eq__(other): return False return self.upgraded_return_annotation == other.upgraded_return_annotation
Signature = UpgradedSignature
[docs]class UpgradedParameter(_util.funcsigs.Parameter): """A `~inspect.Parameter` augmented with parameter sources and upgraded annotations, as found on signatures returned by `sigtools.signature` or `sigtools.signatures.signature`. """ __slots__ = _util.funcsigs.Parameter.__slots__ + ('upgraded_annotation', '_function', 'sources', "source_depths") @classmethod def _upgrade(cls, inst, function, function_sources): if isinstance(inst, cls): return inst sources = function_sources.get(inst.name, []) source_depths = { func: depth for func, depth in function_sources.get("+depths", {}).items() if func in sources } return cls( name=inst.name, kind=inst.kind, default=inst.default, annotation=inst.annotation, upgraded_annotation=UpgradedAnnotation.upgrade(inst.annotation, function, inst.name), function=function, sources=sources, source_depths=source_depths, ) def __init__(self, *args, function=None, sources=[], source_depths={}, upgraded_annotation=EmptyAnnotation, **kwargs): super().__init__(*args, **kwargs) self._function = function self.sources = sources """ Sources of this parameter. .. warning:: Interface is likely to change in `sigtools` 5.0. """ self.source_depths = source_depths """ How deep was each of this parameter's sources found. .. warning:: Interface is likely to change in `sigtools` 5.0. """ self.upgraded_annotation = upgraded_annotation """Annotation of this parameter. :type: sigtools.signatures.UpgradedAnnotation """
[docs] def replace(self, function=_util.UNSET, sources=_util.UNSET, source_depths=_util.UNSET, upgraded_annotation=_util.UNSET, **kwargs): function = self._function if function is _util.UNSET else function sources = self.sources if sources is _util.UNSET else sources source_depths = self.source_depths if source_depths is _util.UNSET else source_depths upgraded_annotation = self.upgraded_annotation if upgraded_annotation is _util.UNSET else upgraded_annotation ret = super().replace(**kwargs) assert isinstance(ret, type(self)) ret._function = function ret.sources = sources ret.source_depths = source_depths ret.upgraded_annotation = upgraded_annotation return ret
[docs] def evaluated(self): """Returns a copy of this Parameter with annotations replaced by their evaluated counterparts""" return self.replace(annotation=self.upgraded_annotation.source_value())
def __eq__(self, other): if not super().__eq__(other): return False return self.upgraded_annotation == other.upgraded_annotation
def _upgrade_parameters_with_warning(parameters, stacklevel=1): if parameters is None: return None if all(isinstance(param, UpgradedParameter) for param in parameters): return parameters else: warnings.warn( "inspect.Signature and Parameter instances " "passed to sigtools should be upgraded " "to sigtools.UpgradedSignature and UpgradedParameter", DeprecationWarning, stacklevel=2 + stacklevel ) return [ UpgradedParameter._upgrade(param, function=None, function_sources={}) for param in parameters ] def default_sources(sig, obj): srcs = dict((pname, [obj]) for pname in sig.parameters) srcs['+depths'] = {obj: 0} return srcs def set_default_sources(sig, obj): """Assigns the source of every parameter of sig to obj""" return Signature._upgrade(sig, obj, default_sources(sig, obj))
[docs]def signature(obj): """Retrieves to unmodified signature from ``obj``, without taking `sigtools.specifiers` decorators into account or attempting automatic signature discovery. For these features, use `sigtools.signature`. """ if isinstance(obj, partial): sig = _util.funcsigs.signature(obj.func) sig = set_default_sources(sig, obj.func) return _mask(sig, len(obj.args), False, False, False, False, obj.keywords or {}, obj) sig =_util.funcsigs.signature(obj) return set_default_sources(sig, obj)
def copy_sources(src, func_swap={}, increase=False): ret = dict( (k, [func_swap.get(f, f) for f in v]) for k, v in src.items() if k != '+depths') ret['+depths'] = dict( (func_swap.get(f, f), v + increase) for f, v in src.get('+depths', {}).items()) return ret SortedParameters = collections.namedtuple( 'SortedParameters', 'posargs pokargs varargs kwoargs varkwargs sources')
[docs]def sort_params(sig, sources=False, _stacklevel=0): """Classifies the parameters from sig. :param UpgradedSignature sig: The signature to operate on :returns: A tuple ``(posargs, pokargs, varargs, kwoargs, varkwas)`` :rtype: ``(list, list, Parameter or None, dict, Parameter or None)`` :: >>> from sigtools import signatures, support >>> from pprint import pprint >>> pprint(signatures.sort_params(support.s('a, /, b, *args, c, d'))) ([<Parameter at 0x7fdda4e89418 'a'>], [<Parameter at 0x7fdda4e89470 'b'>], <Parameter at 0x7fdda4e89c58 'args'>, {'c': <Parameter at 0x7fdda4e89c00 'c'>, 'd': <Parameter at 0x7fdda4e89db8 'd'>}, None) """ sig = UpgradedSignature._upgrade_with_warning(sig, _stacklevel=_stacklevel + 1) assert isinstance(sig, UpgradedSignature), "signature must be upgraded" posargs = [] pokargs = [] varargs = None kwoargs = _util.OrderedDict() varkwas = None for param in sig.parameters.values(): if param.kind == param.POSITIONAL_ONLY: posargs.append(param) elif param.kind == param.POSITIONAL_OR_KEYWORD: pokargs.append(param) elif param.kind == param.VAR_POSITIONAL: varargs = param elif param.kind == param.KEYWORD_ONLY: kwoargs[param.name] = param elif param.kind == param.VAR_KEYWORD: varkwas = param else: raise AssertionError('Unknown param kind {0}'.format(param.kind)) if sources: src = getattr(sig, 'sources', {}) return SortedParameters(posargs, pokargs, varargs, kwoargs, varkwas, copy_sources(src)) else: return posargs, pokargs, varargs, kwoargs, varkwas
[docs]def apply_params(sig, posargs, pokargs, varargs, kwoargs, varkwargs, sources=None, function=None, *, _stacklevel=0): """Reverses `sort_params`'s operation. :returns: A new `inspect.Signature` object based off sig, with the given parameters. """ sig = UpgradedSignature._upgrade_with_warning(sig, _stacklevel=_stacklevel + 1) parameters = [] parameters.extend(posargs) parameters.extend(pokargs) if varargs: parameters.append(varargs) parameters.extend(kwoargs.values()) if varkwargs: parameters.append(varkwargs) sig = sig.replace(parameters=parameters, _stacklevel=_stacklevel + 1) if sources is not None: sig = Signature._upgrade(sig, function, sources, _stacklevel=1) sig.sources = sources return sig
[docs]class IncompatibleSignatures(ValueError): """Raised when two or more signatures are incompatible for the requested operation. :ivar inspect.Signature sig: The signature at which point the incompatibility was discovered :ivar others: The signatures up until ``sig`` """ def __init__(self, sig, others): self.sig = sig self.others = others def __str__(self): return '{0} {1}'.format( ' '.join(str(sig) for sig in self.others), self.sig, )
def _add_sources(ret_src, name, *from_sources): target = ret_src.setdefault(name, []) target.extend(itertools.chain.from_iterable( src.get(name, ()) for src in from_sources)) def _add_all_sources(ret_src, params, from_source): """Adds the sources from from_source of all given parameters into the lhs sources multidict""" for param in params: ret_src.setdefault(param.name, []).extend( from_source.get(param.name, ())) def _exclude_from_seq(seq, el): for i, x in enumerate(seq): if el is x: seq[i] = None break def merge_depths(l, r): ret = dict(l) for func, depth in r.items(): if func in ret and depth > ret[func]: continue ret[func] = depth return ret class _Merger(object): def __init__(self, left, right): self.l = left self.r = right self.performed = False def perform_once(self): self.performed = True self._merge() def __iter__(self): self.perform_once() ret = ( self.posargs, self.pokargs, self.varargs, self.kwoargs, self.varkwargs, self.src) return iter(ret) def _merge(self): self.posargs = [] self.pokargs = [] self.varargs_src = [self.l.varargs, self.r.varargs] self.kwoargs = _util.OrderedDict() self.varkwargs_src = [ self.l.varkwargs, self.r.varkwargs ] self.src = {'+depths': self._merge_depths()} self.l_unmatched_kwoargs = _util.OrderedDict() for param in self.l.kwoargs.values(): name = param.name if name in self.r.kwoargs: self.kwoargs[name] = self._concile_meta( param, self.r.kwoargs[name]) self.src[name] = list(itertools.chain( self.l.sources.get(name, ()), self.r.sources.get(name, ()))) else: self.l_unmatched_kwoargs[param.name] = param self.r_unmatched_kwoargs = _util.OrderedDict() for param in self.r.kwoargs.values(): if param.name not in self.l.kwoargs: self.r_unmatched_kwoargs[param.name] = param il_pokargs = iter(self.l.pokargs) ir_pokargs = iter(self.r.pokargs) for l_param, r_param in zip_longest(self.l.posargs, self.r.posargs): if l_param and r_param: p = self._concile_meta(l_param, r_param) self.posargs.append(p) if l_param.name == r_param.name: _add_sources(self.src, l_param.name, self.l.sources, self.r.sources) else: _add_sources(self.src, l_param.name, self.l.sources) else: if l_param: self._merge_unbalanced_pos( l_param, self.l.sources, ir_pokargs, self.r.varargs, self.r.sources) else: self._merge_unbalanced_pos( r_param, self.r.sources, il_pokargs, self.l.varargs, self.l.sources) for l_param, r_param in zip_longest(il_pokargs, ir_pokargs): if l_param and r_param: if l_param.name == r_param.name: self.pokargs.append(self._concile_meta(l_param, r_param)) _add_sources(self.src, l_param.name, self.l.sources, self.r.sources) else: for i, pokarg in enumerate(self.pokargs): self.pokargs[i] = pokarg.replace( kind=pokarg.POSITIONAL_ONLY) self.pokargs.append( self._concile_meta(l_param, r_param) .replace(kind=l_param.POSITIONAL_ONLY)) _add_sources(self.src, l_param.name, self.l.sources) else: if l_param: self._merge_unbalanced_pok( l_param, self.l.sources, self.r.varargs, self.r.varkwargs, self.r_unmatched_kwoargs, self.r.sources) else: self._merge_unbalanced_pok( r_param, self.r.sources, self.l.varargs, self.l.varkwargs, self.l_unmatched_kwoargs, self.l.sources) if self.l_unmatched_kwoargs: self._merge_unmatched_kwoargs( self.l_unmatched_kwoargs, self.r.varkwargs, self.l.sources) if self.r_unmatched_kwoargs: self._merge_unmatched_kwoargs( self.r_unmatched_kwoargs, self.l.varkwargs, self.r.sources) self.varargs = self._add_starargs( self.varargs_src, self.l.varargs, self.r.varargs) self.varkwargs = self._add_starargs( self.varkwargs_src, self.l.varkwargs, self.r.varkwargs) def _merge_depths(self): return merge_depths(self.l.sources.get('+depths', {}), self.r.sources.get('+depths', {})) def _add_starargs(self, which, left, right): if not left or not right: return None if all(which): ret = self._concile_meta(left, right) if left.name == right.name: _add_sources(self.src, ret.name, self.l.sources, self.r.sources) else: _add_sources(self.src, ret.name, self.l.sources) elif which[0]: ret = left _add_sources(self.src, ret.name, self.l.sources) else: ret = right _add_sources(self.src, ret.name, self.r.sources) return ret def _merge_unbalanced_pos(self, existing, src, convert_from, o_varargs, o_src): try: other = next(convert_from) except StopIteration: if o_varargs: self.posargs.append(existing) _add_sources(self.src, existing.name, src) _exclude_from_seq(self.varargs_src, o_varargs) elif existing.default == existing.empty: raise ValueError('Unmatched positional parameter: {0}' .format(existing)) else: self.posargs.append(self._concile_meta(existing, other)) _add_sources(self.src, existing.name, src) def _merge_unbalanced_pok( self, existing, src, o_varargs, o_varkwargs, o_kwargs_limbo, o_src): """tries to insert positional-or-keyword parameters for which there were no matched positional parameter""" if existing.name in o_kwargs_limbo: self.kwoargs[existing.name] = self._concile_meta( existing, o_kwargs_limbo.pop(existing.name) ).replace(kind=existing.KEYWORD_ONLY) _add_sources(self.src, existing.name, o_src, src) elif o_varargs and o_varkwargs: self.pokargs.append(existing) _add_sources(self.src, existing.name, src) elif o_varkwargs: # convert to keyword argument self.kwoargs[existing.name] = existing.replace( kind=existing.KEYWORD_ONLY) _add_sources(self.src, existing.name, src) elif o_varargs: # convert along with all preceeding to positional args self.posargs.extend( a.replace(kind=a.POSITIONAL_ONLY) for a in self.pokargs) self.pokargs[:] = [] self.posargs.append(existing.replace(kind=existing.POSITIONAL_ONLY)) _add_sources(self.src, existing.name, src) elif existing.default == existing.empty: raise ValueError('Unmatched regular parameter: {0}' .format(existing)) def _merge_unmatched_kwoargs(self, unmatched_kwoargs, o_varkwargs, from_src): if o_varkwargs: self.kwoargs.update(unmatched_kwoargs) _add_all_sources(self.src, unmatched_kwoargs.values(), from_src) _exclude_from_seq(self.varkwargs_src, o_varkwargs) else: non_defaulted = [ arg for arg in unmatched_kwoargs.values() if arg.default == arg.empty ] if non_defaulted: raise ValueError( 'Unmatched keyword parameters: {0}'.format( ' '.join(str(arg) for arg in non_defaulted))) def _concile_meta(self, left, right): default = left.empty if left.default != left.empty and right.default != right.empty: if left.default == right.default: default = left.default else: # The defaults are different. Short of using an "It's complicated" # constant, None is the best replacement available, as a lot of # python code already uses None as default then processes an # actual default in the function body default = None annotation = left.empty upgraded_annotation = EmptyAnnotation if left.annotation != left.empty and right.annotation != right.empty: if left.annotation == right.annotation: annotation = left.annotation upgraded_annotation = left.upgraded_annotation elif left.annotation != left.empty: annotation = left.annotation upgraded_annotation = left.upgraded_annotation elif right.annotation != right.empty: annotation = right.annotation upgraded_annotation = right.upgraded_annotation return left.replace(default=default, annotation=annotation, upgraded_annotation=upgraded_annotation)
[docs]def merge(*signatures): """Tries to compute a signature for which a valid call would also validate the given signatures. It guarantees any call that conforms to the merged signature will conform to all the given signatures. However, some calls that don't conform to the merged signature may actually work on all the given ones regardless. :param sigtools.UpgradedSignature signatures: The signatures to merge together. :returns: a `inspect.Signature` object :raises: `IncompatibleSignatures` :: >>> from sigtools import signatures, support >>> print(signatures.merge( ... support.s('one, two, *args, **kwargs'), ... support.s('one, two, three, *, alpha, **kwargs'), ... support.s('one, *args, beta, **kwargs') ... )) (one, two, three, *, alpha, beta, **kwargs) The resulting signature does not necessarily validate all ways of conforming to the underlying signatures:: >>> from sigtools import signatures >>> from inspect import signature >>> >>> def left(alpha, *args, **kwargs): ... return alpha ... >>> def right(beta, *args, **kwargs): ... return beta ... >>> sig_left = signature(left) >>> sig_right = signature(right) >>> sig_merged = signatures.merge(sig_left, sig_right) >>> >>> print(sig_merged) (alpha, /, *args, **kwargs) >>> >>> kwargs = {'alpha': 'a', 'beta': 'b'} >>> left(**kwargs), right(**kwargs) # both functions accept the call ('a', 'b') >>> >>> sig_merged.bind(**kwargs) # the merged signature doesn't Traceback (most recent call last): File "<input>", line 1, in <module> File "/usr/lib64/python3.4/inspect.py", line 2642, in bind return args[0]._bind(args[1:], kwargs) File "/usr/lib64/python3.4/inspect.py", line 2542, in _bind raise TypeError(msg) from None TypeError: 'alpha' parameter is positional only, but was passed as a keyword """ assert signatures, "Expected at least one signature" ret = sort_params(signatures[0], sources=True, _stacklevel=1) for i, sig in enumerate(signatures[1:], 1): sorted_params = sort_params(sig, sources=True, _stacklevel=1) try: ret = SortedParameters(*_Merger(ret, sorted_params)) except ValueError: raise IncompatibleSignatures(sig, signatures[:i]) ret_sig = apply_params(signatures[0], *ret, _stacklevel=1) return ret_sig
def _check_no_dupes(collect, params): names = [param.name for param in params] dupes = collect.intersection(names) if dupes: raise ValueError('Duplicate parameter names: ' + ' '.join(dupes)) collect.update(names) def _clear_defaults(ita): for param in ita: yield param.replace(default=param.empty) def _embed(outer, inner, use_varargs=True, use_varkwargs=True, depth=1): o_posargs, o_pokargs, o_varargs, o_kwoargs, o_varkwargs, o_src = outer stars_sig = SortedParameters( [], [], use_varargs and o_varargs, {}, use_varkwargs and o_varkwargs, {}) i_posargs, i_pokargs, i_varargs, i_kwoargs, i_varkwargs, i_src = \ _Merger(inner, stars_sig) names = set() e_posargs = [] e_pokargs = [] e_kwoargs = _util.OrderedDict() e_posargs.extend(o_posargs) _check_no_dupes(names, o_posargs) if i_posargs: _check_no_dupes(names, o_pokargs) e_posargs.extend(arg.replace(kind=arg.POSITIONAL_ONLY) for arg in o_pokargs) if i_posargs[0].default is i_posargs[0].empty: e_posargs = list(_clear_defaults(e_posargs)) _check_no_dupes(names, i_posargs) e_posargs.extend(i_posargs) else: _check_no_dupes(names, o_pokargs) if i_pokargs and i_pokargs[0].default == i_pokargs[0].empty: e_posargs = list(_clear_defaults(e_posargs)) e_pokargs.extend(_clear_defaults(o_pokargs)) else: e_pokargs.extend(o_pokargs) _check_no_dupes(names, i_pokargs) e_pokargs.extend(i_pokargs) _check_no_dupes(names, o_kwoargs.values()) e_kwoargs.update(o_kwoargs) _check_no_dupes(names, i_kwoargs.values()) e_kwoargs.update(i_kwoargs) src = dict(i_src, **o_src) if o_varargs and use_varargs: src.pop(o_varargs.name, None) if o_varkwargs and use_varkwargs: src.pop(o_varkwargs.name, None) src['+depths'] = merge_depths( o_src.get('+depths', {}), dict((f, v+depth) for f, v in i_src.get('+depths', {}).items())) return ( e_posargs, e_pokargs, i_varargs if use_varargs else o_varargs, e_kwoargs, i_varkwargs if use_varkwargs else o_varkwargs, src )
[docs]def embed(*signatures, use_varargs=True, use_varkwargs=True, _stacklevel=0): """Embeds a signature within another's ``*args`` and ``**kwargs`` parameters, as if a function with the outer signature called a function with the inner signature with just ``f(*args, **kwargs)``. :param inspect.Signature signatures: The signatures to embed within one-another, outermost first. :param bool use_varargs: Make use of the ``*args``-like parameter. :param bool use_varkwargs: Make use of the ``*kwargs``-like parameter. :returns: a `inspect.Signature` object :raises: `IncompatibleSignatures` :: >>> from sigtools import signatures, support >>> print(signatures.embed( ... support.s('one, *args, **kwargs'), ... support.s('two, *args, kw, **kwargs'), ... support.s('last'), ... )) (one, two, last, *, kw) >>> # use signatures.mask() to remove self-like parameters >>> print(signatures.embed( ... support.s('self, *args, **kwargs'), ... signatures.mask( ... support.s('self, *args, keyword, **kwargs'), 1), ... )) (self, *args, keyword, **kwargs) """ assert signatures ret = sort_params(signatures[0], sources=True, _stacklevel=_stacklevel + 1) for i, sig in enumerate(signatures[1:], 1): try: ret = _embed(ret, sort_params(sig, sources=True, _stacklevel=_stacklevel + 1), use_varargs, use_varkwargs, i) except ValueError: raise IncompatibleSignatures(sig, signatures[:i]) return apply_params(signatures[0], *ret, _stacklevel=_stacklevel + 1)
def _pop_chain(*sequences): for sequence in sequences: while sequence: yield sequence.pop(0) def _remove_from_src(src, ita): for name in ita: src.pop(name, None) def _pnames(ita): for p in ita: yield p.name def _mask(sig, num_args, hide_args, hide_kwargs, hide_varargs, hide_varkwargs, named_args, partial_obj, *, _stacklevel=0): posargs, pokargs, varargs, kwoargs, varkwargs, src \ = sort_params(sig, sources=True, _stacklevel=_stacklevel + 1) pokargs_by_name = dict((p.name, p) for p in pokargs) consumed_names = set() if hide_args: consumed_names.update(p.name for p in posargs) consumed_names.update(p.name for p in pokargs) posargs = [] pokargs = [] elif num_args: consume = num_args for param in _pop_chain(posargs, pokargs): consume -= 1 consumed_names.add(param.name) if not consume: break else: if not varargs: raise ValueError( 'Signature cannot be passed {0} arguments: {1}' .format(num_args, sig)) _remove_from_src(src, consumed_names) if hide_args or hide_varargs: if varargs: src.pop(varargs.name, None) varargs = None partial_mode = partial_obj is not None if hide_kwargs: _remove_from_src(src, _pnames(pokargs)) _remove_from_src(src, kwoargs) pokargs = [] kwoargs = {} named_args = [] for kwarg_name in named_args: if kwarg_name in consumed_names: raise ValueError('Duplicate argument: {0!r}'.format(kwarg_name)) elif kwarg_name in pokargs_by_name: i = pokargs.index(pokargs_by_name[kwarg_name]) pokargs, param, conv_kwoargs = ( pokargs[:i], pokargs[i], pokargs[i+1:]) kwoargs.update( (p.name, p.replace(kind=p.KEYWORD_ONLY)) for p in conv_kwoargs) if partial_mode: kwoargs[param.name] = param.replace( kind=param.KEYWORD_ONLY, default=named_args[param.name]) else: src.pop(kwarg_name, None) if varargs: src.pop(varargs.name, None) varargs = None pokargs_by_name.clear() elif kwarg_name in kwoargs: if partial_mode: param = kwoargs[kwarg_name] kwoargs[kwarg_name] = param.replace( kind=param.KEYWORD_ONLY, default=named_args[kwarg_name]) else: src.pop(kwarg_name, None) kwoargs.pop(kwarg_name) elif not varkwargs: raise ValueError( 'Named parameter {0!r} not found in signature: {1}' .format(kwarg_name, sig)) elif partial_mode: kwoargs[kwarg_name] = UpgradedParameter( kwarg_name, _util.funcsigs.Parameter.KEYWORD_ONLY, default=named_args[kwarg_name]) src[kwarg_name] = [partial_obj] consumed_names.add(kwarg_name) if hide_kwargs or hide_varkwargs: if varkwargs: src.pop(varkwargs.name, None) varkwargs = None if partial_mode: src = copy_sources(src, increase=True) src['+depths'][partial_obj] = 0 ret = apply_params(sig, posargs, pokargs, varargs, kwoargs, varkwargs, src, _stacklevel=_stacklevel + 1) return ret
[docs]def mask(sig, num_args=0, *named_args, hide_args=False, hide_kwargs=False, hide_varargs=False, hide_varkwargs=False, _stacklevel=0 ): """Removes the given amount of positional parameters and the given named parameters from ``sig``. :param inspect.Signature sig: The signature to operate on :param int num_args: The amount of positional arguments passed :param str named_args: The names of named arguments passed :param hide_args: If true, mask all positional parameters :param hide_kwargs: If true, mask all keyword parameters :param hide_varargs: If true, mask the ``*args``-like parameter completely if present. :param hide_varkwargs: If true, mask the ``*kwargs``-like parameter completely if present. :return: a `inspect.Signature` object :raises: `ValueError` if the signature cannot handle the arguments to be passed. :: >>> from sigtools import signatures, support >>> print(signatures.mask(support.s('a, b, *, c, d'), 1, 'd')) (b, *, c) >>> print(signatures.mask(support.s('a, b, *args, c, d'), 3, 'd')) (*args, c) >>> print(signatures.mask(support.s('*args, c, d'), 2, 'd', hide_varargs=True)) (*, c) """ return _mask(sig, num_args, hide_args, hide_kwargs, hide_varargs, hide_varkwargs, named_args, None, _stacklevel=_stacklevel + 1)
[docs]def forwards(outer, inner, num_args=0, *named_args, hide_args=False, hide_kwargs=False, use_varargs=True, use_varkwargs=True, partial=False): """Calls `mask` on ``inner``, then returns the result of calling `embed` with ``outer`` and the result of `mask`. :param inspect.Signature outer: The outermost signature. :param inspect.Signature inner: The inner signature. :param bool partial: Set to `True` if the arguments are passed to ``partial(func_with_inner, *args, **kwargs)`` rather than ``func_with_inner``. ``use_varargs`` and ``use_varkwargs`` are the same parameters as in `.embed`, and ``num_args``, ``named_args``, ``hide_args`` and ``hide_kwargs`` are parameters of `.mask`. :return: the resulting `inspect.Signature` object :raises: `IncompatibleSignatures` :: >>> from sigtools import support, signatures >>> print(signatures.forwards( ... support.s('a, *args, x, **kwargs'), ... support.s('b, c, *, y, z'), ... 1, 'y')) (a, c, *, x, z) .. seealso:: :ref:`forwards-pick` """ if partial: params = [] for param in inner.parameters.values(): if param.kind in [param.VAR_POSITIONAL, param.VAR_KEYWORD]: params.append(param) else: params.append(param.replace(default=None)) inner = inner.replace(parameters=params) return embed( outer, mask(inner, num_args, *named_args, hide_args=hide_args, hide_kwargs=hide_kwargs, hide_varargs=False, hide_varkwargs=False, _stacklevel=1, ), use_varargs=use_varargs, use_varkwargs=use_varkwargs, _stacklevel=1, )