import logging
import pandas as pd
from .utils import * # NOQA
log = logging.getLogger(__name__)
[docs]class ModelForTreatedOrUntreated:
[docs] def __init__(self, treatment_val=1.0):
assert treatment_val in {0.0, 1.0}
self.treatment_val = treatment_val
self.treatment_label = "treated" if treatment_val else "untreated"
[docs] def fit(self, args, df_):
assert isinstance(df_, pd.DataFrame)
treatment_val = self.treatment_val
if args.verbose >= 2:
log.info("\n\n## Model for Treatment = {}".format(treatment_val))
df = df_.query("{}=={}".format(args.col_treatment, treatment_val)).copy()
X_train = df.xs("train")[args.cols_features]
y_train = df.xs("train")[args.col_outcome]
model = initialize_model(args, model_key="uplift_model_params")
if args.enable_ipw and (args.col_propensity in df.xs("train").columns):
propensity = df.xs("train")[args.col_propensity]
# avoid propensity near 0 or 1 which will result in too large weight
if propensity.min() < args.min_propensity and args.verbose >= 2:
log.warning(
"[Warning] Propensity scores below {} were clipped.".format(
args.min_propensity
)
)
if propensity.max() > args.max_propensity and args.verbose >= 2:
log.warning(
"[Warning] Propensity scores above {} were clipped.".format(
args.max_propensity
)
)
propensity.clip(
lower=args.min_propensity, upper=args.max_propensity, inplace=True
)
sample_weight = (
(1 / propensity) if treatment_val == 1.0 else (1 / (1 - propensity))
)
model.fit(X_train, y_train, sample_weight=sample_weight)
elif args.enable_weighting and (args.col_weight in df.xs("train").columns):
sample_weight = df.xs("train")[args.col_weight]
model.fit(X_train, y_train, sample_weight=sample_weight)
else:
model.fit(X_train, y_train)
best_estimator = (
model.best_estimator_ if hasattr(model, "best_estimator_") else model
)
estimator_params = best_estimator.get_params()
if "steps" in estimator_params:
best_estimator = estimator_params["steps"][-1][1]
estimator_params = best_estimator.get_params()
if args.verbose >= 3:
log.info(
"### Best parameters of the model trained using samples "
"with observational Treatment: {} \n {}".format(
treatment_val, estimator_params
)
)
if args.verbose >= 2:
if hasattr(estimator_params, "feature_importances_"):
fi_df = pd.DataFrame(
estimator_params.feature_importances_.reshape(1, -1),
index=["feature importance"],
)
log.info(
"\n### Feature importances of the model trained using samples "
"with observational Treatment: {}".format(treatment_val)
)
apply_method(fi_df, args.df_print)
else:
log.info("## Feature importances not available.")
y_pred_train = model.predict(X_train)
y_test = None
y_pred_test = None
if "test" not in df.index:
log.warn(
"Samples for test not found. Metrics for test dataset will not be computed."
)
else:
test_df = df.xs("test")
if args.col_outcome not in test_df.columns:
log.warn(
"Column for outcome not found. Metrics for test dataset will not be computed."
)
else:
for col in args.cols_features:
if col not in test_df.columns:
error_str = "{} not found in test dataframe columns: {}.".format(
col, test_df.columns
)
log.error(error_str)
raise ValueError(error_str)
X_test = test_df[args.cols_features]
y_test = test_df[args.col_outcome]
y_pred_test = model.predict(X_test)
score_original_treatment_df = score_df(
y_train, y_test, y_pred_train, y_pred_test, average="binary"
)
if args.verbose >= 3:
log.info(
"\n### Outcome estimated by the model trained using samples "
"with observational Treatment: {}".format(treatment_val)
)
apply_method(score_original_treatment_df, args.df_print)
model_dict = dict(model=model, eval_df=score_original_treatment_df)
return model_dict
[docs] def predict_proba(self, args, df_, models_dict):
model = models_dict[self.treatment_label]["model"]
cols_features = args.cols_features
X_train = df_.xs("train")[cols_features]
X_test = df_.xs("test")[cols_features]
y_pred_train = model.predict_proba(X_train)[:, 1]
y_pred_test = model.predict_proba(X_test)[:, 1]
return concat_train_test(args, y_pred_train, y_pred_test)
# X = df_[cols_features]
# y_pred = model.predict_proba(X)[:, 1]
# return y_pred
[docs] def simulate_recommendation(self, args, df_, models_dict):
model = models_dict[self.treatment_label]["model"]
score_original_treatment_df = models_dict[self.treatment_label]["eval_df"]
treatment_val = self.treatment_val
verbose = args.verbose
df = df_.query("{}=={}".format(args.col_recommendation, treatment_val)).copy()
X_train = df.xs("train")[args.cols_features]
y_train = df.xs("train")[args.col_outcome]
y_pred_train = model.predict(X_train)
y_test = None
y_pred_test = None
if "test" not in df.index:
log.warn(
"Samples for test not found. Metrics for test dataset will not be computed."
)
else:
test_df = df.xs("test")
if args.col_outcome not in test_df.columns:
log.warn(
"Column for outcome not found. Metrics for test dataset will not be computed."
)
else:
for col in args.cols_features:
if col not in test_df.columns:
error_str = "{} not found in test dataframe columns: {}.".format(
col, test_df.columns
)
log.error(error_str)
raise ValueError(error_str)
X_test = test_df[args.cols_features]
y_test = test_df[args.col_outcome]
y_pred_test = model.predict(X_test)
score_recommended_treatment_df = score_df(
y_train, y_test, y_pred_train, y_pred_test, average="binary"
)
if verbose >= 3:
log.info(
"\n### Simulated outcome for samples with recommended treatment = {}:".format(
treatment_val
)
)
apply_method(score_recommended_treatment_df, args.df_print)
out_df = pd.DataFrame(index=["train", "test"])
out_df.index.name = "partition"
out_df["# samples chosen"] = score_original_treatment_df[["# samples"]]
out_df["observed CVR"] = score_original_treatment_df[["observed CVR"]]
out_df["# samples recommended"] = score_recommended_treatment_df[["# samples"]]
out_df["predicted CVR"] = score_recommended_treatment_df[["predicted CVR"]]
out_df["pred/obs CVR"] = out_df["predicted CVR"] / out_df["observed CVR"]
if verbose >= 3:
log.info(
"\n### Simulated effect for samples with recommended treatment = {}:".format(
treatment_val
)
)
apply_method(out_df, args.df_print)
return out_df
[docs]class ModelForTreated(ModelForTreatedOrUntreated):
[docs] def __init__(self, *posargs, **kwargs):
kwargs.update(treatment_val=1.0)
super().__init__(*posargs, **kwargs)
[docs]class ModelForUntreated(ModelForTreatedOrUntreated):
[docs] def __init__(self, *posargs, **kwargs):
kwargs.update(treatment_val=0.0)
super().__init__(*posargs, **kwargs)
[docs]def model_for_treated_fit(*posargs, **kwargs):
return ModelForTreated().fit(*posargs, **kwargs)
[docs]def model_for_treated_predict_proba(*posargs, **kwargs):
return ModelForTreated().predict_proba(*posargs, **kwargs)
[docs]def model_for_treated_simulate_recommendation(*posargs, **kwargs):
return ModelForTreated().simulate_recommendation(*posargs, **kwargs)
[docs]def model_for_untreated_fit(*posargs, **kwargs):
return ModelForUntreated().fit(*posargs, **kwargs)
[docs]def model_for_untreated_predict_proba(*posargs, **kwargs):
return ModelForUntreated().predict_proba(*posargs, **kwargs)
[docs]def model_for_untreated_simulate_recommendation(*posargs, **kwargs):
return ModelForUntreated().simulate_recommendation(*posargs, **kwargs)
[docs]def bundle_treated_and_untreated_models(treated_model, untreated_model):
models_dict = dict(treated=treated_model, untreated=untreated_model)
return models_dict