https://github.com/awslabs/datawig
Raw File
Tip revision: 18099c579c850901a58ccbfdeecc6a7738f62b1c authored by dependabot[bot] on 01 March 2023, 20:33:22 UTC
Bump mxnet from 1.4.0 to 1.9.1 in /requirements
Tip revision: 18099c5
test_simple_imputer.py
# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You may not
# use this file except in compliance with the License. A copy of the License
# is located at
#
#     http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is distributed on
# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.

"""

DataWig SimpleImputer tests

"""

import os
import warnings

import numpy as np
import pandas as pd
import pytest
from sklearn.metrics import f1_score, mean_squared_error

from datawig.column_encoders import BowEncoder
from datawig.mxnet_input_symbols import BowFeaturizer
from datawig.simple_imputer import SimpleImputer
from datawig.utils import (logger, rand_string, random_split, generate_df_numeric,
                           generate_df_string)
from datawig import column_encoders
from .conftest import synthetic_label_shift_simple

warnings.filterwarnings("ignore")

logger.setLevel("INFO")


def test_simple_imputer_no_string_column_name():
    with pytest.raises(ValueError):
        SimpleImputer([0], '1')
    with pytest.raises(ValueError):
        SimpleImputer(['0'], 1)


def test_simple_imputer_label_shift(test_dir):
    """
    Test capabilities for detecting and correcting label shift
    """

    tr = synthetic_label_shift_simple(N=1000, label_proportions=[.2, .8], error_proba=.05, covariates=['foo', 'bar'])
    val = synthetic_label_shift_simple(N=500, label_proportions=[.9, .1], error_proba=.05, covariates=['foo', 'bar'])

    # randomly make covariate uninformative
    rand_idxs = np.random.choice(range(val.shape[0]), size=int(val.shape[0]/3), replace=False)
    val.loc[rand_idxs, 'covariate'] = 'foo bar'

    tr, te = random_split(tr, [.8, .2])

    # train domain classifier
    imputer = SimpleImputer(
        input_columns=['covariate'],
        output_column='label',
        output_path=os.path.join(test_dir, "tmp", "label_weighting_experiments"))

    # Fit an imputer model on the train data (coo_imputed_proba, coo_imputed)
    imputer.fit(tr, te, num_epochs=15, learning_rate=3e-4, weight_decay=0)
    pred = imputer.predict(val)

    # compute estimate of ratio of marginals and add corresponding label to the training data
    weights = imputer.check_for_label_shift(val)

    # retrain classifier with balancing
    imputer_balanced = SimpleImputer(
        input_columns=['covariate'],
        output_column='label',
        output_path=os.path.join(test_dir, "tmp", "label_weighting_experiments"))

    # Fit an imputer model on the train data (coo_imputed_proba, coo_imputed)
    imputer_balanced.fit(tr, te, num_epochs=15, learning_rate=3e-4, weight_decay=0, class_weights=weights)

    pred_balanced = imputer_balanced.predict(val)

    acc_balanced = (pred_balanced.label == pred_balanced['label_imputed']).mean()
    acc_classic = (pred.label == pred['label_imputed']).mean()

    # check that weighted performance is better
    assert acc_balanced > acc_classic


def test_label_shift_weight_computation():
    """
    Tests that label shift detection can determine the label marginals of validation data.
    """

    train_proportion = [.7, .3]
    target_proportion = [.3, .7]

    data = synthetic_label_shift_simple(N=2000, label_proportions=train_proportion,
                                        error_proba=.1, covariates=['foo', 'bar'])

    # original train test splits
    tr, te = random_split(data, [.5, .5])

    # train domain classifier
    imputer = SimpleImputer(
        input_columns=['covariate'],
        output_column='label',
        output_path='/tmp/imputer_model')

    # Fit an imputer model on the train data (coo_imputed_proba, coo_imputed)
    imputer.fit(tr, te, num_epochs=15, learning_rate=3e-4, weight_decay=0)

    target_data = synthetic_label_shift_simple(1000, target_proportion,
                                               error_proba=.1, covariates=['foo', 'bar'])

    weights = imputer.check_for_label_shift(target_data)

    # compare the product of weights and training marginals
    # (i.e. estimated target marginals) with the true target marginals.
    for x in list(zip(list(weights.values()), train_proportion, target_proportion)):
        assert x[0]*x[1] - x[2] < .1



