pattern_utils

  1from curses import window
  2from random import randint
  3import pandas as pd
  4import re
  5import os
  6import csv
  7import dtw_applier
  8import normalize_utils
  9from scipy.interpolate import make_interp_spline, BSpline
 10import numpy as np
 11import matplotlib.pyplot as plt
 12from random import randint
 13
 14PATTERNS_FILE_PATH = 'patterns/'
 15BIG_NUMBER = 99999999
 16
 17def createMorningDataframeFromJson(day, inputData):
 18    """Create a dataset with close prices from 08:00 AM to 13:00 PM"""
 19    regex = rf"^{day} (0(8|9):\w*|1(0|1|2|3):\w*)"
 20
 21    dataframe = pd.DataFrame({})
 22    for day, value in inputData.items():
 23        if re.search(regex, day) :
 24            dataframe = pd.concat([dataframe, pd.Series({day: value['4. close']})])
 25    return dataframe 
 26
 27def loadPatterns(number_of_desired_patterns, pattern_types_set):
 28    """Create a pattern dictionary with pattern type contained in the set as key, 
 29    and n patterns data for each type  
 30
 31    Args:  
 32        number_of_desired_patterns (int): number of patterns desired for each type
 33        pattern_types_set (Set{}): set containing the desired pattern types for the dictionary  
 34    Return:  
 35        pattern_dictionary (Dict{})
 36    """
 37    patterns_dictionary = {
 38        'rest_normalized': []
 39    }
 40
 41    for pattern_type in pattern_types_set:
 42        file_list = os.listdir(PATTERNS_FILE_PATH + pattern_type)
 43        total_results = []
 44        elected_files_indexes_set = set()
 45        while len(elected_files_indexes_set) < number_of_desired_patterns:
 46            elected_files_indexes_set.add(randint(0, len(file_list) - 1))
 47
 48        for index in elected_files_indexes_set:
 49            file = file_list[index]
 50            single_file_results = []
 51            with open(PATTERNS_FILE_PATH + pattern_type + '/' + file) as csvfile:
 52                reader = csv.reader(csvfile)
 53                next(reader, None)
 54                for row in reader:
 55                    single_file_results.append(round(float(row[1]), 3))
 56            total_results.append(single_file_results)
 57        patterns_dictionary[pattern_type] = total_results    
 58    return patterns_dictionary
 59
 60def findCommonPattern(normalized_vector, all_patterns_dictionary):
 61    """Find the type of pattern for a given vector
 62
 63        Args:  
 64            normalized_vector (List[]): previous normalized vector containing prices
 65            all_patterns_dictionary (Dict{}): dictionary containing pattern types and prices  
 66        Return:  
 67            common_pattern_type (str): type of the type for the pattern
 68            minimum_distance (float): minimum distance found between the best match and the vector
 69    """
 70    minimun_distance = BIG_NUMBER
 71    common_pattern_type = 'rest_normalized'
 72    for pattern_type in all_patterns_dictionary.keys():
 73        for single_pattern in all_patterns_dictionary[pattern_type]:
 74            current_distance = dtw_applier.comparePatterns(normalized_vector, single_pattern)
 75            if current_distance < minimun_distance:
 76                common_pattern_type = pattern_type
 77                minimun_distance = current_distance
 78    
 79    return common_pattern_type, minimun_distance
 80
 81def enhanceDataframe(distance_found, pattern_type, sliced_vector, all_patterns_dictionary, window_divisions):
 82    """Given a pattern, find a better match, if possible, inside the vector  
 83
 84        Args:  
 85            distance_found (float): minimum distance found between the best match and the vector at the moment
 86            pattern_type (str): type of the pattern found
 87            sliced_vector (List[]): vector containing the data where the search will take plave
 88            all_patterns_dictionary (Dict{}): dictionary containing pattern types and prices
 89            windows_divisions (List[]): list contaning the number that the window is wanted to be fragmented equally  
 90        Return:  
 91            best_segment_i (int): index where the best segment starts
 92            best_segment_j (int): index where the best segment ends
 93    """
 94    minimum_distance = distance_found
 95    best_segment_i = 0
 96    best_segment_j = len(sliced_vector) - 1
 97    for number_of_parts in window_divisions:
 98        window_size = len(sliced_vector) // number_of_parts
 99        left_index = 0
100        right_index = window_size
101        for i in range(number_of_parts):
102            split_vector = sliced_vector[left_index:right_index]
103            normalized_split_vector = normalize_utils.normalizeVector(split_vector)
104            for single_pattern in all_patterns_dictionary[pattern_type]:
105                current_distance = dtw_applier.comparePatterns(normalized_split_vector, single_pattern)
106                if current_distance <= minimum_distance:
107                    minimum_distance = current_distance
108                    best_segment_i = left_index
109                    best_segment_j = right_index
110            left_index = right_index
111            right_index += window_size
112        if i == window_divisions[len(window_divisions) - 1]: #Si es la ultima parte, cogemos todo hasta donde termine
113            right_index = len(sliced_vector) - 1
114    return best_segment_i, best_segment_j
115
116def smoothData(dataframe):
117    """Smooth the data inside a dataframe using average smoothing"""
118    rolling = dataframe.rolling(window=2)
119    rolling_mean = rolling.mean()
120    dataframe.plot()
121    random_number = str(randint(0,999))
122    #plt.savefig('images/Results/AAP' + random_number)
123    rolling_mean.plot(color='red')
124    #plt.savefig('images/Results/AAP' + random_number + 'smooth', color='red')
125    plt.show()
126    return None
127
128def minimumAndMaximumPatternSizes(patterns_dict):
129    """Find inside the paterns_dict the longest and shortest patterns and its size"""
130    min_size = BIG_NUMBER
131    max_size = 0
132    for key, vector in patterns_dict.items():
133        if key == 'rest_normalized':
134            continue
135        for pattern in vector:
136            current_size = len(pattern)
137            if current_size < min_size:
138                min_size = current_size
139            if current_size > max_size:
140                max_size = current_size
141    return min_size, max_size
142
143def calculateTendencyProbability(results, pattern_types):
144    """Calculate the probability of achieving the expected tendency for the pattern types contained in pattern_types  
145
146        Args:  
147            results (List[]): list of results
148            pattern_type (List[]): list of types to calculate probability for  
149        Return:  
150            average_tendency_dict (Dict{}): dictionary containing the average probability for each pattern type
151    """
152    average_tendency_dict = {}
153    for key in pattern_types:
154        if key == 'rest_normalized':
155            continue
156        average_tendency_dict[key] = [0, 0, 0] # [0] para decir cuantos cumplen la tendencia y [1] para saber el total de patrones
157    for pattern_found in results:
158        if pattern_found.tendency is True:
159            average_tendency_dict[pattern_found.pattern_type][0] += 1
160        average_tendency_dict[pattern_found.pattern_type][1] += 1
161    for pattern_type, value in average_tendency_dict.items():
162        if value[1] == 0:
163            average_tendency_dict[pattern_type] = 'Not found'
164        else: 
165            average_tendency_dict[pattern_type] = value[0] / value[1] * 100
166    return average_tendency_dict
def createMorningDataframeFromJson(day, inputData)
18def createMorningDataframeFromJson(day, inputData):
19    """Create a dataset with close prices from 08:00 AM to 13:00 PM"""
20    regex = rf"^{day} (0(8|9):\w*|1(0|1|2|3):\w*)"
21
22    dataframe = pd.DataFrame({})
23    for day, value in inputData.items():
24        if re.search(regex, day) :
25            dataframe = pd.concat([dataframe, pd.Series({day: value['4. close']})])
26    return dataframe 

Create a dataset with close prices from 08:00 AM to 13:00 PM

def loadPatterns(number_of_desired_patterns, pattern_types_set)
28def loadPatterns(number_of_desired_patterns, pattern_types_set):
29    """Create a pattern dictionary with pattern type contained in the set as key, 
30    and n patterns data for each type  
31
32    Args:  
33        number_of_desired_patterns (int): number of patterns desired for each type
34        pattern_types_set (Set{}): set containing the desired pattern types for the dictionary  
35    Return:  
36        pattern_dictionary (Dict{})
37    """
38    patterns_dictionary = {
39        'rest_normalized': []
40    }
41
42    for pattern_type in pattern_types_set:
43        file_list = os.listdir(PATTERNS_FILE_PATH + pattern_type)
44        total_results = []
45        elected_files_indexes_set = set()
46        while len(elected_files_indexes_set) < number_of_desired_patterns:
47            elected_files_indexes_set.add(randint(0, len(file_list) - 1))
48
49        for index in elected_files_indexes_set:
50            file = file_list[index]
51            single_file_results = []
52            with open(PATTERNS_FILE_PATH + pattern_type + '/' + file) as csvfile:
53                reader = csv.reader(csvfile)
54                next(reader, None)
55                for row in reader:
56                    single_file_results.append(round(float(row[1]), 3))
57            total_results.append(single_file_results)
58        patterns_dictionary[pattern_type] = total_results    
59    return patterns_dictionary

Create a pattern dictionary with pattern type contained in the set as key, and n patterns data for each type

Args:
number_of_desired_patterns (int): number of patterns desired for each type pattern_types_set (Set{}): set containing the desired pattern types for the dictionary
Return:
pattern_dictionary (Dict{})

def findCommonPattern(normalized_vector, all_patterns_dictionary)
61def findCommonPattern(normalized_vector, all_patterns_dictionary):
62    """Find the type of pattern for a given vector
63
64        Args:  
65            normalized_vector (List[]): previous normalized vector containing prices
66            all_patterns_dictionary (Dict{}): dictionary containing pattern types and prices  
67        Return:  
68            common_pattern_type (str): type of the type for the pattern
69            minimum_distance (float): minimum distance found between the best match and the vector
70    """
71    minimun_distance = BIG_NUMBER
72    common_pattern_type = 'rest_normalized'
73    for pattern_type in all_patterns_dictionary.keys():
74        for single_pattern in all_patterns_dictionary[pattern_type]:
75            current_distance = dtw_applier.comparePatterns(normalized_vector, single_pattern)
76            if current_distance < minimun_distance:
77                common_pattern_type = pattern_type
78                minimun_distance = current_distance
79    
80    return common_pattern_type, minimun_distance

Find the type of pattern for a given vector

Args:
normalized_vector (List[]): previous normalized vector containing prices all_patterns_dictionary (Dict{}): dictionary containing pattern types and prices
Return:
common_pattern_type (str): type of the type for the pattern minimum_distance (float): minimum distance found between the best match and the vector

def enhanceDataframe( distance_found, pattern_type, sliced_vector, all_patterns_dictionary, window_divisions)
 82def enhanceDataframe(distance_found, pattern_type, sliced_vector, all_patterns_dictionary, window_divisions):
 83    """Given a pattern, find a better match, if possible, inside the vector  
 84
 85        Args:  
 86            distance_found (float): minimum distance found between the best match and the vector at the moment
 87            pattern_type (str): type of the pattern found
 88            sliced_vector (List[]): vector containing the data where the search will take plave
 89            all_patterns_dictionary (Dict{}): dictionary containing pattern types and prices
 90            windows_divisions (List[]): list contaning the number that the window is wanted to be fragmented equally  
 91        Return:  
 92            best_segment_i (int): index where the best segment starts
 93            best_segment_j (int): index where the best segment ends
 94    """
 95    minimum_distance = distance_found
 96    best_segment_i = 0
 97    best_segment_j = len(sliced_vector) - 1
 98    for number_of_parts in window_divisions:
 99        window_size = len(sliced_vector) // number_of_parts
100        left_index = 0
101        right_index = window_size
102        for i in range(number_of_parts):
103            split_vector = sliced_vector[left_index:right_index]
104            normalized_split_vector = normalize_utils.normalizeVector(split_vector)
105            for single_pattern in all_patterns_dictionary[pattern_type]:
106                current_distance = dtw_applier.comparePatterns(normalized_split_vector, single_pattern)
107                if current_distance <= minimum_distance:
108                    minimum_distance = current_distance
109                    best_segment_i = left_index
110                    best_segment_j = right_index
111            left_index = right_index
112            right_index += window_size
113        if i == window_divisions[len(window_divisions) - 1]: #Si es la ultima parte, cogemos todo hasta donde termine
114            right_index = len(sliced_vector) - 1
115    return best_segment_i, best_segment_j

Given a pattern, find a better match, if possible, inside the vector

Args:
distance_found (float): minimum distance found between the best match and the vector at the moment pattern_type (str): type of the pattern found sliced_vector (List[]): vector containing the data where the search will take plave all_patterns_dictionary (Dict{}): dictionary containing pattern types and prices windows_divisions (List[]): list contaning the number that the window is wanted to be fragmented equally
Return:
best_segment_i (int): index where the best segment starts best_segment_j (int): index where the best segment ends

def smoothData(dataframe)
117def smoothData(dataframe):
118    """Smooth the data inside a dataframe using average smoothing"""
119    rolling = dataframe.rolling(window=2)
120    rolling_mean = rolling.mean()
121    dataframe.plot()
122    random_number = str(randint(0,999))
123    #plt.savefig('images/Results/AAP' + random_number)
124    rolling_mean.plot(color='red')
125    #plt.savefig('images/Results/AAP' + random_number + 'smooth', color='red')
126    plt.show()
127    return None

Smooth the data inside a dataframe using average smoothing

def minimumAndMaximumPatternSizes(patterns_dict)
129def minimumAndMaximumPatternSizes(patterns_dict):
130    """Find inside the paterns_dict the longest and shortest patterns and its size"""
131    min_size = BIG_NUMBER
132    max_size = 0
133    for key, vector in patterns_dict.items():
134        if key == 'rest_normalized':
135            continue
136        for pattern in vector:
137            current_size = len(pattern)
138            if current_size < min_size:
139                min_size = current_size
140            if current_size > max_size:
141                max_size = current_size
142    return min_size, max_size

Find inside the paterns_dict the longest and shortest patterns and its size

def calculateTendencyProbability(results, pattern_types)
144def calculateTendencyProbability(results, pattern_types):
145    """Calculate the probability of achieving the expected tendency for the pattern types contained in pattern_types  
146
147        Args:  
148            results (List[]): list of results
149            pattern_type (List[]): list of types to calculate probability for  
150        Return:  
151            average_tendency_dict (Dict{}): dictionary containing the average probability for each pattern type
152    """
153    average_tendency_dict = {}
154    for key in pattern_types:
155        if key == 'rest_normalized':
156            continue
157        average_tendency_dict[key] = [0, 0, 0] # [0] para decir cuantos cumplen la tendencia y [1] para saber el total de patrones
158    for pattern_found in results:
159        if pattern_found.tendency is True:
160            average_tendency_dict[pattern_found.pattern_type][0] += 1
161        average_tendency_dict[pattern_found.pattern_type][1] += 1
162    for pattern_type, value in average_tendency_dict.items():
163        if value[1] == 0:
164            average_tendency_dict[pattern_type] = 'Not found'
165        else: 
166            average_tendency_dict[pattern_type] = value[0] / value[1] * 100
167    return average_tendency_dict

Calculate the probability of achieving the expected tendency for the pattern types contained in pattern_types

Args:
results (List[]): list of results pattern_type (List[]): list of types to calculate probability for
Return:
average_tendency_dict (Dict{}): dictionary containing the average probability for each pattern type