Source code for freeforestml.plot

import math
import re
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.ticker import AutoMinorLocator, LogLocator
from matplotlib.transforms import ScaledTranslation
import pandas as pd
import dask
import dask.dataframe as dd
import seaborn as sns

from atlasify import atlasify
import uhepp

from freeforestml.stack import Stack
from freeforestml.process import Process
from freeforestml.cut import Cut
from freeforestml.variable import Variable, BlindingStrategy
import freeforestml.error as err

# Store built-in range
range_ = range

ATLAS = "Internal"
INFO = "$\sqrt{s} = 13\,\mathrm{TeV}$, $139\,\mathrm{fb}^{-1}$\nfreeforestml"

def example_style():
    global ATLAS, INFO
    ATLAS = False
    INFO = "FreeForestML Example"

def fill_labels(label, info):
    if label is None:
        label = ATLAS
    if info is None:
        info = INFO
    return label, info

[docs]def human_readable(label): """Convert labels to plain ascii strings""" pttrn = "[^a-zA-Z0-9_-]+" label = re.sub(f"(^{pttrn}|{pttrn}$)", "", label) label = re.sub(pttrn, "_", label) return label
[docs]class HistogramFactory: """ Short-cut to create multiple histogram with the same set of processes or in the same region. """
[docs] def __init__(self, *args, **kwds): """ Accepts any number of positional and keyword arguments. The arguments are stored internally and use default value for hist(). See __call__(). """ self.args = args self.kwds = kwds
[docs] def __call__(self, *args, **kwds): """ Proxy for method to hist(). The positional argument passed to hist() are the positional argument given to the constructor concatenated with the positional argument given to this method. The keyword argument for hist() is the union of the keyword arguments passed to the constructor and this method. The argument passed to this method have precedence. The method returns the return value of hist. """ return hist(*self.args, *args, **dict(self.kwds, **kwds))
[docs]def hist(dataframe, variable, bins, stacks, selection=None, range=None, blind=None, figure_size=None, weight=None, y_log=False, y_min=None, vlines=[], denominator=0, numerator=-1, ratio_label=None, diff=False, ratio_range=None, atlas=None, info=None, enlarge=1.6, density=False, include_outside=False, return_uhepp=False, **kwds): """ Creates a histogram of stacked processes. The first argument is the dataframe to operate on. The 'variable' argument defines the x-axis. The variable argument can be a Variable object or a string naming a column in the dataframe. The 'bins' argument can be an integer specifying the number of bins or a list with all bin boundaries. If it is an integer, the argument range is mandatory. The range argument must be a tuple with the lowest and highest bin edge. The properties of a Variable object are used for the x- and y-axis labels. Stacks must be Stack objects. The plotting style is defined via the stack object. The optional blind argument controls which stack should be blinded. The argument can be a single stack, a list of stacks or None. By default, no stack is blinded. This method creates a new figure and axes internally (handled by uhepp). The figure size can be changed with the figure_size argument. If this argument is not None, it must be a tuple of (width, height). The method returns (figure, axes) which were used during plotting. This might be identical to the figure and axes arguments. If a ratio plot is drawn, the axes return value is a list of main, ratio plot. The weight is used to weight the entries. Entries have unit weight if omitted. The argument can be a string name of a column or a variable object. If the y_log argument is set to True, the y axis will be logarithmic. The axis is enlarged on a logarithmic scale to make room for the ATLAS labels. The optional y_min argument can be used to set the lower limit of the y axis. The default is 0 for linear scale, and 1 for logarithmic scale. The option vlines can be used to draw vertical lines onto the histogram, e.g., a cut line. The argument should be an array, one item for each line. If the item is a number a red line will be drawn at that x-position. If it is a dict, it will take the item 'x' to determine the position, all other keywords are passed to matplotlibs plot method.` The ratio_label option controls the label of the ratio plot. The ratio_range argument control the y-range of the ratio plot. If set to None, it will scale automatically to include all points. The default is is None. If diff is set to True, The difference between the 'numerator' and the 'denominator' is down instead of their ratio. The module constants ATLAS and INFO are passed to atlasify. Overwrite them to change the badges. If the density argument is True, the area of each stack is normalized to unity. If return_uhepp is True, the method return a UHepPlot object. """ ################## # Wrap column string by variable if isinstance(variable, str): variable = Variable(variable, variable) ################## # Handle range/bins if range is not None: # Build bins if not isinstance(bins, int): raise err.InvalidBins("When range is given, bins must be int.") if not isinstance(range, tuple) or len(range) != 2: raise err.InvalidProcessSelection("Range argument must be a " "tuple of two numbers.") bins = np.linspace(range[0], range[1], bins + 1) bin_edges = [float(x) for x in bins] ################## # Create UHepp object uhepp_obj = uhepp.UHeppHist(variable.name, bin_edges) uhepp_obj.producer = "freeforestml" ################## # Unit if variable.unit is not None: uhepp_obj.unit = variable.unit ################## # Branding atlas, info = fill_labels(atlas, info) if atlas is not False: uhepp_obj.brand = "ATLAS" uhepp_obj.brand_label = atlas if info is not False: uhepp_obj.subtext = info ################## # Y-Axis uhepp_obj.y_log = y_log ################## # Weight if weight is None: weight = Variable("unity", lambda d: variable(d) * 0 + 1) elif isinstance(weight, str): weight = Variable(weight, weight) squared_weight = Variable("squared weight", lambda d: weight(d)**2) ################## # Ratio draw_ratio = (denominator is not None) and (numerator is not None) # Disable ratio plots if there is only one stack if len(stacks) == 1 and \ isinstance(denominator, int) and \ isinstance(numerator, int): draw_ratio = False ################## # Handle selection if selection is None: selection = Cut(lambda d: variable(d) * 0 == 0) elif not isinstance(selection, Cut): selection = Cut(selection) ################## # Create separate under- and overflow bin uhepp_obj.include_overflow = include_outside uhepp_obj.include_underflow = include_outside def pad(edges): padded = ['-inf'] + list(edges) + ['inf'] padded = [float(x) for x in padded] return np.asarray(padded) bins = pad(bins) ################## # Blinding if blind is None: blind = [] elif isinstance(blind, Stack): # Wrap scalar blind stack blind = [blind] ################## # Handle stack yields = {} # temporary, potentially delayed for i_stack, stack in enumerate(stacks): stack_items = [] if stack in blind and variable.blinding is not None: c_blind = variable.blinding(variable, bins[1:-1]) else: c_blind = lambda d: d density_norm = 1.0 if density: density_norm = stack.get_total(dataframe, [float('-inf'), float('inf')], variable, weight, False).sum() for i_process, process in enumerate(stack.processes): histogram = stack.get_hist(c_blind(dataframe), i_process, bins, variable, weight, False) histogram = histogram / density_norm uncertainty = stack.get_uncertainty(c_blind(dataframe), i_process, bins, variable, weight, False) uncertainty = uncertainty / density_norm human_readable_label = human_readable(process.label) process_id = f"{human_readable_label}__s{i_stack}_p{i_process}" yields[process_id] = (histogram, uncertainty) style = stack.get_aux(i_process) uhepp_sitem = uhepp.StackItem([process_id], process.label, **style) stack_items.append(uhepp_sitem) bartype = stack.get_histtype(0) uhepp_stack = uhepp.Stack(stack_items, bartype) uhepp_obj.stacks.append(uhepp_stack) # Resolve numerator/denominator indices if isinstance(numerator, int) and stacks[numerator] == stack: numerator = stack if isinstance(denominator, int) and stacks[denominator] == stack: denominator = stack if density: uhepp_obj.y_label = "Events (a.u.)" uhepp_obj.ratio_label = ratio_label if ratio_range: uhepp_obj.ratio_min = ratio_range[0] uhepp_obj.ratio_max = ratio_range[1] if draw_ratio: ratio = [] # Get denominator hist if denominator in blind and variable.blinding is not None: c_blind = variable.blinding(variable, bins[1:-1]) else: c_blind = lambda d: d base = denominator.get_total(c_blind(dataframe), bins, variable, weight, False) stat = denominator.get_total_uncertainty(c_blind(dataframe), bins, variable, weight, False) yields["den"] = (base, stat) # Process numerators numerators = numerator if not isinstance(numerators, (list, tuple)): numerators = [numerators] numerators_data = [] for i_numerator, numerator in enumerate(numerators): if numerator in blind and variable.blinding is not None: c_blind = variable.blinding(variable, bins[1:-1]) else: c_blind = lambda d: d process_id = f"num_{i_numerator}" base = numerator.get_total(c_blind(dataframe), bins, variable, weight, False) stat = numerator.get_total_uncertainty(c_blind(dataframe), bins, variable, weight, False) yields[process_id] = (base, stat) histtype = numerator.get_histtype(0) bartype = "points" if histtype == "points" else "step" style = {} #'markersize': 4, 'fmt': 'o'} style.update(numerator.get_aux(0)) uhepp_ritem = uhepp.RatioItem([process_id], ["den"], bartype, **style) uhepp_obj.ratio.append(uhepp_ritem) ################## # Compute delayed dask yields, = dask.compute(yields) for key, item in yields.items(): uhepp_obj.yields[key] = uhepp.Yield(*item) ################## # Reorder if y-axis is log def total_stackitem(uhepp_obj, stack_item): """Compute the total yield of a stack item""" process_totals = [sum(uhepp_obj.get_base(yield_name)) for yield_name in stack_item.yield_names] return sum(process_totals) if y_log: for stack in uhepp_obj.stacks: stack.content.sort(key=lambda x: total_stackitem(uhepp_obj, x)) ################## # Vertical lines for vline in vlines: if isinstance(vline, (int, float)): uhepp_obj.v_lines.append(uhepp.VLine(vline)) else: pos_x = vline.pop("x") uhepp_obj.v_lines.append(uhepp.VLine(pos_x, **vline)) if figure_size is not None: uhepp_obj.figure_size = figure_size if return_uhepp: return uhepp_obj else: return uhepp_obj.render()
def _transpose(array): """Return a transposed version of the array""" if array == [] or array == [[]]: return [[]] return list(map(list, zip(*array))) def _dask_compute(array): """Call dask.compute() on flattened list and return the structured result""" if not array or not array[0]: return array x_len = len(array) y_len = len(array[0]) flat = [i for s in array for i in s] flat = dask.compute(*flat) return [[flat[x * y_len + y] for y in range(y_len)] for x in range(x_len)]
[docs]def confusion_matrix(df, x_processes, y_processes, x_label, y_label, weight=None, axes=None, figure=None, atlas=None, info=None, enlarge=1.3, normalize_rows=False, **kwds): """ Creates a confusion matrix. """ atlas, info = fill_labels(atlas, info) # Wrap column string by variable if weight is None: weight = Variable("unity", lambda d: np.ones(len(d))) elif isinstance(weight, str): weight = Variable(weight, weight) # Handle axes, figure if figure is None: figure, axes = plt.subplots() #figsize=(5,5)) elif axes is None: axes = figure.subplots() y_processes = y_processes[::-1] if normalize_rows: # Swap processes y_processes, x_processes = x_processes, y_processes # Need a nested-list (not numpy array) to store Dask scalars data = [[0 for p in x_processes] for q in y_processes] for i_x, x_process in enumerate(x_processes): x_df = x_process(df) total_weight = weight(x_df).sum() for i_y, y_process in enumerate(y_processes): x_y_df = y_process(x_df) data[i_y][i_x] = weight(x_y_df).sum() / total_weight if normalize_rows: # Swap processes back y_processes, x_processes = x_processes, y_processes data = _transpose(data) data = _dask_compute(data) data = pd.DataFrame(data, columns=[p.label for p in x_processes], index=[p.label for p in y_processes]) if normalize_rows: # Swap labels y_label, x_label = x_label, y_label sns.heatmap(data, **dict(vmin=0, vmax=1, cmap="Greens", ax=axes, cbar_kws={ 'label': "$P($%s$|$%s$)$" % (y_label, x_label) } ), **kwds) if normalize_rows: # Swap labels back y_label, x_label = x_label, y_label axes.set_xlabel(x_label) axes.set_ylabel(y_label) axes.set_ylim(len(y_processes), 0) if (atlas is not False) or (info is not False): atlasify(*fill_labels(atlas, info), axes=axes, outside=True) figure.subplots_adjust(top=1/enlarge) return data
[docs]def roc(df, signal_process, background_process, discriminant, steps=100, selection=None, min=None, max=None, axes=None, weight=None, atlas=None, info=None, enlarge=1.3, return_auc=False): """ Creates a ROC. The method returns a dataframe with the signal efficiency and background rejection columns. The length of the dataframe equals the steps parameter. If return_auc is True, the method returns a tuple with the area under the curve and an uncertainty estimation on the area. """ atlas, info = fill_labels(atlas, info) # Wrap column string by variable if discriminant is None: discriminant = Variable("unity", lambda d: np.ones(len(d))) elif isinstance(discriminant, str): discriminant = Variable(discriminant, discriminant) if weight is None: weight = Variable("unity", lambda d: np.ones(len(d))) elif isinstance(weight, str): weight = Variable(weight, weight) # Handle selection if selection is None: selection = lambda df: df # Identity elif not isinstance(selection, Cut): selection = Cut(selection) # Handle axes if axes is None: fig, axes = plt.subplots() if min is None: min = discriminant(df).min() if max is None: max = discriminant(df).max() df = selection(df) signal_effs = [] background_ieffs = [] n_total_sig = weight(signal_process.selection(df)).sum() n_total_bkg = weight(background_process.selection(df)).sum() for cut_value in np.linspace(min, max, steps): residual_df = df[discriminant(df) >= cut_value] n_total = weight(residual_df).sum() if n_total == 0: continue signal_df = signal_process.selection(residual_df) background_df = background_process.selection(residual_df) n_signal = weight(signal_df).sum() n_background = weight(background_df).sum() signal_effs.append(n_signal / n_total_sig) background_ieffs.append(1 - n_background / n_total_bkg) data = pd.DataFrame({"Signal efficiency": signal_effs, "1 - Background efficiency": background_ieffs}) sns.lineplot(x="Signal efficiency", y="1 - Background efficiency", data=data, ax=axes, errorbar=None, label=discriminant.name) axes.plot([0, 1], [1, 0], color='gray', linestyle=':') axes.set_xlim((0, 1)) axes.set_ylim((0, enlarge)) axes.legend(loc=1, frameon=False) axes.tick_params("both", which="both", direction="in") axes.tick_params("both", which="major", length=6) axes.tick_params("both", which="minor", length=3) axes.tick_params("x", which="both", top=True) axes.tick_params("y", which="both", right=True) axes.xaxis.set_minor_locator(AutoMinorLocator()) axes.yaxis.set_minor_locator(AutoMinorLocator()) if (atlas is not False) or (info is not False): atlasify(*fill_labels(atlas, info), enlarge=1.0, axes=axes) if return_auc: # calculate rectangular approximations of AUC # one of these will be too high, the other too low # the difference gives a measure of uncertainty auc_approx_a = 0 auc_approx_b = 0 for i_bin in range(1,len(signal_effs)): bin_width = abs(signal_effs[i_bin] - signal_effs[i_bin-1]) auc_approx_a += bin_width * background_ieffs[i_bin] auc_approx_b += bin_width * background_ieffs[i_bin - 1] auc_mean = (auc_approx_a + auc_approx_b)/2 auc_diff = abs(auc_approx_a - auc_approx_b)/2 auc_mean = auc_mean if auc_mean > 0.5 else 1 - auc_mean return (auc_mean, auc_diff) else: return data
[docs]def correlation_matrix(df, variables, weight=None, axes=None, figure=None, atlas=None, info=None, enlarge=1.3, normalize_rows=False, **kwds): """ Plot the Pearson correlation coefficient matrix. The square matrix is returned as a DataFrame. """ atlas, info = fill_labels(atlas, info) if weight is None: weight = Variable("unity", lambda d: np.ones(len(d))) elif isinstance(weight, str): weight = Variable(weight, weight) kwds.setdefault("annot", True) kwds.setdefault("fmt", "2.0f") # Handle axes, figure if figure is None: figure, axes = plt.subplots() #figsize=(5,5)) elif axes is None: axes = figure.subplots() var_data = [v(df) for v in variables] var_data = dask.compute(*var_data) data = np.cov(np.array([v.to_numpy() for v in var_data]), aweights=weight(df)) var = np.diag(data) data = data / np.sqrt(var) data = data.T / np.sqrt(var) data = pd.DataFrame(data, columns=[v.name for v in variables], index=[v.name for v in variables]) sns.heatmap(data * 100, **dict(vmin=-100, vmax=100, cmap="RdBu_r", ax=axes, cbar_kws={ 'label': "Correlation coefficient $\\rho \\cdot 100$" } ), **kwds) axes.set_ylim(len(variables), 0) if (atlas is not False) or (info is not False): atlasify(*fill_labels(atlas, info), axes=axes, outside=True) figure.subplots_adjust(top=1/enlarge) return data