Source code for pimmslearn.sklearn.cf_transformer
"""Scikit-learn style interface for Collaborative Filtering model."""
from __future__ import annotations
from pathlib import Path
import matplotlib.pyplot as plt
import pandas as pd
from fastai import learner
from fastai.callback.tracker import EarlyStoppingCallback
from fastai.collab import *
from fastai.collab import EmbeddingDotBias, TabularCollab
from fastai.data.transforms import IndexSplitter
from fastai.learner import Learner
from fastai.losses import MSELossFlat
from fastai.tabular.all import *
from fastai.tabular.all import TransformBlock
from fastai.tabular.core import Categorify
from fastai.torch_core import default_device
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.utils.validation import check_is_fitted
import pimmslearn
import pimmslearn.models as models
# patch plotting function
from pimmslearn.models import collab, plot_loss
learner.Recorder.plot_loss = plot_loss
[docs]
class CollaborativeFilteringTransformer(TransformerMixin, BaseEstimator):
"""Collaborative Filtering transformer.
Collaborative filtering operates on long data specifying two identifiers
(sample and feature) with a quantitative value to predict. Therefore we need to specify
three columns. The sample and feature identifiers are embedded into a space which is
then used to predict the quantitative value.
The data is expected as a Series with a MultiIndex of the sample and feature identifiers,
and the quantitative value as its values.
Parameters
----------
target_column : str
Target column name to predict, e.g. intensity
item_column: str
Column name for features (items) to embed, e.g. peptides
sample_column: str
Sample column name, e.g. Sample_ID
n_factors : int, optional
number of dimension of item and sample embeddings, by default 15
out_folder : str, optional
Output folder for model, by default '.'
batch_size : int, optional
Batch size for training of data in long format, by default 4096
"""
def __init__(
self,
target_column: str,
sample_column: str,
item_column: str,
n_factors: int = 15,
out_folder: str = ".",
batch_size: int = 4096,
):
self.target_column = target_column
self.item_column = item_column
self.sample_column = sample_column
self.n_factors = n_factors
self.out_folder = Path(out_folder)
self.out_folder.mkdir(exist_ok=True, parents=True)
self.batch_size = batch_size
[docs]
def fit(
self,
X: pd.Series,
y: pd.Series = None,
epochs_max=20,
cuda: bool = True,
patience: int = 1,
):
"""Fit the collaborative filtering model to the data provided in long-format.
Parameters
----------
X : Series, shape (n_values, )
The training data as a Series with the target_column as it values
and target_column as its name. The Series has a MultiIndex defined by the
item_column and sample_column.
Is of shape (n_values, )
y : Series, optional
The validation data as a Series with the target_column as it values
and target_column as its name. The Series has a MultiIndex defined by the
item_column and sample_column.
Is of shape (n_values, ), by default None
epochs_max : int, optional
Maximal number of epochs to train, by default 100
cuda : bool, optional
If the model should be trained with an accelerator, by default True
patience : Optional[int], optional
If added, early stopping is added with specified patience, by default None
Returns
-------
AETransformer
Return itself fitted to the training data.
"""
self.model_kwargs = dict(
n_factors=self.n_factors,
y_range=(int(X.squeeze().min()), int(X.squeeze().max()) + 1),
)
if not cuda:
default_device(use=False) # set to cpu
if y is not None:
# Concatenate train and validation observations into on dataframe
first_N_train = len(X)
X, _ = collab.combine_data(X, y)
else:
X, _ = X.reset_index(), 0.0
splits = None
if y is not None:
# specify positional indices of validation data
idx_splitter = IndexSplitter(list(range(first_N_train, len(X))))
splits = idx_splitter(X)
self.cat_names = [self.sample_column, self.item_column]
self.to = TabularCollab(
df=X,
procs=[Categorify],
cat_names=self.cat_names,
y_names=[self.target_column],
y_block=TransformBlock(),
splits=splits,
)
self.dls = self.to.dataloaders(path=".", bs=self.batch_size)
self.model = EmbeddingDotBias.from_classes(
classes=self.dls.classes, **self.model_kwargs
)
self.n_params = models.calc_net_weight_count(self.model)
self.learn = Learner(
dls=self.dls,
model=self.model,
loss_func=MSELossFlat(),
cbs=EarlyStoppingCallback(patience=patience) if y is not None else None,
model_dir=self.out_folder,
)
if cuda:
self.learn.model = self.learn.model.cuda()
suggested_lr = self.learn.lr_find()
print(f"{suggested_lr.valley = :.5f}")
self.learn.fit_one_cycle(epochs_max, lr_max=suggested_lr.valley)
self.plot_loss(y)
self.epochs_trained_ = self.learn.epoch + 1
self.model_kwargs["suggested_inital_lr"] = suggested_lr.valley
# ? own method?
# self.learn.save('collab_model')
return self
[docs]
def transform(self, X):
"""Predict the mising features in the long data based on the index of
sample_column and item_column.
Parameters
----------
X : Series, shape (n_samples, )
The training data with columns target_column, item_column and sample_column.
Returns
-------
X_transformed : pd.Series (n_samples, n_features)
The complete data with imputed values in long format
"""
# Check is fit had been called
check_is_fitted(self, "epochs_trained_")
# ! Input validation
# X = check_array(X, accept_sparse=True)
X = X.squeeze()
mask = X.unstack().isna().stack()
idx_na = mask.loc[mask].index
dl_real_na = self.dls.test_dl(idx_na.to_frame())
pred_na, _ = self.learn.get_preds(dl=dl_real_na)
pred_na = pd.Series(pred_na, idx_na, name=self.target_column)
return pd.concat([X, pred_na])
[docs]
def plot_loss(self, y, figsize=(8, 4), save: bool = False): # -> Axes:
"""Plot the training and validation loss of the model."""
fig, ax = plt.subplots(figsize=figsize)
ax.set_title("CF loss: Reconstruction loss")
self.learn.recorder.plot_loss(
skip_start=5, ax=ax, with_valid=True if y is not None else False
)
self.model_kwargs["batch_size"] = self.batch_size
if save:
fig.savefig(self.out_folder / "loss.png")
pimmslearn.savefig(fig, name="collab_training", folder=self.out_folder)
pimmslearn.io.dump_json(
self.model_kwargs, self.out_folder / "model_params_{}.json".format("CF")
)
return ax