pattern_utils
1from curses import window 2from random import randint 3import pandas as pd 4import re 5import os 6import csv 7import dtw_applier 8import normalize_utils 9from scipy.interpolate import make_interp_spline, BSpline 10import numpy as np 11import matplotlib.pyplot as plt 12from random import randint 13 14PATTERNS_FILE_PATH = 'patterns/' 15BIG_NUMBER = 99999999 16 17def createMorningDataframeFromJson(day, inputData): 18 """Create a dataset with close prices from 08:00 AM to 13:00 PM""" 19 regex = rf"^{day} (0(8|9):\w*|1(0|1|2|3):\w*)" 20 21 dataframe = pd.DataFrame({}) 22 for day, value in inputData.items(): 23 if re.search(regex, day) : 24 dataframe = pd.concat([dataframe, pd.Series({day: value['4. close']})]) 25 return dataframe 26 27def loadPatterns(number_of_desired_patterns, pattern_types_set): 28 """Create a pattern dictionary with pattern type contained in the set as key, 29 and n patterns data for each type 30 31 Args: 32 number_of_desired_patterns (int): number of patterns desired for each type 33 pattern_types_set (Set{}): set containing the desired pattern types for the dictionary 34 Return: 35 pattern_dictionary (Dict{}) 36 """ 37 patterns_dictionary = { 38 'rest_normalized': [] 39 } 40 41 for pattern_type in pattern_types_set: 42 file_list = os.listdir(PATTERNS_FILE_PATH + pattern_type) 43 total_results = [] 44 elected_files_indexes_set = set() 45 while len(elected_files_indexes_set) < number_of_desired_patterns: 46 elected_files_indexes_set.add(randint(0, len(file_list) - 1)) 47 48 for index in elected_files_indexes_set: 49 file = file_list[index] 50 single_file_results = [] 51 with open(PATTERNS_FILE_PATH + pattern_type + '/' + file) as csvfile: 52 reader = csv.reader(csvfile) 53 next(reader, None) 54 for row in reader: 55 single_file_results.append(round(float(row[1]), 3)) 56 total_results.append(single_file_results) 57 patterns_dictionary[pattern_type] = total_results 58 return patterns_dictionary 59 60def findCommonPattern(normalized_vector, all_patterns_dictionary): 61 """Find the type of pattern for a given vector 62 63 Args: 64 normalized_vector (List[]): previous normalized vector containing prices 65 all_patterns_dictionary (Dict{}): dictionary containing pattern types and prices 66 Return: 67 common_pattern_type (str): type of the type for the pattern 68 minimum_distance (float): minimum distance found between the best match and the vector 69 """ 70 minimun_distance = BIG_NUMBER 71 common_pattern_type = 'rest_normalized' 72 for pattern_type in all_patterns_dictionary.keys(): 73 for single_pattern in all_patterns_dictionary[pattern_type]: 74 current_distance = dtw_applier.comparePatterns(normalized_vector, single_pattern) 75 if current_distance < minimun_distance: 76 common_pattern_type = pattern_type 77 minimun_distance = current_distance 78 79 return common_pattern_type, minimun_distance 80 81def enhanceDataframe(distance_found, pattern_type, sliced_vector, all_patterns_dictionary, window_divisions): 82 """Given a pattern, find a better match, if possible, inside the vector 83 84 Args: 85 distance_found (float): minimum distance found between the best match and the vector at the moment 86 pattern_type (str): type of the pattern found 87 sliced_vector (List[]): vector containing the data where the search will take plave 88 all_patterns_dictionary (Dict{}): dictionary containing pattern types and prices 89 windows_divisions (List[]): list contaning the number that the window is wanted to be fragmented equally 90 Return: 91 best_segment_i (int): index where the best segment starts 92 best_segment_j (int): index where the best segment ends 93 """ 94 minimum_distance = distance_found 95 best_segment_i = 0 96 best_segment_j = len(sliced_vector) - 1 97 for number_of_parts in window_divisions: 98 window_size = len(sliced_vector) // number_of_parts 99 left_index = 0 100 right_index = window_size 101 for i in range(number_of_parts): 102 split_vector = sliced_vector[left_index:right_index] 103 normalized_split_vector = normalize_utils.normalizeVector(split_vector) 104 for single_pattern in all_patterns_dictionary[pattern_type]: 105 current_distance = dtw_applier.comparePatterns(normalized_split_vector, single_pattern) 106 if current_distance <= minimum_distance: 107 minimum_distance = current_distance 108 best_segment_i = left_index 109 best_segment_j = right_index 110 left_index = right_index 111 right_index += window_size 112 if i == window_divisions[len(window_divisions) - 1]: #Si es la ultima parte, cogemos todo hasta donde termine 113 right_index = len(sliced_vector) - 1 114 return best_segment_i, best_segment_j 115 116def smoothData(dataframe): 117 """Smooth the data inside a dataframe using average smoothing""" 118 rolling = dataframe.rolling(window=2) 119 rolling_mean = rolling.mean() 120 dataframe.plot() 121 random_number = str(randint(0,999)) 122 #plt.savefig('images/Results/AAP' + random_number) 123 rolling_mean.plot(color='red') 124 #plt.savefig('images/Results/AAP' + random_number + 'smooth', color='red') 125 plt.show() 126 return None 127 128def minimumAndMaximumPatternSizes(patterns_dict): 129 """Find inside the paterns_dict the longest and shortest patterns and its size""" 130 min_size = BIG_NUMBER 131 max_size = 0 132 for key, vector in patterns_dict.items(): 133 if key == 'rest_normalized': 134 continue 135 for pattern in vector: 136 current_size = len(pattern) 137 if current_size < min_size: 138 min_size = current_size 139 if current_size > max_size: 140 max_size = current_size 141 return min_size, max_size 142 143def calculateTendencyProbability(results, pattern_types): 144 """Calculate the probability of achieving the expected tendency for the pattern types contained in pattern_types 145 146 Args: 147 results (List[]): list of results 148 pattern_type (List[]): list of types to calculate probability for 149 Return: 150 average_tendency_dict (Dict{}): dictionary containing the average probability for each pattern type 151 """ 152 average_tendency_dict = {} 153 for key in pattern_types: 154 if key == 'rest_normalized': 155 continue 156 average_tendency_dict[key] = [0, 0, 0] # [0] para decir cuantos cumplen la tendencia y [1] para saber el total de patrones 157 for pattern_found in results: 158 if pattern_found.tendency is True: 159 average_tendency_dict[pattern_found.pattern_type][0] += 1 160 average_tendency_dict[pattern_found.pattern_type][1] += 1 161 for pattern_type, value in average_tendency_dict.items(): 162 if value[1] == 0: 163 average_tendency_dict[pattern_type] = 'Not found' 164 else: 165 average_tendency_dict[pattern_type] = value[0] / value[1] * 100 166 return average_tendency_dict
18def createMorningDataframeFromJson(day, inputData): 19 """Create a dataset with close prices from 08:00 AM to 13:00 PM""" 20 regex = rf"^{day} (0(8|9):\w*|1(0|1|2|3):\w*)" 21 22 dataframe = pd.DataFrame({}) 23 for day, value in inputData.items(): 24 if re.search(regex, day) : 25 dataframe = pd.concat([dataframe, pd.Series({day: value['4. close']})]) 26 return dataframe
Create a dataset with close prices from 08:00 AM to 13:00 PM
28def loadPatterns(number_of_desired_patterns, pattern_types_set): 29 """Create a pattern dictionary with pattern type contained in the set as key, 30 and n patterns data for each type 31 32 Args: 33 number_of_desired_patterns (int): number of patterns desired for each type 34 pattern_types_set (Set{}): set containing the desired pattern types for the dictionary 35 Return: 36 pattern_dictionary (Dict{}) 37 """ 38 patterns_dictionary = { 39 'rest_normalized': [] 40 } 41 42 for pattern_type in pattern_types_set: 43 file_list = os.listdir(PATTERNS_FILE_PATH + pattern_type) 44 total_results = [] 45 elected_files_indexes_set = set() 46 while len(elected_files_indexes_set) < number_of_desired_patterns: 47 elected_files_indexes_set.add(randint(0, len(file_list) - 1)) 48 49 for index in elected_files_indexes_set: 50 file = file_list[index] 51 single_file_results = [] 52 with open(PATTERNS_FILE_PATH + pattern_type + '/' + file) as csvfile: 53 reader = csv.reader(csvfile) 54 next(reader, None) 55 for row in reader: 56 single_file_results.append(round(float(row[1]), 3)) 57 total_results.append(single_file_results) 58 patterns_dictionary[pattern_type] = total_results 59 return patterns_dictionary
Create a pattern dictionary with pattern type contained in the set as key, and n patterns data for each type
Args:
number_of_desired_patterns (int): number of patterns desired for each type
pattern_types_set (Set{}): set containing the desired pattern types for the dictionary
Return:
pattern_dictionary (Dict{})
61def findCommonPattern(normalized_vector, all_patterns_dictionary): 62 """Find the type of pattern for a given vector 63 64 Args: 65 normalized_vector (List[]): previous normalized vector containing prices 66 all_patterns_dictionary (Dict{}): dictionary containing pattern types and prices 67 Return: 68 common_pattern_type (str): type of the type for the pattern 69 minimum_distance (float): minimum distance found between the best match and the vector 70 """ 71 minimun_distance = BIG_NUMBER 72 common_pattern_type = 'rest_normalized' 73 for pattern_type in all_patterns_dictionary.keys(): 74 for single_pattern in all_patterns_dictionary[pattern_type]: 75 current_distance = dtw_applier.comparePatterns(normalized_vector, single_pattern) 76 if current_distance < minimun_distance: 77 common_pattern_type = pattern_type 78 minimun_distance = current_distance 79 80 return common_pattern_type, minimun_distance
Find the type of pattern for a given vector
Args:
normalized_vector (List[]): previous normalized vector containing prices
all_patterns_dictionary (Dict{}): dictionary containing pattern types and prices
Return:
common_pattern_type (str): type of the type for the pattern
minimum_distance (float): minimum distance found between the best match and the vector
82def enhanceDataframe(distance_found, pattern_type, sliced_vector, all_patterns_dictionary, window_divisions): 83 """Given a pattern, find a better match, if possible, inside the vector 84 85 Args: 86 distance_found (float): minimum distance found between the best match and the vector at the moment 87 pattern_type (str): type of the pattern found 88 sliced_vector (List[]): vector containing the data where the search will take plave 89 all_patterns_dictionary (Dict{}): dictionary containing pattern types and prices 90 windows_divisions (List[]): list contaning the number that the window is wanted to be fragmented equally 91 Return: 92 best_segment_i (int): index where the best segment starts 93 best_segment_j (int): index where the best segment ends 94 """ 95 minimum_distance = distance_found 96 best_segment_i = 0 97 best_segment_j = len(sliced_vector) - 1 98 for number_of_parts in window_divisions: 99 window_size = len(sliced_vector) // number_of_parts 100 left_index = 0 101 right_index = window_size 102 for i in range(number_of_parts): 103 split_vector = sliced_vector[left_index:right_index] 104 normalized_split_vector = normalize_utils.normalizeVector(split_vector) 105 for single_pattern in all_patterns_dictionary[pattern_type]: 106 current_distance = dtw_applier.comparePatterns(normalized_split_vector, single_pattern) 107 if current_distance <= minimum_distance: 108 minimum_distance = current_distance 109 best_segment_i = left_index 110 best_segment_j = right_index 111 left_index = right_index 112 right_index += window_size 113 if i == window_divisions[len(window_divisions) - 1]: #Si es la ultima parte, cogemos todo hasta donde termine 114 right_index = len(sliced_vector) - 1 115 return best_segment_i, best_segment_j
Given a pattern, find a better match, if possible, inside the vector
Args:
distance_found (float): minimum distance found between the best match and the vector at the moment
pattern_type (str): type of the pattern found
sliced_vector (List[]): vector containing the data where the search will take plave
all_patterns_dictionary (Dict{}): dictionary containing pattern types and prices
windows_divisions (List[]): list contaning the number that the window is wanted to be fragmented equally
Return:
best_segment_i (int): index where the best segment starts
best_segment_j (int): index where the best segment ends
117def smoothData(dataframe): 118 """Smooth the data inside a dataframe using average smoothing""" 119 rolling = dataframe.rolling(window=2) 120 rolling_mean = rolling.mean() 121 dataframe.plot() 122 random_number = str(randint(0,999)) 123 #plt.savefig('images/Results/AAP' + random_number) 124 rolling_mean.plot(color='red') 125 #plt.savefig('images/Results/AAP' + random_number + 'smooth', color='red') 126 plt.show() 127 return None
Smooth the data inside a dataframe using average smoothing
129def minimumAndMaximumPatternSizes(patterns_dict): 130 """Find inside the paterns_dict the longest and shortest patterns and its size""" 131 min_size = BIG_NUMBER 132 max_size = 0 133 for key, vector in patterns_dict.items(): 134 if key == 'rest_normalized': 135 continue 136 for pattern in vector: 137 current_size = len(pattern) 138 if current_size < min_size: 139 min_size = current_size 140 if current_size > max_size: 141 max_size = current_size 142 return min_size, max_size
Find inside the paterns_dict the longest and shortest patterns and its size
144def calculateTendencyProbability(results, pattern_types): 145 """Calculate the probability of achieving the expected tendency for the pattern types contained in pattern_types 146 147 Args: 148 results (List[]): list of results 149 pattern_type (List[]): list of types to calculate probability for 150 Return: 151 average_tendency_dict (Dict{}): dictionary containing the average probability for each pattern type 152 """ 153 average_tendency_dict = {} 154 for key in pattern_types: 155 if key == 'rest_normalized': 156 continue 157 average_tendency_dict[key] = [0, 0, 0] # [0] para decir cuantos cumplen la tendencia y [1] para saber el total de patrones 158 for pattern_found in results: 159 if pattern_found.tendency is True: 160 average_tendency_dict[pattern_found.pattern_type][0] += 1 161 average_tendency_dict[pattern_found.pattern_type][1] += 1 162 for pattern_type, value in average_tendency_dict.items(): 163 if value[1] == 0: 164 average_tendency_dict[pattern_type] = 'Not found' 165 else: 166 average_tendency_dict[pattern_type] = value[0] / value[1] * 100 167 return average_tendency_dict
Calculate the probability of achieving the expected tendency for the pattern types contained in pattern_types
Args:
results (List[]): list of results
pattern_type (List[]): list of types to calculate probability for
Return:
average_tendency_dict (Dict{}): dictionary containing the average probability for each pattern type