Parabolic SAR v5.0
Hashed Custom Namespace
import time
PAIR = pairs.btc_usd
SAR_PLOT = True # False turns off SAR plot
SAR_AGGREGATION = 3600 # Match to tick = no auto adjust
SAR_SENSITIVITY = 1 # whole, default 2
SAR_RISE_LOOKBACK = 0 # 0 = MAX, else whole # period
SAR_RISE_INITIAL = 0.02 # Inital Rising Acceleration
SAR_RISE_ACCELERATION = 0.02 # Rising Acceleration
SAR_RISE_MAX = 0.2 # Maximum Rising Acceleration
SAR_FALL_LOOKBACK = 0 # 0 = MAX, else whole # period
SAR_FALL_INITIAL = 0.02 # Initial Falling Acceleration
SAR_FALL_ACCELERATION = 0.02 # Falling Acceleration
SAR_FALL_MAX = 0.2 # Maximum Falling Acceleration
def parabolic_sar(pair, sar_plot, aggregation, sensitivity,
rise_lookback, rise_initial, rise_acceleration, rise_max,
fall_lookback, fall_initial, fall_acceleration, fall_max):
import time
import math
start_time = time.time()
''' Seed psuedo random 24 character hash string '''
fall_hash = 2**(1/2.0)*fall_max + 3**(1/2.0)*fall_acceleration + 5**(1/2.0)*(fall_initial+1)
rise_hash = 2**(1/3.0)*rise_max + 3**(1/3.0)*rise_acceleration + 5**(1/3.0)*(rise_initial+1)
lookback_hash = 2**(1/2.0)*fall_lookback + 3**(1/2.0)*rise_lookback + 5**(1/2.0)*sensitivity
pair_agg_hash = int(10**25*pair**(1/2.0) / aggregation**(1/2.0))
sar_hash = str(int(10**25*Decimal((fall_hash+lookback_hash)/rise_hash))+pair_agg_hash)[-24:]
''' Initialize Stored Variables '''
extreme_point = 'parabolic_sar_extreme_point_'+sar_hash
acceleration = 'parabolic_sar_acceleration_'+sar_hash
direction = 'parabolic_sar_direction_'+sar_hash
previous = 'parabolic_sar_previous_'+sar_hash
storage[extreme_point] = storage.get(extreme_point, 0)
storage[acceleration] = storage.get(acceleration, 0)
storage[direction] = storage.get(direction, 0)
storage[previous] = storage.get(previous, 0)
''' Auto Adjust Thresholds Based on Aggregation '''
power = 1.235 # Bigger reduces SAR crosses
aggregation_ratio = aggregation/float(info.interval)
power_ratio = aggregation_ratio**power
sensitivity = int(math.ceil(sensitivity))
rise_lookback = int(rise_lookback*aggregation_ratio)
rise_initial = rise_initial/power_ratio
rise_acceleration = rise_acceleration/power_ratio
rise_max = rise_max*aggregation_ratio
fall_lookback = int(fall_lookback*aggregation_ratio)
fall_initial = fall_initial/power_ratio
fall_acceleration = fall_acceleration/power_ratio
fall_max = fall_max*aggregation_ratio
''' Prevent Rattle on 1m ticks '''
offset = 0
if info.interval == 60:
fall_initial = 0
rise_initial = 0
offset = 0.002
''' Log Adjusted Thresholds and persistent variable names on 1st tick '''
if 1:
if info.tick == 0:
log('storage.' + extreme_point)
log('storage.' + acceleration)
log('storage.' + direction)
log('storage.' + previous)
log('tick size....: %s' % info.interval)
log('aggregation..: %s' % aggregation)
log('agg_ratio....: %s' % aggregation_ratio)
log('power........: %s' % power)
log('power_ratio..: %.2f' % power_ratio)
log('sensitivity..: %s' % sensitivity)
log('rise_lookback: %s' % rise_lookback)
log('rise_initial.: %s' % rise_initial)
log('rise_accel...: %s' % rise_acceleration)
log('rise_max.....: %s' % rise_max)
log('fall_lookback: %s' % fall_lookback)
log('fall_initial.: %s' % fall_initial)
log('fall_accel...: %s' % fall_acceleration)
log('fall_max.....: %s' % fall_max)
log('offset.......: %s' % offset)
''' High, Low, and Close '''
high = data(interval=aggregation)[pair].period(2, 'high')
low = data(interval=aggregation)[pair].period(2, 'low')
close = data(interval=aggregation)[pair].period(2, 'close')
''' Build array of candles to look for SAR cross '''
low_array = []
high_array = []
for z in range(sensitivity, 0, -1):
low_array.append(low[-z])
high_array.append(high[-z])
''' Determine if inital SAR is Rising or Falling '''
if info.tick == 0:
if close[-1] > close[-2]:
storage[direction] = 1
storage[previous] = low[-2]
storage[extreme_point] = max(high_array)
storage[acceleration] = rise_initial
else:
storage[direction] = -1
storage[previous] = high[-2]
storage[extreme_point] = min(low_array)
storage[acceleration] = -fall_initial
''' Calculate Rising SAR '''
# Define New SAR
if storage[direction] == 1:
sar = storage[previous] + storage[acceleration]*(
storage[extreme_point] - storage[previous])
# Update acceleration factor if EP is breached
if high[-1] > storage[extreme_point]:
storage[extreme_point] = high[-1]
storage[acceleration] = storage[acceleration] + rise_acceleration
if storage[acceleration] > rise_max:
storage[acceleration] = rise_max
# Define lookback price based on period
if fall_lookback == 0:
lookback = storage[extreme_point]
else:
lookback = []
for z in range(fall_lookback):
lookback.append(high[-(z+1)])
lookback = max(lookback)
lookback = min(lookback, storage[extreme_point])
# If new SAR cross, then Stop and Reverse
if min(low_array) < sar:
storage[direction] = -2
storage[acceleration] = -fall_initial
sar = lookback * float(1 + offset)
log('***** SAR CROSS *****')
''' Calculate Falling SAR '''
# Define New SAR
if storage[direction] == -1:
sar = storage[previous] + storage[acceleration]*(
storage[previous] - storage[extreme_point])
# note storage.AF is negative in this instance
# Update acceleration factor if EP is breached
if low[-1] < storage[extreme_point]:
storage[extreme_point] = low[-1]
storage[acceleration] = storage[acceleration] - fall_acceleration
if storage[acceleration] < -fall_max:
storage[acceleration] = -fall_max
# Define lookback price based on period
if rise_lookback == 0:
lookback = storage[extreme_point]
else:
lookback = []
for z in range(rise_lookback):
lookback.append(L[-(z+1)])
lookback = min(lookback)
lookback = max(lookback, storage[extreme_point])
# If new SAR cross, then Stop and Reverse
if max(high_array) > sar:
storage[direction] = 1
storage[acceleration] = rise_initial
sar = lookback * float(1 - offset)
log('***** SAR CROSS *****')
''' Update Direction and Prior SAR '''
if storage[direction] == -2:
storage[direction] = -1
storage[previous] = sar
''' Plot Parabolic SAR '''
if sar_plot:
plot('low', low[-1])
plot('high', high[-1])
plot('Parabolic_SAR', sar)
''' Log Tick Time '''
finish_time = time.time()
tick_time = round((finish_time - start_time), 5)
#log(tick_time)
return sar
def tick():
if info.tick == 0:
storage.begin_time = time.time()
z = parabolic_sar(PAIR, SAR_PLOT, SAR_AGGREGATION, SAR_SENSITIVITY,
SAR_RISE_LOOKBACK, SAR_RISE_INITIAL, SAR_RISE_ACCELERATION, SAR_RISE_MAX,
SAR_FALL_LOOKBACK, SAR_FALL_INITIAL, SAR_FALL_ACCELERATION, SAR_FALL_MAX)
#log(('Parabolic SAR: %.2f') % z)
''' Log Total Backtest Time '''
def stop():
end_time = time.time()
run_time = float(end_time - storage.begin_time)
log('Total Run Time: %.2f' % run_time)
'''
RISING SAR
Prior SAR: The SAR value for the previous period.
Extreme Point: The highest high of the current uptrend.
Acceleration Factor: Starting at .02, AF increases by .02 each
time the extreme point makes a new high. AF can reach a maximum
of .20, no matter how long the uptrend extends.
Current RISING SAR = Prior SAR + Prior AF(Prior EP - Prior SAR)
The Acceleration Factor is multiplied by the difference between the
Extreme Point and the prior period's SAR. This is then added to the
prior period's SAR. Note however that Rising SAR can never be above the
prior two periods' lows. Should SAR be above one of those lows, use
the lowest of the two for SAR.
FALLING SAR
Prior SAR: The SAR value for the previous period.
Extreme Point: The lowest low of the current downtrend.
Acceleration Factor (AF): Starting at .02, AF increases by .02 each
time the extreme point makes a new low. AF can reach a maximum
of .20, no matter how long the downtrend extends.
Current FALLING SAR = Prior SAR - Prior AF(Prior SAR - Prior EP)
The Acceleration Factor is multiplied by the difference between the
Prior period's SAR and the Extreme Point. This is then subtracted
from the prior period's SAR. Note however that Falling SAR can never be
below the prior two periods' highs. Should SAR be below one of
those highs, use the highest of the two for SAR.
'''
https://discuss.tradewave.net/t/para...-ta-lib/220/45
Connect With Us