Source code for causallift.causal_lift

from typing import List, Optional, Tuple, Type  # NOQA

from kedro.io import AbstractDataSet, MemoryDataSet

try:
    from kedro.io import CSVLocalDataSet
except:
    from kedro.extras.datasets.pandas.csv_dataset import CSVDataSet as CSVLocalDataSet

try:
    from kedro.io import PickleLocalDataSet
except:
    from kedro.extras.datasets.pickle.pickle_dataset import (
        PickleDataSet as PickleLocalDataSet,
    )


import numpy as np
import sklearn  # NOQA
import logging
from IPython.display import display

from causallift.context.flexible_context import *  # NOQA

from .nodes.estimate_propensity import *  # NOQA
from .nodes.model_for_each import *  # NOQA


log = logging.getLogger(__name__)


[docs]class CausalLift: """ Set up datasets for uplift modeling. Optionally, propensity scores are estimated based on logistic regression. args: train_df: Pandas Data Frame containing samples used for training test_df: Pandas Data Frame containing samples used for testing cols_features: List of column names used as features. If :obj:`None` (default), all the columns except for outcome, propensity, CATE, and recommendation. col_treatment: Name of treatment column. 'Treatment' in default. col_outcome: Name of outcome column. 'Outcome' in default. col_propensity: Name of propensity column. 'Propensity' in default. col_cate: Name of CATE (Conditional Average Treatment Effect) column. 'CATE' in default. col_recommendation: Name of recommendation column. 'Recommendation' in default. col_weight: Name of weight column. 'Weight' in default. min_propensity: Minimum propensity score. 0.01 in default. max_propensity: Maximum propensity score. 0.99 in defualt. verbose: How much info to show. Valid values are: * :obj:`0` to show nothing * :obj:`1` to show only warning * :obj:`2` (default) to show useful info * :obj:`3` to show more info uplift_model_params: Parameters used to fit 2 XGBoost classifier models. * Optionally use `search_cv` key to specify the Search CV class name. \n e.g. `sklearn.model_selection.GridSearchCV` \n Refer to https://scikit-learn.org/stable/modules/generated/sklearn.model_selection.GridSearchCV.html * Use `estimator` key to specify the estimator class name. \n e.g. `xgboost.XGBClassifier` \n Refer to https://xgboost.readthedocs.io/en/latest/parameter.html * Optionally use `const_params` key to specify the constant parameters to \ construct the estimator. If :obj:`None` (default):: dict( search_cv="sklearn.model_selection.GridSearchCV", estimator="xgboost.XGBClassifier", scoring=None, cv=3, return_train_score=False, n_jobs=-1, param_grid=dict( max_depth=[3], learning_rate=[0.1], n_estimators=[100], verbose=[0], objective=["binary:logistic"], booster=["gbtree"], n_jobs=[-1], nthread=[None], gamma=[0], min_child_weight=[1], max_delta_step=[0], subsample=[1], colsample_bytree=[1], colsample_bylevel=[1], reg_alpha=[0], reg_lambda=[1], scale_pos_weight=[1], base_score=[0.5], missing=[None], ), ) Alternatively, estimator model object is acceptable. The object must have the following methods compatible with scikit-learn estimator interface. * :func:`fit` * :func:`predict` * :func:`predict_proba` enable_ipw: Enable Inverse Probability Weighting based on the estimated propensity score. True in default. enable_weighting: Enable Weighting. False in default. propensity_model_params: Parameters used to fit logistic regression model to estimate propensity score. * Optionally use `search_cv` key to specify the Search CV class name.\n e.g. `sklearn.model_selection.GridSearchCV` \n Refer to https://scikit-learn.org/stable/modules/generated/sklearn.model_selection.GridSearchCV.html * Use `estimator` key to specify the estimator class name. \n e.g. `sklearn.linear_model.LogisticRegression` \n Refer to https://scikit-learn.org/stable/modules/generated/sklearn.linear_model.LogisticRegression.html * Optionally use `const_params` key to specify the constant parameters \ to construct the estimator. If :obj:`None` (default):: dict( search_cv="sklearn.model_selection.GridSearchCV", estimator="sklearn.linear_model.LogisticRegression", scoring=None, cv=3, return_train_score=False, n_jobs=-1, param_grid=dict( C=[0.1, 1, 10], class_weight=[None], dual=[False], fit_intercept=[True], intercept_scaling=[1], max_iter=[100], multi_class=["ovr"], n_jobs=[1], penalty=["l1", "l2"], solver=["liblinear"], tol=[0.0001], warm_start=[False], ), ) index_name: Index name of the pandas data frame after resetting the index. 'index' in default. \n If :obj:`None`, the index will not be reset. partition_name: Additional index name to indicate the partition, train or test. 'partition' in default. runner: If set to 'SequentialRunner' (default) or 'ParallelRunner', the pipeline is run by Kedro sequentially or in parallel, respectively. \n If set to :obj:`None` , the pipeline is run by native Python. \n Refer to https://kedro.readthedocs.io/en/latest/04_user_guide/05_nodes_and_pipelines.html#runners conditionally_skip: *[Effective only if runner is set to either 'SequentialRunner' or 'ParallelRunner']* \n Skip running the pipeline if the output files already exist. False in default. df_print: callable to use to show output data frames. IPython.display.display in default. dataset_catalog: *[Effective only if runner is set to either 'SequentialRunner' or 'ParallelRunner']* \n Specify dataset files to save in Dict[str, kedro.io.AbstractDataSet] format. \n To find available file formats, refer to https://kedro.readthedocs.io/en/latest/kedro.io.html#data-sets \n In default:: dict( # args_raw = CSVLocalDataSet(filepath='../data/01_raw/args_raw.csv', version=None), # train_df = CSVLocalDataSet(filepath='../data/01_raw/train_df.csv', version=None), # test_df = CSVLocalDataSet(filepath='../data/01_raw/test_df.csv', version=None), propensity_model = PickleLocalDataSet( filepath='../data/06_models/propensity_model.pickle', version=None ), uplift_models_dict = PickleLocalDataSet( filepath='../data/06_models/uplift_models_dict.pickle', version=None ), df_03 = CSVLocalDataSet( filepath='../data/07_model_output/df.csv', load_args=dict(index_col=['partition', 'index'], float_precision='high'), save_args=dict(index=True, float_format='%.16e'), version=None, ), treated__sim_eval_df = CSVLocalDataSet( filepath='../data/08_reporting/treated__sim_eval_df.csv', version=None, ), untreated__sim_eval_df = CSVLocalDataSet( filepath='../data/08_reporting/untreated__sim_eval_df.csv', version=None, ), estimated_effect_df = CSVLocalDataSet( filepath='../data/08_reporting/estimated_effect_df.csv', version=None, ), ) logging_config: Specify logging configuration. \n Refer to https://docs.python.org/3.6/library/logging.config.html#logging-config-dictschema \n In default:: {'disable_existing_loggers': False, 'formatters': { 'json_formatter': { 'class': 'pythonjsonlogger.jsonlogger.JsonFormatter', 'format': '[%(asctime)s|%(name)s|%(funcName)s|%(levelname)s] %(message)s', }, 'simple': { 'format': '[%(asctime)s|%(name)s|%(levelname)s] %(message)s', }, }, 'handlers': { 'console': { 'class': 'logging.StreamHandler', 'formatter': 'simple', 'level': 'INFO', 'stream': 'ext://sys.stdout', }, 'info_file_handler': { 'class': 'logging.handlers.RotatingFileHandler', 'level': 'INFO', 'formatter': 'simple', 'filename': './info.log', 'maxBytes': 10485760, # 10MB 'backupCount': 20, 'encoding': 'utf8', 'delay': True, }, 'error_file_handler': { 'class': 'logging.handlers.RotatingFileHandler', 'level': 'ERROR', 'formatter': 'simple', 'filename': './errors.log', 'maxBytes': 10485760, # 10MB 'backupCount': 20, 'encoding': 'utf8', 'delay': True, }, }, 'loggers': { 'anyconfig': { 'handlers': ['console', 'info_file_handler', 'error_file_handler'], 'level': 'WARNING', 'propagate': False, }, 'kedro.io': { 'handlers': ['console', 'info_file_handler', 'error_file_handler'], 'level': 'WARNING', 'propagate': False, }, 'kedro.pipeline': { 'handlers': ['console', 'info_file_handler', 'error_file_handler'], 'level': 'INFO', 'propagate': False, }, 'kedro.runner': { 'handlers': ['console', 'info_file_handler', 'error_file_handler'], 'level': 'INFO', 'propagate': False, }, 'causallift': { 'handlers': ['console', 'info_file_handler', 'error_file_handler'], 'level': 'INFO', 'propagate': False, }, }, 'root': { 'handlers': ['console', 'info_file_handler', 'error_file_handler'], 'level': 'INFO', }, 'version': 1} """
[docs] def __init__( self, train_df=None, # type: Optional[pd.DataFrame] test_df=None, # type: Optional[pd.DataFrame] cols_features=None, # type: Optional[List[str]] col_treatment="Treatment", # type: str col_outcome="Outcome", # type: str col_propensity="Propensity", # type: str col_proba_if_treated="Proba_if_Treated", # type: str col_proba_if_untreated="Proba_if_Untreated", # type: str col_cate="CATE", # type: str col_recommendation="Recommendation", # type: str col_weight="Weight", # type: str min_propensity=0.01, # type: float max_propensity=0.99, # type: float verbose=2, # type: int uplift_model_params=dict( search_cv="sklearn.model_selection.GridSearchCV", estimator="xgboost.XGBClassifier", scoring=None, cv=3, return_train_score=False, n_jobs=-1, param_grid=dict( random_state=[0], max_depth=[3], learning_rate=[0.1], n_estimators=[100], verbose=[0], objective=["binary:logistic"], booster=["gbtree"], n_jobs=[-1], nthread=[None], gamma=[0], min_child_weight=[1], max_delta_step=[0], subsample=[1], colsample_bytree=[1], colsample_bylevel=[1], reg_alpha=[0], reg_lambda=[1], scale_pos_weight=[1], base_score=[0.5], missing=[None], ), ), # type: Union[Dict[str, List[Any]], Type[sklearn.base.BaseEstimator]] enable_ipw=True, # type: bool enable_weighting=False, # type: bool propensity_model_params=dict( search_cv="sklearn.model_selection.GridSearchCV", estimator="sklearn.linear_model.LogisticRegression", scoring=None, cv=3, return_train_score=False, n_jobs=-1, param_grid=dict( random_state=[0], C=[0.1, 1, 10], class_weight=[None], dual=[False], fit_intercept=[True], intercept_scaling=[1], max_iter=[100], multi_class=["ovr"], n_jobs=[1], penalty=["l1", "l2"], solver=["liblinear"], tol=[0.0001], warm_start=[False], ), ), # type: Dict[str, List[Any]] index_name="index", # type: str partition_name="partition", # type: str runner="SequentialRunner", # type: str conditionally_skip=False, # type: bool df_print=display, dataset_catalog=dict( # args_raw = CSVLocalDataSet(filepath='../data/01_raw/args_raw.csv', version=None), # train_df = CSVLocalDataSet(filepath='../data/01_raw/train_df.csv', version=None), # test_df = CSVLocalDataSet(filepath='../data/01_raw/test_df.csv', version=None), propensity_model=PickleLocalDataSet( filepath="../data/06_models/propensity_model.pickle", version=None ), uplift_models_dict=PickleLocalDataSet( filepath="../data/06_models/uplift_models_dict.pickle", version=None ), df_03=CSVLocalDataSet( filepath="../data/07_model_output/df.csv", load_args=dict( index_col=["partition", "index"], float_precision="high" ), save_args=dict(index=True, float_format="%.16e"), version=None, ), treated__sim_eval_df=CSVLocalDataSet( filepath="../data/08_reporting/treated__sim_eval_df.csv", load_args=dict(index_col=["partition"], float_precision="high"), save_args=dict(index=True, float_format="%.16e"), version=None, ), untreated__sim_eval_df=CSVLocalDataSet( filepath="../data/08_reporting/untreated__sim_eval_df.csv", load_args=dict(index_col=["partition"], float_precision="high"), save_args=dict(index=True, float_format="%.16e"), version=None, ), estimated_effect_df=CSVLocalDataSet( filepath="../data/08_reporting/estimated_effect_df.csv", load_args=dict(index_col=["partition"], float_precision="high"), save_args=dict(index=True, float_format="%.16e"), version=None, ), ), # type: Dict[str, AbstractDataSet] logging_config={ "disable_existing_loggers": False, "formatters": { "json_formatter": { "class": "pythonjsonlogger.jsonlogger.JsonFormatter", "format": "[%(asctime)s|%(name)s|%(funcName)s|%(levelname)s] %(message)s", }, "simple": { "format": "[%(asctime)s|%(name)s|%(levelname)s] %(message)s" }, }, "handlers": { "console": { "class": "logging.StreamHandler", "formatter": "simple", "level": "INFO", "stream": "ext://sys.stdout", }, "info_file_handler": { "class": "logging.handlers.RotatingFileHandler", "level": "INFO", "formatter": "simple", "filename": "./info.log", "maxBytes": 10485760, # 10MB "backupCount": 20, "encoding": "utf8", "delay": True, }, "error_file_handler": { "class": "logging.handlers.RotatingFileHandler", "level": "ERROR", "formatter": "simple", "filename": "./errors.log", "maxBytes": 10485760, # 10MB "backupCount": 20, "encoding": "utf8", "delay": True, }, }, "loggers": { "anyconfig": { "handlers": ["console", "info_file_handler", "error_file_handler"], "level": "WARNING", "propagate": False, }, "kedro.io": { "handlers": ["console", "info_file_handler", "error_file_handler"], "level": "WARNING", "propagate": False, }, "kedro.pipeline": { "handlers": ["console", "info_file_handler", "error_file_handler"], "level": "INFO", "propagate": False, }, "kedro.runner": { "handlers": ["console", "info_file_handler", "error_file_handler"], "level": "INFO", "propagate": False, }, "causallift": { "handlers": ["console", "info_file_handler", "error_file_handler"], "level": "INFO", "propagate": False, }, }, "root": { "handlers": ["console", "info_file_handler", "error_file_handler"], "level": "INFO", }, "version": 1, }, # type: Optional[Dict[str, Any]] ): # type: (...) -> None self.runner = None # type: Optional[str] self.kedro_context = None # type: Optional[Type[FlexibleKedroContext]] self.args = None # type: Optional[Type[EasyDict]] self.train_df = None # type: Optional[Type[pd.DataFrame]] self.test_df = None # type: Optional[Type[pd.DataFrame]] self.df = None # type: Optional[Type[pd.DataFrame]] self.propensity_model = None # type: Optional[Type[sklearn.base.BaseEstimator]] self.uplift_models_dict = None # type: Optional[Type[EasyDict]] self.treatment_fractions = None # type: Optional[Type[EasyDict]] self.treatment_fraction_train = None # type: Optional[float] self.treatment_fraction_test = None # type: Optional[float] self.treated__proba = None # type: Optional[Type[np.array]] self.untreated__proba = None # type: Optional[Type[np.array]] self.cate_estimated = None # type: Optional[Type[pd.Series]] self.treated__sim_eval_df = None # type: Optional[Type[pd.DataFrame]] self.untreated__sim_eval_df = None # type: Optional[Type[pd.DataFrame]] self.estimated_effect_df = None # type: Optional[Type[pd.DataFrame]] # Instance attributes were defined above. if logging_config: logging.config.dictConfig(logging_config) args_raw = dict( cols_features=cols_features, col_treatment=col_treatment, col_outcome=col_outcome, col_propensity=col_propensity, col_proba_if_treated=col_proba_if_treated, col_proba_if_untreated=col_proba_if_untreated, col_cate=col_cate, col_recommendation=col_recommendation, col_weight=col_weight, min_propensity=min_propensity, max_propensity=max_propensity, verbose=verbose, uplift_model_params=uplift_model_params, enable_ipw=enable_ipw, enable_weighting=enable_weighting, propensity_model_params=propensity_model_params, index_name=index_name, partition_name=partition_name, runner=runner, conditionally_skip=conditionally_skip, df_print=df_print, ) args_raw = EasyDict(args_raw) args_raw.update(dataset_catalog.get("args_raw", MemoryDataSet({}).load())) assert args_raw.runner in {"SequentialRunner", "ParallelRunner", None} if args_raw.runner is None and args_raw.conditionally_skip: log.warning( "[Warning] conditionally_skip option is ignored since runner is None" ) self.kedro_context = FlexibleKedroContext( runner=args_raw.runner, only_missing=args_raw.conditionally_skip ) self.runner = args_raw.runner if self.runner is None: self.df = bundle_train_and_test_data(args_raw, train_df, test_df) self.args = impute_cols_features(args_raw, self.df) self.args = schedule_propensity_scoring(self.args, self.df) self.treatment_fractions = treatment_fractions_(self.args, self.df) if self.args.need_propensity_scoring: self.propensity_model = fit_propensity(self.args, self.df) self.df = estimate_propensity(self.args, self.df, self.propensity_model) if self.runner: self.kedro_context.catalog.add_feed_dict( { "train_df": MemoryDataSet(train_df), "test_df": MemoryDataSet(test_df), "args_raw": MemoryDataSet(args_raw), }, replace=True, ) self.kedro_context.catalog.add_feed_dict(dataset_catalog, replace=True) self.kedro_context.run(tags=["011_bundle_train_and_test_data"]) self.df = self.kedro_context.catalog.load("df_00") self.kedro_context.run( tags=[ "121_prepare_args", "131_treatment_fractions_", "141_initialize_model", ] ) self.args = self.kedro_context.catalog.load("args") self.treatment_fractions = self.kedro_context.catalog.load( "treatment_fractions" ) if self.args.need_propensity_scoring: self.kedro_context.run(tags=["211_fit_propensity"]) self.propensity_model = self.kedro_context.catalog.load( "propensity_model" ) self.kedro_context.run(tags=["221_estimate_propensity"]) self.df = self.kedro_context.catalog.load("df_01") else: self.kedro_context.catalog.add_feed_dict( {"df_01": MemoryDataSet(self.df)}, replace=True ) self.treatment_fraction_train = self.treatment_fractions.train self.treatment_fraction_test = self.treatment_fractions.test if self.args.verbose >= 3: log.info( "### Treatment fraction in train dataset: {}".format( self.treatment_fractions.train ) ) log.info( "### Treatment fraction in test dataset: {}".format( self.treatment_fractions.test ) ) self._separate_train_test()
def _separate_train_test(self): # type: (...) -> Tuple[pd.DataFrame, pd.DataFrame] self.train_df = self.df.xs("train") self.test_df = self.df.xs("test") return self.train_df, self.test_df
[docs] def estimate_cate_by_2_models(self): # type: (...) -> Tuple[pd.DataFrame, pd.DataFrame] r""" Estimate CATE (Conditional Average Treatment Effect) using 2 XGBoost classifier models. """ if self.runner is None: treated__model_dict = model_for_treated_fit(self.args, self.df) untreated__model_dict = model_for_untreated_fit(self.args, self.df) self.uplift_models_dict = bundle_treated_and_untreated_models( treated__model_dict, untreated__model_dict ) self.treated__proba = model_for_treated_predict_proba( self.args, self.df, self.uplift_models_dict ) self.untreated__proba = model_for_untreated_predict_proba( self.args, self.df, self.uplift_models_dict ) self.cate_estimated = compute_cate( self.treated__proba, self.untreated__proba ) self.df = add_cate_to_df( self.args, self.df, self.cate_estimated, self.treated__proba, self.untreated__proba, ) if self.runner: self.kedro_context.run(tags=["311_fit", "312_bundle_2_models"]) self.uplift_models_dict = self.kedro_context.catalog.load( "uplift_models_dict" ) self.kedro_context.run(tags=["321_predict_proba"]) self.treated__proba = self.kedro_context.catalog.load("treated__proba") self.untreated__proba = self.kedro_context.catalog.load("untreated__proba") self.kedro_context.run(tags=["411_compute_cate"]) self.cate_estimated = self.kedro_context.catalog.load("cate_estimated") self.kedro_context.run(tags=["421_add_cate_to_df"]) self.df = self.kedro_context.catalog.load("df_02") return self._separate_train_test()
[docs] def estimate_recommendation_impact( self, cate_estimated=None, # type: Optional[Type[pd.Series]] treatment_fraction_train=None, # type: Optional[float] treatment_fraction_test=None, # type: Optional[float] verbose=None, # type: Optional[int] ): # type: (...) -> Type[pd.DataFrame] r""" Estimate the impact of recommendation based on uplift modeling. args: cate_estimated: Pandas series containing the CATE. If :obj:`None` (default), use the ones calculated by estimate_cate_by_2_models method. treatment_fraction_train: The fraction of treatment in train dataset. If :obj:`None` (default), use the ones calculated by estimate_cate_by_2_models method. treatment_fraction_test: The fraction of treatment in test dataset. If :obj:`None` (default), use the ones calculated by estimate_cate_by_2_models method. verbose: How much info to show. If :obj:`None` (default), use the value set in the constructor. """ if cate_estimated is not None: self.cate_estimated = cate_estimated self.df.loc[:, self.args.col_cate] = cate_estimated.values self.treatment_fractions.train = ( treatment_fraction_train or self.treatment_fractions.train ) self.treatment_fractions.test = ( treatment_fraction_test or self.treatment_fractions.test ) verbose = verbose or self.args.verbose if self.runner is None: self.df = recommend_by_cate(self.args, self.df, self.treatment_fractions) self.treated__sim_eval_df = model_for_treated_simulate_recommendation( self.args, self.df, self.uplift_models_dict ) self.untreated__sim_eval_df = model_for_untreated_simulate_recommendation( self.args, self.df, self.uplift_models_dict ) self.estimated_effect_df = estimate_effect( self.args, self.treated__sim_eval_df, self.untreated__sim_eval_df ) if self.runner: # self.kedro_context.catalog.save('args', self.args) self.kedro_context.run(tags=["511_recommend_by_cate"]) self.df = self.kedro_context.catalog.load("df_03") self.kedro_context.run(tags=["521_simulate_recommendation"]) self.treated__sim_eval_df = self.kedro_context.catalog.load( "treated__sim_eval_df" ) self.untreated__sim_eval_df = self.kedro_context.catalog.load( "untreated__sim_eval_df" ) self.kedro_context.run(tags=["531_estimate_effect"]) self.estimated_effect_df = self.kedro_context.catalog.load( "estimated_effect_df" ) return self.estimated_effect_df