| Viewing file:  test_overrides.py (25.47 KB)      -rw-r--r-- Select action/file-type:
 
  (+) |  (+) |  (+) | Code (+) | Session (+) |  (+) | SDB (+) |  (+) |  (+) |  (+) |  (+) |  (+) | 
 
import inspectimport sys
 import os
 import tempfile
 from io import StringIO
 from unittest import mock
 
 import numpy as np
 from numpy.testing import (
 assert_, assert_equal, assert_raises, assert_raises_regex)
 from numpy.core.overrides import (
 _get_implementing_args, array_function_dispatch,
 verify_matching_signatures)
 from numpy.compat import pickle
 import pytest
 
 
 def _return_not_implemented(self, *args, **kwargs):
 return NotImplemented
 
 
 # need to define this at the top level to test pickling
 @array_function_dispatch(lambda array: (array,))
 def dispatched_one_arg(array):
 """Docstring."""
 return 'original'
 
 
 @array_function_dispatch(lambda array1, array2: (array1, array2))
 def dispatched_two_arg(array1, array2):
 """Docstring."""
 return 'original'
 
 
 class TestGetImplementingArgs:
 
 def test_ndarray(self):
 array = np.array(1)
 
 args = _get_implementing_args([array])
 assert_equal(list(args), [array])
 
 args = _get_implementing_args([array, array])
 assert_equal(list(args), [array])
 
 args = _get_implementing_args([array, 1])
 assert_equal(list(args), [array])
 
 args = _get_implementing_args([1, array])
 assert_equal(list(args), [array])
 
 def test_ndarray_subclasses(self):
 
 class OverrideSub(np.ndarray):
 __array_function__ = _return_not_implemented
 
 class NoOverrideSub(np.ndarray):
 pass
 
 array = np.array(1).view(np.ndarray)
 override_sub = np.array(1).view(OverrideSub)
 no_override_sub = np.array(1).view(NoOverrideSub)
 
 args = _get_implementing_args([array, override_sub])
 assert_equal(list(args), [override_sub, array])
 
 args = _get_implementing_args([array, no_override_sub])
 assert_equal(list(args), [no_override_sub, array])
 
 args = _get_implementing_args(
 [override_sub, no_override_sub])
 assert_equal(list(args), [override_sub, no_override_sub])
 
 def test_ndarray_and_duck_array(self):
 
 class Other:
 __array_function__ = _return_not_implemented
 
 array = np.array(1)
 other = Other()
 
 args = _get_implementing_args([other, array])
 assert_equal(list(args), [other, array])
 
 args = _get_implementing_args([array, other])
 assert_equal(list(args), [array, other])
 
 def test_ndarray_subclass_and_duck_array(self):
 
 class OverrideSub(np.ndarray):
 __array_function__ = _return_not_implemented
 
 class Other:
 __array_function__ = _return_not_implemented
 
 array = np.array(1)
 subarray = np.array(1).view(OverrideSub)
 other = Other()
 
 assert_equal(_get_implementing_args([array, subarray, other]),
 [subarray, array, other])
 assert_equal(_get_implementing_args([array, other, subarray]),
 [subarray, array, other])
 
 def test_many_duck_arrays(self):
 
 class A:
 __array_function__ = _return_not_implemented
 
 class B(A):
 __array_function__ = _return_not_implemented
 
 class C(A):
 __array_function__ = _return_not_implemented
 
 class D:
 __array_function__ = _return_not_implemented
 
 a = A()
 b = B()
 c = C()
 d = D()
 
 assert_equal(_get_implementing_args([1]), [])
 assert_equal(_get_implementing_args([a]), [a])
 assert_equal(_get_implementing_args([a, 1]), [a])
 assert_equal(_get_implementing_args([a, a, a]), [a])
 assert_equal(_get_implementing_args([a, d, a]), [a, d])
 assert_equal(_get_implementing_args([a, b]), [b, a])
 assert_equal(_get_implementing_args([b, a]), [b, a])
 assert_equal(_get_implementing_args([a, b, c]), [b, c, a])
 assert_equal(_get_implementing_args([a, c, b]), [c, b, a])
 
 def test_too_many_duck_arrays(self):
 namespace = dict(__array_function__=_return_not_implemented)
 types = [type('A' + str(i), (object,), namespace) for i in range(33)]
 relevant_args = [t() for t in types]
 
 actual = _get_implementing_args(relevant_args[:32])
 assert_equal(actual, relevant_args[:32])
 
 with assert_raises_regex(TypeError, 'distinct argument types'):
 _get_implementing_args(relevant_args)
 
 
 class TestNDArrayArrayFunction:
 
 def test_method(self):
 
 class Other:
 __array_function__ = _return_not_implemented
 
 class NoOverrideSub(np.ndarray):
 pass
 
 class OverrideSub(np.ndarray):
 __array_function__ = _return_not_implemented
 
 array = np.array([1])
 other = Other()
 no_override_sub = array.view(NoOverrideSub)
 override_sub = array.view(OverrideSub)
 
 result = array.__array_function__(func=dispatched_two_arg,
 types=(np.ndarray,),
 args=(array, 1.), kwargs={})
 assert_equal(result, 'original')
 
 result = array.__array_function__(func=dispatched_two_arg,
 types=(np.ndarray, Other),
 args=(array, other), kwargs={})
 assert_(result is NotImplemented)
 
 result = array.__array_function__(func=dispatched_two_arg,
 types=(np.ndarray, NoOverrideSub),
 args=(array, no_override_sub),
 kwargs={})
 assert_equal(result, 'original')
 
 result = array.__array_function__(func=dispatched_two_arg,
 types=(np.ndarray, OverrideSub),
 args=(array, override_sub),
 kwargs={})
 assert_equal(result, 'original')
 
 with assert_raises_regex(TypeError, 'no implementation found'):
 np.concatenate((array, other))
 
 expected = np.concatenate((array, array))
 result = np.concatenate((array, no_override_sub))
 assert_equal(result, expected.view(NoOverrideSub))
 result = np.concatenate((array, override_sub))
 assert_equal(result, expected.view(OverrideSub))
 
 def test_no_wrapper(self):
 # This shouldn't happen unless a user intentionally calls
 # __array_function__ with invalid arguments, but check that we raise
 # an appropriate error all the same.
 array = np.array(1)
 func = lambda x: x
 with assert_raises_regex(AttributeError, '_implementation'):
 array.__array_function__(func=func, types=(np.ndarray,),
 args=(array,), kwargs={})
 
 
 class TestArrayFunctionDispatch:
 
 def test_pickle(self):
 for proto in range(2, pickle.HIGHEST_PROTOCOL + 1):
 roundtripped = pickle.loads(
 pickle.dumps(dispatched_one_arg, protocol=proto))
 assert_(roundtripped is dispatched_one_arg)
 
 def test_name_and_docstring(self):
 assert_equal(dispatched_one_arg.__name__, 'dispatched_one_arg')
 if sys.flags.optimize < 2:
 assert_equal(dispatched_one_arg.__doc__, 'Docstring.')
 
 def test_interface(self):
 
 class MyArray:
 def __array_function__(self, func, types, args, kwargs):
 return (self, func, types, args, kwargs)
 
 original = MyArray()
 (obj, func, types, args, kwargs) = dispatched_one_arg(original)
 assert_(obj is original)
 assert_(func is dispatched_one_arg)
 assert_equal(set(types), {MyArray})
 # assert_equal uses the overloaded np.iscomplexobj() internally
 assert_(args == (original,))
 assert_equal(kwargs, {})
 
 def test_not_implemented(self):
 
 class MyArray:
 def __array_function__(self, func, types, args, kwargs):
 return NotImplemented
 
 array = MyArray()
 with assert_raises_regex(TypeError, 'no implementation found'):
 dispatched_one_arg(array)
 
 def test_where_dispatch(self):
 
 class DuckArray:
 def __array_function__(self, ufunc, method, *inputs, **kwargs):
 return "overridden"
 
 array = np.array(1)
 duck_array = DuckArray()
 
 result = np.std(array, where=duck_array)
 
 assert_equal(result, "overridden")
 
 
 class TestVerifyMatchingSignatures:
 
 def test_verify_matching_signatures(self):
 
 verify_matching_signatures(lambda x: 0, lambda x: 0)
 verify_matching_signatures(lambda x=None: 0, lambda x=None: 0)
 verify_matching_signatures(lambda x=1: 0, lambda x=None: 0)
 
 with assert_raises(RuntimeError):
 verify_matching_signatures(lambda a: 0, lambda b: 0)
 with assert_raises(RuntimeError):
 verify_matching_signatures(lambda x: 0, lambda x=None: 0)
 with assert_raises(RuntimeError):
 verify_matching_signatures(lambda x=None: 0, lambda y=None: 0)
 with assert_raises(RuntimeError):
 verify_matching_signatures(lambda x=1: 0, lambda y=1: 0)
 
 def test_array_function_dispatch(self):
 
 with assert_raises(RuntimeError):
 @array_function_dispatch(lambda x: (x,))
 def f(y):
 pass
 
 # should not raise
 @array_function_dispatch(lambda x: (x,), verify=False)
 def f(y):
 pass
 
 
 def _new_duck_type_and_implements():
 """Create a duck array type and implements functions."""
 HANDLED_FUNCTIONS = {}
 
 class MyArray:
 def __array_function__(self, func, types, args, kwargs):
 if func not in HANDLED_FUNCTIONS:
 return NotImplemented
 if not all(issubclass(t, MyArray) for t in types):
 return NotImplemented
 return HANDLED_FUNCTIONS[func](*args, **kwargs)
 
 def implements(numpy_function):
 """Register an __array_function__ implementations."""
 def decorator(func):
 HANDLED_FUNCTIONS[numpy_function] = func
 return func
 return decorator
 
 return (MyArray, implements)
 
 
 class TestArrayFunctionImplementation:
 
 def test_one_arg(self):
 MyArray, implements = _new_duck_type_and_implements()
 
 @implements(dispatched_one_arg)
 def _(array):
 return 'myarray'
 
 assert_equal(dispatched_one_arg(1), 'original')
 assert_equal(dispatched_one_arg(MyArray()), 'myarray')
 
 def test_optional_args(self):
 MyArray, implements = _new_duck_type_and_implements()
 
 @array_function_dispatch(lambda array, option=None: (array,))
 def func_with_option(array, option='default'):
 return option
 
 @implements(func_with_option)
 def my_array_func_with_option(array, new_option='myarray'):
 return new_option
 
 # we don't need to implement every option on __array_function__
 # implementations
 assert_equal(func_with_option(1), 'default')
 assert_equal(func_with_option(1, option='extra'), 'extra')
 assert_equal(func_with_option(MyArray()), 'myarray')
 with assert_raises(TypeError):
 func_with_option(MyArray(), option='extra')
 
 # but new options on implementations can't be used
 result = my_array_func_with_option(MyArray(), new_option='yes')
 assert_equal(result, 'yes')
 with assert_raises(TypeError):
 func_with_option(MyArray(), new_option='no')
 
 def test_not_implemented(self):
 MyArray, implements = _new_duck_type_and_implements()
 
 @array_function_dispatch(lambda array: (array,), module='my')
 def func(array):
 return array
 
 array = np.array(1)
 assert_(func(array) is array)
 assert_equal(func.__module__, 'my')
 
 with assert_raises_regex(
 TypeError, "no implementation found for 'my.func'"):
 func(MyArray())
 
 @pytest.mark.parametrize("name", ["concatenate", "mean", "asarray"])
 def test_signature_error_message_simple(self, name):
 func = getattr(np, name)
 try:
 # all of these functions need an argument:
 func()
 except TypeError as e:
 exc = e
 
 assert exc.args[0].startswith(f"{name}()")
 
 def test_signature_error_message(self):
 # The lambda function will be named "<lambda>", but the TypeError
 # should show the name as "func"
 def _dispatcher():
 return ()
 
 @array_function_dispatch(_dispatcher)
 def func():
 pass
 
 try:
 func._implementation(bad_arg=3)
 except TypeError as e:
 expected_exception = e
 
 try:
 func(bad_arg=3)
 raise AssertionError("must fail")
 except TypeError as exc:
 if exc.args[0].startswith("_dispatcher"):
 # We replace the qualname currently, but it used `__name__`
 # (relevant functions have the same name and qualname anyway)
 pytest.skip("Python version is not using __qualname__ for "
 "TypeError formatting.")
 
 assert exc.args == expected_exception.args
 
 @pytest.mark.parametrize("value", [234, "this func is not replaced"])
 def test_dispatcher_error(self, value):
 # If the dispatcher raises an error, we must not attempt to mutate it
 error = TypeError(value)
 
 def dispatcher():
 raise error
 
 @array_function_dispatch(dispatcher)
 def func():
 return 3
 
 try:
 func()
 raise AssertionError("must fail")
 except TypeError as exc:
 assert exc is error  # unmodified exception
 
 def test_properties(self):
 # Check that str and repr are sensible
 func = dispatched_two_arg
 assert str(func) == str(func._implementation)
 repr_no_id = repr(func).split("at ")[0]
 repr_no_id_impl = repr(func._implementation).split("at ")[0]
 assert repr_no_id == repr_no_id_impl
 
 @pytest.mark.parametrize("func", [
 lambda x, y: 0,  # no like argument
 lambda like=None: 0,  # not keyword only
 lambda *, like=None, a=3: 0,  # not last (not that it matters)
 ])
 def test_bad_like_sig(self, func):
 # We sanity check the signature, and these should fail.
 with pytest.raises(RuntimeError):
 array_function_dispatch()(func)
 
 def test_bad_like_passing(self):
 # Cover internal sanity check for passing like as first positional arg
 def func(*, like=None):
 pass
 
 func_with_like = array_function_dispatch()(func)
 with pytest.raises(TypeError):
 func_with_like()
 with pytest.raises(TypeError):
 func_with_like(like=234)
 
 def test_too_many_args(self):
 # Mainly a unit-test to increase coverage
 objs = []
 for i in range(40):
 class MyArr:
 def __array_function__(self, *args, **kwargs):
 return NotImplemented
 
 objs.append(MyArr())
 
 def _dispatch(*args):
 return args
 
 @array_function_dispatch(_dispatch)
 def func(*args):
 pass
 
 with pytest.raises(TypeError, match="maximum number"):
 func(*objs)
 
 
 
 class TestNDArrayMethods:
 
 def test_repr(self):
 # gh-12162: should still be defined even if __array_function__ doesn't
 # implement np.array_repr()
 
 class MyArray(np.ndarray):
 def __array_function__(*args, **kwargs):
 return NotImplemented
 
 array = np.array(1).view(MyArray)
 assert_equal(repr(array), 'MyArray(1)')
 assert_equal(str(array), '1')
 
 
 class TestNumPyFunctions:
 
 def test_set_module(self):
 assert_equal(np.sum.__module__, 'numpy')
 assert_equal(np.char.equal.__module__, 'numpy.char')
 assert_equal(np.fft.fft.__module__, 'numpy.fft')
 assert_equal(np.linalg.solve.__module__, 'numpy.linalg')
 
 def test_inspect_sum(self):
 signature = inspect.signature(np.sum)
 assert_('axis' in signature.parameters)
 
 def test_override_sum(self):
 MyArray, implements = _new_duck_type_and_implements()
 
 @implements(np.sum)
 def _(array):
 return 'yes'
 
 assert_equal(np.sum(MyArray()), 'yes')
 
 def test_sum_on_mock_array(self):
 
 # We need a proxy for mocks because __array_function__ is only looked
 # up in the class dict
 class ArrayProxy:
 def __init__(self, value):
 self.value = value
 def __array_function__(self, *args, **kwargs):
 return self.value.__array_function__(*args, **kwargs)
 def __array__(self, *args, **kwargs):
 return self.value.__array__(*args, **kwargs)
 
 proxy = ArrayProxy(mock.Mock(spec=ArrayProxy))
 proxy.value.__array_function__.return_value = 1
 result = np.sum(proxy)
 assert_equal(result, 1)
 proxy.value.__array_function__.assert_called_once_with(
 np.sum, (ArrayProxy,), (proxy,), {})
 proxy.value.__array__.assert_not_called()
 
 def test_sum_forwarding_implementation(self):
 
 class MyArray(np.ndarray):
 
 def sum(self, axis, out):
 return 'summed'
 
 def __array_function__(self, func, types, args, kwargs):
 return super().__array_function__(func, types, args, kwargs)
 
 # note: the internal implementation of np.sum() calls the .sum() method
 array = np.array(1).view(MyArray)
 assert_equal(np.sum(array), 'summed')
 
 
 class TestArrayLike:
 def setup_method(self):
 class MyArray():
 def __init__(self, function=None):
 self.function = function
 
 def __array_function__(self, func, types, args, kwargs):
 assert func is getattr(np, func.__name__)
 try:
 my_func = getattr(self, func.__name__)
 except AttributeError:
 return NotImplemented
 return my_func(*args, **kwargs)
 
 self.MyArray = MyArray
 
 class MyNoArrayFunctionArray():
 def __init__(self, function=None):
 self.function = function
 
 self.MyNoArrayFunctionArray = MyNoArrayFunctionArray
 
 def add_method(self, name, arr_class, enable_value_error=False):
 def _definition(*args, **kwargs):
 # Check that `like=` isn't propagated downstream
 assert 'like' not in kwargs
 
 if enable_value_error and 'value_error' in kwargs:
 raise ValueError
 
 return arr_class(getattr(arr_class, name))
 setattr(arr_class, name, _definition)
 
 def func_args(*args, **kwargs):
 return args, kwargs
 
 def test_array_like_not_implemented(self):
 self.add_method('array', self.MyArray)
 
 ref = self.MyArray.array()
 
 with assert_raises_regex(TypeError, 'no implementation found'):
 array_like = np.asarray(1, like=ref)
 
 _array_tests = [
 ('array', *func_args((1,))),
 ('asarray', *func_args((1,))),
 ('asanyarray', *func_args((1,))),
 ('ascontiguousarray', *func_args((2, 3))),
 ('asfortranarray', *func_args((2, 3))),
 ('require', *func_args((np.arange(6).reshape(2, 3),),
 requirements=['A', 'F'])),
 ('empty', *func_args((1,))),
 ('full', *func_args((1,), 2)),
 ('ones', *func_args((1,))),
 ('zeros', *func_args((1,))),
 ('arange', *func_args(3)),
 ('frombuffer', *func_args(b'\x00' * 8, dtype=int)),
 ('fromiter', *func_args(range(3), dtype=int)),
 ('fromstring', *func_args('1,2', dtype=int, sep=',')),
 ('loadtxt', *func_args(lambda: StringIO('0 1\n2 3'))),
 ('genfromtxt', *func_args(lambda: StringIO('1,2.1'),
 dtype=[('int', 'i8'), ('float', 'f8')],
 delimiter=',')),
 ]
 
 @pytest.mark.parametrize('function, args, kwargs', _array_tests)
 @pytest.mark.parametrize('numpy_ref', [True, False])
 def test_array_like(self, function, args, kwargs, numpy_ref):
 self.add_method('array', self.MyArray)
 self.add_method(function, self.MyArray)
 np_func = getattr(np, function)
 my_func = getattr(self.MyArray, function)
 
 if numpy_ref is True:
 ref = np.array(1)
 else:
 ref = self.MyArray.array()
 
 like_args = tuple(a() if callable(a) else a for a in args)
 array_like = np_func(*like_args, **kwargs, like=ref)
 
 if numpy_ref is True:
 assert type(array_like) is np.ndarray
 
 np_args = tuple(a() if callable(a) else a for a in args)
 np_arr = np_func(*np_args, **kwargs)
 
 # Special-case np.empty to ensure values match
 if function == "empty":
 np_arr.fill(1)
 array_like.fill(1)
 
 assert_equal(array_like, np_arr)
 else:
 assert type(array_like) is self.MyArray
 assert array_like.function is my_func
 
 @pytest.mark.parametrize('function, args, kwargs', _array_tests)
 @pytest.mark.parametrize('ref', [1, [1], "MyNoArrayFunctionArray"])
 def test_no_array_function_like(self, function, args, kwargs, ref):
 self.add_method('array', self.MyNoArrayFunctionArray)
 self.add_method(function, self.MyNoArrayFunctionArray)
 np_func = getattr(np, function)
 
 # Instantiate ref if it's the MyNoArrayFunctionArray class
 if ref == "MyNoArrayFunctionArray":
 ref = self.MyNoArrayFunctionArray.array()
 
 like_args = tuple(a() if callable(a) else a for a in args)
 
 with assert_raises_regex(TypeError,
 'The `like` argument must be an array-like that implements'):
 np_func(*like_args, **kwargs, like=ref)
 
 @pytest.mark.parametrize('numpy_ref', [True, False])
 def test_array_like_fromfile(self, numpy_ref):
 self.add_method('array', self.MyArray)
 self.add_method("fromfile", self.MyArray)
 
 if numpy_ref is True:
 ref = np.array(1)
 else:
 ref = self.MyArray.array()
 
 data = np.random.random(5)
 
 with tempfile.TemporaryDirectory() as tmpdir:
 fname = os.path.join(tmpdir, "testfile")
 data.tofile(fname)
 
 array_like = np.fromfile(fname, like=ref)
 if numpy_ref is True:
 assert type(array_like) is np.ndarray
 np_res = np.fromfile(fname, like=ref)
 assert_equal(np_res, data)
 assert_equal(array_like, np_res)
 else:
 assert type(array_like) is self.MyArray
 assert array_like.function is self.MyArray.fromfile
 
 def test_exception_handling(self):
 self.add_method('array', self.MyArray, enable_value_error=True)
 
 ref = self.MyArray.array()
 
 with assert_raises(TypeError):
 # Raises the error about `value_error` being invalid first
 np.array(1, value_error=True, like=ref)
 
 @pytest.mark.parametrize('function, args, kwargs', _array_tests)
 def test_like_as_none(self, function, args, kwargs):
 self.add_method('array', self.MyArray)
 self.add_method(function, self.MyArray)
 np_func = getattr(np, function)
 
 like_args = tuple(a() if callable(a) else a for a in args)
 # required for loadtxt and genfromtxt to init w/o error.
 like_args_exp = tuple(a() if callable(a) else a for a in args)
 
 array_like = np_func(*like_args, **kwargs, like=None)
 expected = np_func(*like_args_exp, **kwargs)
 # Special-case np.empty to ensure values match
 if function == "empty":
 array_like.fill(1)
 expected.fill(1)
 assert_equal(array_like, expected)
 
 
 def test_function_like():
 # We provide a `__get__` implementation, make sure it works
 assert type(np.mean) is np.core._multiarray_umath._ArrayFunctionDispatcher
 
 class MyClass:
 def __array__(self):
 # valid argument to mean:
 return np.arange(3)
 
 func1 = staticmethod(np.mean)
 func2 = np.mean
 func3 = classmethod(np.mean)
 
 m = MyClass()
 assert m.func1([10]) == 10
 assert m.func2() == 1  # mean of the arange
 with pytest.raises(TypeError, match="unsupported operand type"):
 # Tries to operate on the class
 m.func3()
 
 # Manual binding also works (the above may shortcut):
 bound = np.mean.__get__(m, MyClass)
 assert bound() == 1
 
 bound = np.mean.__get__(None, MyClass)  # unbound actually
 assert bound([10]) == 10
 
 bound = np.mean.__get__(MyClass)  # classmethod
 with pytest.raises(TypeError, match="unsupported operand type"):
 bound()
 
 
 def test_scipy_trapz_support_shim():
 # SciPy 1.10 and earlier "clone" trapz in this way, so we have a
 # support shim in place: https://github.com/scipy/scipy/issues/17811
 # That should be removed eventually.  This test copies what SciPy does.
 # Hopefully removable 1 year after SciPy 1.11; shim added to NumPy 1.25.
 import types
 import functools
 
 def _copy_func(f):
 # Based on http://stackoverflow.com/a/6528148/190597 (Glenn Maynard)
 g = types.FunctionType(f.__code__, f.__globals__, name=f.__name__,
 argdefs=f.__defaults__, closure=f.__closure__)
 g = functools.update_wrapper(g, f)
 g.__kwdefaults__ = f.__kwdefaults__
 return g
 
 trapezoid = _copy_func(np.trapz)
 
 assert np.trapz([1, 2]) == trapezoid([1, 2])
 
 |