def test_simple_imputer_real_data_default_args(test_dir, data_frame):
    """
    Tests SimpleImputer with default options

    """
    feature_col = "string_feature"
    label_col = "label"

    n_samples = 2000
    num_labels = 3
    seq_len = 100
    vocab_size = int(2 ** 15)

    # generate some random data
    random_data = data_frame(feature_col=feature_col,
                             label_col=label_col,
                             vocab_size=vocab_size,
                             num_labels=num_labels,
                             num_words=seq_len,
                             n_samples=n_samples)

    df_train, df_test, df_val = random_split(random_data, [.8, .1, .1])

    output_path = os.path.join(test_dir, "tmp", "real_data_experiment_simple")

    df_train_cols_before = df_train.columns.tolist()

    input_columns = [feature_col]

    imputer = SimpleImputer(
        input_columns=input_columns,
        output_column=label_col,
        output_path=output_path
    ).fit(
        train_df=df_train,
        num_epochs=10
    )

    logfile = os.path.join(imputer.output_path, 'imputer.log')
    assert os.path.exists(logfile)
    assert os.path.getsize(logfile) > 0

    assert imputer.output_path == output_path
    assert imputer.imputer.data_featurizers[0].__class__ == BowFeaturizer
    assert imputer.imputer.data_encoders[0].__class__ == BowEncoder
    assert set(imputer.imputer.data_encoders[0].input_columns) == set(input_columns)
    assert set(imputer.imputer.label_encoders[0].input_columns) == set([label_col])

    assert all([after == before for after, before in zip(df_train.columns, df_train_cols_before)])


    df_no_label_column = df_test.copy()
    true_labels = df_test[label_col]
    del (df_no_label_column[label_col])
    df_test_cols_before = df_no_label_column.columns.tolist()

    df_test_imputed = imputer.predict(df_no_label_column, inplace=True)

    assert all(
        [after == before for after, before in zip(df_no_label_column.columns, df_test_cols_before)])

    imputed_columns = df_test_cols_before + [label_col + "_imputed", label_col + "_imputed_proba"]

    assert all([after == before for after, before in zip(df_test_imputed, imputed_columns)])

    f1 = f1_score(true_labels, df_test_imputed[label_col + '_imputed'], average="weighted")

    assert f1 > .9

    new_path = imputer.output_path + "-" + rand_string()

    os.rename(imputer.output_path, new_path)

    deserialized = SimpleImputer.load(new_path)
    df_test = deserialized.predict(df_test, imputation_suffix="_deserialized_imputed")
    f1 = f1_score(df_test[label_col], df_test[label_col + '_deserialized_imputed'],
                  average="weighted")

    assert f1 > .9

    retrained_simple_imputer = deserialized.fit(df_train, df_train, num_epochs=10)

    df_train_imputed = retrained_simple_imputer.predict(df_train.copy(), inplace=True)
    f1 = f1_score(df_train[label_col], df_train_imputed[label_col + '_imputed'], average="weighted")

    assert f1 > .9

    metrics = retrained_simple_imputer.load_metrics()

    assert f1 == metrics['weighted_f1']


def test_numeric_or_text_imputer(test_dir, data_frame):
    """
    Tests SimpleImputer with default options

    """

    feature_col = "string_feature"
    label_col = "label"

    n_samples = 1000
    num_labels = 3
    seq_len = 30
    vocab_size = int(2 ** 10)

    # generate some random data
    random_data = data_frame(feature_col=feature_col,
                             label_col=label_col,
                             vocab_size=vocab_size,
                             num_labels=num_labels,
                             num_words=seq_len,
                             n_samples=n_samples)

    numeric_data = np.random.uniform(-np.pi, np.pi, (n_samples,))
    df = pd.DataFrame({
        'x': numeric_data,
        '*2': numeric_data * 2. + np.random.normal(0, .1, (n_samples,)),
        '**2': numeric_data ** 2 + np.random.normal(0, .1, (n_samples,)),
        feature_col: random_data[feature_col].values,
        label_col: random_data[label_col].values
    })

    df_train, df_test = random_split(df, [.8, .2])
    output_path = os.path.join(test_dir, "tmp", "real_data_experiment_numeric")

    imputer_numeric_linear = SimpleImputer(
        input_columns=['x', feature_col],
        output_column="*2",
        output_path=output_path
    ).fit(
        train_df=df_train,
        learning_rate=1e-3,
        num_epochs=10
    )

    imputer_numeric_linear.predict(df_test, inplace=True)

    assert mean_squared_error(df_test['*2'], df_test['*2_imputed']) < 1.0

    imputer_numeric = SimpleImputer(
        input_columns=['x', feature_col],
        output_column="**2",
        output_path=output_path
    ).fit(
        train_df=df_train,
        learning_rate=1e-3
    )

    imputer_numeric.predict(df_test, inplace=True)

    assert mean_squared_error(df_test['**2'], df_test['**2_imputed']) < 1.0

    imputer_string = SimpleImputer(
        input_columns=[feature_col, 'x'],
        output_column=label_col,
        output_path=output_path
    ).fit(
        train_df=df_train,
        num_epochs=10
    )

    imputer_string.predict(df_test, inplace=True)

    assert f1_score(df_test[label_col], df_test[label_col + '_imputed'], average="weighted") > .7


