Source code for pimmslearn.pandas

import collections.abc
from collections import namedtuple
from types import SimpleNamespace
from typing import Iterable, List, Optional

import numpy as np
import omegaconf
import pandas as pd

from pimmslearn.pandas.calc_errors import calc_errors_per_feat, get_absolute_error

__all__ = [
    "calc_errors_per_feat",
    "get_absolute_error",
    "unique_cols",
    "get_unique_non_unique_columns",
    "prop_unique_index",
    "replace_with",
    "index_to_dict",
    "get_columns_accessor",
    "get_columns_accessor_from_iterable",
    "select_max_by",
    "get_columns_namedtuple",
    "highlight_min",
    "_add_indices",
    "interpolate",
    "flatten_dict_of_dicts",
    "key_map",
    "parse_query_expression",
    "length",
    "get_last_index_matching_proportion",
    "get_lower_whiskers",
    "get_counts_per_bin",
]


[docs] def unique_cols(s: pd.Series) -> bool: """Check all entries are equal in pandas.Series Ref: https://stackoverflow.com/a/54405767/968487 Parameters ---------- s : pandas.Series Series to check uniqueness Returns ------- bool Boolean on if all values are equal. """ return (s.iloc[0] == s).all()
[docs] def get_unique_non_unique_columns(df: pd.DataFrame) -> SimpleNamespace: """Get back a namespace with an column.Index both of the unique and non-unique columns. Parameters ---------- df : pd.DataFrame Returns ------- types.SimpleNamespace SimpleNamespace with `unique` and `non_unique` column names indices. """ mask_unique_columns = df.apply(unique_cols) columns = SimpleNamespace() columns.unique = df.columns[mask_unique_columns] columns.non_unique = df.columns[~mask_unique_columns] return columns
[docs] def prop_unique_index(df: pd.DataFrame) -> pd.DataFrame: counts = df.index.value_counts() prop = (counts > 1).sum() / len(counts) return 1 - prop
[docs] def replace_with(string_key: str, replace: str = "()/", replace_with: str = "") -> str: for symbol in replace: string_key = string_key.replace(symbol, replace_with) return string_key
[docs] def index_to_dict(index: pd.Index) -> dict: cols = {replace_with(col.replace(" ", "_").replace("-", "_")): col for col in index} return cols
[docs] def get_columns_accessor(df: pd.DataFrame, all_lower_case=False) -> omegaconf.OmegaConf: if isinstance(df.columns, pd.MultiIndex): raise ValueError("MultiIndex not supported.") cols = index_to_dict(df.columns) if all_lower_case: cols = {k.lower(): v for k, v in cols.items()} return omegaconf.OmegaConf.create(cols)
[docs] def get_columns_accessor_from_iterable( cols: Iterable[str], all_lower_case=False ) -> omegaconf.OmegaConf: cols = index_to_dict(cols) if all_lower_case: cols = {k.lower(): v for k, v in cols.items()} return omegaconf.OmegaConf.create(cols)
[docs] def select_max_by( df: pd.DataFrame, grouping_columns: list, selection_column: str ) -> pd.DataFrame: df = df.sort_values(by=[*grouping_columns, selection_column], ascending=False) df = df.drop_duplicates(subset=grouping_columns, keep="first") return df
[docs] def get_columns_namedtuple(df: pd.DataFrame) -> namedtuple: """Create namedtuple instance of column names. Spaces in column names are replaced with underscores in the look-up. Parameters ---------- df : pd.DataFrame A pandas DataFrame Returns ------- namedtuple NamedTuple instance with columns as attributes. """ columns = df.columns.to_list() column_keys = [x.replace(" ", "_") for x in columns] ColumnsNamedTuple = namedtuple("Columns", column_keys) return ColumnsNamedTuple(**{k: v for k, v in zip(column_keys, columns)})
[docs] def highlight_min(s: pd.Series) -> list: """Highlight the min in a Series yellow for using in pandas.DataFrame.style Parameters ---------- s : pd.Series Pandas Series Returns ------- list list of strings containing the background color for the values speciefied. To be used as `pandas.DataFrame.style.apply(highlight_min)` """ to_highlight = s == s.min() return ["background-color: yellow" if v else "" for v in to_highlight]
def _add_indices( array: np.array, original_df: pd.DataFrame, index_only: bool = False ) -> pd.DataFrame: index = original_df.index columns = None if not index_only: columns = original_df.columns return pd.DataFrame(array, index=index, columns=columns)
[docs] def interpolate(wide_df: pd.DataFrame, name="interpolated") -> pd.DataFrame: """Interpolate NA values with the values before and after. Uses n=3 replicates. First rows replicates are the two following. Last rows replicates are the two preceding. Parameters ---------- wide_df : pd.DataFrame rows are sample, columns are measurements name : str, optional name for measurement in columns, by default 'replicates' Returns ------- pd.DataFrame pd.DataFrame in long-format """ mask = wide_df.isna() first_row = wide_df.iloc[0].copy() last_row = wide_df.iloc[-1].copy() m = first_row.isna() first_row.loc[m] = wide_df.iloc[1:3, m.to_list()].mean() m = last_row.isna() last_row.loc[m] = wide_df.iloc[-3:-1, m.to_list()].mean() ret = wide_df.interpolate(method="linear", limit_direction="both", limit=1, axis=0) ret.iloc[0] = first_row ret.iloc[-1] = last_row ret = ret[mask].stack().dropna().squeeze() # does not work with MultiIndex columns ret.rename(name, inplace=True) return ret
[docs] def flatten_dict_of_dicts(d: dict, parent_key: str = "") -> dict: """Build tuples for nested dictionaries for use as `pandas.MultiIndex`. Parameters ---------- d : dict Nested dictionary for which all keys are flattened to tuples. parent_key : str, optional Outer key (used for recursion), by default '' Returns ------- dict Flattend dictionary with tuple keys: {(outer_key, ..., inner_key) : value} """ # simplified and adapted from: https://stackoverflow.com/a/6027615/9684872 items = [] for k, v in d.items(): new_key = parent_key + (k,) if parent_key else (k,) if isinstance(v, collections.abc.MutableMapping): items.extend(flatten_dict_of_dicts(v, parent_key=new_key).items()) else: items.append((new_key, v)) return dict(items)
[docs] def key_map(d: dict) -> dict: """Build a schema of dicts Parameters ---------- d : dict dictionary of dictionaries Returns ------- dict Key map of dictionaries """ ret = {} _keys = () for k, v in d.items(): if isinstance(v, dict): ret[k] = key_map(v) else: _keys = (_keys) + (k,) if _keys: if ret: print(f"Dictionaries are not of the same length: {_keys = } and {ret = }") for k in _keys: ret[k] = None else: return _keys return ret
printable = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ "
[docs] def parse_query_expression(s: str, printable: str = printable) -> str: """Parse a query expression for pd.DataFrame.query to a file name. Removes all characters not listed in printable.""" return "".join(filter(lambda x: x in printable, s))
[docs] def length(x): """Len function which return 0 if object (probably np.nan) has no length. Otherwise return length of list, pandas.Series, numpy.array, dict, etc.""" try: return len(x) except BaseException: return 0
[docs] def get_last_index_matching_proportion( df_counts: pd.DataFrame, prop: float = 0.25, prop_col: str = "proportion" ) -> object: """df_counts needs to be sorted by "prop_col" (descending). Parameters ---------- df_counts : pd.DataFrame df counts with ascending values along proportion column. Index should be unique. prop : float, optional cutoff, inclusive, by default 0.25 prop_col : str, optional column name for proportion, by default 'proportion' Returns ------- object Index value for cutoff """ assert df_counts.index.is_unique mask = df_counts[prop_col] >= prop idx_cutoff = df_counts[prop_col].loc[mask].tail(1).index[0] return idx_cutoff
[docs] def get_lower_whiskers(df: pd.DataFrame, factor: float = 1.5) -> pd.Series: ret = df.describe() iqr = ret.loc["75%"] - ret.loc["25%"] ret = ret.loc["25%"] - iqr * factor return ret
[docs] def get_counts_per_bin( df: pd.DataFrame, bins: range, columns: Optional[List[str]] = None ) -> pd.DataFrame: """Return counts per bin for selected columns in DataFrame.""" counts_per_bin = dict() if columns is None: columns = df.columns.to_list() for col in columns: _series = pd.cut(df[col], bins=bins).to_frame().groupby(col).size() _series.index.name = "bin" counts_per_bin[col] = _series counts_per_bin = pd.DataFrame(counts_per_bin) return counts_per_bin