Source code for causallift.nodes.utils

from typing import Any, Dict, Type  # NOQA
import logging

from easydict import EasyDict
from kedro.utils import load_obj
import numpy as np
import pandas as pd
import sklearn  # NOQA
from sklearn.metrics import (
    accuracy_score,
    confusion_matrix,
    f1_score,
    precision_score,
    recall_score,
    roc_auc_score,
)


log = logging.getLogger(__name__)


[docs]def get_cols_features( df, non_feature_cols=[ "Treatment", "Outcome", "TransformedOutcome", "Propensity", "Recommendation", ], ): return [column for column in df.columns if column not in non_feature_cols]
[docs]def concat_train_test(args, train, test): r""" Concatenate train and test series. Use series.xs('train') or series.xs('test') to split """ if test is None: series = pd.concat( [pd.Series(train)], keys=["train"], names=[args.partition_name, args.index_name], ) else: series = pd.concat( [pd.Series(train), pd.Series(test)], keys=["train", "test"], names=[args.partition_name, args.index_name], ) return series
[docs]def concat_train_test_df(args, train, test): r""" Concatenate train and test data frames. Use df.xs('train') or df.xs('test') to split. """ df = pd.concat( [train, test], keys=["train", "test"], names=[args.partition_name, args.index_name], ) return df
[docs]def len_t(df, treatment=1.0, col_treatment="Treatment"): return df.query("{}=={}".format(col_treatment, treatment)).shape[0]
[docs]def len_o(df, outcome=1.0, col_outcome="Outcome"): return df.query("{}=={}".format(col_outcome, outcome)).shape[0]
[docs]def len_to( df, treatment=1.0, outcome=1.0, col_treatment="Treatment", col_outcome="Outcome" ): len_ = df.query( "{}=={} & {}=={}".format(col_treatment, treatment, col_outcome, outcome) ).shape[0] return len_
[docs]def treatment_fraction_(df, col_treatment="Treatment"): return len_t(df, col_treatment=col_treatment) / len(df)
[docs]def treatment_fractions_( args, # type: Dict[str, Any] df, # type: Type[pd.DataFrame] ): # type: (...) -> Type[EasyDict] col_treatment = args.col_treatment treatment_fractions = { "train": treatment_fraction_(df.xs("train"), col_treatment=col_treatment), "test": treatment_fraction_(df.xs("test"), col_treatment=col_treatment), } return EasyDict(treatment_fractions)
[docs]def outcome_fraction_(df, col_outcome="Outcome"): return len_o(df, col_outcome=col_outcome) / len(df)
[docs]def overall_uplift_gain_( df, treatment=1.0, outcome=1.0, col_treatment="Treatment", col_outcome="Outcome" ): overall_uplift_gain = ( len_to(df, col_treatment=col_treatment, col_outcome=col_outcome) / len_t(df, col_treatment=col_treatment) ) - ( len_to(df, 0, 1, col_treatment=col_treatment, col_outcome=col_outcome) / len_t(df, 0, col_treatment=col_treatment) ) return overall_uplift_gain
[docs]def gain_tuple(df_, r_): treatment_fraction = treatment_fraction_(df_) outcome_fraction = outcome_fraction_(df_) overall_uplift_gain = overall_uplift_gain_(df_) cgain = np.interp(treatment_fraction, r_.cgains_x, r_.cgains_y) cgain_base = overall_uplift_gain * treatment_fraction cgain_factor = cgain / cgain_base return ( treatment_fraction, outcome_fraction, overall_uplift_gain, cgain, cgain_base, cgain_factor, r_.Q_cgains, r_.q1_cgains, r_.q2_cgains, )
[docs]def score_df(y_train, y_test, y_pred_train, y_pred_test, average="binary"): if ( y_train is not None and y_pred_train is not None and len(y_train) != len(y_pred_train) ): raise Exception("Lengths of true and predicted for train do not match.") if ( y_test is not None and y_pred_test is not None and len(y_test) != len(y_pred_test) ): raise Exception("Lengths of true and predicted for test do not match.") score_df = pd.DataFrame() for (partition_, y_, y_pred_) in [ ("train", y_train, y_pred_train), ("test", y_test, y_pred_test), ]: if ( y_ is not None and y_pred_ is not None and (0 <= y_).all() and (y_ <= 1).all() and (0 <= y_pred_).all() and (y_pred_ <= 1).all() ): num_classes = pd.Series(y_).nunique() score_list = [ len(y_), pd.Series(y_).nunique(), accuracy_score(y_, y_pred_), precision_score(y_, y_pred_, average=average), recall_score(y_, y_pred_, average=average), f1_score(y_, y_pred_, average=average), ] + ( [ roc_auc_score(y_, y_pred_), pd.Series(y_).mean(), pd.Series(y_pred_).mean(), ] if num_classes == 2 else [] ) column_list = [ "# samples", "# classes", "accuracy", "precision", "recall", "f1", ] + ( ["roc_auc", "observed CVR", "predicted CVR"] if num_classes == 2 else [] ) score_df_ = pd.DataFrame( [score_list], index=[partition_], columns=column_list, ) score_df = score_df.append(score_df_) return score_df
[docs]def conf_mat_df(y_true, y_pred): conf_mat = confusion_matrix(y_true, y_pred) num_class = len(conf_mat) true_labels = ["True_{}".format(i) for i in range(num_class)] pred_labels = ["Pred_{}".format(i) for i in range(num_class)] conf_mat_df = pd.DataFrame(conf_mat, index=true_labels, columns=pred_labels) return conf_mat_df
[docs]def bundle_train_and_test_data(args, train_df, test_df): assert isinstance(train_df, pd.DataFrame) # assert isinstance(test_df, pd.DataFrame) # assert set(train_df.columns) == set(test_df.columns) assert all([isinstance(col_name, str) for col_name in train_df.columns]) index_name = args.index_name if index_name is not None: train_df = train_df.reset_index(drop=True).copy() train_df.index.name = index_name if test_df is not None: test_df = test_df.reset_index(drop=True).copy() test_df.index.name = index_name elif test_df is not None: assert train_df.index.name == test_df.index.name df = concat_train_test_df(args, train_df, test_df) return df
[docs]def impute_cols_features(args, df): non_feature_cols = [ args.col_treatment, args.col_outcome, args.col_propensity, args.col_cate, args.col_recommendation, ] args.cols_features = args.cols_features or get_cols_features( df, non_feature_cols=non_feature_cols ) return args
[docs]def compute_cate(proba_treated, proba_untreated): cate_estimated = proba_treated - proba_untreated return cate_estimated
[docs]def add_cate_to_df(args, df, cate_estimated, proba_treated, proba_untreated): df.loc[:, args.col_proba_if_treated] = proba_treated df.loc[:, args.col_proba_if_untreated] = proba_untreated df.loc[:, args.col_cate] = cate_estimated.values return df
[docs]def recommend_by_cate(args, df, treatment_fractions): cate_series = df[args.col_cate] def recommendation(cate_series, treatment_fraction): rank_series = cate_series.rank(method="first", ascending=False, pct=True) r = np.where(rank_series <= treatment_fraction, 1.0, 0.0) return r recommendation_train = recommendation( cate_series.xs("train"), treatment_fractions.train ) recommendation_test = recommendation( cate_series.xs("test"), treatment_fractions.test ) df.loc[:, args.col_recommendation] = concat_train_test( args, recommendation_train, recommendation_test ) return df
[docs]def estimate_effect(args, sim_treated_df, sim_untreated_df): estimated_effect_df = pd.DataFrame() estimated_effect_df["# samples"] = ( sim_treated_df["# samples chosen"] + sim_untreated_df["# samples chosen"] ) ## Original (without uplift model) estimated_effect_df["observed CVR"] = ( sim_treated_df["# samples chosen"] * sim_treated_df["observed CVR"] + sim_untreated_df["# samples chosen"] * sim_untreated_df["observed CVR"] ) / (sim_treated_df["# samples chosen"] + sim_untreated_df["# samples chosen"]) ## Recommended (with uplift model) estimated_effect_df["predicted CVR"] = ( sim_treated_df["# samples recommended"] * sim_treated_df["predicted CVR"] + sim_untreated_df["# samples recommended"] * sim_untreated_df["predicted CVR"] ) / ( sim_treated_df["# samples recommended"] + sim_untreated_df["# samples recommended"] ) estimated_effect_df["pred/obs CVR"] = ( estimated_effect_df["predicted CVR"] / estimated_effect_df["observed CVR"] ) verbose = args.verbose if verbose >= 2: log.info( "\n## Overall simulated effect of recommendation based on the uplift modeling:" ) apply_method(estimated_effect_df, args.df_print) return estimated_effect_df
[docs]def initialize_model( args, # type: Type[EasyDict] model_key="uplift_model_params", # type: str default_estimator="sklearn.linear_model.LogisticRegression", # type: str ): # type: (...) -> Type[sklearn.base.BaseEstimator] if not isinstance(args[model_key], dict): model = args[model_key] return model model_params = args[model_key].copy() if not model_params.get("estimator"): model_params["estimator"] = default_estimator estimator_str = model_params.pop("estimator") estimator_obj = load_obj(estimator_str) const_params = ( (model_params.pop("const_params") or dict()) if "const_params" in model_params else dict() ) if not model_params.get("search_cv"): const_params.update(model_params) model = estimator_obj(**const_params) return model search_cv_str = model_params.pop("search_cv") search_cv_obj = load_obj(search_cv_str) model_params["estimator"] = estimator_obj(**const_params) model = search_cv_obj(**model_params) return model
[docs]def apply_method(obj, method, **kwargs): if isinstance(method, str): func = getattr(obj, method) out = func(**kwargs) if isinstance(out, str): print(out) return out else: return method(obj, **kwargs)