def test_imputer_hpo_numeric(test_dir):
    """

    Tests SimpleImputer HPO for numeric data/imputation

    """

    N = 200
    numeric_data = np.random.uniform(-np.pi, np.pi, (N,))
    df = pd.DataFrame({
        'x': numeric_data,
        '**2': numeric_data ** 2 + np.random.normal(0, .1, (N,)),
    })

    df_train, df_test = random_split(df, [.8, .2])
    output_path = os.path.join(test_dir, "tmp", "experiment_numeric_hpo")

    imputer_numeric = SimpleImputer(
        input_columns=['x'],
        output_column="**2",
        output_path=output_path)

    feature_col = 'x'

    hps = dict()
    hps[feature_col] = {}
    hps[feature_col]['type'] = ['numeric']
    hps[feature_col]['numeric_latent_dim'] = [30]
    hps[feature_col]['numeric_hidden_layers'] = [1]

    hps['global'] = {}
    hps['global']['final_fc_hidden_units'] = [[]]
    hps['global']['learning_rate'] = [1e-3, 1e-4]
    hps['global']['weight_decay'] = [0]
    hps['global']['num_epochs'] = [200]
    hps['global']['patience'] = [100]

    imputer_numeric.fit_hpo(df_train, hps=hps)
    results = imputer_numeric.hpo.results

    assert results[results['mse'] == min(results['mse'])]['mse'].iloc[0] < 1.5


def test_imputer_hpo_text(test_dir, data_frame):
    """

    Tests SimpleImputer HPO with text data and categorical imputations

    """
    feature_col = "string_feature"
    label_col = "label"

    n_samples = 1000
    num_labels = 3
    seq_len = 20

    # generate some random data
    df = data_frame(feature_col=feature_col,
                    label_col=label_col,
                    num_labels=num_labels,
                    num_words=seq_len,
                    n_samples=n_samples)

    df_train, df_test = random_split(df, [.8, .2])

    output_path = os.path.join(test_dir, "tmp", "experiment_text_hpo")

    imputer_string = SimpleImputer(
        input_columns=[feature_col],
        output_column=label_col,
        output_path=output_path
    )

    hps = dict()
    hps[feature_col] = {}
    hps[feature_col]['type'] = ['string']
    hps[feature_col]['tokens'] = [['words'], ['chars']]

    hps['global'] = {}
    hps['global']['final_fc_hidden_units'] = [[]]
    hps['global']['learning_rate'] = [1e-3]
    hps['global']['weight_decay'] = [0]
    hps['global']['num_epochs'] = [30]

    imputer_string.fit_hpo(df_train, hps=hps, num_epochs=10, num_evals=3)

    assert max(imputer_string.hpo.results['f1_micro']) > 0.7


