"""
=========
numpy_ext
=========
An extension library for NumPy_ that implements common array operations not present in NumPy.
.. _numpy: https://numpy.org/
Installation
------------
**Regular installation**::
    pip install numpy_ext
**Installation for development**::
    git clone https://github.com/3jane/numpy_ext.git
    cd numpy_ext
    pip install -e .[dev]  # note: make sure you are using pip>=20
Window operations
-----------------
- :func:`numpy_ext.expanding`
- :func:`numpy_ext.expanding_apply`
- :func:`numpy_ext.rolling`
- :func:`numpy_ext.rolling_apply`
Operations with nans
--------------------
- :func:`numpy_ext.nans`
- :func:`numpy_ext.drop_na`
- :func:`numpy_ext.fill_na`
- :func:`numpy_ext.fill_not_finite`
- :func:`numpy_ext.prepend_na`
Others
------
- :func:`numpy_ext.apply_map`
- :func:`numpy_ext.expstep_range`
Functions
---------
"""
from functools import partial
from typing import Callable, Any, Union, Generator, Tuple, List
import numpy as np
from joblib import Parallel, delayed
Number = Union[int, float]
[docs]def expstep_range(
    start: Number,
    end: Number,
    min_step: Number = 1,
    step_mult: Number = 1,
    round_func: Callable = None
) -> np.ndarray:
    """
    Return spaced values within a given interval. Step is increased by a multiplier on each iteration.
    Parameters
    ----------
    start : int or float
        Start of interval, inclusive
    end : int or float
        End of interval, exclusive
    min_step : int or float, optional
        Minimal step between values. Must be bigger than 0. Default is 1.
    step_mult : int or float, optional
        Multiplier by which to increase the step on each iteration. Must be bigger than 0. Default is 1.
    round_func: Callable, optional
        Vectorized rounding function, e.g. np.ceil, np.floor, etc. Default is None.
    Returns
    -------
    np.ndarray
        Array of exponentially spaced values.
    Examples
    --------
    >>> expstep_range(1, 100, min_step=1, step_mult=1.5)
    array([ 1.        ,  2.        ,  3.5       ,  5.75      ,  9.125     ,
           14.1875    , 21.78125   , 33.171875  , 50.2578125 , 75.88671875])
    >>> expstep_range(1, 100, min_step=1, step_mult=1.5, round_func=np.ceil)
    array([ 1.,  2.,  4.,  6., 10., 15., 22., 34., 51., 76.])
    >>> expstep_range(start=-1, end=-100, min_step=1, step_mult=1.5)
    array([ -1.        ,  -2.        ,  -3.5       ,  -5.75      ,
            -9.125     , -14.1875    , -21.78125   , -33.171875  ,
           -50.2578125 , -75.88671875])
    Generate array of ints
    >>> expstep_range(start=100, end=1, min_step=1, step_mult=1.5).astype(int)
    array([100,  99,  97,  95,  91,  86,  79,  67,  50,  25])
    """
    if step_mult <= 0:
        raise ValueError('mult_step should be bigger than 0')
    if min_step <= 0:
        raise ValueError('min_step should be bigger than 0')
    last = start
    values = []
    step = min_step
    sign = 1 if start < end else -1
    while start < end and last < end or start > end and last > end:
        values.append(last)
        last += max(step, min_step) * sign
        step = abs(step * step_mult)
    values = np.array(values)
    if not round_func:
        return values
    values = np.array(round_func(values))
    _, idx = np.unique(values, return_index=True)
    return values[np.sort(idx)] 
[docs]def apply_map(func: Callable[[Any], Any], array: Union[List, np.ndarray]) -> np.ndarray:
    """
    Apply a function element-wise to an array.
    Parameters
    ----------
    func : Callable[[Any], Any]
        Function that accepts one argument and returns a single value.
    array : Union[List, np.ndarray]
        Input array or a list. Any lists will be converted to np.ndarray first.
    Returns
    -------
    np.ndarray
        Resulting array.
    Examples
    --------
    >>> apply_map(lambda x: 0 if x < 3 else 1, [[2, 2], [3, 3]])
    array([[0, 0],
           [1, 1]])
    """
    array = np.array(array)
    array_view = array.flat
    array_view[:] = [func(x) for x in array_view]
    return array 
