Source code for genetools.helpers

"""Pandas/Numpy common recipes."""

import os
import scipy
import numpy as np
import pandas as pd


[docs]def rename_duplicates(series, delim="-"): """Rename duplicate values to be unique. ['a', 'a'] will become ['a', 'a-1'], for example. :param series: series with values to rename :type series: pandas.Series :param delim: delimeter before duplicate-number index, defaults to "-" :type delim: str, optional :return: series where original duplicates have been renamed to -1, -2, etc. :rtype: pandas.Series """ duplicate_suffix = ( series.groupby(series).cumcount().astype(str).replace("0", "") ) # a number for all but first occurence extra_strs = delim + duplicate_suffix # remove entries that are just the delim extra_strs = extra_strs.replace(delim, "") # add to values out = series.astype(str) + extra_strs # confirm unique (may fail if a-1 happened to match another element that preexisted!) assert out.nunique() == out.shape[0] return out
[docs]def merge_into_left(left, right, **kwargs): """Defensively merge [right] series or dataframe into [left] by index, preserving [left]'s index exactly. [right] data will be reordered to match [left] index. :param left: left data whose index will be preserved :type left: pandas.DataFrame or pandas.Series :param right: right data which will be reordered based on left index. :type right: pandas.DataFrame or pandas.Series :param \**kwargs: passed to pandas.merge :return: left-merged DataFrame with [left]'s index :rtype: pandas.DataFrame """ # defensively cast to dataframe df1 = pd.DataFrame(left) df2 = pd.DataFrame(right) df = pd.merge( df1, df2, how="left", left_index=True, right_index=True, sort=False, validate="1:1", **kwargs ) # TODO: asserts are stripped away when code is optimized; replace with if not, raise ValueError('message') assert df.shape[0] == df1.shape[0] assert df.shape[1] == df1.shape[1] + df2.shape[1] df.index = df1.index return df
[docs]def horizontal_concat(df_left, df_right): """Concatenate df_right horizontally to df_left, with no checks for whether the indexes match, but confirming final shape. :param df_left: Left data :type df_left: pandas.DataFrame or pandas.Series :param df_right: Right data :type df_right: pandas.DataFrame or pandas.Series :return: Copied dataframe with df_right's columns glued onto the right side of df_left's columns :rtype: pandas.DataFrame """ # defensively cast to DataFrame df1 = pd.DataFrame(df_left) df2 = pd.DataFrame(df_right) df = pd.concat([df1, df2], axis=1) assert df.shape[0] == df1.shape[0] == df2.shape[0] assert df.shape[1] == df1.shape[1] + df2.shape[1] return df
[docs]def vertical_concat(df_top, df_bottom, reset_index=False): """Concatenate df_bottom vertically to df_top, with no checks for whether the columns match, but confirming final shape. :param df_top: Top data :type df_top: pandas.DataFrame :param df_bottom: Bottom data :type df_bottom: pandas.DataFrame :param reset_index: Reset index values after concat, defaults to False :type reset_index: bool, optional :return: Copied dataframe with df_bottom's rows glued onto the bottom of df_top's rows :rtype: pandas.DataFrame """ # defensively cast to DataFrame df1 = pd.DataFrame(df_top) df2 = pd.DataFrame(df_bottom) df = pd.concat([df1, df2], axis=0) if reset_index: # so far indexes have just been glued together # we can reset it to be unique values df = df.reset_index(drop=True) assert df.shape[0] == df1.shape[0] + df2.shape[0] assert df.shape[1] == df1.shape[1] == df2.shape[1] return df
[docs]def barcode_split( obs_names, separator="-", colname_barcode="barcode", colname_library="library_id" ): """Split single cell barcodes such as ATGC-1 into a barcode column with value "ATGC" and a library ID column with value 1. Recommended usage with scanpy: adata.obs = horizontal_concat(adata.obs, barcode_split(adata.obs_names)) :param obs_names: Cell barcodes with a library ID suffix. :type obs_names: pandas.Series or pandas.Index :param separator: library ID separator, defaults to '-' :type separator: str, optional :param colname_barcode: output column name containing barcode without library ID suffix, defaults to 'barcode' :type colname_barcode: str, optional :param colname_library: output column name containing library ID suffix as an int, defaults to 'library_id' :type colname_library: str, optional :return: Two-column dataframe containing barcode prefix and library ID suffix. :rtype: pandas.DataFrame """ # defensively cast to a string Series in case an Index was passed, such as adata.obs_names or adata.obs.index df = pd.Series(obs_names, dtype="str").str.split(separator, expand=True) df.columns = [colname_barcode, colname_library] df[colname_library] = df[colname_library].astype(int) return df
[docs]def get_off_diagonal_values(arr): """Get off-diagonal values of a numpy 2d array as a flattened 1d array. :param arr: input numpy 2d array :type arr: numpy.ndarray :return: flattened 1d array of non-diagonal values only :rtype: numpy.ndarray """ # See https://stackoverflow.com/a/35746928/130164 return arr[~np.eye(arr.shape[0], dtype=bool)].flatten()
[docs]def make_slurm_command( script, job_name, log_path, env=None, options={}, job_group_name="", wrap_script=True, ): """Generate slurm sbatch command. Should be pipe-able straight to bash. Automatic log filenames will take the format: - `{{ log_path }}/{{ job_group_name (optional) }}/{{ job_name }}.out` for stdout - `{{ log_path }}/{{ job_group_name (optional) }}/{{ job_name }}.err` for stderr You can override automatic log filenames by manually supplying "output" and "error" values in the `options` dict. :param script: path to an executable script, or inline script (if wrap_script is True) :type script: str :param job_name: job name, used for naming log files :type job_name: str :param log_path: destination for log files. :type log_path: str :param env: any environment variables to pass to script, defaults to None :type env: dict, optional :param options: any CLI options for sbatch, defaults to {} :type options: dict, optional :param job_group_name: optional group name for this job and related jobs, used for naming log files, defaults to "" :type job_group_name: str, optional :param wrap_script: whether the script is inline as opposed to a file on disk, defaults to True :type wrap_script: bool, optional :return: an sbatch command :rtype: str """ log_fname_prefix = os.path.join(log_path, job_group_name, job_name) if "output" not in options: options["output"] = log_fname_prefix + ".out" if "error" not in options: options["error"] = log_fname_prefix + ".err" options_items = ['--%s="%s"' % (name, val) for name, val in options.items()] options_string = " ".join(options_items) variable_string = "" if env is not None: variable_items = ['"%s"="%s"' % (name, val) for name, val in env.items()] variable_string = "--export=" + ",".join(variable_items) # Very important to wrap the "wrap script" in single quotes, # so that the script will pick up the exported variables during execution. script_string = "--wrap='%s'" % script if wrap_script else script return "sbatch {options} {variables} {script};".format( options=options_string, variables=variable_string, script=script_string )