From 2761b5b67c46e373ed43d172bd778236e1cd1355 Mon Sep 17 00:00:00 2001 From: Ross Garbutt Date: Mon, 11 Mar 2024 11:18:44 +1100 Subject: [PATCH] Performance improvements for fwma, wma and linreg --- pandas_ta/overlap/fwma.py | 26 ++++---- pandas_ta/overlap/linreg.py | 121 ++++++++++++++++-------------------- pandas_ta/overlap/wma.py | 30 ++++----- 3 files changed, 79 insertions(+), 98 deletions(-) diff --git a/pandas_ta/overlap/fwma.py b/pandas_ta/overlap/fwma.py index 131150b9..1088ad8f 100644 --- a/pandas_ta/overlap/fwma.py +++ b/pandas_ta/overlap/fwma.py @@ -1,4 +1,5 @@ # -*- coding: utf-8 -*- +import numpy as np from pandas import Series from pandas_ta._typing import DictLike, Int from pandas_ta.utils import ( @@ -6,11 +7,9 @@ v_ascending, v_offset, v_pos_default, - v_series, - weights + v_series ) - def fwma( close: Series, length: Int = None, asc: bool = None, offset: Int = None, **kwargs: DictLike @@ -47,21 +46,26 @@ def fwma( # Calculate fibs = fibonacci(n=length, weighted=True) - fwma = close.rolling(length, min_periods=length) \ - .apply(weights(fibs), raw=True) + # Reverse the weights + fib_weights = fibs[::-1] + # Total weight for normalization + total_weight = fibs.sum() + fwma_values = np.convolve(close, fib_weights, 'valid') / total_weight + _fwma = np.concatenate((np.full(length-1, np.nan), fwma_values)) + _fwma = Series(_fwma, index=close.index) # Offset if offset != 0: - fwma = fwma.shift(offset) + _fwma = _fwma.shift(offset) # Fill if "fillna" in kwargs: - fwma.fillna(kwargs["fillna"], inplace=True) + _fwma.fillna(kwargs["fillna"], inplace=True) if "fill_method" in kwargs: - fwma.fillna(method=kwargs["fill_method"], inplace=True) + _fwma.fillna(method=kwargs["fill_method"], inplace=True) # Name and Category - fwma.name = f"FWMA_{length}" - fwma.category = "overlap" + _fwma.name = f"FWMA_{length}" + _fwma.category = "overlap" - return fwma + return _fwma diff --git a/pandas_ta/overlap/linreg.py b/pandas_ta/overlap/linreg.py index bb5db7e7..8b7d99c2 100644 --- a/pandas_ta/overlap/linreg.py +++ b/pandas_ta/overlap/linreg.py @@ -1,20 +1,16 @@ # -*- coding: utf-8 -*- -from sys import float_info as sflt -from numpy import arctan, nan, pi, zeros_like -from numpy.version import version as np_version +import numpy as np +import pandas as pd from pandas import Series from pandas_ta._typing import DictLike, Int from pandas_ta.maps import Imports from pandas_ta.utils import ( - strided_window, v_offset, v_pos_default, v_series, - v_talib, - zero + v_talib ) - def linreg( close: Series, length: Int = None, talib: bool = None, offset: Int = None, **kwargs: DictLike @@ -69,94 +65,81 @@ def linreg( slope = kwargs.pop("slope", False) tsf = kwargs.pop("tsf", False) - # Calculate - np_close = close.values - if Imports["talib"] and mode_tal and not r: from talib import LINEARREG, LINEARREG_ANGLE, LINEARREG_INTERCEPT, LINEARREG_SLOPE, TSF if tsf: - linreg = TSF(close, timeperiod=length) + _linreg = TSF(close, timeperiod=length) elif slope: - linreg = LINEARREG_SLOPE(close, timeperiod=length) + _linreg = LINEARREG_SLOPE(close, timeperiod=length) elif intercept: - linreg = LINEARREG_INTERCEPT(close, timeperiod=length) + _linreg = LINEARREG_INTERCEPT(close, timeperiod=length) elif angle: - linreg = LINEARREG_ANGLE(close, timeperiod=length) + _linreg = LINEARREG_ANGLE(close, timeperiod=length) else: - linreg = LINEARREG(close, timeperiod=length) + _linreg = LINEARREG(close, timeperiod=length) else: - linreg_ = zeros_like(np_close) - # [1, 2, ..., n] from 1 to n keeps Sum(xy) low - x = range(1, length + 1) + np_close = close.to_numpy() + x = np.arange(1, length + 1) x_sum = 0.5 * length * (length + 1) x2_sum = x_sum * (2 * length + 1) / 3 - divisor = length * x2_sum - x_sum * x_sum - - # Needs to be reworked outside the method - def linear_regression(series): - y_sum = series.sum() - xy_sum = (x * series).sum() - - m = (length * xy_sum - x_sum * y_sum) / divisor - if slope: - return m - b = (y_sum * x2_sum - x_sum * xy_sum) / divisor - if intercept: - return b - - if angle: - theta = arctan(m) - if degrees: - theta *= 180 / pi - return theta - - if r: - y2_sum = (series * series).sum() - rn = length * xy_sum - x_sum * y_sum - rd = (divisor * (length * y2_sum - y_sum * y_sum)) ** 0.5 - if zero(rd) == 0: - rd = sflt.epsilon - return rn / rd - - return m * length + b if not tsf else m * (length - 1) + b - - if np_version >= "1.20.0": + divisor = length * x2_sum - x_sum ** 2 + + if np.__version__ >= "1.20.0": from numpy.lib.stride_tricks import sliding_window_view - linreg_ = [ - linear_regression(_) for _ in sliding_window_view( - np_close, length) - ] + windows = sliding_window_view(np_close, window_shape=length) + else: + windows = np.array([np_close[i:i+length] for i in range(len(np_close)-length+1)]) + y_sums = windows.sum(axis=1) + xy_sums = (x * windows).sum(axis=1) + m_values = (length * xy_sums - x_sum * y_sums) / divisor + b_values = (y_sums * x2_sum - x_sum * xy_sums) / divisor + + if slope: + result = m_values + elif intercept: + result = b_values + elif angle: + theta = np.arctan(m_values) + if degrees: + theta *= 180 / np.pi + result = theta + elif r: + y2_sums = (windows ** 2).sum(axis=1) + rn = length * xy_sums - x_sum * y_sums + rd = np.sqrt(divisor * (length * y2_sums - y_sums ** 2)) + rd[rd == 0] = np.finfo(float).eps # Prevent division by zero + result = rn / rd else: - linreg_ = [ - linear_regression(_) for _ in strided_window( - np_close, length) - ] + result = m_values * length + b_values + + # Match the length of the input series + _linreg = np.concatenate((np.full(length - 1, np.nan), result)) - linreg = Series([nan] * (length - 1) + linreg_, index=close.index) + _linreg = pd.Series(_linreg, index=close.index) # Offset if offset != 0: - linreg = linreg.shift(offset) + _linreg = _linreg.shift(offset) # Fill if "fillna" in kwargs: - linreg.fillna(kwargs["fillna"], inplace=True) + _linreg.fillna(kwargs["fillna"], inplace=True) if "fill_method" in kwargs: - linreg.fillna(method=kwargs["fill_method"], inplace=True) + _linreg.fillna(method=kwargs["fill_method"], inplace=True) # Name and Category - linreg.name = f"LINREG" + _linreg.name = "LINREG" if slope: - linreg.name += "m" + _linreg.name += "m" if intercept: - linreg.name += "b" + _linreg.name += "b" if angle: - linreg.name += "a" + _linreg.name += "a" if r: - linreg.name += "r" + _linreg.name += "r" - linreg.name += f"_{length}" - linreg.category = "overlap" + _linreg.name += f"_{length}" + _linreg.category = "overlap" - return linreg + return _linreg diff --git a/pandas_ta/overlap/wma.py b/pandas_ta/overlap/wma.py index d13be9e7..f06da320 100644 --- a/pandas_ta/overlap/wma.py +++ b/pandas_ta/overlap/wma.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -from numpy import arange, dot +import numpy as np from pandas import Series from pandas_ta._typing import DictLike, Int from pandas_ta.maps import Imports @@ -54,32 +54,26 @@ def wma( # Calculate if Imports["talib"] and mode_tal: from talib import WMA - wma = WMA(close, length) + _wma = WMA(close, length) else: total_weight = 0.5 * length * (length + 1) - weights_ = Series(arange(1, length + 1)) - weights = weights_ if asc else weights_[::-1] - - def linear(w): - def _compute(x): - return dot(x, w) / total_weight - return _compute - - close_ = close.rolling(length, min_periods=length) - wma = close_.apply(linear(weights), raw=True) + weights = np.arange(1, length + 1) if asc else np.arange(length, 0, -1) + _wma = np.convolve(close, weights[::-1], 'valid') / total_weight + _wma = np.concatenate((np.full(length-1, np.nan), _wma)) + _wma = Series(_wma, index=close.index) # Offset if offset != 0: - wma = wma.shift(offset) + _wma = _wma.shift(offset) # Fill if "fillna" in kwargs: - wma.fillna(kwargs["fillna"], inplace=True) + _wma.fillna(kwargs["fillna"], inplace=True) if "fill_method" in kwargs: - wma.fillna(method=kwargs["fill_method"], inplace=True) + _wma.fillna(method=kwargs["fill_method"], inplace=True) # Name and Category - wma.name = f"WMA_{length}" - wma.category = "overlap" + _wma.name = f"WMA_{length}" + _wma.category = "overlap" - return wma + return _wma