#############################
# Operations with nans
#############################
[docs]def nans(shape: Union[int, Tuple[int, ...]], dtype=np.float64) -> np.ndarray:
    """
    Return a new array of a given shape and type, filled with np.nan values.
    Parameters
    ----------
    shape : int or tuple of ints
        Shape of the new array, e.g., (2, 3) or 2.
    dtype: data-type, optional
    Returns
    -------
    np.ndarray
        Array of np.nans of the given shape.
    Examples
    --------
    >>> nans(3)
    array([nan, nan, nan])
    >>> nans((2, 2))
    array([[nan, nan],
           [nan, nan]])
    >>> nans(2, np.datetime64)
    array(['NaT', 'NaT'], dtype=datetime64)
    """
    if np.issubdtype(dtype, np.integer):
        dtype = np.float
    arr = np.empty(shape, dtype=dtype)
    arr.fill(np.nan)
    return arr 
[docs]def drop_na(array: np.ndarray) -> np.ndarray:
    """
    Return a given array flattened and with nans dropped.
    Parameters
    ----------
    array : np.ndarray
        Input array.
    Returns
    -------
    np.ndarray
        New array without nans.
    Examples
    --------
    >>> drop_na(np.array([np.nan, 1, 2]))
    array([1., 2.])
    """
    return array[~np.isnan(array)] 
[docs]def fill_na(array: np.ndarray, value: Any) -> np.ndarray:
    """
    Return a copy of array with nans replaced with a given value.
    Parameters
    ----------
    array : np.ndarray
        Input array.
    value : Any
        Value to replace nans with.
    Returns
    -------
    np.ndarray
        A copy of array with nans replaced with the given value.
    Examples
    --------
    >>> fill_na(np.array([np.nan, 1, 2]), -1)
    array([-1.,  1.,  2.])
    """
    ar = array.copy()
    ar[np.isnan(ar)] = value
    return ar 
[docs]def fill_not_finite(array: np.ndarray, value: Any = 0) -> np.ndarray:
    """
    Return a copy of array with nans and infs replaced with a given value.
    Parameters
    ----------
    array : np.ndarray
        Input array.
    value : Any, optional
        Value to replace nans and infs with. Default is 0.
    Returns
    -------
    np.ndarray
        A copy of array with nans and infs replaced with the given value.
    Examples
    --------
    >>> fill_not_finite(np.array([np.nan, np.inf, 1, 2]), 99)
    array([99., 99.,  1.,  2.])
    """
    ar = array.copy()
    ar[~np.isfinite(array)] = value
    return ar 
[docs]def prepend_na(array: np.ndarray, n: int) -> np.ndarray:
    """
    Return a copy of array with nans inserted at the beginning.
    Parameters
    ----------
    array : np.ndarray
        Input array.
    n : int
        Number of elements to insert.
    Returns
    -------
    np.ndarray
        New array with nans added at the beginning.
    Examples
    --------
    >>> prepend_na(np.array([1, 2]), 2)
    array([nan, nan,  1.,  2.])
    """
    if not len(array):  # if empty, simply create empty array
        return np.hstack((nans(n), array))
    elem = array[0]
    dtype = np.float64
    if hasattr(elem, 'dtype'):
        dtype = elem.dtype
    if hasattr(elem, '__len__') and len(elem) > 1:  # if the array has many dimension
        if isinstance(array, np.ndarray):
            array_shape = array.shape
        else:
            array_shape = np.array(array).shape
        return np.vstack((nans((n, *array_shape[1:]), dtype), array))
    else:
        return np.hstack((nans(n, dtype), array)) 
