from typing import Any, Dict, Type # NOQA
import logging
from easydict import EasyDict
from kedro.utils import load_obj
import numpy as np
import pandas as pd
import sklearn # NOQA
from sklearn.metrics import (
accuracy_score,
confusion_matrix,
f1_score,
precision_score,
recall_score,
roc_auc_score,
)
log = logging.getLogger(__name__)
[docs]def get_cols_features(
df,
non_feature_cols=[
"Treatment",
"Outcome",
"TransformedOutcome",
"Propensity",
"Recommendation",
],
):
return [column for column in df.columns if column not in non_feature_cols]
[docs]def concat_train_test(args, train, test):
r"""
Concatenate train and test series.
Use series.xs('train') or series.xs('test') to split
"""
if test is None:
series = pd.concat(
[pd.Series(train)],
keys=["train"],
names=[args.partition_name, args.index_name],
)
else:
series = pd.concat(
[pd.Series(train), pd.Series(test)],
keys=["train", "test"],
names=[args.partition_name, args.index_name],
)
return series
[docs]def concat_train_test_df(args, train, test):
r"""
Concatenate train and test data frames.
Use df.xs('train') or df.xs('test') to split.
"""
df = pd.concat(
[train, test],
keys=["train", "test"],
names=[args.partition_name, args.index_name],
)
return df
[docs]def len_t(df, treatment=1.0, col_treatment="Treatment"):
return df.query("{}=={}".format(col_treatment, treatment)).shape[0]
[docs]def len_o(df, outcome=1.0, col_outcome="Outcome"):
return df.query("{}=={}".format(col_outcome, outcome)).shape[0]
[docs]def len_to(
df, treatment=1.0, outcome=1.0, col_treatment="Treatment", col_outcome="Outcome"
):
len_ = df.query(
"{}=={} & {}=={}".format(col_treatment, treatment, col_outcome, outcome)
).shape[0]
return len_
[docs]def treatment_fraction_(df, col_treatment="Treatment"):
return len_t(df, col_treatment=col_treatment) / len(df)
[docs]def treatment_fractions_(
args, # type: Dict[str, Any]
df, # type: Type[pd.DataFrame]
):
# type: (...) -> Type[EasyDict]
col_treatment = args.col_treatment
treatment_fractions = {
"train": treatment_fraction_(df.xs("train"), col_treatment=col_treatment),
"test": treatment_fraction_(df.xs("test"), col_treatment=col_treatment),
}
return EasyDict(treatment_fractions)
[docs]def outcome_fraction_(df, col_outcome="Outcome"):
return len_o(df, col_outcome=col_outcome) / len(df)
[docs]def overall_uplift_gain_(
df, treatment=1.0, outcome=1.0, col_treatment="Treatment", col_outcome="Outcome"
):
overall_uplift_gain = (
len_to(df, col_treatment=col_treatment, col_outcome=col_outcome)
/ len_t(df, col_treatment=col_treatment)
) - (
len_to(df, 0, 1, col_treatment=col_treatment, col_outcome=col_outcome)
/ len_t(df, 0, col_treatment=col_treatment)
)
return overall_uplift_gain
[docs]def gain_tuple(df_, r_):
treatment_fraction = treatment_fraction_(df_)
outcome_fraction = outcome_fraction_(df_)
overall_uplift_gain = overall_uplift_gain_(df_)
cgain = np.interp(treatment_fraction, r_.cgains_x, r_.cgains_y)
cgain_base = overall_uplift_gain * treatment_fraction
cgain_factor = cgain / cgain_base
return (
treatment_fraction,
outcome_fraction,
overall_uplift_gain,
cgain,
cgain_base,
cgain_factor,
r_.Q_cgains,
r_.q1_cgains,
r_.q2_cgains,
)
[docs]def score_df(y_train, y_test, y_pred_train, y_pred_test, average="binary"):
if (
y_train is not None
and y_pred_train is not None
and len(y_train) != len(y_pred_train)
):
raise Exception("Lengths of true and predicted for train do not match.")
if (
y_test is not None
and y_pred_test is not None
and len(y_test) != len(y_pred_test)
):
raise Exception("Lengths of true and predicted for test do not match.")
score_df = pd.DataFrame()
for (partition_, y_, y_pred_) in [
("train", y_train, y_pred_train),
("test", y_test, y_pred_test),
]:
if (
y_ is not None
and y_pred_ is not None
and (0 <= y_).all()
and (y_ <= 1).all()
and (0 <= y_pred_).all()
and (y_pred_ <= 1).all()
):
num_classes = pd.Series(y_).nunique()
score_list = [
len(y_),
pd.Series(y_).nunique(),
accuracy_score(y_, y_pred_),
precision_score(y_, y_pred_, average=average),
recall_score(y_, y_pred_, average=average),
f1_score(y_, y_pred_, average=average),
] + (
[
roc_auc_score(y_, y_pred_),
pd.Series(y_).mean(),
pd.Series(y_pred_).mean(),
]
if num_classes == 2
else []
)
column_list = [
"# samples",
"# classes",
"accuracy",
"precision",
"recall",
"f1",
] + (
["roc_auc", "observed CVR", "predicted CVR"] if num_classes == 2 else []
)
score_df_ = pd.DataFrame(
[score_list], index=[partition_], columns=column_list,
)
score_df = score_df.append(score_df_)
return score_df
[docs]def conf_mat_df(y_true, y_pred):
conf_mat = confusion_matrix(y_true, y_pred)
num_class = len(conf_mat)
true_labels = ["True_{}".format(i) for i in range(num_class)]
pred_labels = ["Pred_{}".format(i) for i in range(num_class)]
conf_mat_df = pd.DataFrame(conf_mat, index=true_labels, columns=pred_labels)
return conf_mat_df
[docs]def bundle_train_and_test_data(args, train_df, test_df):
assert isinstance(train_df, pd.DataFrame)
# assert isinstance(test_df, pd.DataFrame)
# assert set(train_df.columns) == set(test_df.columns)
assert all([isinstance(col_name, str) for col_name in train_df.columns])
index_name = args.index_name
if index_name is not None:
train_df = train_df.reset_index(drop=True).copy()
train_df.index.name = index_name
if test_df is not None:
test_df = test_df.reset_index(drop=True).copy()
test_df.index.name = index_name
elif test_df is not None:
assert train_df.index.name == test_df.index.name
df = concat_train_test_df(args, train_df, test_df)
return df
[docs]def impute_cols_features(args, df):
non_feature_cols = [
args.col_treatment,
args.col_outcome,
args.col_propensity,
args.col_cate,
args.col_recommendation,
]
args.cols_features = args.cols_features or get_cols_features(
df, non_feature_cols=non_feature_cols
)
return args
[docs]def compute_cate(proba_treated, proba_untreated):
cate_estimated = proba_treated - proba_untreated
return cate_estimated
[docs]def add_cate_to_df(args, df, cate_estimated, proba_treated, proba_untreated):
df.loc[:, args.col_proba_if_treated] = proba_treated
df.loc[:, args.col_proba_if_untreated] = proba_untreated
df.loc[:, args.col_cate] = cate_estimated.values
return df
[docs]def recommend_by_cate(args, df, treatment_fractions):
cate_series = df[args.col_cate]
def recommendation(cate_series, treatment_fraction):
rank_series = cate_series.rank(method="first", ascending=False, pct=True)
r = np.where(rank_series <= treatment_fraction, 1.0, 0.0)
return r
recommendation_train = recommendation(
cate_series.xs("train"), treatment_fractions.train
)
recommendation_test = recommendation(
cate_series.xs("test"), treatment_fractions.test
)
df.loc[:, args.col_recommendation] = concat_train_test(
args, recommendation_train, recommendation_test
)
return df
[docs]def estimate_effect(args, sim_treated_df, sim_untreated_df):
estimated_effect_df = pd.DataFrame()
estimated_effect_df["# samples"] = (
sim_treated_df["# samples chosen"] + sim_untreated_df["# samples chosen"]
)
## Original (without uplift model)
estimated_effect_df["observed CVR"] = (
sim_treated_df["# samples chosen"] * sim_treated_df["observed CVR"]
+ sim_untreated_df["# samples chosen"] * sim_untreated_df["observed CVR"]
) / (sim_treated_df["# samples chosen"] + sim_untreated_df["# samples chosen"])
## Recommended (with uplift model)
estimated_effect_df["predicted CVR"] = (
sim_treated_df["# samples recommended"] * sim_treated_df["predicted CVR"]
+ sim_untreated_df["# samples recommended"] * sim_untreated_df["predicted CVR"]
) / (
sim_treated_df["# samples recommended"]
+ sim_untreated_df["# samples recommended"]
)
estimated_effect_df["pred/obs CVR"] = (
estimated_effect_df["predicted CVR"] / estimated_effect_df["observed CVR"]
)
verbose = args.verbose
if verbose >= 2:
log.info(
"\n## Overall simulated effect of recommendation based on the uplift modeling:"
)
apply_method(estimated_effect_df, args.df_print)
return estimated_effect_df
[docs]def initialize_model(
args, # type: Type[EasyDict]
model_key="uplift_model_params", # type: str
default_estimator="sklearn.linear_model.LogisticRegression", # type: str
):
# type: (...) -> Type[sklearn.base.BaseEstimator]
if not isinstance(args[model_key], dict):
model = args[model_key]
return model
model_params = args[model_key].copy()
if not model_params.get("estimator"):
model_params["estimator"] = default_estimator
estimator_str = model_params.pop("estimator")
estimator_obj = load_obj(estimator_str)
const_params = (
(model_params.pop("const_params") or dict())
if "const_params" in model_params
else dict()
)
if not model_params.get("search_cv"):
const_params.update(model_params)
model = estimator_obj(**const_params)
return model
search_cv_str = model_params.pop("search_cv")
search_cv_obj = load_obj(search_cv_str)
model_params["estimator"] = estimator_obj(**const_params)
model = search_cv_obj(**model_params)
return model
[docs]def apply_method(obj, method, **kwargs):
if isinstance(method, str):
func = getattr(obj, method)
out = func(**kwargs)
if isinstance(out, str):
print(out)
return out
else:
return method(obj, **kwargs)