def test_hpo_all_input_types(test_dir, data_frame):
    """

    Using sklearn advantages: parallelism, distributions of parameters, multiple cross-validation

    """
    label_col = "label"

    n_samples = 500
    num_labels = 3
    seq_len = 12

    # generate some random data
    df = data_frame(feature_col="string_feature",
                    label_col=label_col,
                    num_labels=num_labels,
                    num_words=seq_len,
                    n_samples=n_samples)

    # add categorical feature
    df['categorical_feature'] = ['foo' if r > .5 else 'bar' for r in np.random.rand(n_samples)]

    # add numerical feature
    df['numeric_feature'] = np.random.rand(n_samples)

    df_train, df_test = random_split(df, [.8, .2])
    output_path = os.path.join(test_dir, "tmp", "real_data_experiment_text_hpo")

    imputer = SimpleImputer(
        input_columns=['string_feature', 'categorical_feature', 'numeric_feature'],
        output_column='label',
        output_path=output_path
    )

    # Define default hyperparameter choices for each column type (string, categorical, numeric)
    hps = dict()
    hps['global'] = {}
    hps['global']['learning_rate'] = [3e-4]
    hps['global']['weight_decay'] = [1e-8]
    hps['global']['num_epochs'] = [5, 50]
    hps['global']['patience'] = [5]
    hps['global']['batch_size'] = [16]
    hps['global']['final_fc_hidden_units'] = [[]]

    hps['string_feature'] = {}
    hps['string_feature']['max_tokens'] = [2 ** 15]
    hps['string_feature']['tokens'] = [['words', 'chars']]
    hps['string_feature']['ngram_range'] = {}
    hps['string_feature']['ngram_range']['words'] = [(1, 5)]
    hps['string_feature']['ngram_range']['chars'] = [(2, 4), (1, 3)]

    hps['categorical_feature'] = {}
    hps['categorical_feature']['type'] = ['categorical']
    hps['categorical_feature']['max_tokens'] = [2 ** 15]
    hps['categorical_feature']['embed_dim'] = [10]

    hps['numeric_feature'] = {}
    hps['numeric_feature']['normalize'] = [True]
    hps['numeric_feature']['numeric_latent_dim'] = [10]
    hps['numeric_feature']['numeric_hidden_layers'] = [1]

    # user defined score function for hyperparameters
    def calibration_check(true, predicted, confidence):
        """
        expect kwargs: true, predicted, confidence
        here we compute a calibration sanity check
        """
        return (np.mean(true[confidence > .9] == predicted[confidence > .9]),
                np.mean(true[confidence > .5] == predicted[confidence > .5]))

    def coverage_check(true, predicted, confidence):
        return np.mean(confidence > .9)

    uds = [(calibration_check, 'calibration check'),
           (coverage_check, 'coverage at 90')]

    imputer.fit_hpo(df_train,
                    hps=hps,
                    user_defined_scores=uds,
                    num_evals=3,
                    hpo_run_name='test1_',
                    num_epochs=10)

    imputer.fit_hpo(df_train,
                    hps=hps,
                    user_defined_scores=uds,
                    num_evals=3,
                    hpo_run_name='test2_',
                    max_running_hours=1/3600,
                    num_epochs=10)

    results = imputer.hpo.results

    assert results[results['global:num_epochs'] == 50]['f1_micro'].iloc[0] > \
           results[results['global:num_epochs'] == 5]['f1_micro'].iloc[0]

    
def test_hpo_defaults(test_dir, data_frame):
    label_col = "label"

    n_samples = 500
    num_labels = 3
    seq_len = 10

    # generate some random data
    df = data_frame(feature_col="string_feature",
                    label_col=label_col,
                    num_labels=num_labels,
                    num_words=seq_len,
                    n_samples=n_samples)

    # add categorical feature
    df['categorical_feature'] = ['foo' if r > .5 else 'bar' for r in np.random.rand(n_samples)]

    # add numerical feature
    df['numeric_feature'] = np.random.rand(n_samples)

    df_train, df_test = random_split(df, [.8, .2])
    output_path = os.path.join(test_dir, "tmp", "real_data_experiment_text_hpo")

    imputer = SimpleImputer(
        input_columns=['string_feature', 'categorical_feature', 'numeric_feature'],
        output_column='label',
        output_path=output_path
    )

    imputer.fit_hpo(df_train, num_evals=10, num_epochs=5)

    assert imputer.hpo.results.precision_weighted.max() > .9


def test_hpo_num_evals_empty_hps(test_dir, data_frame):
    feature_col, label_col = "feature", "label"

    # generate some random data
    df = data_frame(feature_col=feature_col,
                    label_col=label_col)

    imputer = SimpleImputer(
        input_columns=[col for col in df.columns if col != label_col],
        output_column=label_col,
        output_path=test_dir
    )

    num_evals = 2
    imputer.fit_hpo(df, num_evals=num_evals, num_epochs=10)

    assert imputer.hpo.results.shape[0] == 2