#############################
# window operations
#############################
[docs]def rolling(
    array: np.ndarray,
    window: int,
    skip_na: bool = False,
    as_array: bool = False
) -> Union[Generator[np.ndarray, None, None], np.ndarray]:
    """
    Roll a fixed-width window over an array.
    The result is either a 2-D array or a generator of slices, controlled by `as_array` parameter.
    Parameters
    ----------
    array : np.ndarray
        Input array.
    window : int
        Size of the rolling window.
    skip_na : bool, optional
        If False, the sequence starts with (window-1) windows filled with nans. If True, those are omitted.
        Default is False.
    as_array : bool, optional
        If True, return a 2-D array. Otherwise, return a generator of slices. Default is False.
    Returns
    -------
    np.ndarray or Generator[np.ndarray, None, None]
        Rolling window matrix or generator
    Examples
    --------
    >>> rolling(np.array([1, 2, 3, 4, 5]), 2, as_array=True)
    array([[nan,  1.],
           [ 1.,  2.],
           [ 2.,  3.],
           [ 3.,  4.],
           [ 4.,  5.]])
    Usage with numpy functions
    >>> arr = rolling(np.array([1, 2, 3, 4, 5]), 2, as_array=True)
    >>> np.sum(arr, axis=1)
    array([nan,  3.,  5.,  7.,  9.])
    """
    if not any(isinstance(window, t) for t in [int, np.integer]):
        raise TypeError(f'Wrong window type ({type(window)}) int expected')
    window = int(window)
    if array.size < window:
        raise ValueError('array.size should be bigger than window')
    def rows_gen():
        if not skip_na:
            yield from (prepend_na(array[:i + 1], (window - 1) - i) for i in np.arange(window - 1))
        starts = np.arange(array.size - (window - 1))
        yield from (array[start:end] for start, end in zip(starts, starts + window))
    return np.array([row for row in rows_gen()]) if as_array else rows_gen() 
[docs]def rolling_apply(
    func: Callable,
    window: int,
    *arrays: np.ndarray,
    prepend_nans: bool = True,
    n_jobs: int = 1,
    **kwargs
) -> np.ndarray:
    """
    Roll a fixed-width window over an array or a group of arrays, producing slices.
    Apply a function to each slice / group of slices, transforming them into a value.
    Perform computations in parallel, optionally.
    Return a new np.ndarray with the resulting values.
    Parameters
    ----------
    func : Callable
        The function to apply to each slice or a group of slices.
    window : int
        Window size.
    *arrays : list
        List of input arrays.
    prepend_nans : bool
        Specifies if nans should be prepended to the resulting array
    n_jobs : int, optional
        Parallel tasks count for joblib. If 1, joblib won't be used. Default is 1.
    **kwargs : dict
        Input parameters (passed to func, must be named).
    Returns
    -------
    np.ndarray
    Examples
    --------
    >>> arr = np.array([1, 2, 3, 4, 5])
    >>> rolling_apply(sum, 2, arr)
    array([nan,  3.,  5.,  7.,  9.])
    >>> arr2 = np.array([1.5, 2.5, 3.5, 4.5, 5.5])
    >>> func = lambda a1, a2, k: (sum(a1) + max(a2)) * k
    >>> rolling_apply(func, 2, arr, arr2, k=-1)
    array([  nan,  -5.5,  -8.5, -11.5, -14.5])
    """
    if not any(isinstance(window, t) for t in [int, np.integer]):
        raise TypeError(f'Wrong window type ({type(window)}) int expected')
    window = int(window)
    if max(len(x.shape) for x in arrays) != 1:
        raise ValueError('Wrong array shape. Supported only 1D arrays')
    if len({array.size for array in arrays}) != 1:
        raise ValueError('Arrays must be the same length')
    def _apply_func_to_arrays(idxs):
        return func(*[array[idxs[0]:idxs[-1] + 1] for array in arrays], **kwargs)
    array = arrays[0]
    rolls = rolling(
        array if len(arrays) == n_jobs == 1 else np.arange(len(array)),
        window=window,
        skip_na=True
    )
    if n_jobs == 1:
        if len(arrays) == 1:
            arr = list(map(partial(func, **kwargs), rolls))
        else:
            arr = list(map(_apply_func_to_arrays, rolls))
    else:
        f = delayed(_apply_func_to_arrays)
        arr = Parallel(n_jobs=n_jobs)(f(idxs[[0, -1]]) for idxs in rolls)
    return prepend_na(arr, n=window - 1) if prepend_nans else np.array(arr) 
