Source code for numpy_ext

"""
=========
numpy_ext
=========

An extension library for NumPy_ that implements common array operations not present in NumPy.

.. _numpy: https://numpy.org/

Installation
------------

**Regular installation**::

    pip install numpy_ext


**Installation for development**::

    git clone https://github.com/3jane/numpy_ext.git
    cd numpy_ext
    pip install -e .[dev]  # note: make sure you are using pip>=20


Window operations
-----------------

- :func:`numpy_ext.expanding`
- :func:`numpy_ext.expanding_apply`
- :func:`numpy_ext.rolling`
- :func:`numpy_ext.rolling_apply`

Operations with nans
--------------------

- :func:`numpy_ext.nans`
- :func:`numpy_ext.drop_na`
- :func:`numpy_ext.fill_na`
- :func:`numpy_ext.fill_not_finite`
- :func:`numpy_ext.prepend_na`

Others
------
- :func:`numpy_ext.apply_map`
- :func:`numpy_ext.expstep_range`

Functions
---------

"""
from functools import partial
from typing import Callable, Any, Union, Generator, Tuple, List

import numpy as np
from joblib import Parallel, delayed

Number = Union[int, float]


[docs]def expstep_range( start: Number, end: Number, min_step: Number = 1, step_mult: Number = 1, round_func: Callable = None ) -> np.ndarray: """ Return spaced values within a given interval. Step is increased by a multiplier on each iteration. Parameters ---------- start : int or float Start of interval, inclusive end : int or float End of interval, exclusive min_step : int or float, optional Minimal step between values. Must be bigger than 0. Default is 1. step_mult : int or float, optional Multiplier by which to increase the step on each iteration. Must be bigger than 0. Default is 1. round_func: Callable, optional Vectorized rounding function, e.g. np.ceil, np.floor, etc. Default is None. Returns ------- np.ndarray Array of exponentially spaced values. Examples -------- >>> expstep_range(1, 100, min_step=1, step_mult=1.5) array([ 1. , 2. , 3.5 , 5.75 , 9.125 , 14.1875 , 21.78125 , 33.171875 , 50.2578125 , 75.88671875]) >>> expstep_range(1, 100, min_step=1, step_mult=1.5, round_func=np.ceil) array([ 1., 2., 4., 6., 10., 15., 22., 34., 51., 76.]) >>> expstep_range(start=-1, end=-100, min_step=1, step_mult=1.5) array([ -1. , -2. , -3.5 , -5.75 , -9.125 , -14.1875 , -21.78125 , -33.171875 , -50.2578125 , -75.88671875]) Generate array of ints >>> expstep_range(start=100, end=1, min_step=1, step_mult=1.5).astype(int) array([100, 99, 97, 95, 91, 86, 79, 67, 50, 25]) """ if step_mult <= 0: raise ValueError('mult_step should be bigger than 0') if min_step <= 0: raise ValueError('min_step should be bigger than 0') last = start values = [] step = min_step sign = 1 if start < end else -1 while start < end and last < end or start > end and last > end: values.append(last) last += max(step, min_step) * sign step = abs(step * step_mult) values = np.array(values) if not round_func: return values values = np.array(round_func(values)) _, idx = np.unique(values, return_index=True) return values[np.sort(idx)]
[docs]def apply_map(func: Callable[[Any], Any], array: Union[List, np.ndarray]) -> np.ndarray: """ Apply a function element-wise to an array. Parameters ---------- func : Callable[[Any], Any] Function that accepts one argument and returns a single value. array : Union[List, np.ndarray] Input array or a list. Any lists will be converted to np.ndarray first. Returns ------- np.ndarray Resulting array. Examples -------- >>> apply_map(lambda x: 0 if x < 3 else 1, [[2, 2], [3, 3]]) array([[0, 0], [1, 1]]) """ array = np.array(array) array_view = array.flat array_view[:] = [func(x) for x in array_view] return array
############################# # Operations with nans #############################
[docs]def nans(shape: Union[int, Tuple[int, ...]], dtype=np.float64) -> np.ndarray: """ Return a new array of a given shape and type, filled with np.nan values. Parameters ---------- shape : int or tuple of ints Shape of the new array, e.g., (2, 3) or 2. dtype: data-type, optional Returns ------- np.ndarray Array of np.nans of the given shape. Examples -------- >>> nans(3) array([nan, nan, nan]) >>> nans((2, 2)) array([[nan, nan], [nan, nan]]) >>> nans(2, np.datetime64) array(['NaT', 'NaT'], dtype=datetime64) """ if np.issubdtype(dtype, np.integer): dtype = np.float arr = np.empty(shape, dtype=dtype) arr.fill(np.nan) return arr
[docs]def drop_na(array: np.ndarray) -> np.ndarray: """ Return a given array flattened and with nans dropped. Parameters ---------- array : np.ndarray Input array. Returns ------- np.ndarray New array without nans. Examples -------- >>> drop_na(np.array([np.nan, 1, 2])) array([1., 2.]) """ return array[~np.isnan(array)]
[docs]def fill_na(array: np.ndarray, value: Any) -> np.ndarray: """ Return a copy of array with nans replaced with a given value. Parameters ---------- array : np.ndarray Input array. value : Any Value to replace nans with. Returns ------- np.ndarray A copy of array with nans replaced with the given value. Examples -------- >>> fill_na(np.array([np.nan, 1, 2]), -1) array([-1., 1., 2.]) """ ar = array.copy() ar[np.isnan(ar)] = value return ar
[docs]def fill_not_finite(array: np.ndarray, value: Any = 0) -> np.ndarray: """ Return a copy of array with nans and infs replaced with a given value. Parameters ---------- array : np.ndarray Input array. value : Any, optional Value to replace nans and infs with. Default is 0. Returns ------- np.ndarray A copy of array with nans and infs replaced with the given value. Examples -------- >>> fill_not_finite(np.array([np.nan, np.inf, 1, 2]), 99) array([99., 99., 1., 2.]) """ ar = array.copy() ar[~np.isfinite(array)] = value return ar
[docs]def prepend_na(array: np.ndarray, n: int) -> np.ndarray: """ Return a copy of array with nans inserted at the beginning. Parameters ---------- array : np.ndarray Input array. n : int Number of elements to insert. Returns ------- np.ndarray New array with nans added at the beginning. Examples -------- >>> prepend_na(np.array([1, 2]), 2) array([nan, nan, 1., 2.]) """ if not len(array): # if empty, simply create empty array return np.hstack((nans(n), array)) elem = array[0] dtype = np.float64 if hasattr(elem, 'dtype'): dtype = elem.dtype if hasattr(elem, '__len__') and len(elem) > 1: # if the array has many dimension if isinstance(array, np.ndarray): array_shape = array.shape else: array_shape = np.array(array).shape return np.vstack((nans((n, *array_shape[1:]), dtype), array)) else: return np.hstack((nans(n, dtype), array))
############################# # window operations #############################
[docs]def rolling( array: np.ndarray, window: int, skip_na: bool = False, as_array: bool = False ) -> Union[Generator[np.ndarray, None, None], np.ndarray]: """ Roll a fixed-width window over an array. The result is either a 2-D array or a generator of slices, controlled by `as_array` parameter. Parameters ---------- array : np.ndarray Input array. window : int Size of the rolling window. skip_na : bool, optional If False, the sequence starts with (window-1) windows filled with nans. If True, those are omitted. Default is False. as_array : bool, optional If True, return a 2-D array. Otherwise, return a generator of slices. Default is False. Returns ------- np.ndarray or Generator[np.ndarray, None, None] Rolling window matrix or generator Examples -------- >>> rolling(np.array([1, 2, 3, 4, 5]), 2, as_array=True) array([[nan, 1.], [ 1., 2.], [ 2., 3.], [ 3., 4.], [ 4., 5.]]) Usage with numpy functions >>> arr = rolling(np.array([1, 2, 3, 4, 5]), 2, as_array=True) >>> np.sum(arr, axis=1) array([nan, 3., 5., 7., 9.]) """ if not any(isinstance(window, t) for t in [int, np.integer]): raise TypeError(f'Wrong window type ({type(window)}) int expected') window = int(window) if array.size < window: raise ValueError('array.size should be bigger than window') def rows_gen(): if not skip_na: yield from (prepend_na(array[:i + 1], (window - 1) - i) for i in np.arange(window - 1)) starts = np.arange(array.size - (window - 1)) yield from (array[start:end] for start, end in zip(starts, starts + window)) return np.array([row for row in rows_gen()]) if as_array else rows_gen()
[docs]def rolling_apply( func: Callable, window: int, *arrays: np.ndarray, prepend_nans: bool = True, n_jobs: int = 1, **kwargs ) -> np.ndarray: """ Roll a fixed-width window over an array or a group of arrays, producing slices. Apply a function to each slice / group of slices, transforming them into a value. Perform computations in parallel, optionally. Return a new np.ndarray with the resulting values. Parameters ---------- func : Callable The function to apply to each slice or a group of slices. window : int Window size. *arrays : list List of input arrays. prepend_nans : bool Specifies if nans should be prepended to the resulting array n_jobs : int, optional Parallel tasks count for joblib. If 1, joblib won't be used. Default is 1. **kwargs : dict Input parameters (passed to func, must be named). Returns ------- np.ndarray Examples -------- >>> arr = np.array([1, 2, 3, 4, 5]) >>> rolling_apply(sum, 2, arr) array([nan, 3., 5., 7., 9.]) >>> arr2 = np.array([1.5, 2.5, 3.5, 4.5, 5.5]) >>> func = lambda a1, a2, k: (sum(a1) + max(a2)) * k >>> rolling_apply(func, 2, arr, arr2, k=-1) array([ nan, -5.5, -8.5, -11.5, -14.5]) """ if not any(isinstance(window, t) for t in [int, np.integer]): raise TypeError(f'Wrong window type ({type(window)}) int expected') window = int(window) if max(len(x.shape) for x in arrays) != 1: raise ValueError('Wrong array shape. Supported only 1D arrays') if len({array.size for array in arrays}) != 1: raise ValueError('Arrays must be the same length') def _apply_func_to_arrays(idxs): return func(*[array[idxs[0]:idxs[-1] + 1] for array in arrays], **kwargs) array = arrays[0] rolls = rolling( array if len(arrays) == n_jobs == 1 else np.arange(len(array)), window=window, skip_na=True ) if n_jobs == 1: if len(arrays) == 1: arr = list(map(partial(func, **kwargs), rolls)) else: arr = list(map(_apply_func_to_arrays, rolls)) else: f = delayed(_apply_func_to_arrays) arr = Parallel(n_jobs=n_jobs)(f(idxs[[0, -1]]) for idxs in rolls) return prepend_na(arr, n=window - 1) if prepend_nans else np.array(arr)
[docs]def expanding( array: np.ndarray, min_periods: int = 1, skip_na: bool = True, as_array: bool = False ) -> Union[Generator[np.ndarray, None, None], np.ndarray]: """ Roll an expanding window over an array. The window size starts at min_periods and gets incremented by 1 on each iteration. The result is either a 2-D array or a generator of slices, controlled by `as_array` parameter. Parameters ---------- array : np.ndarray Input array. min_periods : int, optional Minimum size of the window. Default is 1. skip_na : bool, optional If False, the windows of size less than min_periods are filled with nans. If True, they're dropped. Default is True. as_array : bool, optional If True, return a 2-D array. Otherwise, return a generator of slices. Default is False. Returns ------- np.ndarray or Generator[np.ndarray, None, None] Examples -------- >>> expanding(np.array([1, 2, 3, 4, 5]), 3, as_array=True) array([array([1, 2, 3]), array([1, 2, 3, 4]), array([1, 2, 3, 4, 5])], dtype=object) """ if not any(isinstance(min_periods, t) for t in [int, np.integer]): raise TypeError(f'Wrong min_periods type ({type(min_periods)}) int expected') min_periods = int(min_periods) if array.size < min_periods: raise ValueError('array.size should be bigger than min_periods') def rows_gen(): if not skip_na: yield from (nans(i) for i in np.arange(1, min_periods)) yield from (array[:i] for i in np.arange(min_periods, array.size + 1)) return np.array([row for row in rows_gen()]) if as_array else rows_gen()
[docs]def expanding_apply( func: Callable, min_periods: int, *arrays: np.ndarray, prepend_nans: bool = True, n_jobs: int = 1, **kwargs ) -> np.ndarray: """ Roll an expanding window over an array or a group of arrays producing slices. The window size starts at min_periods and gets incremented by 1 on each iteration. Apply a function to each slice / group of slices, transforming them into a value. Perform computations in parallel, optionally. Return a new np.ndarray with the resulting values. Parameters ---------- func : Callable The function to apply to each slice or a group of slices. min_periods : int Minimal size of expanding window. *arrays : list List of input arrays. prepend_nans : bool Specifies if nans should be prepended to the resulting array n_jobs : int, optional Parallel tasks count for joblib. If 1, joblib won't be used. Default is 1. **kwargs : dict Input parameters (passed to func, must be named). Returns ------- np.ndarray Examples -------- >>> arr = np.array([1, 2, 3, 4, 5]) >>> expanding_apply(sum, 2, arr) array([nan, 3., 6., 10., 15.]) >>> arr2 = np.array([1.5, 2.5, 3.5, 4.5, 5.5]) >>> func = lambda a1, a2, k: (sum(a1) + max(a2)) * k >>> expanding_apply(func, 2, arr, arr2, k=-1) array([ nan, -5.5, -9.5, -14.5, -20.5]) """ if not any(isinstance(min_periods, t) for t in [int, np.integer]): raise TypeError(f'Wrong min_periods type ({type(min_periods)}) int expected') min_periods = int(min_periods) if max(len(x.shape) for x in arrays) != 1: raise ValueError('Supported only 1-D arrays') if len({array.size for array in arrays}) != 1: raise ValueError('Arrays must be the same length') def _apply_func_to_arrays(idxs): return func(*[array[idxs.astype(np.int)] for array in arrays], **kwargs) array = arrays[0] rolls = expanding( array if len(arrays) == n_jobs == 1 else np.arange(len(array)), min_periods=min_periods, skip_na=True ) if n_jobs == 1: if len(arrays) == 1: arr = list(map(partial(func, **kwargs), rolls)) else: arr = list(map(_apply_func_to_arrays, rolls)) else: f = delayed(_apply_func_to_arrays) arr = Parallel(n_jobs=n_jobs)(map(f, rolls)) return prepend_na(arr, n=min_periods - 1) if prepend_nans else np.array(arr)