def test_hpo_num_evals_given_hps(test_dir, data_frame):
    feature_col, label_col = "feature", "label"

    # generate some random data
    df = data_frame(feature_col=feature_col,
                    label_col=label_col)

    # assert that num_evals is an upper bound on the number of hpo runs
    for num_evals in range(1, 3):
        imputer = SimpleImputer(
            input_columns=[col for col in df.columns if col != label_col],
            output_column=label_col,
            output_path=test_dir
        )

        imputer.fit_hpo(df, num_evals=num_evals, num_epochs=5)

        assert imputer.hpo.results.shape[0] == num_evals


def test_hpo_many_columns(test_dir, data_frame):
    feature_col, label_col = "feature", "label"

    n_samples = 300
    num_labels = 3
    ncols = 10
    seq_len = 4

    # generate some random data
    df = data_frame(feature_col=feature_col,
                    label_col=label_col,
                    num_labels=num_labels,
                    num_words=seq_len,
                    n_samples=n_samples)

    for col in range(ncols):
        df[feature_col + '_' + str(col)] = df[feature_col]

    imputer = SimpleImputer(
        input_columns=[col for col in df.columns if not col in ['label']],
        output_column=label_col,
        output_path=test_dir
    )

    imputer.fit_hpo(df, num_evals=2, num_epochs=10)

    assert imputer.hpo.results.precision_weighted.max() > .75


def test_imputer_categorical_heuristic(data_frame):
    """
    Tests the heuristic used for checking whether a column is categorical
    :param data_frame:
    """

    feature_col = "string_feature"
    label_col = "label"

    n_samples = 1000
    num_labels = 3
    seq_len = 20

    # generate some random data
    df = data_frame(feature_col=feature_col,
                    label_col=label_col,
                    num_labels=num_labels,
                    num_words=seq_len,
                    n_samples=n_samples)

    assert SimpleImputer._is_categorical(df[feature_col]) == False
    assert SimpleImputer._is_categorical(df[label_col]) == True


def test_imputer_complete():
    """
    Tests the complete functionality of SimpleImputer
    :param data_frame:
    """

    feature_col = "string_feature"
    label_col = "label"
    feature_col_numeric = "numeric_feature"
    label_col_numeric = "numeric_label"

    num_samples = 1000
    num_labels = 3
    seq_len = 20

    missing_ratio = .1

    df_string = generate_df_string(
        num_labels=num_labels,
        num_words=seq_len,
        num_samples=num_samples,
        label_column_name=label_col,
        data_column_name=feature_col)

    df_numeric = generate_df_numeric(num_samples=num_samples,
                                     label_column_name=label_col_numeric,
                                     data_column_name=feature_col_numeric)

    df = pd.concat([
        df_string[[feature_col, label_col]],
        df_numeric[[feature_col_numeric, label_col_numeric]]
    ], ignore_index=True, axis=1)
    df.columns = [feature_col, label_col, feature_col_numeric, label_col_numeric]

    # delete some entries
    for col in df.columns:
        missing = np.random.random(len(df)) < missing_ratio
        df[col].iloc[missing] = np.nan

    feature_col_missing = df[feature_col].isnull()
    label_col_missing = df[label_col].isnull()

    df = SimpleImputer.complete(data_frame=df)

    assert all(df[feature_col].isnull() == feature_col_missing)
    assert df[label_col].isnull().sum() < label_col_missing.sum()
    assert df[feature_col_numeric].isnull().sum() == 0
    assert df[label_col_numeric].isnull().sum() == 0

    df = SimpleImputer.complete(data_frame=df, output_path='some_path')

def test_default_no_explainable_simple_imputer():
    imputer = SimpleImputer(
        ['features'],
        'label'
    )
    assert not imputer.is_explainable


def test_explainable_simple_imputer_unfitted():
    label_col = 'label'

    imputer = SimpleImputer(
        ['features'],
        label_col,
        is_explainable=True
    )

    assert imputer.is_explainable

    try:
        imputer.explain('some class')
        raise pytest.fail('imputer.explain should fail with an appropriate error message')
    except ValueError as exception:
        assert exception.args[0] == 'Need to call .fit() before'

    instance = pd.Series({'features': 'some feature text'})
    try:
        imputer.explain_instance(instance)
        raise pytest.fail('imputer.explain_instance should fail with an appropriate error message')
    except ValueError as exception:
        assert exception.args[0] == 'Need to call .fit() before'


