import logging
import random
from collections import namedtuple
from pathlib import Path
from types import SimpleNamespace
from typing import List, Optional, Tuple, Union
import matplotlib.dates as mdates
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn
from njab.sklearn import run_pca
from sklearn.impute import SimpleImputer
import pimmslearn
from pimmslearn.analyzers import Analysis
from pimmslearn.io.datasplits import long_format, wide_format
from pimmslearn.io.load import verify_df
from pimmslearn.pandas import _add_indices
logger = logging.getLogger(__name__)
__doc__ = "A collection of Analyzers to perform certain type of analysis."
ALPHA = 0.5
# ! deprecate AnalyzePeptides
[docs]
class AnalyzePeptides(SimpleNamespace):
"""Namespace for current analysis
Attributes
----------
df: pandas.DataFrame
current eagerly loaded data in wide format only: sample index, features in columns
stats: types.SimpleNamespace
Some statistics of certain aspects. Normally each will be a DataFrame.
Many more attributes are set dynamically depending on the concrete analysis.
"""
def __init__(
self,
data: pd.DataFrame,
is_log_transformed: bool = False,
is_wide_format: bool = True,
ind_unstack: str = "",
):
if not is_wide_format:
if not ind_unstack:
raise ValueError(
"Please specify index level for unstacking via "
f"'ind_unstack' from: {data.index.names}"
)
data = data.unstack(ind_unstack)
is_wide_format = True
self.df = data # assume wide
self.N, self.M = self.df.shape
self.stats = SimpleNamespace()
self.is_log_transformed = is_log_transformed
self.is_wide_format = is_wide_format
self.index_col = self.df.index.name
[docs]
@classmethod
def from_csv(
cls,
fname: str,
nrows: int = None,
# could be potentially 0 for the first column
index_col: Union[int, str, List] = "Sample ID",
verify_fname: bool = False,
usecols=None,
**kwargs,
):
df = pd.read_csv(
fname, index_col=index_col, low_memory=False, nrows=nrows, usecols=usecols
).squeeze("columns")
if len(df.shape) == 1:
# unstack all but first column
df = df.unstack(df.index.names[1:])
verify_df(
df=df,
fname=fname,
index_col=index_col,
verify_fname=verify_fname,
usecols=usecols,
)
return cls(data=df, **kwargs) # all __init__ parameters are kwargs
[docs]
@classmethod
# @delegates(from_csv) # does only include parameters with defaults
def from_pickle(
cls,
fname: str,
# could be potentially 0 for the first column
index_col: Union[int, str, List] = "Sample ID",
verify_fname: bool = False,
usecols=None,
**kwargs,
):
df = pd.read_pickle(fname).squeeze()
if len(df.shape) == 1:
df = df.unstack(df.index.names[1:])
verify_df(
df=df,
fname=fname,
index_col=index_col,
verify_fname=verify_fname,
usecols=usecols,
)
return cls(data=df, **kwargs) # all __init__ parameters are kwargs
[docs]
def get_consecutive_dates(self, n_samples, seed=42):
"""Select n consecutive samples using a seed.
Updated the original DataFrame attribute: df
"""
self.df.sort_index(inplace=True)
n_samples = min(len(self.df), n_samples) if n_samples else len(self.df)
print(f"Get {n_samples} samples.")
if seed:
random.seed(42)
_attr_name = f"df_{n_samples}"
setattr(self, _attr_name, get_consecutive_data_indices(self.df, n_samples))
print("Training data referenced unter:", _attr_name)
self.df = getattr(self, _attr_name)
print("Updated attribute: df")
return self.df
@property
def df_long(self):
if hasattr(self, "_df_long"):
return self._df_long
return self.to_long_format(
colname_values="intensity", index_name=self.index_col
)
@property
def df_wide(self):
return self.to_wide_format()
[docs]
def describe_peptides(self, sample_n: int = None):
if sample_n:
df = self.df.sample(n=sample_n, axis=1)
else:
df = self.df
stats = df.describe()
stats.loc["CV"] = stats.loc["std"] / stats.loc["mean"]
self.stats.peptides = stats
return stats
[docs]
def get_prop_not_na(self):
"""Get prop. of not NA values for each sample."""
return self.df.notna().sum(axis=1) / self.df.shape[-1]
[docs]
def get_PCA(self, n_components=2, imputer=SimpleImputer):
self.imputer_ = imputer()
X = self.imputer_.fit_transform(self.df)
X = _add_indices(X, self.df)
assert all(X.notna())
PCs, self.pca_ = run_pca(X, n_components=n_components)
if not hasattr(self, "df_meta"):
logger.warning('No metadata available, please set "df_meta" first.')
try:
PCs["ms_instrument"] = self.df_meta["ms_instrument"].astype("category")
except KeyError:
logger.warning("No MS instrument added.")
except AttributeError:
logger.warning("No metadata available, please set 'df_meta' first.")
logger.warning("No MS instrument added.")
return PCs
[docs]
def calculate_PCs(self, new_df, is_wide=True):
if not is_wide:
new_df = new_df.unstack(new_df.index.names[1:])
X = self.imputer_.transform(new_df)
X = _add_indices(X, new_df)
PCs = self.pca_.transform(X)
PCs = _add_indices(PCs, new_df, index_only=True)
PCs.columns = [f"PC {i+1}" for i in range(PCs.shape[-1])]
return PCs
[docs]
def plot_pca(
self,
):
"""Create principal component plot with three heatmaps showing
instrument, degree of non NA data and sample by date."""
if not self.is_wide_format:
self.df = self.df.unstack(self.df.index.names[1:])
self.is_wide_format = True
if not hasattr(self, "df_meta"):
raise AttributeError('No metadata available, please set "df_meta" first.')
PCs = self.get_PCA()
fig, axes = plt.subplots(
nrows=3, ncols=1, figsize=(15, 20), constrained_layout=True
)
Dim = namedtuple("DimensionsData", "N M")
self.dim = Dim(*self.df.shape)
fig.suptitle(
f"First two Principal Components of {self.dim.M} most abundant peptides \n for {self.dim.N} samples",
fontsize=30,
)
# by instrument
ax = axes[0]
seaborn_scatter(
df=PCs.iloc[:, :2],
fig=fig,
ax=ax,
meta=PCs["ms_instrument"],
title="by MS instrument",
)
ax.legend(loc="center right", bbox_to_anchor=(1.11, 0.5))
# by complettness/missingness
# continues colormap will be a bit trickier using seaborn: https://stackoverflow.com/a/44642014/9684872
ax = axes[1]
plot_scatter(
df=PCs.iloc[:, :2],
fig=fig,
ax=ax,
meta=self.df_meta["prop_not_na"],
title="by number on na",
)
# by dates
ax = axes[2]
plot_date_map(df=PCs.iloc[:, :2], ax=ax, dates=self.df_meta.date)
return fig
[docs]
def get_dectection_limit(self):
"""Compute information on detection limit in dataset.
Returns
-------
str
Information on detection limit
"""
self.detection_limit = (
self.df.min().min()
if self.is_log_transformed
else np.log10(self.df).min().min()
) # all zeros become nan.
return "Detection limit: {:6.3f}, corresponding to intensity value of {:,d}".format(
self.detection_limit, int(10**self.detection_limit)
)
def __repr__(self):
keys = sorted(self.__dict__)
items = ("{}".format(k) for k in keys)
return "{} with attributes: {}".format(type(self).__name__, ", ".join(items))
# def __dir__(self):
# return sorted(self.__dict__)
@property
def fname_stub(self):
assert hasattr(self, "df"), f"Attribute df is missing: {self}"
return "N{:05d}_M{:05d}".format(*self.df.shape)
[docs]
class LatentAnalysis(Analysis):
def __init__(
self,
latent_space: pd.DataFrame,
meta_data: pd.DataFrame,
model_name: str,
fig_size: Tuple[int, int] = (15, 15),
folder: Path = None,
):
self.latent_space, self.meta_data = latent_space, meta_data
self.fig_size, self.folder = fig_size, folder
self.model_name = model_name
self.folder = Path(self.folder) if self.folder else Path(".")
assert (
len(self.latent_space.shape) == 2
), "Expected a two dimensional DataFrame."
self.latent_dim = self.latent_space.shape[-1]
if self.latent_dim > 2:
# pca, add option for different methods
self.latent_reduced, self.pca_ = run_pca(self.latent_space)
else:
self.latent_reduced = self.latent_space
[docs]
def plot_by_date(self, meta_key: str = "date", save: bool = True):
fig, ax = self._plot(fct=plot_date_map, meta_key=meta_key, save=save)
return fig, ax
[docs]
def plot_by_category(self, meta_key: str, save: bool = True):
fig, ax = self._plot(fct=seaborn_scatter, meta_key=meta_key, save=save)
return fig, ax
def _plot(self, fct, meta_key: str, save: bool = True):
try:
meta_data = self.meta_data[meta_key]
except KeyError as e:
raise ValueError(
f"Requested key: '{meta_key}' is not in available,"
f" use: {', '.join(x for x in self.meta_data.columns)}"
) from e
fig, ax = plt.subplots(figsize=self.fig_size)
_ = fct(
df=self.latent_reduced,
ax=ax,
meta=meta_data.loc[self.latent_reduced.index],
title=f"{self.model_name} latent space PCA of {self.latent_dim} dimensions by {meta_key}",
)
if save:
pimmslearn.plotting._savefig(
fig, name=f"{self.model_name}_latent_by_{meta_key}", folder=self.folder
)
return fig, ax
# def read_csv(fname:str, nrows:int, index_col:str=None)-> pd.DataFrame:
# return pd.read_csv(fname, index_col=index_col, low_memory=False, nrows=nrows)
[docs]
def get_consecutive_data_indices(df, n_samples):
index = df.sort_index().index
start_sample = len(index) - n_samples
start_sample = random.randint(0, start_sample)
return df.loc[index[start_sample : start_sample + n_samples]]
[docs]
def corr_lower_triangle(df, **kwargs):
"""Compute the correlation matrix, returning only unique values."""
corr_df = df.corr(**kwargs)
lower_triangle = pd.DataFrame(np.tril(np.ones(corr_df.shape), -1)).astype(bool)
lower_triangle.index, lower_triangle.columns = corr_df.index, corr_df.columns
return corr_df.where(lower_triangle)
[docs]
def plot_corr_histogram(corr_lower_triangle, bins=10):
fig, axes = plt.subplots(
ncols=2, gridspec_kw={"width_ratios": [5, 1], "wspace": 0.2}, figsize=(8, 4)
)
values = pd.Series(corr_lower_triangle.to_numpy().flatten()).dropna()
ax = axes[0]
ax = values.hist(ax=ax, bins=bins)
ax.yaxis.set_major_formatter("{x:,.0f}")
ax = axes[1]
plt.axis("off")
data = values.describe(percentiles=np.linspace(0.1, 1, 10)).round(2)
data.name = ""
_ = pd.plotting.table(ax=ax, data=data, loc="best", edges="open")
return fig, axes
[docs]
def plot_date_map(
df,
ax,
dates: pd.Series = None,
meta: pd.Series = None,
title: str = "by date",
fontsize=8,
size=2,
):
if dates is not None and meta is not None:
raise ValueError("Only set either dates or meta parameters.")
# ToDo: Clean up arguments
if dates is None:
dates = meta
cols = list(df.columns)
assert len(cols) == 2, f"Please provide two dimensons, not {df.columns}"
ax.set_title(title, fontsize=fontsize)
ax.set_xlabel(cols[0])
ax.set_ylabel(cols[1])
path_collection = scatter_plot_w_dates(ax, df, dates=dates, errors="raise")
_ = add_date_colorbar(path_collection, ax=ax)
[docs]
def plot_scatter(
df,
ax,
meta: pd.Series,
feat_name_display: str = "features",
title: Optional[str] = None,
alpha=ALPHA,
fontsize=8,
size=2,
):
cols = list(df.columns)
assert len(cols) == 2, f"Please provide two dimensons, not {df.columns}"
if not title:
title = f"by identified {feat_name_display}"
ax.set_title(title, fontsize=fontsize)
ax.set_xlabel(cols[0])
ax.set_ylabel(cols[1])
path_collection = ax.scatter(
x=cols[0], y=cols[1], s=size, c=meta, data=df, alpha=alpha
)
_ = ax.get_figure().colorbar(
path_collection,
ax=ax,
label=f"Identified {feat_name_display}",
# ticklocation='left', # ignored by matplotlib
location="right", # ! left does not put colobar without overlapping y ticks
format="{x:,.0f}",
)
[docs]
def seaborn_scatter(
df,
ax,
meta: pd.Series,
title: str = "by some metadata",
alpha=ALPHA,
fontsize=5,
size=5,
):
cols = list(df.columns)
assert len(cols) == 2, f"Please provide two dimensons, not {df.columns}"
seaborn.scatterplot(
x=df[cols[0]],
y=df[cols[1]],
hue=meta,
ax=ax,
palette="deep",
s=size,
alpha=alpha,
)
_ = ax.legend(
fontsize=fontsize,
title_fontsize=fontsize,
markerscale=0.4,
title=meta.name,
)
ax.set_title(title, fontsize=fontsize)
return ax
[docs]
def scatter_plot_w_dates(ax, df, dates=None, marker=None, errors="raise", size=2):
"""plot first vs. second column in DataFrame.
Use dates to color data.
errors : {'ignore', 'raise', 'coerce'}, default 'raise'
Passed on to pandas.to_datetime
- If 'raise', then invalid parsing will raise an exception.
- If 'coerce', then invalid parsing will be set as NaT.
- If 'ignore', then invalid parsing will return the input.
"""
# Inspiration: https://stackoverflow.com/a/59685599/9684872
cols = df.columns
if isinstance(dates, str):
dates = df["dates"]
path_collection = ax.scatter(
x=df[cols[0]],
y=df[cols[1]],
c=(
[mdates.date2num(t) for t in pd.to_datetime(dates, errors=errors)]
if dates is not None
else None
),
alpha=ALPHA,
s=size,
marker=marker,
)
return path_collection
[docs]
def add_date_colorbar(mappable, ax):
loc = mdates.AutoDateLocator()
cbar = ax.get_figure().colorbar(
mappable, ax=ax, ticks=loc, format=mdates.AutoDateFormatter(loc)
)
return cbar
[docs]
def cast_object_to_category(df: pd.DataFrame) -> pd.DataFrame:
"""Cast object columns to category dtype.
Parameters
----------
df : pd.DataFrame
DataFrame with columns
Returns
-------
pd.DataFrame
DataFrame with category columns instead of object columns.
"""
_columns = df.select_dtypes(include="object").columns
return df.astype({col: "category" for col in _columns})