Source code for pimmslearn.sklearn.ae_transformer
"""Scikit-learn style interface for Denoising and Variational Autoencoder model."""
from __future__ import annotations
from pathlib import Path
from typing import Optional
import pandas as pd
import sklearn
from fastai import learner
from fastai.basics import *
from fastai.callback.all import *
from fastai.callback.tracker import EarlyStoppingCallback
from fastai.learner import Learner
from fastai.losses import MSELossFlat
from fastai.torch_basics import *
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler
from sklearn.utils.validation import check_is_fitted
import pimmslearn.models as models
# patch plotting function
from pimmslearn.models import ae, plot_loss
learner.Recorder.plot_loss = plot_loss
default_pipeline = sklearn.pipeline.Pipeline(
[("normalize", StandardScaler()), ("impute", SimpleImputer(add_indicator=False))]
)
[docs]
class AETransformer(TransformerMixin, BaseEstimator):
"""Autoencoder transformer (Denoising or Variational).
Autoencoder transformer which can be used to impute missing values
in a dataset it is fitted to. The data is standard normalized
for fitting the model, but imputations are provided on the original scale
after internally fitting the model.
The data uses the wide data format with samples as rows and features as columns.
Parameters
----------
hidden_layers : list[int]
Architecture for encoder. Decoder is mirrored.
dim_latent : int, optional
Hidden space dimension, by default 15
out_folder : str, optional
Output folder for model, by default '.'
model : str, optional
Model type ("VAE", "DAE"), by default 'VAE'
batch_size : int, optional
Batch size for training, by default 64
"""
def __init__(
self,
hidden_layers: list[int],
latent_dim: int = 15,
out_folder: str = ".",
model="VAE",
# y_range:Optional[tuple[int]]=None,
batch_size: int = 64,
):
self.hidden_layers = hidden_layers
self.latent_dim = latent_dim
self.batch_size = batch_size
self.out_folder = Path(out_folder)
self.out_folder.mkdir(exist_ok=True, parents=True)
if model == "VAE":
self.model = models.vae.VAE
self.cbs = [ae.ModelAdapterVAE()]
self.loss_fct = models.vae.loss_fct
elif model == "DAE":
self.model = ae.Autoencoder
self.cbs = [ae.ModelAdapter(p=0.2)]
self.loss_fct = MSELossFlat(reduction="sum")
else:
raise ValueError(f'Unknown model {model}, choose either "VAE" or "DAE"')
self.model_name = model
[docs]
def fit(
self,
X: pd.DataFrame,
y: pd.DataFrame = None,
epochs_max: int = 100,
cuda: bool = True,
patience: Optional[int] = None,
):
"""Fit the model to the data.
Parameters
----------
X : pd.DataFrame
training data of dimension N_samples x M_features
y : pd.DataFrame, optional
validation data points which are missing in X of dimension
N_sample x M_features, by default None
epochs_max : int, optional
Maximal number of epochs to train, by default 100
cuda : bool, optional
If the model should be trained with an accelerator, by default True
patience : Optional[int], optional
If added, early stopping is added with specified patience, by default None
Returns
-------
AETransformer
Return itself fitted to the training data.
"""
self.analysis = ae.AutoEncoderAnalysis( # datasplits=data,
train_df=X,
val_df=y,
model=self.model,
model_kwargs=dict(
n_features=X.shape[-1],
n_neurons=self.hidden_layers,
last_decoder_activation=None,
dim_latent=self.latent_dim,
),
transform=default_pipeline,
decode=["normalize"],
bs=self.batch_size,
)
self.n_params = self.analysis.n_params_ae
if cuda:
self.analysis.model = self.analysis.model.cuda()
cbs = self.cbs
if patience is not None:
cbs = [*self.cbs, EarlyStoppingCallback(patience=patience)]
self.analysis.learn = Learner(
dls=self.analysis.dls,
model=self.analysis.model,
loss_func=self.loss_fct,
cbs=cbs,
)
suggested_lr = self.analysis.learn.lr_find()
self.analysis.params["suggested_inital_lr"] = suggested_lr.valley
self.analysis.learn.fit_one_cycle(epochs_max, lr_max=suggested_lr.valley)
self.epochs_trained_ = self.analysis.learn.epoch + 1
N_train_notna = X.notna().sum().sum()
N_val_notna = None
if y is not None:
N_val_notna = y.notna().sum().sum()
self.fig_loss_ = models.plot_training_losses(
self.analysis.learn,
self.model_name,
folder=self.out_folder,
norm_factors=[N_train_notna, N_val_notna],
)
return self
[docs]
def transform(self, X):
"""Impute the data using the trained model.
Parameters
----------
X : pd.DataFrame
The data to be imputed, shape (N_samples, N_features).
Returns
-------
X_transformed : array, shape (N_samples, M_features)
Return the imputed DataFrame using the model.
"""
# Check is fit had been called
check_is_fitted(self, "epochs_trained_")
self.analysis.model.eval()
pred, target = ae.get_preds_from_df(
df=X,
learn=self.analysis.learn,
position_pred_tuple=0,
transformer=self.analysis.transform,
)
return X.fillna(pred)