def test_explainable_simple_imputer(test_dir, data_frame):
    label_col = 'label'
    df = data_frame(n_samples=100, label_col=label_col)

    output_path = os.path.join(test_dir, "tmp")
    imputer = SimpleImputer(
        ['features'],
        label_col,
        output_path=output_path,
        is_explainable=True
    ).fit(df)

    assert imputer.is_explainable

    assert isinstance(imputer.imputer.data_encoders[0], column_encoders.TfIdfEncoder)

    # explain should not raise an exception
    _ = imputer.explain(df[label_col].unique()[0])

    # explain_instance should not raise an exception
    instance = pd.Series({'features': 'some feature text'})
    _ = imputer.explain_instance(instance)

    assert True


def test_hpo_runs(test_dir, data_frame):
    feature_col, label_col = "feature", "label"

    df = data_frame(feature_col=feature_col,
                    label_col=label_col)

    imputer = SimpleImputer(
        input_columns=[col for col in df.columns if col != label_col],
        output_column=label_col,
        output_path=test_dir
    )

    hps = dict()
    max_tokens = [1024, 2048]
    hps[feature_col] = {'max_tokens': max_tokens}
    hps['global'] = {}
    hps['global']['concat_columns'] = [False]
    hps['global']['num_epochs'] = [10]
    hps['global']['num_epochs'] = [10]
    hps['global']['num_epochs'] = [10]

    imputer.fit_hpo(df, hps=hps, num_hash_bucket_candidates=[2**15], tokens_candidates=['words'])

    # only search over specified parameter ranges
    assert set(imputer.hpo.results[feature_col+':'+'max_tokens'].unique().tolist()) == set(max_tokens)
    assert imputer.hpo.results.shape[0] == 2


def test_hpo_single_column_encoder_parameter(test_dir, data_frame):
    feature_col, label_col = "feature", "label"

    df = data_frame(feature_col=feature_col,
                    label_col=label_col)

    imputer = SimpleImputer(
        input_columns=[col for col in df.columns if col != label_col],
        output_column=label_col,
        output_path=test_dir,
        is_explainable=True
    )

    hps = dict()
    hps[feature_col] = {'max_tokens': [1024]}
    hps['global'] = {}
    hps['global']['num_epochs'] = [10]

    imputer.fit_hpo(df, hps=hps)

    assert imputer.hpo.results.shape[0] == 2
    assert imputer.imputer.data_encoders[0].vectorizer.max_features == 1024


def test_hpo_multiple_columns_only_one_used(test_dir, data_frame):
    feature_col, label_col = "feature", "label"

    df = data_frame(feature_col=feature_col,
                    label_col=label_col)
    df.loc[:, feature_col+'_2'] = df.loc[:, feature_col]

    imputer = SimpleImputer(
        input_columns=[feature_col],
        output_column=label_col,
        output_path=test_dir,
        is_explainable=True
    )

    hps = dict()
    hps['global'] = {}
    hps['global']['num_epochs'] = [10]
    hps[feature_col] = {'max_tokens': [1024]}
    hps[feature_col]['tokens'] = [['chars']]

    imputer.fit_hpo(df, hps=hps)

    assert imputer.hpo.results.shape[0] == 1
    assert imputer.imputer.data_encoders[0].vectorizer.max_features == 1024


def test_hpo_mixed_hps_and_kwargs(test_dir, data_frame):
    feature_col, label_col = "feature", "label"

    df = data_frame(feature_col=feature_col,
                    label_col=label_col)

    imputer = SimpleImputer(
        input_columns=[feature_col],
        output_column=label_col,
        output_path=test_dir
    )

    hps = {feature_col: {'max_tokens': [1024]}}

    imputer.fit_hpo(df, hps=hps, learning_rate_candidates=[0.1])

    assert imputer.hpo.results['global:learning_rate'].values[0] == 0.1


def test_hpo_mixed_hps_and_kwargs_precedence(test_dir, data_frame):
    feature_col, label_col = "feature", "label"

    df = data_frame(feature_col=feature_col,
                    label_col=label_col)

    imputer = SimpleImputer(
        input_columns=[feature_col],
        output_column=label_col,
        output_path=test_dir
    )

    hps = {feature_col: {'max_tokens': [1024]}, 'global': {'learning_rate': [0.11]}}

    imputer.fit_hpo(df, hps=hps, learning_rate_candidates=[0.1])

    # give parameters in `hps` precedence over fit_hpo() kwargs
    assert imputer.hpo.results['global:learning_rate'].values[0] == 0.11


