From 12c56da43bd7633033775d801e29c788f4305f6f Mon Sep 17 00:00:00 2001 From: Luis Barrancos Date: Fri, 1 Dec 2023 01:42:51 +0000 Subject: [PATCH] Add fix for #742 Add sum_signed_rolling_deltas function and proper NumPy vectorization. Some TMO indicators in TV consider the rolling window to be exclusive of the current day, while others consider it inclusive. Since this series is used to compute the moving averages and momentums for the TMO, there would be a plot mismatch against TV unless we dealt with both cases. Added doctests for both cases, noting that since the series must be cast to float type, fp precision representation issues might arise, which makes the Pandas Series equality tests unsuitable. We use np.allclose so that we can deal with both the fp precision issues if they arise, and with the necessary NaN padding as well. --- pandas_ta/momentum/tmo.py | 142 ++++++++++++++++++++++++++++---------- 1 file changed, 105 insertions(+), 37 deletions(-) diff --git a/pandas_ta/momentum/tmo.py b/pandas_ta/momentum/tmo.py index f8da1e4f..cd76e859 100644 --- a/pandas_ta/momentum/tmo.py +++ b/pandas_ta/momentum/tmo.py @@ -1,59 +1,131 @@ # -*- coding: utf-8 -*- -from numpy import isnan, zeros + +import numpy as np from pandas import DataFrame, Series + from pandas_ta._typing import DictLike, Int from pandas_ta.ma import ma -from pandas_ta.utils import v_bool, v_mamode, v_pos_default, v_offset, v_series +from pandas_ta.utils import v_bool, v_mamode, v_offset, v_pos_default, v_series + + +def sum_signed_rolling_deltas( + close: Series, open_: Series, length: Int, exclusive: bool = True +) -> Series: + """Sum of signed rolling price deltas + + Calculate the sum of signed differences between the current day's closing + price and a rolling window of preceding opening prices. This sum is then + padded to match the original series length. + + The function can operate in two modes: exclusive and inclusive. + In exclusive mode, the rolling window considers the current day is not part + of the lookback period, while in inclusive mode it is. + + Parameters: + close (Series): Series of closing prices. + open_ (Series): Series of opening prices. + length (Int): The window length for the rolling calculation. + exclusive (bool): If True, the rolling window is exclusive of the + current day's opening price, otherwise it's inclusive. + + Returns: + Series: A series with the sum of signed rolling deltas between current + day closing price and preceding days opening prices. + + Example: + >>> close = Series([100, 110, 140, 80, 90, 60, 50, 40, 90, 110]) + >>> open_ = Series([95, 83, 71, 132, 129, 145, 133, 101, 68, 96]) + >>> result = sum_signed_rolling_deltas(close, open_, 4, exclusive=True) + >>> expected_result = Series([np.nan, np.nan, np.nan, np.nan, \ + 0.0, -4.0, -4.0, -4.0, -4.0, 0.0]) + >>> np.allclose(result, expected_result, rtol=1e-6, equal_nan=True) + True + >>> result = sum_signed_rolling_deltas(close, open_, 4, exclusive=False) + >>> expected_result = Series([np.nan, np.nan, np.nan, \ + -1.0, 1.0, -3.0, -3.0, -3.0, -3.0, 1.0]) + >>> np.allclose(result, expected_result, rtol=1e-6, equal_nan=True) + True + """ + + if not exclusive: + length -= 1 + + rolling_open = np.lib.stride_tricks.sliding_window_view( + open_, window_shape=length)[:-1] + + close_broadcasted = np.broadcast_to( + close[length:].to_numpy()[:, np.newaxis], rolling_open.shape + ) + + signed_deltas = np.sign(close_broadcasted - rolling_open) + sum_signed_deltas = np.nansum(signed_deltas, axis=1).astype(float) + + return Series( + np.pad(sum_signed_deltas, (length, 0), + mode="constant", constant_values=np.nan), + index=close.index, + ) def tmo( - open_: Series, close: Series, - tmo_length: Int = None, calc_length: Int =None, smooth_length: Int = None, - mamode: str = None, compute_momentum: bool = False, + open_: Series, + close: Series, + tmo_length: Int = None, + calc_length: Int = None, + smooth_length: Int = None, + mamode: str = None, + compute_momentum: bool = False, normalize_signal: bool = False, - offset: Int = None, **kwargs: DictLike + exclusive_window: bool = True, + offset: Int = None, + **kwargs: DictLike, ) -> DataFrame: """True Momentum Oscillator (TMO) - The True Momentum Oscillator (TMO) aims to capture the true momentum - underlying the price movement of an asset over a specified time frame. It - quantifies the net buying and selling pressure by summing and then - smoothing the signum of the closing and opening price difference over the - given period, and then computing a main and smooth signal with a series - of moving averages. Crossovers between the main and smooth signal generate - potential signals for buying and selling opportunities. + This function computes the True Momentum Oscillator (TMO), a technical + indicator that measures the momentum of an asset's price movement over + a specified time frame. It does this by comparing the most recent closing + price within a rolling window to each opening price in that window, summing + the sign of these differences, and then applying a series of moving + averages to smooth the results. Some platforms present versions of this indicator with an optional momentum calculation for the main TMO signal and its smooth version, as well as the possibility to normalize the signals to the [-100, 100] range, which has the added benefit of allowing the definition of overbought and oversold regions typically between -70 and 70. + Common implementations in TV include indicators where the rolling windows + are exclusive or not, and control over this behaviour is also provided. Sources: https://www.tradingview.com/script/VRwDppqd-True-Momentum-Oscillator/ https://www.tradingview.com/script/65vpO7T5-True-Momentum-Oscillator-Universal-Edition/ - https://www.tradingview.com/script/o9BQyaA4-True-Momentum-Oscillator/ Args: - open_ (pd.Series): Series of 'open' prices. - close (pd.Series): Series of 'close' prices. - tmo_length (int): The period for TMO calculation. Default: 14 - calc_length (int): Initial moving average window. Default: 5 - smooth_length (int): Main and smooth signal MA window. Default: 3 + open_ (Series): Series of 'open' prices. + close (Series): Series of 'close' prices. + tmo_length (Int): The period for TMO calculation. Default: 14 + calc_length (Int): Initial moving average window. Default: 5 + smooth_length (Int): Main and smooth signal MA window. Default: 3 mamode (str): See ``help(ta.ma)``. Default: 'ema' compute_momentum (bool): Compute main and smooth momentum. Default: False normalize_signal (bool): Normalize TMO values to [-100,100]. Default: False - offset (int): How many periods to offset the result. Default: 0 + exclusive_window (bool): Exclusive or inclusive rolling window, where + the lookback is made over n days, or n-1, if we consider the rolling + window period should include the current date. + offset (Int): How many periods to offset the result. Default: 0 Kwargs: - fillna (value, optional): pd.DataFrame.fillna(value) + fillna (value, optional): DataFrame.fillna(value) fill_method (value, optional): Type of fill method Returns: - pd.DataFrame: main, smooth, main momentum, smooth momentum + DataFrame: main, smooth, main momentum, smooth momentum + """ + # Validate tmo_length = v_pos_default(tmo_length, 14) calc_length = v_pos_default(calc_length, 5) @@ -73,31 +145,27 @@ def tmo( if open_ is None or close is None: return None - # Calculate - signed_diff = Series(close - open_) \ - .apply(lambda x: 1 if x > 0 else (-1 if x < 0 else 0)) - signed_diff_sum = signed_diff.rolling(window=tmo_length).sum() - - if normalize_signal: - signed_diff_sum = signed_diff_sum * 100 / tmo_length + signed_diff_sum = sum_signed_rolling_deltas( + close, open_, tmo_length, exclusive=exclusive_window + ) initial_ma = ma(mamode, signed_diff_sum, length=calc_length) - if all(isnan(initial_ma)): - return None # Emergency Break + if all(np.isnan(initial_ma)): + return None # Emergency Break main = ma(mamode, initial_ma, length=smooth_length) - if all(isnan(main)): - return None # Emergency Break + if all(np.isnan(main)): + return None # Emergency Break smooth = ma(mamode, main, length=smooth_length) - if all(isnan(smooth)): - return None # Emergency Break + if all(np.isnan(smooth)): + return None # Emergency Break if compute_momentum: mom_main = main - main.shift(tmo_length) mom_smooth = smooth - smooth.shift(tmo_length) else: - zero_array = zeros(main.size) + zero_array = np.zeros(main.size) mom_main = Series(zero_array, index=main.index) mom_smooth = Series(zero_array, index=smooth.index) @@ -135,7 +203,7 @@ def tmo( main.name: main, smooth.name: smooth, mom_main.name: mom_main, - mom_smooth.name: mom_smooth + mom_smooth.name: mom_smooth, } df = DataFrame(data, index=close.index) df.name = f"TMO{_props}"