Skip to content

Commit

Permalink
Performance improvements for fwma, wma and linreg
Browse files Browse the repository at this point in the history
  • Loading branch information
Ross Garbutt committed Mar 11, 2024
1 parent b8f466c commit 2761b5b
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 98 deletions.
26 changes: 15 additions & 11 deletions pandas_ta/overlap/fwma.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
# -*- coding: utf-8 -*-
import numpy as np
from pandas import Series
from pandas_ta._typing import DictLike, Int
from pandas_ta.utils import (
fibonacci,
v_ascending,
v_offset,
v_pos_default,
v_series,
weights
v_series
)


def fwma(
close: Series, length: Int = None, asc: bool = None,
offset: Int = None, **kwargs: DictLike
Expand Down Expand Up @@ -47,21 +46,26 @@ def fwma(

# Calculate
fibs = fibonacci(n=length, weighted=True)
fwma = close.rolling(length, min_periods=length) \
.apply(weights(fibs), raw=True)
# Reverse the weights
fib_weights = fibs[::-1]
# Total weight for normalization
total_weight = fibs.sum()
fwma_values = np.convolve(close, fib_weights, 'valid') / total_weight
_fwma = np.concatenate((np.full(length-1, np.nan), fwma_values))
_fwma = Series(_fwma, index=close.index)

# Offset
if offset != 0:
fwma = fwma.shift(offset)
_fwma = _fwma.shift(offset)

# Fill
if "fillna" in kwargs:
fwma.fillna(kwargs["fillna"], inplace=True)
_fwma.fillna(kwargs["fillna"], inplace=True)
if "fill_method" in kwargs:
fwma.fillna(method=kwargs["fill_method"], inplace=True)
_fwma.fillna(method=kwargs["fill_method"], inplace=True)

# Name and Category
fwma.name = f"FWMA_{length}"
fwma.category = "overlap"
_fwma.name = f"FWMA_{length}"
_fwma.category = "overlap"

return fwma
return _fwma
121 changes: 52 additions & 69 deletions pandas_ta/overlap/linreg.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,16 @@
# -*- coding: utf-8 -*-
from sys import float_info as sflt
from numpy import arctan, nan, pi, zeros_like
from numpy.version import version as np_version
import numpy as np
import pandas as pd
from pandas import Series
from pandas_ta._typing import DictLike, Int
from pandas_ta.maps import Imports
from pandas_ta.utils import (
strided_window,
v_offset,
v_pos_default,
v_series,
v_talib,
zero
v_talib
)


def linreg(
close: Series, length: Int = None, talib: bool = None,
offset: Int = None, **kwargs: DictLike
Expand Down Expand Up @@ -69,94 +65,81 @@ def linreg(
slope = kwargs.pop("slope", False)
tsf = kwargs.pop("tsf", False)

# Calculate
np_close = close.values

if Imports["talib"] and mode_tal and not r:
from talib import LINEARREG, LINEARREG_ANGLE, LINEARREG_INTERCEPT, LINEARREG_SLOPE, TSF
if tsf:
linreg = TSF(close, timeperiod=length)
_linreg = TSF(close, timeperiod=length)
elif slope:
linreg = LINEARREG_SLOPE(close, timeperiod=length)
_linreg = LINEARREG_SLOPE(close, timeperiod=length)
elif intercept:
linreg = LINEARREG_INTERCEPT(close, timeperiod=length)
_linreg = LINEARREG_INTERCEPT(close, timeperiod=length)
elif angle:
linreg = LINEARREG_ANGLE(close, timeperiod=length)
_linreg = LINEARREG_ANGLE(close, timeperiod=length)
else:
linreg = LINEARREG(close, timeperiod=length)
_linreg = LINEARREG(close, timeperiod=length)
else:
linreg_ = zeros_like(np_close)
# [1, 2, ..., n] from 1 to n keeps Sum(xy) low
x = range(1, length + 1)
np_close = close.to_numpy()
x = np.arange(1, length + 1)
x_sum = 0.5 * length * (length + 1)
x2_sum = x_sum * (2 * length + 1) / 3
divisor = length * x2_sum - x_sum * x_sum

# Needs to be reworked outside the method
def linear_regression(series):
y_sum = series.sum()
xy_sum = (x * series).sum()

m = (length * xy_sum - x_sum * y_sum) / divisor
if slope:
return m
b = (y_sum * x2_sum - x_sum * xy_sum) / divisor
if intercept:
return b

if angle:
theta = arctan(m)
if degrees:
theta *= 180 / pi
return theta

if r:
y2_sum = (series * series).sum()
rn = length * xy_sum - x_sum * y_sum
rd = (divisor * (length * y2_sum - y_sum * y_sum)) ** 0.5
if zero(rd) == 0:
rd = sflt.epsilon
return rn / rd

return m * length + b if not tsf else m * (length - 1) + b

if np_version >= "1.20.0":
divisor = length * x2_sum - x_sum ** 2

if np.__version__ >= "1.20.0":
from numpy.lib.stride_tricks import sliding_window_view
linreg_ = [
linear_regression(_) for _ in sliding_window_view(
np_close, length)
]
windows = sliding_window_view(np_close, window_shape=length)
else:
windows = np.array([np_close[i:i+length] for i in range(len(np_close)-length+1)])

y_sums = windows.sum(axis=1)
xy_sums = (x * windows).sum(axis=1)
m_values = (length * xy_sums - x_sum * y_sums) / divisor
b_values = (y_sums * x2_sum - x_sum * xy_sums) / divisor

if slope:
result = m_values
elif intercept:
result = b_values
elif angle:
theta = np.arctan(m_values)
if degrees:
theta *= 180 / np.pi
result = theta
elif r:
y2_sums = (windows ** 2).sum(axis=1)
rn = length * xy_sums - x_sum * y_sums
rd = np.sqrt(divisor * (length * y2_sums - y_sums ** 2))
rd[rd == 0] = np.finfo(float).eps # Prevent division by zero
result = rn / rd
else:
linreg_ = [
linear_regression(_) for _ in strided_window(
np_close, length)
]
result = m_values * length + b_values

# Match the length of the input series
_linreg = np.concatenate((np.full(length - 1, np.nan), result))

linreg = Series([nan] * (length - 1) + linreg_, index=close.index)
_linreg = pd.Series(_linreg, index=close.index)

# Offset
if offset != 0:
linreg = linreg.shift(offset)
_linreg = _linreg.shift(offset)

# Fill
if "fillna" in kwargs:
linreg.fillna(kwargs["fillna"], inplace=True)
_linreg.fillna(kwargs["fillna"], inplace=True)
if "fill_method" in kwargs:
linreg.fillna(method=kwargs["fill_method"], inplace=True)
_linreg.fillna(method=kwargs["fill_method"], inplace=True)

# Name and Category
linreg.name = f"LINREG"
_linreg.name = "LINREG"
if slope:
linreg.name += "m"
_linreg.name += "m"
if intercept:
linreg.name += "b"
_linreg.name += "b"
if angle:
linreg.name += "a"
_linreg.name += "a"
if r:
linreg.name += "r"
_linreg.name += "r"

linreg.name += f"_{length}"
linreg.category = "overlap"
_linreg.name += f"_{length}"
_linreg.category = "overlap"

return linreg
return _linreg
30 changes: 12 additions & 18 deletions pandas_ta/overlap/wma.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
from numpy import arange, dot
import numpy as np
from pandas import Series
from pandas_ta._typing import DictLike, Int
from pandas_ta.maps import Imports
Expand Down Expand Up @@ -54,32 +54,26 @@ def wma(
# Calculate
if Imports["talib"] and mode_tal:
from talib import WMA
wma = WMA(close, length)
_wma = WMA(close, length)
else:
total_weight = 0.5 * length * (length + 1)
weights_ = Series(arange(1, length + 1))
weights = weights_ if asc else weights_[::-1]

def linear(w):
def _compute(x):
return dot(x, w) / total_weight
return _compute

close_ = close.rolling(length, min_periods=length)
wma = close_.apply(linear(weights), raw=True)
weights = np.arange(1, length + 1) if asc else np.arange(length, 0, -1)
_wma = np.convolve(close, weights[::-1], 'valid') / total_weight
_wma = np.concatenate((np.full(length-1, np.nan), _wma))
_wma = Series(_wma, index=close.index)

# Offset
if offset != 0:
wma = wma.shift(offset)
_wma = _wma.shift(offset)

# Fill
if "fillna" in kwargs:
wma.fillna(kwargs["fillna"], inplace=True)
_wma.fillna(kwargs["fillna"], inplace=True)
if "fill_method" in kwargs:
wma.fillna(method=kwargs["fill_method"], inplace=True)
_wma.fillna(method=kwargs["fill_method"], inplace=True)

# Name and Category
wma.name = f"WMA_{length}"
wma.category = "overlap"
_wma.name = f"WMA_{length}"
_wma.category = "overlap"

return wma
return _wma

0 comments on commit 2761b5b

Please sign in to comment.