https://github.com/JulianeMu/IntegratedDualAnalysisAproach_MDA
Tip revision: e677a38a563ef5a9eefde04725a247c26424d92d authored by Juliane Müller on 11 February 2021, 08:05:13 UTC
Update README.md
Update README.md
Tip revision: e677a38
compute_descriptive_statistics_MDA.py
import collections
import statistics
from math import sqrt
import numpy
import pandas as pd
from sklearn.neighbors import KernelDensity
import global_variables as gv
id_data_type__categorical = "string"
id_data_type__numerical = "number"
id_data_type__date = "date"
def get_descriptive_statistics_deviations(current_descriptive_statistics, descriptive_statistics_whole_data):
current_descriptive_statistics.coefficient_of_unalikeability_deviation = current_descriptive_statistics.coefficient_of_unalikeability - descriptive_statistics_whole_data.coefficient_of_unalikeability
current_descriptive_statistics.stDev_deviation = current_descriptive_statistics.stDev - descriptive_statistics_whole_data.stDev
current_descriptive_statistics.varNC_deviation = current_descriptive_statistics.varNC - descriptive_statistics_whole_data.varNC
current_descriptive_statistics.number_of_modes_deviation = current_descriptive_statistics.number_of_modes - descriptive_statistics_whole_data.number_of_modes
current_descriptive_statistics.missing_values_percentage_deviation = current_descriptive_statistics.missing_values_percentage - descriptive_statistics_whole_data.missing_values_percentage
current_descriptive_statistics.overall_deviation = (abs(
current_descriptive_statistics.coefficient_of_unalikeability_deviation) + abs(
current_descriptive_statistics.stDev_deviation) + abs(
current_descriptive_statistics.missing_values_percentage_deviation)) / 3
return current_descriptive_statistics
def compute_stdev(column_values_stdev_varnc, data_type):
if data_type == id_data_type__categorical:
relative_frequencies = [number / gv.initial_length_of_data_rows for number in
list(collections.Counter(column_values_stdev_varnc).values())]
# fm = numpy.amax(relative_frequencies)
n_total_size = sum(relative_frequencies)
k = len(relative_frequencies)
# check if k is smaller 2, if so return 0,0 because division by 0 is not allowed
if k < 2:
return 0, 0
sum_distances_stdev = 0
sum_value_varnc = sum(
[(cat_frequ - n_total_size / k) * (cat_frequ - n_total_size / k) for cat_frequ in relative_frequencies])
varnc = (sum_value_varnc / (((n_total_size * n_total_size) * (k - 1)) / k))
# sum_value_iqv = sum([cat_frequ * cat_frequ for cat_frequ in relative_frequencies])
# sum_value_h_star = sum([cat_frequ * log(cat_frequ) for cat_frequ in relative_frequencies])
for i in range(len(relative_frequencies) - 1):
current_frequ = relative_frequencies[i]
sum_distances_stdev += sum([(current_frequ - cat_frequ) * (current_frequ - cat_frequ) for cat_frequ in
relative_frequencies[i + 1:]])
stdev = sqrt(sum_distances_stdev / (n_total_size * n_total_size * (k - 1)))
else:
not_nan_values = [x for x in column_values_stdev_varnc if str(x) != 'nan']
stdev = statistics.stdev(not_nan_values)
varnc = statistics.variance(not_nan_values)
return stdev, varnc
class CategoriesObject(object):
def __init__(self, unique_value, relative_frequency, count):
self.unique_value = unique_value
self.relativeFrequency = relative_frequency
self.count = count
def get_categories(column_values):
counter_elements = collections.Counter(column_values)
categories_list = []
# for count_el in range(len(collections.Counter(column_values))):
for count_el in counter_elements:
if str(count_el) != 'nan':
categories_list.append(
CategoriesObject(str(count_el), (counter_elements[count_el] / len(column_values)),
counter_elements[count_el]))
return categories_list
def compute_coefficient_of_unalikeability(column_values_coeff_unalikeability, data_type, column_id):
epsilon_percent_for_coefficient_of_unalikeability = gv.coefficient_of_unalikeability_threshold
sum_unalike = 0
if data_type == id_data_type__categorical:
counter = collections.Counter(column_values_coeff_unalikeability)
for count in counter.values():
sum_unalike += count * (len(column_values_coeff_unalikeability) - count) # (gv.initial_length_of_data_rows - count)
else:
column_values_coeff_unalikeability = pd.Series(column_values_coeff_unalikeability)
column_values_coeff_unalikeability = column_values_coeff_unalikeability.rename(column_id)
original_column_values = column_values_coeff_unalikeability
if len([x for x in gv.data_initially_formatted if x.id == column_id]) > 0:
original_column_values = pd.Series([x for x in gv.data_initially_formatted if x.id == column_id][0].column_values)
original_column_values = original_column_values.rename(column_id)
# due to normalization, we have fixed min and max values
min_value = 0 # numpy.amin(original_column_values)
max_value = 1 # numpy.amax(original_column_values)
epsilon = epsilon_percent_for_coefficient_of_unalikeability #(max_value - min_value) / 100 * \
length_available_data = len([x for x in column_values_coeff_unalikeability if (x is not None and x != 'None')]) # gv.initial_length_of_data_rows
# very low on performance, not the best implementation
# better: group the values
# for i in column_values:
# if ~numpy.isnan(i):
#
# sum_unalike += + len(list(x for x in sorted(column_values) if (i - epsilon) <= x <= (i + epsilon)))
# else:
# length_available_data -= 1
#
# sum_unalike += length_available_data * (column_values_length - length_available_data)
if length_available_data == 0:
return 0
elif numpy.isnan(min_value):
sum_unalike += length_available_data * (len(column_values_coeff_unalikeability) - length_available_data) # (gv.initial_length_of_data_rows - length_available_data)
elif min_value == max_value:
counter = collections.Counter(column_values_coeff_unalikeability)
sum_unalike += counter[min_value] * (len(column_values_coeff_unalikeability) - counter[min_value])
sum_unalike += (len(column_values_coeff_unalikeability) - counter[min_value]) * (len(column_values_coeff_unalikeability) -
(len(column_values_coeff_unalikeability) -
counter[
min_value]))
else:
grouped_values = column_values_coeff_unalikeability.groupby(pd.cut(column_values_coeff_unalikeability, numpy.arange(min_value,
max_value,
epsilon))).count().to_frame()
for count in grouped_values[column_values_coeff_unalikeability.name]:
if count > 0:
length_available_data -= count
sum_unalike += count * (len(column_values_coeff_unalikeability) - count)
sum_unalike += length_available_data * (len(column_values_coeff_unalikeability) - length_available_data)
if len(column_values_coeff_unalikeability) == 0:
return 0
sum_unalike = sum_unalike / (len(column_values_coeff_unalikeability) * len(column_values_coeff_unalikeability))
return sum_unalike
def get_number_of_modes(current_col, data_type, column_id):
cleaned_list = [x for x in current_col if (str(x) != 'nan' and str(x) != "None")]
if len(cleaned_list) == 0:
return 0
elif data_type == id_data_type__categorical:
threshold = gv.modes_threshold
relative_frequencies = [number / gv.initial_length_of_data_rows for number in
list(collections.Counter(cleaned_list).values())]
max_mode_freque = numpy.amax(relative_frequencies)
# j2 = [i for i in relative_frequencies if i >= max_mode_freque - max_mode_freque * threshold]
return len([i for i in relative_frequencies if i >= max_mode_freque - max_mode_freque * threshold])
else:
if numpy.amin(cleaned_list) == numpy.amax(cleaned_list):
return 1
else:
bandwidths = list(numpy.histogram(cleaned_list, 'fd'))
best_bandwidth = (bandwidths[1][1] - bandwidths[1][0])
# kde = stats.gaussian_kde(cleaned_list, best_bandwidth)
a = numpy.asarray([i * 100 for i in cleaned_list]).reshape(-1, 1) #
kde_sklearn = KernelDensity(kernel='gaussian', bandwidth=best_bandwidth).fit(a)
s = numpy.linspace(0,50)
e = kde_sklearn.score_samples(s.reshape(-1, 1))
maxima = numpy.count_nonzero(numpy.r_[True, e[1:] > e[:-1]] & numpy.r_[e[:-1] > e[1:], True])
return int(maxima) # len(argrelextrema(kde(bandwidths[1]), numpy.greater)[0])
def update_coeff_unalikeability_modes(current_col):
current_col_without_nan = [current_val for current_val in current_col.descriptive_statistics.normalized_values
if str(current_val) != 'nan']
column_used = current_col_without_nan
if gv.include_missing_values:
column_used = current_col.descriptive_statistics.normalized_values
current_col.descriptive_statistics.coefficient_of_unalikeability = compute_coefficient_of_unalikeability(
column_used, current_col.data_type, current_col.id)
current_col.descriptive_statistics.number_of_modes = get_number_of_modes(column_used, current_col.data_type,
current_col.id)
return current_col
def update_coeff_unalikeability_modes_dict(current_col):
current_col_without_nan = [current_val for current_val in current_col['descriptive_statistics']['normalized_values']
if str(current_val) != 'nan']
column_used = current_col_without_nan
if gv.include_missing_values:
column_used = current_col['descriptive_statistics']['normalized_values']
current_col['descriptive_statistics']['coefficient_of_unalikeability'] = compute_coefficient_of_unalikeability(
column_used, current_col['data_type'], current_col['id'])
current_col['descriptive_statistics']['number_of_modes'] = get_number_of_modes(column_used, current_col['data_type']
, current_col['id'])
return current_col
def update_coeff_unalikealibiity_modes_deviations(current_col):
used_data = gv.data_initially_formatted_no_missing_values
if gv.include_missing_values:
used_data = gv.data_initially_formatted
whole_data_col = [x for x in used_data if x.id == current_col['id']][0]
current_col['descriptive_statistics']['coefficient_of_unalikeability_deviation'] = current_col['descriptive_statistics']['coefficient_of_unalikeability'] - whole_data_col.descriptive_statistics.coefficient_of_unalikeability
current_col['descriptive_statistics']['number_of_modes_deviation'] = current_col['descriptive_statistics']['number_of_modes'] - whole_data_col.descriptive_statistics.number_of_modes
current_col['descriptive_statistics']['overall_deviation'] = (abs(
current_col['descriptive_statistics']['coefficient_of_unalikeability_deviation']) + abs(
current_col['descriptive_statistics']['stDev_deviation']) + abs(
current_col['descriptive_statistics']['missing_values_percentage_deviation'])) / 3
return current_col