[docs]def expanding(
    array: np.ndarray,
    min_periods: int = 1,
    skip_na: bool = True,
    as_array: bool = False
) -> Union[Generator[np.ndarray, None, None], np.ndarray]:
    """
    Roll an expanding window over an array.
    The window size starts at min_periods and gets incremented by 1 on each iteration.
    The result is either a 2-D array or a generator of slices, controlled by `as_array` parameter.
    Parameters
    ----------
    array : np.ndarray
        Input array.
    min_periods : int, optional
        Minimum size of the window. Default is 1.
    skip_na : bool, optional
        If False, the windows of size less than min_periods are filled with nans. If True, they're dropped.
        Default is True.
    as_array : bool, optional
        If True, return a 2-D array. Otherwise, return a generator of slices. Default is False.
    Returns
    -------
    np.ndarray or Generator[np.ndarray, None, None]
    Examples
    --------
    >>> expanding(np.array([1, 2, 3, 4, 5]), 3, as_array=True)
    array([array([1, 2, 3]), array([1, 2, 3, 4]), array([1, 2, 3, 4, 5])],
          dtype=object)
    """
    if not any(isinstance(min_periods, t) for t in [int, np.integer]):
        raise TypeError(f'Wrong min_periods type ({type(min_periods)}) int expected')
    min_periods = int(min_periods)
    if array.size < min_periods:
        raise ValueError('array.size should be bigger than min_periods')
    def rows_gen():
        if not skip_na:
            yield from (nans(i) for i in np.arange(1, min_periods))
        yield from (array[:i] for i in np.arange(min_periods, array.size + 1))
    return np.array([row for row in rows_gen()]) if as_array else rows_gen() 
[docs]def expanding_apply(
    func: Callable,
    min_periods: int,
    *arrays: np.ndarray,
    prepend_nans: bool = True,
    n_jobs: int = 1,
    **kwargs
) -> np.ndarray:
    """
    Roll an expanding window over an array or a group of arrays producing slices.
    The window size starts at min_periods and gets incremented by 1 on each iteration.
    Apply a function to each slice / group of slices, transforming them into a value.
    Perform computations in parallel, optionally.
    Return a new np.ndarray with the resulting values.
    Parameters
    ----------
    func : Callable
        The function to apply to each slice or a group of slices.
    min_periods : int
        Minimal size of expanding window.
    *arrays : list
        List of input arrays.
    prepend_nans : bool
        Specifies if nans should be prepended to the resulting array
    n_jobs : int, optional
        Parallel tasks count for joblib. If 1, joblib won't be used. Default is 1.
    **kwargs : dict
        Input parameters (passed to func, must be named).
    Returns
    -------
    np.ndarray
    Examples
    --------
    >>> arr = np.array([1, 2, 3, 4, 5])
    >>> expanding_apply(sum, 2, arr)
    array([nan,  3.,  6., 10., 15.])
    >>> arr2 = np.array([1.5, 2.5, 3.5, 4.5, 5.5])
    >>> func = lambda a1, a2, k: (sum(a1) + max(a2)) * k
    >>> expanding_apply(func, 2, arr, arr2, k=-1)
    array([  nan,  -5.5,  -9.5, -14.5, -20.5])
    """
    if not any(isinstance(min_periods, t) for t in [int, np.integer]):
        raise TypeError(f'Wrong min_periods type ({type(min_periods)}) int expected')
    min_periods = int(min_periods)
    if max(len(x.shape) for x in arrays) != 1:
        raise ValueError('Supported only 1-D arrays')
    if len({array.size for array in arrays}) != 1:
        raise ValueError('Arrays must be the same length')
    def _apply_func_to_arrays(idxs):
        return func(*[array[idxs.astype(np.int)] for array in arrays], **kwargs)
    array = arrays[0]
    rolls = expanding(
        array if len(arrays) == n_jobs == 1 else np.arange(len(array)),
        min_periods=min_periods,
        skip_na=True
    )
    if n_jobs == 1:
        if len(arrays) == 1:
            arr = list(map(partial(func, **kwargs), rolls))
        else:
            arr = list(map(_apply_func_to_arrays, rolls))
    else:
        f = delayed(_apply_func_to_arrays)
        arr = Parallel(n_jobs=n_jobs)(map(f, rolls))
    return prepend_na(arr, n=min_periods - 1) if prepend_nans else np.array(arr)