Source code for freeforestml.model

from abc import ABC, abstractmethod

import os
import sys
import h5py
import json
import socket
import warnings
import multiprocessing as mp

import numpy as np
import pandas as pd
import tensorflow as tf
import keras_tuner

from freeforestml.variable import Variable
from freeforestml.helpers import python_to_str, str_to_python

[docs]class CrossValidator(ABC): """ Abstract class of a cross validation method. """
[docs] def __init__(self, k, mod_var=None, frac_var=None): """ Creates a new cross validator. The argument k determines the number of folders. The mod_var specifies a variable whose 'mod k' value defines the set. The frac_var specifies a variable whose decimals defines the set. Only one of the two can be used. Both options can be either a string naming the column in the dataframe or a variable object. """ self.k = k if (mod_var is None) == (frac_var is None): raise TypeError("Exactly one of mod_var or frac_var must be " "used.") elif mod_var is not None: self.variable = mod_var self.mod_mode = True else: self.variable = frac_var self.mod_mode = False # Handle variable if isinstance(self.variable, str): self.variable = Variable(self.variable, self.variable)
[docs] def __eq__(self, other): """ Compare if two cross validators are the same. """ if not isinstance(other, self.__class__): return False if self.k != other.k: return False if self.mod_mode != other.mod_mode: return False if self.variable != other.variable: return False return True
[docs] @abstractmethod def select_slice(self, df, slice_id): """ Returns the index array to select all events from the dataset of a given slice. NB: This method is for internal usage only. There might be more than k slices. """
[docs] @abstractmethod def select_training(self, df, fold_i): """ Returns the index array to select all training events from the dataset for the given fold. """
[docs] @abstractmethod def select_validation(self, df, fold_i): """ Returns the index array to select all validation events from the dataset for the given fold. """
[docs] @abstractmethod def select_test(self, df, fold_i): """ Returns the index array to select all test events from the dataset for the given fold. """
[docs] def select_cv_set(self, df, cv, fold_i): """ Returns the index array to select all events from the cross validator set specified with cv ('train', 'val', 'test') for the given fold. """ if cv not in ['train', 'val', 'test']: raise ValueError("Argument 'cv' must be one of 'train', 'val', " "'test', 'all'; but was %s." % repr(cv)) if cv == "train": selected = self.select_training(df, fold_i) elif cv == "val": selected = self.select_validation(df, fold_i) else: selected = self.select_test(df, fold_i) return selected
[docs] def retrieve_fold_info(self, df, cv): """ Returns and array of integers to specify which event was used for train/val/test in which fold """ fold_info = np.zeros(len(df), dtype='bool') - 1 for fold_i in range(self.k): selected = self.select_cv_set(df, cv, fold_i) fold_info[selected] = fold_i return fold_info
[docs] def save_to_h5(self, path, key, overwrite=False): """ Save cross validator definition to a hdf5 file. 'path' is the file path and 'key' is the path inside the hdf5 file. If overwrite is true then already existing file contents are overwritten. """ if overwrite: open_mode = "w" else: open_mode = "a" with h5py.File(path, open_mode) as output_file: group = output_file.create_group(os.path.join(key)) group.attrs["class_name"] = np.string_(self.__class__.__name__) group.attrs["k"] = self.k group.attrs["mod_mode"] = self.mod_mode self.variable.save_to_h5(path, os.path.join(key, "variable"))
[docs] @classmethod def load_from_h5(cls, path, key): """ Create a new cross validator instance from an hdf5 file. 'path' is the file path and 'key' is the path inside the hdf5 file. """ with h5py.File(path, "r") as input_file: class_name = input_file[key].attrs["class_name"].decode() class_object = getattr(sys.modules[__name__], class_name) k = input_file[key].attrs["k"] mod_mode = input_file[key].attrs["mod_mode"] variable = Variable.load_from_h5(path, os.path.join(key, "variable")) if mod_mode: return class_object(k=k, mod_var=variable) else: return class_object(k=k, frac_var=variable)
[docs]class ClassicalCV(CrossValidator): """ Performs the k-fold cross validation on half of the data set. The other half is designated as the test set. fold 0: | Tr | Tr | Tr | Tr | Va | Test | fold 1: | Tr | Tr | Tr | Va | Tr | Test | fold 2: | Tr | Tr | Va | Tr | Tr | Test | fold 3: | Tr | Va | Tr | Tr | Tr | Test | fold 4: | Va | Tr | Tr | Tr | Tr | Test | Va=Validation, Tr=Training """
[docs] def select_slice(self, df, slice_id): """ Returns the index array to select all events from the dataset of a given slice. NB: This method is for internal usage only. There might be more than k slices. """ if self.mod_mode: return (self.variable(df) % (self.k * 2) == slice_id) else: variable = self.variable(df) % 1 return (slice_id / (self.k * 2.0) <= variable) \ & (variable < (slice_id + 1.0) / (self.k * 2))
[docs] def select_training(self, df, fold_i): """ Returns the index array to select all training events from the dataset for the given fold. """ selected = np.zeros(len(df), dtype='bool') for slice_i in range(self.k): if (slice_i + fold_i) % self.k == self.k - 1: continue selected = selected | self.select_slice(df, slice_i) return selected
[docs] def select_validation(self, df, fold_i): """ Returns the index array to select all validation events from the dataset for the given fold. """ return self.select_slice(df, (self.k - fold_i - 1) % self.k)
[docs] def select_test(self, df, fold_i): """ Returns the index array to select all test events from the dataset for the given fold. """ selected = np.zeros(len(df), dtype='bool') for slice_i in range(self.k, self.k * 2): selected = selected | self.select_slice(df, slice_i) return selected
class NoTestCV(CrossValidator): """ Uses the whole dataset for training and validation with a single fold. The test set is empty. fold 0: | Training | Val | The NoTestCV can be useful if the test dataset is provided independently from the training and validation, for example if a different generator is used for the training or if real-time (non-hep) data is used as a "test" set. """ def __init__(self, mod_var=None, frac_var=None, k=10): """ The parameter k defines the inverse fraction of the validation set. For example, k=5 will allocate 1/5 = 20% of the dataset for validation. """ super().__init__(k, mod_var=mod_var, frac_var=frac_var) def select_slice(self, df, slice_id): """ Returns the index array to select all events from the dataset of a given slice. NB: This method is for internal usage only. There might be more than k slices. """ if self.mod_mode: return (self.variable(df) % self.k == slice_id) else: variable = self.variable(df) % 1 return (slice_id / self.k <= variable) \ & (variable < (slice_id + 1.0) / self.k) def select_training(self, df, fold_i): """ Returns the index array to select all training events from the dataset. The fold_i parameter has no effect. """ selected = np.zeros(len(df), dtype='bool') for slice_i in range(1, self.k): selected = selected | self.select_slice(df, slice_i) return selected def select_validation(self, df, fold_i): """ Returns the index array to select all validation events from the dataset for the given fold. """ return self.select_slice(df, 0) def select_test(self, df, fold_i): """ Returns the index array to select all test events from the dataset for the given fold. The test set is empty. """ selected = np.zeros(len(df), dtype='bool') return selected class BinaryCV(CrossValidator): """ Defines a training set and a test set using a binary split. There is no independent validation set in this case. The BinaryCV should not be used for parameter optimization. fold 0: | Training | Test & Val | fold 1: | Test & Val | Training | The BinaryCV can be used after parameter optimization with ClassicalCV to retrain the model on the full half. The valiation performance contain in HepNet.history is the test performance. """ def __init__(self, mod_var=None, frac_var=None, k=None): """ k is set to 2. The argument k has no effect. """ super().__init__(2, mod_var=mod_var, frac_var=frac_var) def select_slice(self, df, slice_id): """ Returns the index array to select all events from the dataset of a given slice. NB: This method is for internal usage only. There might be more than k slices. """ if self.mod_mode: return (self.variable(df) % self.k == slice_id) else: variable = self.variable(df) % 1 return (slice_id / self.k <= variable) \ & (variable < (slice_id + 1.0) / self.k) def select_training(self, df, fold_i): """ Returns the index array to select all training events from the dataset for the given fold. """ return self.select_slice(df, fold_i) def select_validation(self, df, fold_i): """ Returns the index array to select all validation events from the dataset for the given fold. """ return self.select_slice(df, (1 + fold_i) % self.k) def select_test(self, df, fold_i): """ Returns the index array to select all test events from the dataset for the given fold. """ return self.select_slice(df, (1 + fold_i) % self.k)
[docs]class MixedCV(CrossValidator): """ Performs the k-fold cross validation where validation and test sets are both interleaved. fold 0: | Tr | Tr | Tr | Te | Va | fold 1: | Tr | Tr | Te | Va | Tr | fold 2: | Tr | Te | Va | Tr | Tr | fold 3: | Te | Va | Tr | Tr | Tr | fold 4: | Va | Tr | Tr | Tr | Te | Va=Validation, Tr=Training, Te=Test """
[docs] def select_slice(self, df, slice_id): """ Returns the index array to select all events from the dataset of a given slice. NB: This method is for internal usage only. There might be more than k slices. """ if self.mod_mode: return (self.variable(df) % self.k == slice_id) else: variable = self.variable(df) % 1 return (slice_id / self.k <= variable) \ & (variable < (slice_id + 1.0) / self.k)
[docs] def select_training(self, df, fold_i): """ Returns the index array to select all training events from the dataset for the given fold. """ selected = np.zeros(len(df), dtype='bool') for slice_i in range(self.k): if (slice_i + fold_i) % self.k == self.k - 1: continue if (slice_i + fold_i) % self.k == self.k - 2: continue selected = selected | self.select_slice(df, slice_i) return selected
[docs] def select_validation(self, df, fold_i): """ Returns the index array to select all validation events from the dataset for the given fold. """ return self.select_slice(df, (self.k - fold_i - 1) % self.k)
[docs] def select_test(self, df, fold_i): """ Returns the index array to select all test events from the dataset for the given fold. """ return self.select_slice(df, (self.k - fold_i - 2) % self.k)
[docs]class Normalizer(ABC): """ Abstract normalizer which shift and scales the distribution such that it hash zero mean and unit width. """
[docs] @abstractmethod def __init__(self, df, input_list=None): """ Returns a normalizer object with the normalization moments stored internally. The input_list argument specifies which inputs should be normalized. All other columns are left untouched. """
[docs] @abstractmethod def __call__(self, df): """ Applies the normalized of the input_columns to the given dataframe and returns a normalized copy. """
[docs] @abstractmethod def __eq__(self, other): """ Check if two normalizers are the same. """
@property @abstractmethod def scales(self): """ Every normalizor must reduce to a simple (offset + scale * x) normalization to be used with lwtnn. This property returns the scale parameters for all variables. """ @property @abstractmethod def offsets(self): """ Every normalizor must reduce to a simple (offset + scale * x) normalization to be used with lwtnn. This property returns the offset parameters for all variables. """
[docs] def save_to_h5(self, path, key, overwrite=False): """ Save normalizer definition to a hdf5 file. 'path' is the file path and 'key' is the path inside the hdf5 file. If overwrite is true then already existing file contents are overwritten. """ if overwrite: open_mode = "w" else: open_mode = "a" with h5py.File(path, open_mode) as output_file: group = output_file.create_group(os.path.join(key)) group.attrs["class_name"] = np.string_(self.__class__.__name__) self._save_to_h5(path, key)
@abstractmethod def _save_to_h5(self, path, key): """ Save child class specific definitions to a hdf5 file. 'path' is the file path and 'key' is the path inside the hdf5 file. If overwrite is true then already existing file contents are overwritten. """
[docs] @classmethod def load_from_h5(cls, path, key): """ Create a new normalizer instance from an hdf5 file. 'path' is the file path and 'key' is the path inside the hdf5 file. """ with h5py.File(path, "r") as input_file: if key not in input_file: return None class_name = input_file[key].attrs["class_name"].decode() class_object = getattr(sys.modules[__name__], class_name) return class_object._load_from_h5(path, key)
@classmethod @abstractmethod def _load_from_h5(cls, path, key): """ Load child class specific definitions from a hdf5 file. """
[docs]class EstimatorNormalizer(Normalizer): """ Normalizer which uses estimators to compute the normalization moments. This method might be lead to sub-optimal results if there are outliers. """
[docs] def __init__(self, df, input_list=None, center=None, width=None): """ See base class. """ if center is not None and width is not None: self.center = center self.width = width else: if input_list is not None: if isinstance(input_list[0],list): input_list = sum(input_list, []) input_list = sorted(set(input_list), key=input_list.index) df = df[input_list] else: df = df[input_list] self.center = df.mean() self.width = df.std() self.width[self.width == 0] = 1
[docs] def __call__(self, df): """ See base class. """ input_list = list(self.center.index) normed = (df[input_list] - self.center) / self.width aux_list = [c for c in df.columns if c not in input_list] normed[aux_list] = df[aux_list] return normed
[docs] def __eq__(self, other): """ See base class. """ if not isinstance(other, self.__class__): return False if not self.center.equals(other.center): return False if not self.width.equals(other.width): return False return True
def _save_to_h5(self, path, key): """ See base class. """ self.center.to_hdf(path, key=os.path.join(key, "center")) self.width.to_hdf(path, key=os.path.join(key, "width")) @classmethod def _load_from_h5(cls, path, key): """ See base class. """ center = pd.read_hdf(path, os.path.join(key, "center")) width = pd.read_hdf(path, os.path.join(key, "width")) return cls(None, center=center, width=width) @property def scales(self): return 1 / self.width @property def offsets(self): return -self.center / self. width
class IdentityNormalizer(Normalizer): ''' Has no effect on the df. Basically, UnitNormalizer(df) returns df ''' def __init__(self, df, input_list=None, center=None, width=None): if center is not None and width is not None: self.center = center self.width = width else: if input_list is not None: if isinstance(input_list[0],list): input_list = sum(input_list, []) input_list = sorted(set(input_list), key=input_list.index) df = df[input_list] else: df = df[input_list] keys = df.keys() self.center = pd.Series({name: 0.0 for name in keys}) self.width = pd.Series({name: 1.0 for name in keys}) def __call__(self, df): return df def __eq__(self, other): if not isinstance(other, self.__class__): return False if not self.center.equals(other.center): return False if not self.width.equals(other.width): return False return True def _save_to_h5(self, path, key): self.center.to_hdf(path, key=os.path.join(key, "center")) self.width.to_hdf(path, key=os.path.join(key, "width")) @classmethod def _load_from_h5(cls, path, key): center = pd.read_hdf(path, os.path.join(key, "center")) width = pd.read_hdf(path, os.path.join(key, "width")) return cls(None, center=center, width=width) @property def scales(self): return 1.0 @property def offsets(self): return -0.0 def normalize_category_weights(df, categories, weight='weight'): """ The categorical weight normalizer acts on the weight variable only. The returned dataframe will satisfy the following conditions: - The sum of weights of all events is equal to the total number of entries. - The sum of weights of a category is equal to the total number of entries divided by the number of classes. Therefore the sum of weights of two categories are equal. - The relative weights within a category are unchanged. """ df_out = df[:] w_norm = np.empty(len(df)) for category in categories: idx = category.idx_array(df) w_norm[idx] = df[idx][weight].sum() df_out[weight] = df_out[weight] / w_norm * len(df) / len(categories) return df_out
[docs]class HepNet: """ Meta model of a concrete neural network around the underlying Keras model. The HEP net handles cross validation, normalization of the input variables, the input weights, and the actual Keras model. A HEP net has no free parameters. """
[docs] def __init__(self, keras_model, cross_validator, normalizer, input_list, output_list): """ Creates a new HEP model. The keras model parameter must be a class that returns a new instance of the compiled model (The HEP net needs to able to create multiple models, one for each cross validation fold.) The cross_validator must be a CrossValidator object. The normalizer must be a Normalizer class that returns a normalizer. Each cross_validation fold uses a separate normalizer with independent normalization weights. The input and output lists are lists of variables of column names used as input and target of the keras model. The input is normalized. """ self.model_cls = keras_model self.cv = cross_validator self.norm_cls = IdentityNormalizer if normalizer==None else normalizer self.input_list = input_list self.output_list = output_list self.norms = [] self.models = [] self.history = pd.DataFrame()
[docs] def __eq__(self, other): """ Check if two models have the same configuration. """ if not isinstance(other, self.__class__): return False if python_to_str(self.model_cls) != python_to_str(other.model_cls): return False if self.cv != other.cv: return False if python_to_str(self.norm_cls) != python_to_str(other.norm_cls): return False if self.input_list != other.input_list: return False if self.output_list != other.output_list: return False if self.norms != other.norms: return False if (self.history != other.history).all().all(): return False return True
def _get_model_path(self, path, fold_i): """ Returns the path of the fold_i model """ path_token = list( os.path.splitext(path) ) path_token.insert(1, f".fold_{fold_i}") return "".join(path_token)
[docs] def fit(self, df, weight=None, **kwds): """ Calls fit() on all folds. All kwds are passed to fit(). """ if weight is None: weight = Variable("unity", lambda d: np.ones(len(d))) elif isinstance(weight, str): weight = Variable(weight, weight) ### Loop over folds: self.norms = [] self.models = [] self.history = pd.DataFrame() for fold_i in range(self.cv.k): # select training set selected = self.cv.select_training(df, fold_i) training_df = df[selected] # select validation set selected = self.cv.select_validation(df, fold_i) validation_df = df[selected] # seed normalizers norm = self.norm_cls(training_df, self.input_list) self.norms.append(norm) training_df = norm(training_df) validation_df = norm(validation_df) # fit folds model = self.model_cls() self.models.append(model) # input_list training and validation dataframes if isinstance(self.input_list[0], list): input_training_df = [ training_df[l] for l in self.input_list ] input_validation_df = [ validation_df[l] for l in self.input_list ] else: input_training_df = training_df[self.input_list] input_validation_df = validation_df[self.input_list] # output_list training and validation dataframes if isinstance(self.output_list[0], list): output_training_df = [ training_df[l] for l in self.output_list ] output_validation_df = [ validation_df[l] for l in self.output_list ] else: output_training_df = training_df[self.output_list] output_validation_df = validation_df[self.output_list] history = model.fit(input_training_df, output_training_df, validation_data=( input_validation_df, output_validation_df, np.array(weight(validation_df)), ), sample_weight=np.array(weight(training_df)), **kwds) history = history.history history['fold'] = np.ones(len(history['loss']), dtype='int') * fold_i history['epoch'] = np.arange(len(history['loss'])) self.history = pd.concat([self.history, pd.DataFrame(history)])
[docs] def predict(self, df, cv='val', retrieve_fold_info = False, **kwds): """ Calls predict() on the Keras model. The argument cv specifies the cross validation set to select: 'train', 'val', 'test'. Default is 'val'. All other keywords are passed to predict. """ if cv not in ['train', 'val', 'test']: raise ValueError("Argument 'cv' must be one of 'train', 'val', " "'test', 'all'; but was %s." % repr(cv)) try: out = np.zeros((len(df), len( list( np.concatenate(self.output_list) ) ) )) except ValueError: out = np.zeros((len(df), len(self.output_list))) if isinstance(self.input_list[0],list): flat_input_list = sum(self.input_list, []) flat_input_list = sorted(set(flat_input_list), key=flat_input_list.index) else: flat_input_list = self.input_list test_set = np.zeros(len(df), dtype='bool') for fold_i in range(self.cv.k): model = self.models[fold_i] norm = self.norms[fold_i] # identify fold selected = self.cv.select_cv_set(df, cv, fold_i) test_set |= selected df_inputs = norm(df[selected][flat_input_list]) if isinstance(self.input_list[0],list): df_inputs = [ df_inputs[i] for i in self.input_list] if isinstance(self.output_list[0], list): out[selected] = np.concatenate( model.predict(df_inputs, **kwds), axis=1 ) else: out[selected] = model.predict(df_inputs, **kwds) test_df = df[test_set] out = out[test_set].transpose() try: out = dict(zip(["pred_" + s for s in list( np.concatenate(self.output_list) )], out)) except ValueError: out = dict(zip(["pred_" + s for s in self.output_list], out)) test_df = test_df.assign(**out) if retrieve_fold_info: fold = {cv + "_fold" : self.cv.retrieve_fold_info(df, cv)} test_df = test_df.assign(**fold) return test_df
[docs] def save(self, path): """ Save the model and all associated components to a hdf5 file. """ # save model architecture and weights (only if already trained) if len(self.models) == self.cv.k: for fold_i in range(self.cv.k): path_token = self._get_model_path(path, fold_i) # this is the built-in save function from keras self.models[fold_i].save(path_token) with h5py.File(path, "w") as output_file: # save default model class # since this is a arbitrary piece of python code we need to use the python_to_str function group = output_file.create_group("models/default") group.attrs["model_cls"] = np.string_(python_to_str(self.model_cls)) # save class name of default normalizer as string group = output_file.create_group("normalizers/default") group.attrs["norm_cls"] = np.string_(self.norm_cls.__name__) # save cross_validator self.cv.save_to_h5(path, "cross_validator") # save normalizer (only if already trained) if len(self.norms) == self.cv.k: for fold_i in range(self.cv.k): self.norms[fold_i].save_to_h5(path, "normalizers/fold_{}".format(fold_i)) # save input/output lists if isinstance(self.input_list[0], list): for i in range(len(self.input_list)): pd.DataFrame(self.input_list[i]).to_hdf(path, "input_list"+str(i)) else: pd.DataFrame(self.input_list).to_hdf(path, "input_list0") if isinstance(self.output_list[0], list): for i in range(len(self.output_list)): pd.DataFrame(self.output_list[i]).to_hdf(path, "output_list"+str(i)) else: pd.DataFrame(self.output_list).to_hdf(path, "output_list0") # save training history self.history.to_hdf(path, "history")
[docs] @classmethod def load(cls, path, **kwds): """ Restore a model from a hdf5 file. """ # load default model and normalizer with h5py.File(path, "r") as input_file: model = str_to_python(input_file["models/default"].attrs["model_cls"].decode()) normalizer_class_name = input_file["normalizers/default"].attrs["norm_cls"].decode() normalizer = getattr(sys.modules[__name__], normalizer_class_name) # load cross validator cv = CrossValidator.load_from_h5(path, "cross_validator") # load input/output lists input_list = [] output_list = [] for i in range(100): try: output_list.append( list(pd.read_hdf(path, "output_list"+str(i))[0]) ) except KeyError: break if len(output_list) == 1: output_list = output_list[0] for i in range(100): try: input_list.append( list(pd.read_hdf(path, "input_list"+str(i))[0]) ) except KeyError: break if len(input_list) == 1: input_list = input_list[0] # create instance instance = cls(model, cv, normalizer, input_list, output_list) # load history history = pd.read_hdf(path, "history") instance.history = history # load trained models (if existing) with h5py.File(path, "r") as input_file: for fold_i in range(cv.k): path_token = instance._get_model_path(path, fold_i) model = tf.keras.models.load_model(path_token, **kwds) instance.models.append(model) # load normalizer for fold_i in range(cv.k): norm = Normalizer.load_from_h5(path, "normalizers/fold_{}".format(fold_i)) if norm is not None: instance.norms.append(norm) return instance
[docs] def export(self, path_base, command="converters/keras2json.py", expression={}): """ Exports the network such that it can be converted to lwtnn's json format. The method generate a set of files for each cross validation fold. For every fold, the archtecture, the weights, the input variables and their normalization is exported. To simplify the conversion to lwtnn's json format, the method also creates a bash script which converts all folds. The path_base argument should be a path or a name of the network. The names of the generated files are created by appending to path_base. The optional expression can be used to inject the CAF expression when the NN is used. The final json file will contain an entry KEY=VALUE if a variable matches the dict key. """ for fold_i in range(self.cv.k): # get the architecture as a json string arch = self.models[fold_i].to_json() # save the architecture string to a file somehow, the below will work with open('%s_arch_%d.json' % (path_base, fold_i), 'w') as arch_file: arch_file.write(arch) # now save the weights as an HDF5 file self.models[fold_i].save_weights('%s_wght_%d.h5' % (path_base, fold_i)) with open("%s_vars_%d.json" % (path_base, fold_i), "w") \ as variable_file: if isinstance(self.models[fold_i], tf.keras.Sequential): scales = self.norms[fold_i].scales offsets = self.norms[fold_i].offsets offsets = [o / s for o, s in zip(offsets, scales)] variables = [("%s=%s" % (v, expression[v])) if v in expression else v for v in self.input_list] inputs = [dict(name=v, offset=o, scale=s) for v, o, s in zip(variables, offsets, scales)] json.dump(dict(inputs=inputs, class_labels=self.output_list), variable_file) else: output_layers = [ layer[0] for layer in json.loads(arch)['config']['output_layers'] ] scales_flat = self.norms[fold_i].scales.tolist() offsets_flat = self.norms[fold_i].offsets offsets_flat = [o / s for o, s in zip(offsets_flat, scales_flat)] if isinstance(self.input_list[0],list): lengths_acc = 0 lengths = [0] for entry in self.input_list: lengths_acc += len(entry) lengths += [ lengths_acc ] scales = [ scales_flat[i:j] for i,j in zip(lengths[:-1], lengths[1:]) ] offsets = [ offsets_flat[i:j] for i,j in zip(lengths[:-1], lengths[1:]) ] variables = [ [("%s=%s" % (v, expression[v])) if v in expression else v for v in inner_list] for inner_list in self.input_list ] inputs = [] for i in range(len(variables)): _variables = [dict(name=v, offset=o, scale=s) for v, o, s in zip(variables[i], offsets[i], scales[i])] inputs += [ dict(name='node_'+str(i), variables=_variables) ] else: scales = scales_flat offsets = [offsets_flat] variables = [ [("%s=%s" % (v, expression[v])) if v in expression else v for v in self.input_list] ] inputs = [dict(name=v, offset=o, scale=s) for v, o, s in zip(variables, offsets, scales)] outputs = [] if isinstance(self.output_list[0],list): for i in range(len(self.output_list)): outputs += [ dict(name=output_layers[i], labels=self.output_list[i]) ] else: outputs = [ dict(name=output_layers[0], labels=self.output_list[i]) ] json.dump( dict(input_sequences=[], inputs=inputs, outputs=outputs), variable_file ) mode = "w" if fold_i == 0 else "a" if isinstance(self.models[fold_i], tf.keras.Sequential): with open("%s.sh" % path_base, mode) as script_file: print(f"{command} {path_base}_arch_{fold_i}.json " f"{path_base}_vars_{fold_i}.json " f"{path_base}_wght_{fold_i}.h5 " f"> {path_base}_{fold_i}.json", file=script_file) else: with open("%s.sh" % path_base, mode) as script_file: print(f"{command} {path_base}_arch_{fold_i}.json " f"{path_base}_wght_{fold_i}.h5 " f"{path_base}_vars_{fold_i}.json " f"> {path_base}_{fold_i}.json", file=script_file)
class HepNetSearch: """ A hyperparameter tuner/search for HepNet, based on keras-tuner. The class support multi-processing and distributed hyperparameter tuning on demand. """ def __init__(self, keras_model, tuner_name, cross_validator, normalizer, input_list, output_list, ETH_IP=None): """ HepNetSearch arguments tightly follow those of HepNet, except for the following: tuner_name: Name of the keras-tuner class {RandomSearch, BayesianOptimization, Hyperband} ETH_IP: In case distributed training is planned in the form of a chief-worker model, the Ethernet IP has to be passed. """ self.model = keras_model self.cv = cross_validator self.norm_cls = normalizer self.input_list = input_list self.output_list = output_list self.norms = [] self.tuner_settings = None self.ETH_IP = ETH_IP if tuner_name not in ['Hyperband', 'RandomSearch', 'BayesianOptimization', 'GridSearch']: warnings.warn("%s is not a tested tuner, the program might break.\nTested tuners: " % (tuner_name,repr(tuner)) ) else: self.tuner_name = tuner_name def _get_eth_ip(self): ''' Fetch ethernet IP ''' s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.settimeout(0) try: # doesn't have to be reachable s.connect(('10.254.254.254', 1)) IP = s.getsockname()[0] except Exception: IP = '127.0.0.1' finally: s.close() return IP def _get_model(self, trial_index=None, fold=0): """ Get the model of interest by trial_index. By default, the function returns the best trial summary with no tolerance. """ if trial_index==None: trial_index = self.book(sort=True, filter_nan=True).index[0] hps_dict = self.search_book.iloc[trial_index][self.hps_str].to_dict() oracle_trials = len(self.oracles[fold].get_state()['tried_so_far']) hps_trials = self.tuners[fold].get_best_hyperparameters(num_trials=oracle_trials) trial_position = None for i in range(oracle_trials): #remove non-hps items before comparison hps_trial = hps_trials[i].values hps_trial = { key:hps_trial[key] for key, value in hps_dict.items() } if hps_dict == hps_trial: trial_position = i break if trial_position == None: raise ValueError("No trained model in the (%d) fold matches the hyperparameter set" "indexed at (%d):\n%s" % (fold, trial_index, hps_dict)) hps = hps_trials[trial_index] model = self.tuners[fold].hypermodel.build(hps) return model def set_tuner(self, **kwds): """ Keras tuner kwds """ self.tuner_settings = kwds def search(self, df, weight=None, Nfmp= False, distribute= False, tuner_id= None, **kwds): """ Perform the hyperparameter search. weight: Training weights. Nfmp: Execute Nfold fits in parallel as multiple processes (multi-processing). distribute: Allow distributed training. tuner_id: Chief-worker model ID. This is either 'chief' or 'tunerX', where X is a unique tuner number. **kwds: Passed directly to fit() """ #Meant for internal use only self.tuner_id = tuner_id if self.tuner_settings == None: raise ValueError("The tuner is not yet set. You need to setup tuner_settings as a dictionary kwds for the Keras tuner.") if weight is None: weight = Variable("unity", lambda d: np.ones(len(d))) elif isinstance(weight, str): weight = Variable(weight, weight) #Perform the tuning/search on each fold and the oracles and tuners self.norms = [] oracles = [] tuners = [] multi_tuner = tuner_mp(self) if Nfmp: #Run multiprocessing jobs q= mp.Queue() procs=[] for fold_i in range(self.cv.k): p= mp.Process(target=multi_tuner.search_body,args=(fold_i,df,weight,None,None,distribute,kwds,tuner_id)) procs.append(p) p.start() for p in procs: p.join() else: for fold_i in range(self.cv.k): multi_tuner.search_body(fold_i,df,weight,tuners,oracles,distribute,kwds,tuner_id) #On search completion, register the combined Nfold searches if tuner_id == "chief" or tuner_id == None: for fold_i in range(self.cv.k): multi_tuner.search_body(fold_i,df,weight,tuners,oracles,distribute,kwds,tuner_id,noTraining=True) #Fetch hyperparameter names hps_str=[] for i in range(len(oracles[0].get_best_trials()[0].get_state()['hyperparameters']['space'])): hps_str.append(oracles[0].get_best_trials()[0].get_state()['hyperparameters']['space'][i]['config']['name']) self.hps_str = hps_str #Evaluate validation mean and std score across folds search_book = [] fold_0_Ntrials=len(list(oracles[0].trials.values())) for j in range(fold_0_Ntrials): fold_0_trial_j = oracles[0].get_best_trials(num_trials=fold_0_Ntrials)[j] #fold_0 oracle, trial_j fold_0_hps_j = fold_0_trial_j.get_state()['hyperparameters']['values'] #dictionary of fold_0 oracle trial_j ... #... ALL hyperparameter values fold_0_hps_j = {p:fold_0_hps_j[p] for p in hps_str} #only model hyperparameters scores={'fold_0_score': fold_0_trial_j.get_state()['score']} #Register fold_0 trial_j score #search the fold_i oracle (oracle_i) for the matching trial_m using the hyperparameter values then append the score for i in range(1,self.cv.k): fold_i_Ntrials = len(list(oracles[i].trials.values())) fold_i_trials = oracles[i].get_best_trials(num_trials=fold_i_Ntrials) for m in range(len(fold_i_trials)): fold_i_trial_m = fold_i_trials[m] fold_i_hps_m = fold_i_trial_m.get_state()['hyperparameters']['values'] if fold_0_hps_j.items() <= fold_i_hps_m.items(): scores['fold_'+str(i)+'_score'] = fold_i_trial_m.get_state()['score'] break search_book.append({**fold_0_hps_j,**scores}) #Convert search_book to a dataframe then evaluate mean and std search_book = pd.DataFrame(search_book) scores_book = search_book.drop(hps_str,axis=1) hps_book = search_book.drop(scores_book.columns,axis=1) if len(scores_book>1): search_book = search_book.assign(mean=np.mean(scores_book,axis=1)) search_book = search_book.assign(std=np.std(scores_book,axis=1)) else: search_book = search_book.assign(mean=np.mean(scores_book,axis=1)) search_book = search_book.assign(std=[0]*len(scores_book)) self.tuners= tuners self.oracles = oracles self.search_book = search_book self.hps_book = hps_book self.scores_book = scores_book def book(self, tolerance=np.inf, sort=False, filter_nan=False): """ Return the search_book of the HPO. tolerence: A positive number defining the tolernce level of the tracked metric validation std. Models with Nfold_std(val_metric) > tolerence will be discarded from the search_book. sort: Sort by best score mean filter_nan: Remove trials with std of nan """ if tolerance <= 0: raise ValueError("Tolerance value must either be a positive number.") #The following if statement is to support custom objective methods if isinstance(self.tuner_settings['objective'],str): objective_name = self.tuner_settings['objective'] elif isinstance(self.tuner_settings['objective'], keras_tuner.Objective): objective_name = self.tuner_settings['objective'].name else: raise TypeError("The objective must be a string or tf.keras.Objective") score_direction=self.oracles[0].get_best_trials()[0].get_state()\ ['metrics']['metrics'][objective_name]['direction'] if score_direction=='min': ascending=True elif score_direction=='max': ascending=False else: ascending=False print('Warning: Objective direction is neither max nor min! Defaulted to descending order for optimal trial mean.') ###TODO: The following is a work around dropped trials. It is indeed a performance issue. ###1-It affects run time 2-It affects the optimal hyperparameter set ###Currently, the best course of action is to seed the models identically #Remove tuner dropped trials if filter_nan: dropped_trial_indices = self.search_book[self.search_book.isnull().any(axis=1)].index.tolist() search_book = self.search_book.drop(dropped_trial_indices) else: search_book = self.search_book #Get score direction method then sort accordingly if sort: score_direction = getattr(np,score_direction) search_book = search_book.sort_values(by=['mean'],axis=0,ascending=ascending) #Filter by tolerance level search_book = search_book[search_book['std']<=tolerance] return search_book def trial_summary(self, trial_index=None, detailed=False, **kwds): """ Summary of the model at a specific trial_index. By default, the function returns the best trial summary with no tolerance. """ if trial_index==None: trial_index = self.book(sort=True, filter_nan=True).index[0] trial=dict(self.search_book.loc[trial_index]) print("Index: %s" %trial_index) print("Mean score: %s \nstd: %s" %(trial['mean'],trial['std'])) print("Hyperparameters:") for key in self.hps_str: print("\t%s: %s" %(key,trial[key])) print('\nTrial model summary:') model=self._get_model(trial_index=trial_index) model.summary(**kwds) if detailed: print('\n') print( json.dumps(model.get_config(), indent=1) ) class tuner_mp(object): def __init__(self,class_object): self.__dict__= class_object.__dict__.copy() def search_body(self,fold_i,df,weight,tuners,oracles,distribute,kwds,tuner_id,noTraining=False): #Set chief/worker environment communication ports if distribute and not noTraining: if tuner_id == None: raise ValueError("tuner_id was not passed.") os.environ["KERASTUNER_TUNER_ID"]= tuner_id os.environ["KERASTUNER_ORACLE_IP"]= self._get_eth_ip() if self.ETH_IP==None else self.ETH_IP os.environ["KERASTUNER_ORACLE_PORT"] = str(47808+fold_i) importlib.reload(keras_tuner) print("Fold:%s\t ID:%s IP:%s PORT:%s"%(fold_i,os.getenv("KERASTUNER_TUNER_ID"),os.getenv("KERASTUNER_ORACLE_IP"),os.getenv("KERASTUNER_ORACLE_PORT"))) if distribute and noTraining: try: del os.environ["KERASTUNER_TUNER_ID"] del os.environ["KERASTUNER_ORACLE_IP"] del os.environ["KERASTUNER_ORACLE_PORT"] except: pass print("Fold:%s\t ID:%s IP:%s PORT:%s"%(fold_i,os.getenv("KERASTUNER_TUNER_ID"),os.getenv("KERASTUNER_ORACLE_IP"),os.getenv("KERASTUNER_ORACLE_PORT"))) #Constrain memory growth on physical GPUs physical_devices = tf.config.list_physical_devices('GPU') try: tf.config.experimental.set_memory_growth(physical_devices[0], True) except: # In case of CPU or virtual devices pass # select training set selected = self.cv.select_training(df, fold_i) training_df = df[selected] # select validation set selected = self.cv.select_validation(df, fold_i) validation_df = df[selected] # seed normalizers norm = self.norm_cls(training_df, self.input_list) self.norms.append(norm) training_df = norm(training_df) validation_df = norm(validation_df) # search in fold tuner = getattr(keras_tuner,self.tuner_name) tuner_settings_i = self.tuner_settings.copy() if 'logger' in tuner_settings_i: tuner_settings_i['logger'].fold = fold_i tuner_settings_i.update({'project_name':self.tuner_settings['project_name']+'_'+str(fold_i)}) tuner = tuner(self.model,**tuner_settings_i) tuner.search(training_df[self.input_list], training_df[self.output_list], validation_data=( validation_df[self.input_list], validation_df[self.output_list], np.array(weight(validation_df)), ), sample_weight=np.array(weight(training_df)), **kwds) #append for completly saved search if tuners!=None and oracles!=None: tuners.append(tuner) oracles.append(tuner.oracle) del tuner