def test_hpo_similar_input_col_mixed_types(test_dir, data_frame):
    feature_col, label_col = "feature", "label"
    numeric_col = "numeric_feature"
    categorical_col = "categorical_col"

    df = data_frame(feature_col=feature_col,
                    label_col=label_col)

    df.loc[:, numeric_col] = np.random.randn(df.shape[0])
    df.loc[:, categorical_col] = np.random.randint(df.shape[0])

    imputer = SimpleImputer(
        input_columns=[feature_col, numeric_col, categorical_col],
        output_column=label_col,
        output_path=test_dir
    )

    imputer.fit_hpo(df, num_epochs=10)


def test_hpo_kwargs_only_support(test_dir, data_frame):
    feature_col, label_col = "feature", "label"
    numeric_col = "numeric_feature"

    df = data_frame(feature_col=feature_col,
                    label_col=label_col)

    df.loc[:, numeric_col] = np.random.randn(df.shape[0])

    imputer = SimpleImputer(
        input_columns=[feature_col, numeric_col],
        output_column=label_col,
        output_path=test_dir
    )

    imputer.fit_hpo(
        df,
        num_epochs=1,
        patience=1,
        weight_decay=[0.001],
        batch_size=320,
        num_hash_bucket_candidates=[3],
        tokens_candidates=['words'],
        numeric_latent_dim_candidates=[1],
        numeric_hidden_layers_candidates=[1],
        final_fc_hidden_units=[[1]],
        learning_rate_candidates=[0.1],
        normalize_numeric=False
    )

    def assert_val(col, value):
        assert imputer.hpo.results[col].values[0] == value

    assert_val('global:num_epochs', 1)
    assert_val('global:patience', 1)
    assert_val('global:weight_decay', 0.001)

    assert_val('global:batch_size', 320)
    assert_val(feature_col + ':max_tokens', 3)
    assert_val(feature_col + ':tokens', ['words'])

    assert_val(numeric_col + ':numeric_latent_dim', 1)
    assert_val(numeric_col + ':numeric_hidden_layers', 1)

    assert_val('global:final_fc_hidden_units', [1])
    assert_val('global:learning_rate', 0.1)


def test_hpo_numeric_best_pick(test_dir, data_frame):
    feature_col, label_col = "feature", "label"

    df = data_frame(feature_col=feature_col,
                    label_col=label_col)

    df.loc[:, label_col] = np.random.randn(df.shape[0])

    imputer = SimpleImputer(
        input_columns=[feature_col],
        output_column=label_col,
        output_path=test_dir,
        is_explainable=True
    )

    hps = {feature_col: {'max_tokens': [1, 2, 3]}}
    hps[feature_col]['tokens'] = [['chars']]

    imputer.fit_hpo(df, hps=hps)

    results = imputer.hpo.results

    max_tokens_of_encoder = imputer.imputer.data_encoders[0].vectorizer.max_features

    # model with minimal MSE
    best_hpo_run = imputer.hpo.results['mse'].astype('float').idxmin()
    loaded_hpo_run = results.loc[results[feature_col+':max_tokens'] == max_tokens_of_encoder].index[0]

    assert best_hpo_run == loaded_hpo_run


def test_fit_resumes(test_dir, data_frame):
    feature_col, label_col = "feature", "label"

    df = data_frame(feature_col=feature_col,
                    label_col=label_col)

    imputer = SimpleImputer(
        input_columns=[feature_col],
        output_column=label_col,
        output_path=test_dir
    )

    assert imputer.imputer is None

    imputer.fit(df)
    first_fit_imputer = imputer.imputer

    imputer.fit(df)
    second_fit_imputer = imputer.imputer

    assert first_fit_imputer == second_fit_imputer


def test_hpo_explainable(test_dir, data_frame):
    from sklearn.feature_extraction.text import HashingVectorizer, TfidfVectorizer
    feature_col, label_col = "feature", "label"

    df = data_frame(feature_col=feature_col,
                    label_col=label_col)

    for explainable, vectorizer in [(False, HashingVectorizer), (True, TfidfVectorizer)]:
        imputer = SimpleImputer(
            input_columns=[feature_col],
            output_column=label_col,
            output_path=test_dir,
            is_explainable=explainable
        ).fit_hpo(df, num_epochs=3)
        assert isinstance(imputer.imputer.data_encoders[0].vectorizer, vectorizer)
back to top