-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature Request: Pivot Points #452
Comments
Hello @drixie, I am aware of all sorts of indicators including Pivot Points as well as the different kinds of Pivot Points will need to be included. Luckily, you do not need to look far for Python code for Pivots as Jesse AI has also implemented them. The only issue I foresee is that Pandas TA does not know what timeframe users are using... it seems many users are using intraday timeframes, so these might need to be excluded from the DataFrame Extension inclusion (Step 2) in #355. If you find time and willingness to contribute, it would be greatly appreciated! 😎 Kind Regards, |
I have an implementation of PIvot Points in Python which resembles the PivotHigh and PivotLow of TradingView I can share here. |
Initial Codefrom collections import deque
import numpy as np
def clean_deque(i, k, deq, df, key, isHigh):
if deq and deq[0] == i - k:
deq.popleft()
if isHigh:
while deq and df.iloc[i][key] > df.iloc[deq[-1]][key]:
deq.pop()
else:
while deq and df.iloc[i][key] < df.iloc[deq[-1]][key]:
deq.pop()
def pivotPoints(pivot=None,data=None):
"""
This function calculates the pivot points based on the pivot lenght.
These can be HH, LH , HL, LL values based on the adjacent pivots
which occur based on the length of the pivot.
"""
data['PH'] = False
data['PHV'] = np.NaN
data['PL'] = False
data['PLV'] = np.NaN
keyHigh = 'high'
keyLow = 'low'
win_size = pivot * 2 + 1
deqHigh = deque()
deqLow = deque()
max_idx = 0
min_idx = 0
i = 0
j = pivot
pivot_low = None
pivot_high = None
for index, row in data.iterrows():
if i < win_size:
clean_deque(i, win_size, deqHigh, data, keyHigh, True)
clean_deque(i, win_size, deqLow, data, keyLow, False)
deqHigh.append(i)
deqLow.append(i)
if data.iloc[i][keyHigh] > data.iloc[max_idx][keyHigh]:
max_idx = i
if data.iloc[i][keyLow] < data.iloc[min_idx][keyLow]:
min_idx = i
if i == win_size-1:
if data.iloc[max_idx][keyHigh] == data.iloc[j][keyHigh]:
data.at[data.index[j], 'PH'] = True
pivot_high = data.iloc[j][keyHigh]
if data.iloc[min_idx][keyLow] == data.iloc[j][keyLow]:
data.at[data.index[j], 'PL'] = True
pivot_low = data.iloc[j][keyLow]
if i >= win_size:
j += 1
clean_deque(i, win_size, deqHigh, data, keyHigh, True)
clean_deque(i, win_size, deqLow, data, keyLow, False)
deqHigh.append(i)
deqLow.append(i)
pivot_val = data.iloc[deqHigh[0]][keyHigh]
if pivot_val == data.iloc[j][keyHigh]:
data.at[data.index[j], 'PH'] = True
pivot_high = data.iloc[j][keyHigh]
if data.iloc[deqLow[0]][keyLow] == data.iloc[j][keyLow]:
data.at[data.index[j], 'PL'] = True
pivot_low = data.iloc[j][keyLow]
data.at[data.index[j], 'PHV'] = pivot_high
data.at[data.index[j], 'PLV'] = pivot_low
i = i + 1
return data Callingif __name__ == "__main__":
from ..data.fetch import *
from .emas import *
import pandas_ta as ta
df = data1m(
symbol = 'SPY',
start_date = '2021-12-10',
end_date = '2021-12-10'
)
pivots = pivotPoints(pivot=1,data=df)
df['PH'] = pivots['PH']
df['PHV'] = pivots['PHV']
df['PL'] = pivots['PL']
df['PLV'] = pivots['PLV']
print(df.loc[df['PH'] == True])
print(df.loc[df['PL'] == True]) |
In the above snippet |
``
It's not necessary, but I think the amount of memory needed could be cut down pretty dramatically by defaulting to have the 'anchor' values stored in a separate ndarray and then accessed using corresponding index positions returned from datetime or some other method. I'm not entirely sure, however a conditional np.where() call parameter could then still be used to return it back to the original timeframe for plotting ect if needed. from numpy import NaN, select
from pandas import DataFrame, to_datetime, infer_freq
from pandas_ta.utils import is_datetime_ordered, verify_series
def pivots(_open, high, low, close, anchor=None, method=None):
_open = verify_series(_open)
high = verify_series(high)
low = verify_series(low)
close = verify_series(close)
anchor = anchor.upper() if anchor and isinstance(anchor, str) and len(anchor) >= 1 else "D"
method_list = ["traditional", "fibonacci", "woodie", "classic", "demark", "camarilla"]
method = method if method in method_list else "traditional"
date = (
to_datetime(close.index, unit="ms")
if not is_datetime_ordered(close) and verify_series(close)
else close.index
)
freq = infer_freq(date)
df = DataFrame(
index=date,
data={"open": _open.values, "high": high.values, "low": low.values, "close": close.values},
)
if freq is not anchor:
a = DataFrame()
a["open"] = df["open"].resample(anchor).first()
a["high"] = df["high"].resample(anchor).max()
a["low"] = df["low"].resample(anchor).min()
a["close"] = df["close"].resample(anchor).last()
else:
a = df
# Calculate the Pivot Points
if method == "traditional":
a["p"] = (a.high.values + a.low.values + a.close.values) / 3
a["s1"] = (2 * a.p.values) - a.high.values
a["s2"] = a.p.values - (a.high.values - a.low.values)
a["s3"] = a.p.values - (a.high.values - a.low.values) * 2
a["r1"] = (2 * a.p.values) - a.low.values
a["r2"] = a.p.values + (a.high.values - a.low.values)
a["r3"] = a.p.values + (a.high.values - a.low.values) * 2
elif method == "fibonacci":
a["p"] = (a.high.values + a.low.values + a.close.values) / 3
a["pivot_range"] = a.high.values - a.low.values
a["s1"] = a.p.values - 0.382 * a.pivot_range.values
a["s2"] = a.p.values - 0.618 * a.pivot_range.values
a["s3"] = a.p.values - 1 * a.pivot_range.values
a["r1"] = a.p.values + 0.382 * a.pivot_range.values
a["r2"] = a.p.values + 0.382 * a.pivot_range.values
a["r3"] = a.p.values + 1 * a.pivot_range.values
a.drop(["pivot_range"], axis=1, inplace=True)
elif method == "woodie":
a["pivot_range"] = a.high.values - a.low.values
a["p"] = (a.high.values + a.low.values + a.open.values * 2) / 4
a["s1"] = a.p.values * 2 - a.high.values
a["s2"] = a.p.values - 1 * a.pivot_range.values
a["s3"] = a.high.values + 2 * (a.p.values - a.low.values)
a["s4"] = a.s3 - a.p.values
a["r1"] = a.p.values * 2 - a.low.values
a["r2"] = a.p.values + 1 * a.pivot_range.values
a["r3"] = a.low.values - 2 * (a.high.values - a.p.values)
a["r4"] = a.r3 + a.p.values
a.drop(["pivot_range"], axis=1, inplace=True)
elif method == "classic":
a["p"] = (a.high.values + a.low.values + a.close.values) / 3
a["pivot_range"] = a.high.values - a.low.values
a["s1"] = a.p.values * 2 - a.high.values
a["s2"] = a.p.values - 1 * a.pivot_range.values
a["s3"] = a.p.values - 2 * a.pivot_range.values
a["s4"] = a.p.values - 3 * a.pivot_range.values
a["r1"] = a.p.values * 2 - a.low.values
a["r2"] = a.p.values + 1 * a.pivot_range.values
a["r3"] = a.p.values + 2 * a.pivot_range.values
a["r4"] = a.p.values + 3 * a.pivot_range.values
a.drop(["pivot_range"], axis=1, inplace=True)
elif method == "demark":
conds = (
a.close.values == a.open.values,
a.close.values > a.open.values,
)
vals = (
a.high.values + a.low.values + a.close.values * 2,
a.high.values * 2 + a.low.values + a.close.values,
)
p = select(conds, vals, default=(a.high.values + a.low.values * 2 + a.close.values))
a["p"] = p / 4
a["s1"] = p / 2 - a.high.values
a["r1"] = p / 2 - a.low.values
elif method == "camarilla":
a["p"] = (a.high.values + a.low.values + a.close.values) / 3
a["pivot_range"] = a.high.values - a.low.values
a["s1"] = a.close.values - a.pivot_range.values * 1.1 / 12
a["s2"] = a.close.values - a.pivot_range.values * 1.1 / 6
a["s3"] = a.close.values - a.pivot_range.values * 1.1 / 4
a["s4"] = a.close.values - a.pivot_range.values * 1.1 / 2
a["r1"] = a.close.values + a.pivot_range.values * 1.1 / 12
a["r2"] = a.close.values + a.pivot_range.values * 1.1 / 6
a["r3"] = a.close.values + a.pivot_range.values * 1.1 / 4
a["r4"] = a.close.values + a.pivot_range.values * 1.1 / 2
a.drop(["pivot_range"], axis=1, inplace=True)
else:
raise ValueError("Invalid method")
if freq is not anchor:
pivots_df = a.reindex(df.index, method="ffill")
else:
pivots_df = a
pivots_df.drop(columns=["open", "high", "low", "close"], inplace=True)
x = pivots_df.loc[lambda x: (x.index.hour == 23) & (x.index.minute == 59)].iloc[0].name
pivots_df.loc[:x] = NaN
return pivots_df |
@W80125m, Looks good. I think there should be an argument/parameter to limit the number of pivots to have or show. Makes no sense to trade off prior pivots. On my charts I use general two pivot periods, the rest is just a distraction (but that's me). 😎 But I suppose there are some data miners out there that require the whole enchilada. 🤷🏼♂️ Kind Regards, |
Added tc bc and range% for pivot from numpy import NaN, select
from pandas import DataFrame, to_datetime, infer_freq
from pandas_ta.utils import is_datetime_ordered, verify_series
def pivots(_open, high, low, close, anchor=None, method=None):
_open = verify_series(_open)
high = verify_series(high)
low = verify_series(low)
close = verify_series(close)
anchor = anchor.upper() if anchor and isinstance(anchor, str) and len(anchor) >= 1 else "D"
method_list = ["traditional", "fibonacci", "woodie", "classic", "demark", "camarilla"]
method = method if method in method_list else "traditional"
date = (
to_datetime(close.index, unit="ms")
if not is_datetime_ordered(close) and verify_series(close)
else close.index
)
freq = infer_freq(date)
df = DataFrame(
index=date,
data={"open": _open.values, "high": high.values, "low": low.values, "close": close.values},
)
if freq is not anchor:
a = DataFrame()
a["open"] = df["open"].resample(anchor).first()
a["high"] = df["high"].resample(anchor).max()
a["low"] = df["low"].resample(anchor).min()
a["close"] = df["close"].resample(anchor).last()
else:
a = df
# Calculate the Pivot Points
if method == "traditional":
a["p"] = (a.high.values + a.low.values + a.close.values) / 3
a["bc"] = (a.high.values + a.low.values ) / 2
a["tc"] = (2 * a.p.values) - a.bc.values
a["rng"] = abs(a.tc.values-a.bc.values)/a.p.values*100
a["s1"] = (2 * a.p.values) - a.high.values
a["s2"] = a.p.values - (a.high.values - a.low.values)
a["s3"] = a.p.values - (a.high.values - a.low.values) * 2
a["r1"] = (2 * a.p.values) - a.low.values
a["r2"] = a.p.values + (a.high.values - a.low.values)
a["r3"] = a.p.values + (a.high.values - a.low.values) * 2
elif method == "fibonacci":
a["p"] = (a.high.values + a.low.values + a.close.values) / 3
a["pivot_range"] = a.high.values - a.low.values
a["s1"] = a.p.values - 0.382 * a.pivot_range.values
a["s2"] = a.p.values - 0.618 * a.pivot_range.values
a["s3"] = a.p.values - 1 * a.pivot_range.values
a["r1"] = a.p.values + 0.382 * a.pivot_range.values
a["r2"] = a.p.values + 0.618 * a.pivot_range.values
a["r3"] = a.p.values + 1 * a.pivot_range.values
a.drop(["pivot_range"], axis=1, inplace=True)
elif method == "woodie":
a["pivot_range"] = a.high.values - a.low.values
a["p"] = (a.high.values + a.low.values + a.open.values * 2) / 4
a["s1"] = a.p.values * 2 - a.high.values
a["s2"] = a.p.values - 1 * a.pivot_range.values
a["s3"] = a.high.values + 2 * (a.p.values - a.low.values)
a["s4"] = a.s3 - a.p.values
a["r1"] = a.p.values * 2 - a.low.values
a["r2"] = a.p.values + 1 * a.pivot_range.values
a["r3"] = a.low.values - 2 * (a.high.values - a.p.values)
a["r4"] = a.r3 + a.p.values
a.drop(["pivot_range"], axis=1, inplace=True)
elif method == "classic":
a["p"] = (a.high.values + a.low.values + a.close.values) / 3
a["pivot_range"] = a.high.values - a.low.values
a["s1"] = a.p.values * 2 - a.high.values
a["s2"] = a.p.values - 1 * a.pivot_range.values
a["s3"] = a.p.values - 2 * a.pivot_range.values
a["s4"] = a.p.values - 3 * a.pivot_range.values
a["r1"] = a.p.values * 2 - a.low.values
a["r2"] = a.p.values + 1 * a.pivot_range.values
a["r3"] = a.p.values + 2 * a.pivot_range.values
a["r4"] = a.p.values + 3 * a.pivot_range.values
a.drop(["pivot_range"], axis=1, inplace=True)
elif method == "demark":
conds = (
a.close.values == a.open.values,
a.close.values > a.open.values,
)
vals = (
a.high.values + a.low.values + a.close.values * 2,
a.high.values * 2 + a.low.values + a.close.values,
)
p = select(conds, vals, default=(a.high.values + a.low.values * 2 + a.close.values))
a["p"] = p / 4
a["s1"] = p / 2 - a.high.values
a["r1"] = p / 2 - a.low.values
elif method == "camarilla":
a["p"] = (a.high.values + a.low.values + a.close.values) / 3
a["pivot_range"] = a.high.values - a.low.values
a["s1"] = a.close.values - a.pivot_range.values * 1.1 / 12
a["s2"] = a.close.values - a.pivot_range.values * 1.1 / 6
a["s3"] = a.close.values - a.pivot_range.values * 1.1 / 4
a["s4"] = a.close.values - a.pivot_range.values * 1.1 / 2
a["r1"] = a.close.values + a.pivot_range.values * 1.1 / 12
a["r2"] = a.close.values + a.pivot_range.values * 1.1 / 6
a["r3"] = a.close.values + a.pivot_range.values * 1.1 / 4
a["r4"] = a.close.values + a.pivot_range.values * 1.1 / 2
a.drop(["pivot_range"], axis=1, inplace=True)
else:
raise ValueError("Invalid method")
if freq is not anchor:
pivots_df = a.reindex(df.index, method="ffill")
else:
pivots_df = a
pivots_df.drop(columns=["open", "high", "low", "close"], inplace=True)
x = pivots_df.loc[lambda x: (x.index.hour == 23) & (x.index.minute == 59)].iloc[0].name
pivots_df.loc[:x] = NaN
return pivots_df |
Hi @bibinvargheset , I'm new here and new use the pandas-ta, hlp = hl.pivots(df["open price"],df["high"], df["low"], df["close"], anchor=23)
print(hlp) The error print: File "/Users/yanlee/PycharmProjects/pythonProject/main.py", line 111, in long
hlp = hl.pivots(df["open price"],df["high"], df["low"], df["close"])
File "/Users/yanlee/PycharmProjects/pythonProject/hl.py", line 119, in pivots
x = pivots_df.loc[lambda x: (x.index.hour == 23) & (x.index.minute == 59)].iloc[0].name
File "/Users/yanlee/PycharmProjects/pythonProject/venv/lib/python3.8/site-packages/pandas/core/indexing.py", line 967, in __getitem__
return self._getitem_axis(maybe_callable, axis=axis)
File "/Users/yanlee/PycharmProjects/pythonProject/venv/lib/python3.8/site-packages/pandas/core/indexing.py", line 1520, in _getitem_axis
self._validate_integer(key, axis)
File "/Users/yanlee/PycharmProjects/pythonProject/venv/lib/python3.8/site-packages/pandas/core/indexing.py", line 1452, in _validate_integer
raise IndexError("single positional indexer is out-of-bounds")
IndexError: single positional indexer is out-of-bounds How can I fix it? Thank you |
Remove this code an try , I just added some part orginal code is from above discussion x = pivots_df.loc[lambda x: (x.index.hour == 23) & (x.index.minute == 59)].iloc[0].name
pivots_df.loc[:x] = NaN |
Big thanks! Everything is working fine |
@bibinvargheset I found that your implementation use Open/Close/High/Low of the current day, not the previous day, which will cause a very serious leak |
Yes, thats right, i had actually added to previous code in the discussion i will correct that and update later |
Can you please confirm the data is only OHLCV ? |
Pivots require time |
When using fibonacci, r1=r2: a["r1"] = a.p.values + 0.382 * a.pivot_range.values
a["r2"] = a.p.values + 0.382 * a.pivot_range.values |
corrected |
Hello all! Great to see the collaborative progress of Pivot Points being worked out here. 😎 Would any of you: @AGDholo, @bibinvargheset, @drixie, @idristarwala, @malakeel, @moewiee, @xfo-0 want to submit "the best" current working version as a PR yet? KJ |
@twopirllc This is issue is still open right? . If so , can i share my code for a PR ? |
please do |
Hello @01xRiverse, @jhmenke There is a PR #574 for it that I am refactoring and hope to get available on the development branch within a week or so. Contributions are always open. What sort of additions did you have in mind? KJ |
Excellent, thanks! |
Great to see i was not checking for some time, is it added to dev branch |
Has the Pivot Point Standards been implemented yet in the main repo? |
To whom it may concern: These are all on the development branch for now. |
Hi, great work about pivots! Im trying to calculate HH, HL, LH, LL. Looks like here is only an example for PH and PL. Anyone has a any documentation where to find a good explanation how is the calculation? and if there is any plan to include it as part of the lib will be great. thanks |
Pivot points are used to notice importance levels during a trading session. I have used them successfully for day trading as they are solid inflection points.
Read more here - https://tradingsim.com/blog/pivot-points/
Since it is typically computed on daily timeframe (and above), yfinance data can be used as a default.
My python skills are limited so I cannot code it as a Pandas TA custom indicator. That said, it seems pretty easy enough that I can compute it in pandas. It would just be super convenient if it was part of Pandas TA. I will update this issue with the pandas code when I get it done.
The text was updated successfully, but these errors were encountered: