compute_descriptive_statistics.py
import collections
import statistics
from math import sqrt
import numpy
import pandas as pd
from sklearn.neighbors import KernelDensity
import global_variables as gv
id_data_type__categorical = "string"
id_data_type__numerical = "number"
id_data_type__date = "date"
def get_descriptive_statistics_deviations(current_descriptive_statistics, descriptive_statistics_whole_data):
current_descriptive_statistics.coefficient_of_unalikeability_deviation = current_descriptive_statistics.coefficient_of_unalikeability - descriptive_statistics_whole_data.coefficient_of_unalikeability
current_descriptive_statistics.stDev_deviation = current_descriptive_statistics.stDev - descriptive_statistics_whole_data.stDev
current_descriptive_statistics.varNC_deviation = current_descriptive_statistics.varNC - descriptive_statistics_whole_data.varNC
current_descriptive_statistics.number_of_modes_deviation = current_descriptive_statistics.number_of_modes - descriptive_statistics_whole_data.number_of_modes
current_descriptive_statistics.missing_values_percentage_deviation = current_descriptive_statistics.missing_values_percentage - descriptive_statistics_whole_data.missing_values_percentage
current_descriptive_statistics.overall_deviation = (abs(
current_descriptive_statistics.coefficient_of_unalikeability_deviation) + abs(
current_descriptive_statistics.stDev_deviation) + abs(
current_descriptive_statistics.missing_values_percentage_deviation)) / 3
return current_descriptive_statistics
def compute_stdev(column_values_stdev_varnc, data_type):
if data_type == id_data_type__categorical:
relative_frequencies = [number / gv.initial_length_of_data_rows for number in
list(collections.Counter(column_values_stdev_varnc).values())]
# fm = numpy.amax(relative_frequencies)
n_total_size = sum(relative_frequencies)
k = len(relative_frequencies)
# check if k is smaller 2, if so return 0,0 because division by 0 is not allowed
if k < 2:
return 0, 0
sum_distances_stdev = 0
sum_value_varnc = sum(
[(cat_frequ - n_total_size / k) * (cat_frequ - n_total_size / k) for cat_frequ in relative_frequencies])
varnc = (sum_value_varnc / (((n_total_size * n_total_size) * (k - 1)) / k))
# sum_value_iqv = sum([cat_frequ * cat_frequ for cat_frequ in relative_frequencies])
# sum_value_h_star = sum([cat_frequ * log(cat_frequ) for cat_frequ in relative_frequencies])
for i in range(len(relative_frequencies) - 1):
current_frequ = relative_frequencies[i]
sum_distances_stdev += sum([(current_frequ - cat_frequ) * (current_frequ - cat_frequ) for cat_frequ in
relative_frequencies[i + 1:]])
stdev = sqrt(sum_distances_stdev / (n_total_size * n_total_size * (k - 1)))
else:
not_nan_values = [x for x in column_values_stdev_varnc if str(x) != 'nan']
stdev = statistics.stdev(not_nan_values)
varnc = statistics.variance(not_nan_values)
return stdev, varnc
class CategoriesObject(object):
def __init__(self, unique_value, relative_frequency, count):
self.unique_value = unique_value
self.relativeFrequency = relative_frequency
self.count = count
def get_categories(column_values):
counter_elements = collections.Counter(column_values)
categories_list = []
# for count_el in range(len(collections.Counter(column_values))):
for count_el in counter_elements:
if str(count_el) != 'nan':
categories_list.append(
CategoriesObject(str(count_el), counter_elements[count_el] / gv.initial_length_of_data_rows,
counter_elements[count_el]))
return categories_list
def compute_coefficient_of_unalikeability(column_values_coeff_unalikeability, data_type, column_id):
epsilon_percent_for_coefficient_of_unalikeability = 5
sum_unalike = 0
if data_type == id_data_type__categorical:
counter = collections.Counter(column_values_coeff_unalikeability)
for count in counter.values():
sum_unalike += count * (len(column_values_coeff_unalikeability) - count) # (gv.initial_length_of_data_rows - count)
else:
column_values_coeff_unalikeability = pd.Series(column_values_coeff_unalikeability)
column_values_coeff_unalikeability = column_values_coeff_unalikeability.rename(column_id)
original_column_values = column_values_coeff_unalikeability
if len([x for x in gv.data_initially_formatted if x.id == column_id]) > 0:
original_column_values = pd.Series([x for x in gv.data_initially_formatted if x.id == column_id][0].column_values)
original_column_values = original_column_values.rename(column_id)
# due to normalization, we have fixed min and max values
min_value = 0 # numpy.amin(original_column_values)
max_value = 1 # numpy.amax(original_column_values)
epsilon = (max_value - min_value) / 100 * epsilon_percent_for_coefficient_of_unalikeability
length_available_data = len(column_values_coeff_unalikeability) # gv.initial_length_of_data_rows
# very low on performance, not the best implementation
# better: group the values
# for i in column_values:
# if ~numpy.isnan(i):
#
# sum_unalike += + len(list(x for x in sorted(column_values) if (i - epsilon) <= x <= (i + epsilon)))
# else:
# length_available_data -= 1
#
# sum_unalike += length_available_data * (column_values_length - length_available_data)
if numpy.isnan(min_value):
sum_unalike += length_available_data * (len(column_values_coeff_unalikeability) - length_available_data) # (gv.initial_length_of_data_rows - length_available_data)
elif min_value == max_value:
counter = collections.Counter(column_values_coeff_unalikeability)
sum_unalike += counter[min_value] * (len(column_values_coeff_unalikeability) - counter[min_value])
sum_unalike += (len(column_values_coeff_unalikeability) - counter[min_value]) * (len(column_values_coeff_unalikeability) -
(len(column_values_coeff_unalikeability) -
counter[
min_value]))
else:
grouped_values = column_values_coeff_unalikeability.groupby(pd.cut(column_values_coeff_unalikeability, numpy.arange(min_value,
max_value,
epsilon))).count().to_frame()
for count in grouped_values[column_values_coeff_unalikeability.name]:
if count > 0:
length_available_data -= count
sum_unalike += count * (len(column_values_coeff_unalikeability) - count)
sum_unalike += length_available_data * (len(column_values_coeff_unalikeability) - length_available_data)
sum_unalike = sum_unalike / (len(column_values_coeff_unalikeability) * len(column_values_coeff_unalikeability))
return sum_unalike
def get_number_of_modes(current_col, data_type):
cleaned_list = [x for x in current_col if (str(x) != 'nan' and str(x) != "None")]
if len(cleaned_list) == 0:
return 0
elif data_type == id_data_type__categorical:
threshold = 0.10
relative_frequencies = [number / gv.initial_length_of_data_rows for number in
list(collections.Counter(cleaned_list).values())]
max_mode_freque = numpy.amax(relative_frequencies)
# j2 = [i for i in relative_frequencies if i >= max_mode_freque - max_mode_freque * threshold]
return len([i for i in relative_frequencies if i >= max_mode_freque - max_mode_freque * threshold])
else:
if numpy.amin(cleaned_list) == numpy.amax(cleaned_list):
return 1
else:
bandwidths = list(numpy.histogram(cleaned_list, 'fd'))
best_bandwidth = (bandwidths[1][1] - bandwidths[1][0])
# kde = stats.gaussian_kde(cleaned_list, best_bandwidth)
a = numpy.asarray([i * 100 for i in cleaned_list]).reshape(-1, 1) #
kde_sklearn = KernelDensity(kernel='gaussian', bandwidth=best_bandwidth).fit(a)
s = numpy.linspace(0,50)
e = kde_sklearn.score_samples(s.reshape(-1, 1))
maxima = numpy.count_nonzero(numpy.r_[True, e[1:] > e[:-1]] & numpy.r_[e[:-1] > e[1:], True])
return int(maxima) # len(argrelextrema(kde(bandwidths[1]), numpy.greater)[0])