"""
tensorTRAX: Math on (Hyper-Dual) Tensors with Trailing Axes.
"""
from copy import copy
from functools import wraps
import numpy as np
from joblib import Parallel, cpu_count, delayed
from ._tensor import Tensor, Δδ, broadcast_to, f, δ
from .math.special import from_triu_1d, from_triu_2d, triu_1d
def take(fun, item=0):
"Evaluate the function and take only the selected item."
@wraps(fun)
def evaluate(*args, **kwargs):
return fun(*args, **kwargs)[item]
return evaluate
def add_tensor(
args, kwargs, wrt, ntrax, sym=False, gradient=False, hessian=False, δx=None, Δx=None
):
"Modify the arguments and replace the w.r.t.-argument by a tensor."
kwargs_out = copy(kwargs)
args_out = list(args)
if isinstance(wrt, str):
args_old = kwargs
args_new = kwargs_out
elif isinstance(wrt, int):
args_old = args
args_new = args_out
else:
raise TypeError(
f"Type of wrt not supported. type(wrt) is {type(wrt)} (must be str or int)."
)
x = args_old[wrt]
if sym:
x = triu_1d(x)
tensor = Tensor(x=x, ntrax=ntrax)
trax = tensor.trax
tensor.init(gradient=gradient, hessian=hessian, sym=sym, δx=δx, Δx=Δx)
if sym:
tensor = from_triu_1d(tensor)
args_new[wrt] = tensor
return args_out, kwargs_out, tensor.shape, trax
def partition(args, kwargs, wrt, ntrax, parallel, chunks=None, batch=100, axis=None):
"""Partition function (keyword) arguments into a list of (keyword) arguments. Only
top-level args and kwargs with equal shapes to be splitted are allowed."""
# deactivate parallel evaluation if no trailing axes are present
if ntrax == 0:
parallel = False
# get shape of trailing axes, define axis and chunks
# if size of chosen axis is below batch, deactivate parallel evaluation
if parallel:
# get shape of trailing axes
trax = (kwargs[wrt] if isinstance(wrt, str) else args[wrt]).shape[-ntrax:]
# select axis
if axis is None:
axis = -(1 + np.argmax(trax[::-1]))
# define chunks
if chunks is None:
if (trax[axis] // batch) > 0:
chunks = min(trax[axis] // batch, cpu_count())
else:
parallel = False
if not parallel:
list_of_args_kwargs = [(args, kwargs)]
chunks = 1
axis = -1
else:
# generate list with args and kwargs for chunks
list_of_args_kwargs = [[list(args), {**kwargs}] for chunk in range(chunks)]
# test if object has attribute shape (is tensor or array)
def isactive(x):
return hasattr(x, "shape") and np.all(np.isin(trax, x.shape[-ntrax:]))
# iterate through args and split tensor-like objects
args_partitioned = []
for i, arg in enumerate(args):
if isactive(arg):
args_partitioned.append((i, np.array_split(arg, chunks, axis=axis)))
# replace arguments by chunks
for i, arg in enumerate(list_of_args_kwargs):
for j, argp in args_partitioned:
list_of_args_kwargs[i][0][j] = argp[i]
# iterate through kwargs and split tensor-like objects
kwargs_partitioned = []
for key, value in kwargs.items():
if isactive(value):
kwargs_partitioned.append(
(key, np.array_split(value, chunks, axis=axis))
)
# replace keyword arguments by chunks
for i, kwarg in enumerate(list_of_args_kwargs):
for key, value in kwargs_partitioned:
list_of_args_kwargs[i][1][key] = value[i]
return list_of_args_kwargs, chunks, axis
def concatenate_results(res, axis, full_output):
"Concatenate results (with optional full output) on existing axis."
def concat(arrays, axis):
"Concatenate arrays, fall-back to first item if shape of first array is zero."
if len(arrays[0].shape) == 0 or len(arrays) == 1:
return arrays[0]
else:
return np.concatenate(arrays, axis=axis)
if full_output:
nres = len(res[0])
return [concat([r[a] for r in res], axis=axis) for a in range(nres)]
else:
return concat(res, axis=axis)
[docs]
def function(fun, wrt=0, ntrax=0, parallel=False):
r"""Evaluate a function.
Parameters
----------
fun : callable
The function to be evaluated.
wrt : int or str, optional
The input argument which will be treated as :class:`~tensortrax.Tensor` (default
is 0).
ntrax : int, optional
Number of elementwise-operating trailing axes (batch dimensions). Default is 0.
parallel : bool, optional
Flag to evaluate the function in parallel (threaded).
Returns
-------
ndarray
NumPy array containing the function result.
Examples
--------
>>> import numpy as np
>>> import tensortrax as tr
>>> import tensortrax.math as tm
>>>
>>> def fun(F, mu=1):
>>> C = F.T @ F
>>> I1 = tm.trace(C)
>>> J = tm.linalg.det(F)
>>> return mu / 2 * (J ** (-2 / 3) * I1 - 3)
>>>
>>> np.random.seed(125161)
>>> F = (np.eye(3) + np.random.rand(20, 8, 3, 3) / 10).T
>>>
>>> F.shape
(3, 3, 8, 20)
>>> W = tr.function(fun, wrt=0, ntrax=2)(F)
>>> W = tr.function(fun, wrt="F", ntrax=2)(F=F)
>>>
>>> W.shape
>>> (8, 20)
"""
@wraps(fun)
def evaluate_function(*args, **kwargs):
def kernel(args, kwargs):
args, kwargs, shape, trax = add_tensor(
args, kwargs, wrt, ntrax, False, False, False
)
func = fun(*args, **kwargs)
if isinstance(func, Tensor):
func = f(func)
return func
list_of_args_kwargs, chunks, axis = partition(
args, kwargs, wrt, ntrax, parallel
)
res = Parallel(n_jobs=chunks, prefer="threads")(
delayed(kernel)(*args_chunk) for args_chunk in list_of_args_kwargs
)
return concatenate_results(res=res, axis=axis, full_output=False)
return evaluate_function
[docs]
def gradient(fun, wrt=0, ntrax=0, parallel=False, full_output=False, sym=False):
r"""Evaluate the gradient of a scalar-valued function.
Parameters
----------
fun : callable
The function to be evaluated.
wrt : int or str, optional
The input argument which will be treated as :class:`~tensortrax.Tensor` (default
is 0). The gradient is carried out with respect to this argument.
ntrax : int, optional
Number of elementwise-operating trailing axes (batch dimensions). Default is 0.
parallel : bool, optional
Flag to evaluate the gradient in parallel (threaded).
full_output: bool, optional
Return the gradient and the function (default is False).
sym : bool, optional
Apply the variations only on the upper-triangle entries of a symmetric second
order tensor. This is a performance feature and requires no modification of the
callable ``fun`` and the input arguments, including ``wrt``. Default is False.
Returns
-------
ndarray or list of ndarray
NumPy array containing the gradient result. If ``full_output=True``, the
function is also returned.
Examples
--------
>>> import numpy as np
>>> import tensortrax as tr
>>> import tensortrax.math as tm
>>>
>>> def fun(F, mu=1):
>>> C = F.T @ F
>>> I1 = tm.trace(C)
>>> J = tm.linalg.det(F)
>>> return mu / 2 * (J ** (-2 / 3) * I1 - 3)
>>>
>>> np.random.seed(125161)
>>> F = (np.eye(3) + np.random.rand(20, 8, 3, 3) / 10).T
>>>
>>> F.shape
(3, 3, 8, 20)
>>> dWdF = tr.gradient(fun, wrt=0, ntrax=2)(F)
>>> dWdF = tr.gradient(fun, wrt="F", ntrax=2)(F=F)
>>>
>>> dWdF.shape
>>> (3, 3, 8, 20)
"""
@wraps(fun)
def evaluate_gradient(*args, **kwargs):
def kernel(args, kwargs):
args, kwargs, shape, trax = add_tensor(
args, kwargs, wrt, ntrax, sym, True, False
)
func = fun(*args, **kwargs)
grad = δ(func) if sym is False else from_triu_1d(δ(func))
grad = broadcast_to(grad, (*shape, *trax))
if full_output:
trax = (1,) if len(trax) == 0 else trax
zeros = np.zeros_like(shape) if sym is False else (0,)
funct = f(func)[(*zeros,)]
funct = broadcast_to(funct, (*trax,))
return grad, funct
else:
return grad
list_of_args, chunks, axis = partition(args, kwargs, wrt, ntrax, parallel)
res = Parallel(n_jobs=chunks, prefer="threads")(
delayed(kernel)(*args_chunk) for args_chunk in list_of_args
)
return concatenate_results(res=res, axis=axis, full_output=full_output)
return evaluate_gradient
[docs]
def hessian(fun, wrt=0, ntrax=0, parallel=False, full_output=False, sym=False):
r"""Evaluate the Hessian of a scalar-valued function.
Parameters
----------
fun : callable
The function to be evaluated.
wrt : int or str, optional
The input argument which will be treated as :class:`~tensortrax.Tensor` (default
is 0). The Hessian is carried out with respect to this argument.
ntrax : int, optional
Number of elementwise-operating trailing axes (batch dimensions). Default is 0.
parallel : bool, optional
Flag to evaluate the Hessian in parallel (threaded).
full_output: bool, optional
Return the hessian, the gradient and the function (default is False).
sym : bool, optional
Apply the variations only on the upper-triangle entries of a symmetric second
order tensor. This is a performance feature and requires no modification of the
callable ``fun`` and the input arguments, including ``wrt``. Default is False.
Returns
-------
ndarray or list of ndarray
NumPy array containing the Hessian result. If ``full_output=True``, the
gradient and the function are also returned.
Examples
--------
>>> import numpy as np
>>> import tensortrax as tr
>>> import tensortrax.math as tm
>>>
>>> def fun(F, mu=1):
>>> C = F.T @ F
>>> I1 = tm.trace(C)
>>> J = tm.linalg.det(F)
>>> return mu / 2 * (J ** (-2 / 3) * I1 - 3)
>>>
>>> np.random.seed(125161)
>>> F = (np.eye(3) + np.random.rand(20, 8, 3, 3) / 10).T
>>>
>>> F.shape
(3, 3, 8, 20)
>>> d2WdFdF = tr.hessian(fun, wrt=0, ntrax=2)(F)
>>> d2WdFdF = tr.hessian(fun, wrt="F", ntrax=2)(F=F)
>>>
>>> d2WdFdF.shape
>>> (3, 3, 3, 3, 8, 20)
"""
@wraps(fun)
def evaluate_hessian(*args, **kwargs):
def kernel(args, kwargs):
args, kwargs, shape, trax = add_tensor(
args, kwargs, wrt, ntrax, sym, False, True
)
func = fun(*args, **kwargs)
hess = Δδ(func) if sym is False else from_triu_2d(Δδ(func))
if full_output:
grad = δ(func) if sym is False else from_triu_1d(δ(func))
zeros = np.zeros_like(shape) if sym is False else (0,)
grad = grad[(*[slice(None) for a in shape], *zeros)]
grad = broadcast_to(grad, (*shape, *trax))
funct = f(func)[
(
*zeros,
*zeros,
)
]
funct = broadcast_to(funct, (*trax,))
trax = (1,) if len(trax) == 0 else trax
return hess, grad, funct
else:
return hess
list_of_args, chunks, axis = partition(args, kwargs, wrt, ntrax, parallel)
res = Parallel(n_jobs=chunks, prefer="threads")(
delayed(kernel)(*args_chunk) for args_chunk in list_of_args
)
return concatenate_results(res=res, axis=axis, full_output=full_output)
return evaluate_hessian
[docs]
def jacobian(fun, wrt=0, ntrax=0, parallel=False, full_output=False):
r"""Evaluate the Jacobian of a tensor-valued function.
Parameters
----------
fun : callable
The function to be evaluated.
wrt : int or str, optional
The input argument which will be treated as :class:`~tensortrax.Tensor` (default
is 0). The Jacobian is carried out with respect to this argument.
ntrax : int, optional
Number of elementwise-operating trailing axes (batch dimensions). Default is 0.
parallel : bool, optional
Flag to evaluate the Jacobian in parallel (threaded).
full_output: bool, optional
Return the Jacobian and the function (default is False).
Returns
-------
ndarray
NumPy array containing the Jacobian result.
Examples
--------
>>> import numpy as np
>>> import tensortrax as tr
>>> import tensortrax.math as tm
>>>
>>> def fun(C, mu=1):
>>> I3 = tm.linalg.det(C)
>>> return mu * tm.special.dev(I3 ** (-1 / 3) * C) @ tm.linalg.inv(C)
>>>
>>> np.random.seed(125161)
>>> F = (np.eye(3) + np.random.rand(20, 8, 3, 3) / 10).T
>>> C = np.einsum("ki...,kj...->ij...", F, F)
>>>
>>> C.shape
(3, 3, 8, 20)
>>> dSdC = tr.jacobian(fun, wrt=0, ntrax=2)(C)
>>> dSdC = tr.jacobian(fun, wrt="C", ntrax=2)(C=C)
>>>
>>> dSdC.shape
>>> (3, 3, 3, 3, 8, 20)
"""
@wraps(fun)
def evaluate_jacobian(*args, **kwargs):
def kernel(args, kwargs):
args, kwargs, shape, trax = add_tensor(
args, kwargs, wrt, ntrax, False, True, False
)
func = fun(*args, **kwargs)
if full_output:
return δ(func), f(func).reshape(*func.shape, *trax)
else:
return δ(func)
list_of_args, chunks, axis = partition(args, kwargs, wrt, ntrax, parallel)
res = Parallel(n_jobs=chunks, prefer="threads")(
delayed(kernel)(*args_chunk) for args_chunk in list_of_args
)
return concatenate_results(res=res, axis=axis, full_output=full_output)
return evaluate_jacobian
[docs]
def gradient_vector_product(fun, wrt=0, ntrax=0, parallel=False):
r"""Evaluate the gradient-vector-product of a scalar-valued function.
Parameters
----------
fun : callable
The function to be evaluated. Its signature is extended to
:func:`fun(*args, δx, **kwargs)`, where the added ``δx``-argument is the vector
of the gradient-vector product.
wrt : int or str, optional
The input argument which will be treated as :class:`~tensortrax.Tensor` (default
is 0). The gradient-vector-product is carried out with respect to this argument.
ntrax : int, optional
Number of elementwise-operating trailing axes (batch dimensions). Default is 0.
parallel : bool, optional
Flag to evaluate the gradient-vector-product in parallel (threaded).
Returns
-------
ndarray
NumPy array containing the gradient-vector-product result.
Notes
-----
The *vector* :math:`\delta x` and the tensor-argument ``wrt`` must have equal or
broadcast-compatible shapes. This means that the *vector* is not restricted to be a
one-dimensional array but must be an array with compatible shape instead.
Examples
--------
>>> import numpy as np
>>> import tensortrax as tr
>>> import tensortrax.math as tm
>>>
>>> def fun(F, mu=1):
>>> C = F.T @ F
>>> I1 = tm.trace(C)
>>> J = tm.linalg.det(F)
>>> return mu / 2 * (J ** (-2 / 3) * I1 - 3)
>>>
>>> np.random.seed(125161)
>>> F = (np.eye(3) + np.random.rand(20, 8, 3, 3) / 10).T
>>> F.shape
(3, 3, 8, 20)
>>> np.random.seed(63254)
>>> δF = np.random.rand(3, 3, 8, 20) / 10
>>> δF.shape
(3, 3, 8, 20)
>>> dW = tr.gradient_vector_product(fun, wrt=0, ntrax=2)(F, δx=δF)
>>> dW.shape
>>> (8, 20)
"""
@wraps(fun)
def evaluate_gradient_vector_product(*args, δx, **kwargs):
args, kwargs, shape, trax = add_tensor(
args, kwargs, wrt, ntrax, False, gradient=True, δx=δx
)
δfun = δ(fun(*args, **kwargs))
trim = np.zeros(len(δfun.shape) - ntrax, dtype=int)
return broadcast_to(δfun[(*trim,)], trax)
return evaluate_gradient_vector_product
[docs]
def hessian_vector_product(fun, wrt=0, ntrax=0, parallel=False):
r"""Evaluate the Hessian-vector-product of a scalar-valued function.
Parameters
----------
fun : callable
The function to be evaluated. Its signature is extended to
:func:`fun(*args, δx, **kwargs)`, where the added ``δx``-argument is the vector
of the Hessian-vector product.
wrt : int or str, optional
The input argument which will be treated as :class:`~tensortrax.Tensor` (default
is 0). The Hessian-vector-product is carried out with respect to this argument.
ntrax : int, optional
Number of elementwise-operating trailing axes (batch dimensions). Default is 0.
parallel : bool, optional
Flag to evaluate the gradient-vector-product in parallel (threaded).
Returns
-------
ndarray
NumPy array containing the Hessian-vector-product result.
Notes
-----
The *vector* :math:`\delta x` and the tensor-argument ``wrt`` must have equal or
broadcast-compatible shapes. This means that the *vector* is not restricted to be a
one-dimensional array but must be an array with compatible shape instead.
Examples
--------
>>> import numpy as np
>>> import tensortrax as tr
>>> import tensortrax.math as tm
>>>
>>> def fun(F, mu=1):
>>> C = F.T @ F
>>> I1 = tm.trace(C)
>>> J = tm.linalg.det(F)
>>> return mu / 2 * (J ** (-2 / 3) * I1 - 3)
>>>
>>> np.random.seed(125161)
>>> F = (np.eye(3) + np.random.rand(20, 8, 3, 3) / 10).T
>>> F.shape
(3, 3, 8, 20)
>>> np.random.seed(63254)
>>> δF = np.random.rand(3, 3, 8, 20) / 10
>>> δF.shape
(3, 3, 8, 20)
>>> dP = tr.hessian_vector_product(fun, wrt=0, ntrax=2)(F, δx=δF)
>>> dP.shape
>>> (3, 3, 8, 20)
"""
@wraps(fun)
def evaluate_hessian_vector_product(*args, δx, **kwargs):
args, kwargs, shape, trax = add_tensor(
args, kwargs, wrt, ntrax, False, hessian=True, δx=δx
)
Δδfun = Δδ(fun(*args, **kwargs))
trim = np.zeros(len(Δδfun.shape) - len(shape) - ntrax, dtype=int)
return broadcast_to(Δδfun[(*trim,)], (*shape, *trax))
return evaluate_hessian_vector_product
[docs]
def hessian_vectors_product(fun, wrt=0, ntrax=0, parallel=False):
r"""Evaluate the vector-Hessian-vector- or Hessian-vectors-product of a scalar-
valued function.
Parameters
----------
fun : callable
The function to be evaluated. Its signature is extended to
:func:`fun(*args, δx, Δx, **kwargs)`, where the added ``δx``- and ``Δx``-
arguments are the vectors of the Hessian-vectors product.
wrt : int or str, optional
The input argument which will be treated as :class:`~tensortrax.Tensor` (default
is 0). The Hessian-vectors-product is carried out with respect to this argument.
ntrax : int, optional
Number of elementwise-operating trailing axes (batch dimensions). Default is 0.
parallel : bool, optional
Flag to evaluate the gradient-vector-product in parallel (threaded).
Returns
-------
ndarray
NumPy array containing the Hessian-vectors-product result.
Notes
-----
The *vectors* :math:`\delta x` and :math:`\Delta x` as well as the tensor-argument
``wrt`` must have equal or broadcast-compatible shapes. This means that the
*vectors* are not restricted to be one-dimensional arrays but must be arrays with
compatible shapes instead.
Examples
--------
>>> import numpy as np
>>> import tensortrax as tr
>>> import tensortrax.math as tm
>>>
>>> def fun(F, mu=1):
>>> C = F.T @ F
>>> I1 = tm.trace(C)
>>> J = tm.linalg.det(F)
>>> return mu / 2 * (J ** (-2 / 3) * I1 - 3)
>>>
>>> np.random.seed(125161)
>>> F = (np.eye(3) + np.random.rand(20, 8, 3, 3) / 10).T
>>> F.shape
(3, 3, 8, 20)
>>> np.random.seed(63254)
>>> δF = np.random.rand(3, 3, 8, 20) / 10
>>> δF.shape
(3, 3, 8, 20)
>>> np.random.seed(85476)
>>> ΔF = np.random.rand(3, 3, 8, 20) / 10
>>> ΔF.shape
(3, 3, 8, 20)
>>> ΔδW = tr.hessian_vectors_product(fun, wrt=0, ntrax=2)(F, δx=δF, Δx=ΔF)
>>> ΔδW.shape
>>> (8, 20)
"""
@wraps(fun)
def evaluate_hessian_vectors_product(*args, δx, Δx, **kwargs):
args, kwargs, shape, trax = add_tensor(
args, kwargs, wrt, ntrax, False, hessian=True, δx=δx, Δx=Δx
)
Δδfun = Δδ(fun(*args, **kwargs))
trim = np.zeros(len(Δδfun.shape) - ntrax, dtype=int)
return broadcast_to(Δδfun[(*trim,)], trax)
return evaluate_hessian_vectors_product