from abc import ABC, abstractmethod
import os
import sys
import h5py
import json
import socket
import warnings
import multiprocessing as mp
import numpy as np
import pandas as pd
import tensorflow as tf
import keras_tuner
from freeforestml.variable import Variable
from freeforestml.helpers import python_to_str, str_to_python
[docs]class CrossValidator(ABC):
"""
Abstract class of a cross validation method.
"""
[docs] def __init__(self, k, mod_var=None, frac_var=None):
"""
Creates a new cross validator. The argument k determines the number of
folders. The mod_var specifies a variable whose 'mod k' value defines
the set. The frac_var specifies a variable whose decimals defines the
set. Only one of the two can be used. Both options can be either a
string naming the column in the dataframe or a variable object.
"""
self.k = k
if (mod_var is None) == (frac_var is None):
raise TypeError("Exactly one of mod_var or frac_var must be "
"used.")
elif mod_var is not None:
self.variable = mod_var
self.mod_mode = True
else:
self.variable = frac_var
self.mod_mode = False
# Handle variable
if isinstance(self.variable, str):
self.variable = Variable(self.variable, self.variable)
[docs] def __eq__(self, other):
"""
Compare if two cross validators are the same.
"""
if not isinstance(other, self.__class__):
return False
if self.k != other.k:
return False
if self.mod_mode != other.mod_mode:
return False
if self.variable != other.variable:
return False
return True
[docs] @abstractmethod
def select_slice(self, df, slice_id):
"""
Returns the index array to select all events from the dataset of a
given slice.
NB: This method is for internal usage only. There might be more than k
slices.
"""
[docs] @abstractmethod
def select_training(self, df, fold_i):
"""
Returns the index array to select all training events from the dataset for the
given fold.
"""
[docs] @abstractmethod
def select_validation(self, df, fold_i):
"""
Returns the index array to select all validation events from the dataset for the
given fold.
"""
[docs] @abstractmethod
def select_test(self, df, fold_i):
"""
Returns the index array to select all test events from the dataset for the
given fold.
"""
[docs] def select_cv_set(self, df, cv, fold_i):
"""
Returns the index array to select all events from the cross validator
set specified with cv ('train', 'val', 'test') for the given fold.
"""
if cv not in ['train', 'val', 'test']:
raise ValueError("Argument 'cv' must be one of 'train', 'val', "
"'test', 'all'; but was %s." % repr(cv))
if cv == "train":
selected = self.select_training(df, fold_i)
elif cv == "val":
selected = self.select_validation(df, fold_i)
else:
selected = self.select_test(df, fold_i)
return selected
[docs] def retrieve_fold_info(self, df, cv):
"""
Returns and array of integers to specify which event was used
for train/val/test in which fold
"""
fold_info = np.zeros(len(df), dtype='bool') - 1
for fold_i in range(self.k):
selected = self.select_cv_set(df, cv, fold_i)
fold_info[selected] = fold_i
return fold_info
[docs] def save_to_h5(self, path, key, overwrite=False):
"""
Save cross validator definition to a hdf5 file.
'path' is the file path and 'key' is the path inside the hdf5 file.
If overwrite is true then already existing file contents are overwritten.
"""
if overwrite:
open_mode = "w"
else:
open_mode = "a"
with h5py.File(path, open_mode) as output_file:
group = output_file.create_group(os.path.join(key))
group.attrs["class_name"] = np.string_(self.__class__.__name__)
group.attrs["k"] = self.k
group.attrs["mod_mode"] = self.mod_mode
self.variable.save_to_h5(path, os.path.join(key, "variable"))
[docs] @classmethod
def load_from_h5(cls, path, key):
"""
Create a new cross validator instance from an hdf5 file.
'path' is the file path and 'key' is the path inside the hdf5 file.
"""
with h5py.File(path, "r") as input_file:
class_name = input_file[key].attrs["class_name"].decode()
class_object = getattr(sys.modules[__name__], class_name)
k = input_file[key].attrs["k"]
mod_mode = input_file[key].attrs["mod_mode"]
variable = Variable.load_from_h5(path, os.path.join(key, "variable"))
if mod_mode:
return class_object(k=k, mod_var=variable)
else:
return class_object(k=k, frac_var=variable)
[docs]class ClassicalCV(CrossValidator):
"""
Performs the k-fold cross validation on half of the data set. The other
half is designated as the test set.
fold 0: | Tr | Tr | Tr | Tr | Va | Test |
fold 1: | Tr | Tr | Tr | Va | Tr | Test |
fold 2: | Tr | Tr | Va | Tr | Tr | Test |
fold 3: | Tr | Va | Tr | Tr | Tr | Test |
fold 4: | Va | Tr | Tr | Tr | Tr | Test |
Va=Validation, Tr=Training
"""
[docs] def select_slice(self, df, slice_id):
"""
Returns the index array to select all events from the dataset of a
given slice.
NB: This method is for internal usage only. There might be more than k
slices.
"""
if self.mod_mode:
return (self.variable(df) % (self.k * 2) == slice_id)
else:
variable = self.variable(df) % 1
return (slice_id / (self.k * 2.0) <= variable) \
& (variable < (slice_id + 1.0) / (self.k * 2))
[docs] def select_training(self, df, fold_i):
"""
Returns the index array to select all training events from the dataset for the
given fold.
"""
selected = np.zeros(len(df), dtype='bool')
for slice_i in range(self.k):
if (slice_i + fold_i) % self.k == self.k - 1:
continue
selected = selected | self.select_slice(df, slice_i)
return selected
[docs] def select_validation(self, df, fold_i):
"""
Returns the index array to select all validation events from the dataset for the
given fold.
"""
return self.select_slice(df, (self.k - fold_i - 1) % self.k)
[docs] def select_test(self, df, fold_i):
"""
Returns the index array to select all test events from the dataset for the
given fold.
"""
selected = np.zeros(len(df), dtype='bool')
for slice_i in range(self.k, self.k * 2):
selected = selected | self.select_slice(df, slice_i)
return selected
class NoTestCV(CrossValidator):
"""
Uses the whole dataset for training and validation with a single fold. The
test set is empty.
fold 0: | Training | Val |
The NoTestCV can be useful if the test dataset is provided independently
from the training and validation, for example if a different generator is
used for the training or if real-time (non-hep) data is used as a "test"
set.
"""
def __init__(self, mod_var=None, frac_var=None, k=10):
"""
The parameter k defines the inverse fraction of the validation set.
For example, k=5 will allocate 1/5 = 20% of the dataset for validation.
"""
super().__init__(k, mod_var=mod_var, frac_var=frac_var)
def select_slice(self, df, slice_id):
"""
Returns the index array to select all events from the dataset of a
given slice.
NB: This method is for internal usage only. There might be more than k
slices.
"""
if self.mod_mode:
return (self.variable(df) % self.k == slice_id)
else:
variable = self.variable(df) % 1
return (slice_id / self.k <= variable) \
& (variable < (slice_id + 1.0) / self.k)
def select_training(self, df, fold_i):
"""
Returns the index array to select all training events from the
dataset. The fold_i parameter has no effect.
"""
selected = np.zeros(len(df), dtype='bool')
for slice_i in range(1, self.k):
selected = selected | self.select_slice(df, slice_i)
return selected
def select_validation(self, df, fold_i):
"""
Returns the index array to select all validation events from the dataset for the
given fold.
"""
return self.select_slice(df, 0)
def select_test(self, df, fold_i):
"""
Returns the index array to select all test events from the dataset for the
given fold. The test set is empty.
"""
selected = np.zeros(len(df), dtype='bool')
return selected
class BinaryCV(CrossValidator):
"""
Defines a training set and a test set using a binary split. There is no
independent validation set in this case. The BinaryCV should not be used
for parameter optimization.
fold 0: | Training | Test & Val |
fold 1: | Test & Val | Training |
The BinaryCV can be used after parameter optimization with ClassicalCV to
retrain the model on the full half. The valiation performance contain in
HepNet.history is the test performance.
"""
def __init__(self, mod_var=None, frac_var=None, k=None):
"""
k is set to 2. The argument k has no effect.
"""
super().__init__(2, mod_var=mod_var, frac_var=frac_var)
def select_slice(self, df, slice_id):
"""
Returns the index array to select all events from the dataset of a
given slice.
NB: This method is for internal usage only. There might be more than k
slices.
"""
if self.mod_mode:
return (self.variable(df) % self.k == slice_id)
else:
variable = self.variable(df) % 1
return (slice_id / self.k <= variable) \
& (variable < (slice_id + 1.0) / self.k)
def select_training(self, df, fold_i):
"""
Returns the index array to select all training events from the dataset for the
given fold.
"""
return self.select_slice(df, fold_i)
def select_validation(self, df, fold_i):
"""
Returns the index array to select all validation events from the dataset for the
given fold.
"""
return self.select_slice(df, (1 + fold_i) % self.k)
def select_test(self, df, fold_i):
"""
Returns the index array to select all test events from the dataset for the
given fold.
"""
return self.select_slice(df, (1 + fold_i) % self.k)
[docs]class MixedCV(CrossValidator):
"""
Performs the k-fold cross validation where validation and test sets are
both interleaved.
fold 0: | Tr | Tr | Tr | Te | Va |
fold 1: | Tr | Tr | Te | Va | Tr |
fold 2: | Tr | Te | Va | Tr | Tr |
fold 3: | Te | Va | Tr | Tr | Tr |
fold 4: | Va | Tr | Tr | Tr | Te |
Va=Validation, Tr=Training, Te=Test
"""
[docs] def select_slice(self, df, slice_id):
"""
Returns the index array to select all events from the dataset of a
given slice.
NB: This method is for internal usage only. There might be more than k
slices.
"""
if self.mod_mode:
return (self.variable(df) % self.k == slice_id)
else:
variable = self.variable(df) % 1
return (slice_id / self.k <= variable) \
& (variable < (slice_id + 1.0) / self.k)
[docs] def select_training(self, df, fold_i):
"""
Returns the index array to select all training events from the dataset for the
given fold.
"""
selected = np.zeros(len(df), dtype='bool')
for slice_i in range(self.k):
if (slice_i + fold_i) % self.k == self.k - 1:
continue
if (slice_i + fold_i) % self.k == self.k - 2:
continue
selected = selected | self.select_slice(df, slice_i)
return selected
[docs] def select_validation(self, df, fold_i):
"""
Returns the index array to select all validation events from the dataset for the
given fold.
"""
return self.select_slice(df, (self.k - fold_i - 1) % self.k)
[docs] def select_test(self, df, fold_i):
"""
Returns the index array to select all test events from the dataset for the
given fold.
"""
return self.select_slice(df, (self.k - fold_i - 2) % self.k)
[docs]class Normalizer(ABC):
"""
Abstract normalizer which shift and scales the distribution such that it hash
zero mean and unit width.
"""
[docs] @abstractmethod
def __init__(self, df, input_list=None):
"""
Returns a normalizer object with the normalization moments stored
internally. The input_list argument specifies which inputs should be
normalized. All other columns are left untouched.
"""
[docs] @abstractmethod
def __call__(self, df):
"""
Applies the normalized of the input_columns to the given dataframe and
returns a normalized copy.
"""
[docs] @abstractmethod
def __eq__(self, other):
"""
Check if two normalizers are the same.
"""
@property
@abstractmethod
def scales(self):
"""
Every normalizor must reduce to a simple (offset + scale * x)
normalization to be used with lwtnn. This property returns the scale
parameters for all variables.
"""
@property
@abstractmethod
def offsets(self):
"""
Every normalizor must reduce to a simple (offset + scale * x)
normalization to be used with lwtnn. This property returns the offset
parameters for all variables.
"""
[docs] def save_to_h5(self, path, key, overwrite=False):
"""
Save normalizer definition to a hdf5 file.
'path' is the file path and 'key' is the path inside the hdf5 file.
If overwrite is true then already existing file contents are overwritten.
"""
if overwrite:
open_mode = "w"
else:
open_mode = "a"
with h5py.File(path, open_mode) as output_file:
group = output_file.create_group(os.path.join(key))
group.attrs["class_name"] = np.string_(self.__class__.__name__)
self._save_to_h5(path, key)
@abstractmethod
def _save_to_h5(self, path, key):
"""
Save child class specific definitions to a hdf5 file.
'path' is the file path and 'key' is the path inside the hdf5 file.
If overwrite is true then already existing file contents are overwritten.
"""
[docs] @classmethod
def load_from_h5(cls, path, key):
"""
Create a new normalizer instance from an hdf5 file.
'path' is the file path and 'key' is the path inside the hdf5 file.
"""
with h5py.File(path, "r") as input_file:
if key not in input_file:
return None
class_name = input_file[key].attrs["class_name"].decode()
class_object = getattr(sys.modules[__name__], class_name)
return class_object._load_from_h5(path, key)
@classmethod
@abstractmethod
def _load_from_h5(cls, path, key):
"""
Load child class specific definitions from a hdf5 file.
"""
[docs]class EstimatorNormalizer(Normalizer):
"""
Normalizer which uses estimators to compute the normalization moments.
This method might be lead to sub-optimal results if there are outliers.
"""
[docs] def __init__(self, df, input_list=None, center=None, width=None):
"""
See base class.
"""
if center is not None and width is not None:
self.center = center
self.width = width
else:
if input_list is not None:
if isinstance(input_list[0],list):
input_list = sum(input_list, [])
input_list = sorted(set(input_list), key=input_list.index)
df = df[input_list]
else:
df = df[input_list]
self.center = df.mean()
self.width = df.std()
self.width[self.width == 0] = 1
[docs] def __call__(self, df):
"""
See base class.
"""
input_list = list(self.center.index)
normed = (df[input_list] - self.center) / self.width
aux_list = [c for c in df.columns if c not in input_list]
normed[aux_list] = df[aux_list]
return normed
[docs] def __eq__(self, other):
"""
See base class.
"""
if not isinstance(other, self.__class__):
return False
if not self.center.equals(other.center):
return False
if not self.width.equals(other.width):
return False
return True
def _save_to_h5(self, path, key):
"""
See base class.
"""
self.center.to_hdf(path, key=os.path.join(key, "center"))
self.width.to_hdf(path, key=os.path.join(key, "width"))
@classmethod
def _load_from_h5(cls, path, key):
"""
See base class.
"""
center = pd.read_hdf(path, os.path.join(key, "center"))
width = pd.read_hdf(path, os.path.join(key, "width"))
return cls(None, center=center, width=width)
@property
def scales(self):
return 1 / self.width
@property
def offsets(self):
return -self.center / self. width
class IdentityNormalizer(Normalizer):
'''
Has no effect on the df. Basically, UnitNormalizer(df) returns df
'''
def __init__(self, df, input_list=None, center=None, width=None):
if center is not None and width is not None:
self.center = center
self.width = width
else:
if input_list is not None:
if isinstance(input_list[0],list):
input_list = sum(input_list, [])
input_list = sorted(set(input_list), key=input_list.index)
df = df[input_list]
else:
df = df[input_list]
keys = df.keys()
self.center = pd.Series({name: 0.0 for name in keys})
self.width = pd.Series({name: 1.0 for name in keys})
def __call__(self, df):
return df
def __eq__(self, other):
if not isinstance(other, self.__class__):
return False
if not self.center.equals(other.center):
return False
if not self.width.equals(other.width):
return False
return True
def _save_to_h5(self, path, key):
self.center.to_hdf(path, key=os.path.join(key, "center"))
self.width.to_hdf(path, key=os.path.join(key, "width"))
@classmethod
def _load_from_h5(cls, path, key):
center = pd.read_hdf(path, os.path.join(key, "center"))
width = pd.read_hdf(path, os.path.join(key, "width"))
return cls(None, center=center, width=width)
@property
def scales(self):
return 1.0
@property
def offsets(self):
return -0.0
def normalize_category_weights(df, categories, weight='weight'):
"""
The categorical weight normalizer acts on the weight variable only. The
returned dataframe will satisfy the following conditions:
- The sum of weights of all events is equal to the total number of
entries.
- The sum of weights of a category is equal to the total number of entries
divided by the number of classes. Therefore the sum of weights of two
categories are equal.
- The relative weights within a category are unchanged.
"""
df_out = df[:]
w_norm = np.empty(len(df))
for category in categories:
idx = category.idx_array(df)
w_norm[idx] = df[idx][weight].sum()
df_out[weight] = df_out[weight] / w_norm * len(df) / len(categories)
return df_out
[docs]class HepNet:
"""
Meta model of a concrete neural network around the underlying Keras model.
The HEP net handles cross validation, normalization of the input
variables, the input weights, and the actual Keras model. A HEP net has no
free parameters.
"""
[docs] def __init__(self, keras_model, cross_validator, normalizer, input_list,
output_list):
"""
Creates a new HEP model. The keras model parameter must be a class that
returns a new instance of the compiled model (The HEP net needs to
able to create multiple models, one for each cross validation fold.)
The cross_validator must be a CrossValidator object.
The normalizer must be a Normalizer class that returns a normalizer. Each
cross_validation fold uses a separate normalizer with independent
normalization weights.
The input and output lists are lists of variables of column names used
as input and target of the keras model. The input is normalized.
"""
self.model_cls = keras_model
self.cv = cross_validator
self.norm_cls = IdentityNormalizer if normalizer==None else normalizer
self.input_list = input_list
self.output_list = output_list
self.norms = []
self.models = []
self.history = pd.DataFrame()
[docs] def __eq__(self, other):
"""
Check if two models have the same configuration.
"""
if not isinstance(other, self.__class__):
return False
if python_to_str(self.model_cls) != python_to_str(other.model_cls):
return False
if self.cv != other.cv:
return False
if python_to_str(self.norm_cls) != python_to_str(other.norm_cls):
return False
if self.input_list != other.input_list:
return False
if self.output_list != other.output_list:
return False
if self.norms != other.norms:
return False
if (self.history != other.history).all().all():
return False
return True
def _get_model_path(self, path, fold_i):
"""
Returns the path of the fold_i model
"""
path_token = list( os.path.splitext(path) )
path_token.insert(1, f".fold_{fold_i}")
return "".join(path_token)
[docs] def fit(self, df, weight=None, **kwds):
"""
Calls fit() on all folds. All kwds are passed to fit().
"""
if weight is None:
weight = Variable("unity", lambda d: np.ones(len(d)))
elif isinstance(weight, str):
weight = Variable(weight, weight)
### Loop over folds:
self.norms = []
self.models = []
self.history = pd.DataFrame()
for fold_i in range(self.cv.k):
# select training set
selected = self.cv.select_training(df, fold_i)
training_df = df[selected]
# select validation set
selected = self.cv.select_validation(df, fold_i)
validation_df = df[selected]
# seed normalizers
norm = self.norm_cls(training_df, self.input_list)
self.norms.append(norm)
training_df = norm(training_df)
validation_df = norm(validation_df)
# fit folds
model = self.model_cls()
self.models.append(model)
# input_list training and validation dataframes
if isinstance(self.input_list[0], list):
input_training_df = [ training_df[l] for l in self.input_list ]
input_validation_df = [ validation_df[l] for l in self.input_list ]
else:
input_training_df = training_df[self.input_list]
input_validation_df = validation_df[self.input_list]
# output_list training and validation dataframes
if isinstance(self.output_list[0], list):
output_training_df = [ training_df[l] for l in self.output_list ]
output_validation_df = [ validation_df[l] for l in self.output_list ]
else:
output_training_df = training_df[self.output_list]
output_validation_df = validation_df[self.output_list]
history = model.fit(input_training_df,
output_training_df,
validation_data=(
input_validation_df,
output_validation_df,
np.array(weight(validation_df)),
),
sample_weight=np.array(weight(training_df)),
**kwds)
history = history.history
history['fold'] = np.ones(len(history['loss']), dtype='int') * fold_i
history['epoch'] = np.arange(len(history['loss']))
self.history = pd.concat([self.history, pd.DataFrame(history)])
[docs] def predict(self, df, cv='val', retrieve_fold_info = False, **kwds):
"""
Calls predict() on the Keras model. The argument cv specifies the
cross validation set to select: 'train', 'val', 'test'.
Default is 'val'.
All other keywords are passed to predict.
"""
if cv not in ['train', 'val', 'test']:
raise ValueError("Argument 'cv' must be one of 'train', 'val', "
"'test', 'all'; but was %s." % repr(cv))
try:
out = np.zeros((len(df), len( list( np.concatenate(self.output_list) ) ) ))
except ValueError:
out = np.zeros((len(df), len(self.output_list)))
if isinstance(self.input_list[0],list):
flat_input_list = sum(self.input_list, [])
flat_input_list = sorted(set(flat_input_list), key=flat_input_list.index)
else:
flat_input_list = self.input_list
test_set = np.zeros(len(df), dtype='bool')
for fold_i in range(self.cv.k):
model = self.models[fold_i]
norm = self.norms[fold_i]
# identify fold
selected = self.cv.select_cv_set(df, cv, fold_i)
test_set |= selected
df_inputs = norm(df[selected][flat_input_list])
if isinstance(self.input_list[0],list):
df_inputs = [ df_inputs[i] for i in self.input_list]
if isinstance(self.output_list[0], list):
out[selected] = np.concatenate( model.predict(df_inputs,
**kwds), axis=1 )
else:
out[selected] = model.predict(df_inputs,
**kwds)
test_df = df[test_set]
out = out[test_set].transpose()
try:
out = dict(zip(["pred_" + s for s in list( np.concatenate(self.output_list) )], out))
except ValueError:
out = dict(zip(["pred_" + s for s in self.output_list], out))
test_df = test_df.assign(**out)
if retrieve_fold_info:
fold = {cv + "_fold" : self.cv.retrieve_fold_info(df, cv)}
test_df = test_df.assign(**fold)
return test_df
[docs] def save(self, path):
"""
Save the model and all associated components to a hdf5 file.
"""
# save model architecture and weights (only if already trained)
if len(self.models) == self.cv.k:
for fold_i in range(self.cv.k):
path_token = self._get_model_path(path, fold_i)
# this is the built-in save function from keras
self.models[fold_i].save(path_token)
with h5py.File(path, "w") as output_file:
# save default model class
# since this is a arbitrary piece of python code we need to use the python_to_str function
group = output_file.create_group("models/default")
group.attrs["model_cls"] = np.string_(python_to_str(self.model_cls))
# save class name of default normalizer as string
group = output_file.create_group("normalizers/default")
group.attrs["norm_cls"] = np.string_(self.norm_cls.__name__)
# save cross_validator
self.cv.save_to_h5(path, "cross_validator")
# save normalizer (only if already trained)
if len(self.norms) == self.cv.k:
for fold_i in range(self.cv.k):
self.norms[fold_i].save_to_h5(path, "normalizers/fold_{}".format(fold_i))
# save input/output lists
if isinstance(self.input_list[0], list):
for i in range(len(self.input_list)):
pd.DataFrame(self.input_list[i]).to_hdf(path, "input_list"+str(i))
else:
pd.DataFrame(self.input_list).to_hdf(path, "input_list0")
if isinstance(self.output_list[0], list):
for i in range(len(self.output_list)):
pd.DataFrame(self.output_list[i]).to_hdf(path, "output_list"+str(i))
else:
pd.DataFrame(self.output_list).to_hdf(path, "output_list0")
# save training history
self.history.to_hdf(path, "history")
[docs] @classmethod
def load(cls, path, **kwds):
"""
Restore a model from a hdf5 file.
"""
# load default model and normalizer
with h5py.File(path, "r") as input_file:
model = str_to_python(input_file["models/default"].attrs["model_cls"].decode())
normalizer_class_name = input_file["normalizers/default"].attrs["norm_cls"].decode()
normalizer = getattr(sys.modules[__name__], normalizer_class_name)
# load cross validator
cv = CrossValidator.load_from_h5(path, "cross_validator")
# load input/output lists
input_list = []
output_list = []
for i in range(100):
try:
output_list.append( list(pd.read_hdf(path, "output_list"+str(i))[0]) )
except KeyError:
break
if len(output_list) == 1:
output_list = output_list[0]
for i in range(100):
try:
input_list.append( list(pd.read_hdf(path, "input_list"+str(i))[0]) )
except KeyError:
break
if len(input_list) == 1:
input_list = input_list[0]
# create instance
instance = cls(model, cv, normalizer, input_list, output_list)
# load history
history = pd.read_hdf(path, "history")
instance.history = history
# load trained models (if existing)
with h5py.File(path, "r") as input_file:
for fold_i in range(cv.k):
path_token = instance._get_model_path(path, fold_i)
model = tf.keras.models.load_model(path_token, **kwds)
instance.models.append(model)
# load normalizer
for fold_i in range(cv.k):
norm = Normalizer.load_from_h5(path, "normalizers/fold_{}".format(fold_i))
if norm is not None:
instance.norms.append(norm)
return instance
[docs] def export(self, path_base, command="converters/keras2json.py",
expression={}):
"""
Exports the network such that it can be converted to lwtnn's json
format. The method generate a set of files for each cross validation
fold. For every fold, the archtecture, the weights, the input
variables and their normalization is exported. To simplify the
conversion to lwtnn's json format, the method also creates a bash
script which converts all folds.
The path_base argument should be a path or a name of the network. The
names of the generated files are created by appending to path_base.
The optional expression can be used to inject the CAF expression when
the NN is used. The final json file will contain an entry KEY=VALUE if
a variable matches the dict key.
"""
for fold_i in range(self.cv.k):
# get the architecture as a json string
arch = self.models[fold_i].to_json()
# save the architecture string to a file somehow, the below will work
with open('%s_arch_%d.json' % (path_base, fold_i), 'w') as arch_file:
arch_file.write(arch)
# now save the weights as an HDF5 file
self.models[fold_i].save_weights('%s_wght_%d.h5' % (path_base, fold_i))
with open("%s_vars_%d.json" % (path_base, fold_i), "w") \
as variable_file:
if isinstance(self.models[fold_i], tf.keras.Sequential):
scales = self.norms[fold_i].scales
offsets = self.norms[fold_i].offsets
offsets = [o / s for o, s in zip(offsets, scales)]
variables = [("%s=%s" % (v, expression[v]))
if v in expression else v
for v in self.input_list]
inputs = [dict(name=v, offset=o, scale=s)
for v, o, s in zip(variables, offsets, scales)]
json.dump(dict(inputs=inputs, class_labels=self.output_list),
variable_file)
else:
output_layers = [ layer[0] for layer in json.loads(arch)['config']['output_layers'] ]
scales_flat = self.norms[fold_i].scales.tolist()
offsets_flat = self.norms[fold_i].offsets
offsets_flat = [o / s for o, s in zip(offsets_flat, scales_flat)]
if isinstance(self.input_list[0],list):
lengths_acc = 0
lengths = [0]
for entry in self.input_list:
lengths_acc += len(entry)
lengths += [ lengths_acc ]
scales = [ scales_flat[i:j] for i,j in zip(lengths[:-1], lengths[1:]) ]
offsets = [ offsets_flat[i:j] for i,j in zip(lengths[:-1], lengths[1:]) ]
variables = [ [("%s=%s" % (v, expression[v]))
if v in expression else v
for v in inner_list]
for inner_list in self.input_list ]
inputs = []
for i in range(len(variables)):
_variables = [dict(name=v, offset=o, scale=s)
for v, o, s in zip(variables[i], offsets[i], scales[i])]
inputs += [ dict(name='node_'+str(i), variables=_variables) ]
else:
scales = scales_flat
offsets = [offsets_flat]
variables = [ [("%s=%s" % (v, expression[v]))
if v in expression else v
for v in self.input_list]
]
inputs = [dict(name=v, offset=o, scale=s)
for v, o, s in zip(variables, offsets, scales)]
outputs = []
if isinstance(self.output_list[0],list):
for i in range(len(self.output_list)):
outputs += [ dict(name=output_layers[i],
labels=self.output_list[i]) ]
else:
outputs = [ dict(name=output_layers[0],
labels=self.output_list[i]) ]
json.dump( dict(input_sequences=[],
inputs=inputs,
outputs=outputs),
variable_file )
mode = "w" if fold_i == 0 else "a"
if isinstance(self.models[fold_i], tf.keras.Sequential):
with open("%s.sh" % path_base, mode) as script_file:
print(f"{command} {path_base}_arch_{fold_i}.json "
f"{path_base}_vars_{fold_i}.json "
f"{path_base}_wght_{fold_i}.h5 "
f"> {path_base}_{fold_i}.json", file=script_file)
else:
with open("%s.sh" % path_base, mode) as script_file:
print(f"{command} {path_base}_arch_{fold_i}.json "
f"{path_base}_wght_{fold_i}.h5 "
f"{path_base}_vars_{fold_i}.json "
f"> {path_base}_{fold_i}.json", file=script_file)
class HepNetSearch:
"""
A hyperparameter tuner/search for HepNet, based on keras-tuner.
The class support multi-processing and distributed hyperparameter tuning on demand.
"""
def __init__(self, keras_model, tuner_name, cross_validator, normalizer, input_list,
output_list, ETH_IP=None):
"""
HepNetSearch arguments tightly follow those of HepNet, except for the following:
tuner_name: Name of the keras-tuner class {RandomSearch, BayesianOptimization, Hyperband}
ETH_IP: In case distributed training is planned in the form of a chief-worker model,
the Ethernet IP has to be passed.
"""
self.model = keras_model
self.cv = cross_validator
self.norm_cls = normalizer
self.input_list = input_list
self.output_list = output_list
self.norms = []
self.tuner_settings = None
self.ETH_IP = ETH_IP
if tuner_name not in ['Hyperband', 'RandomSearch', 'BayesianOptimization', 'GridSearch']:
warnings.warn("%s is not a tested tuner, the program might break.\nTested tuners: " % (tuner_name,repr(tuner)) )
else:
self.tuner_name = tuner_name
def _get_eth_ip(self):
'''
Fetch ethernet IP
'''
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.settimeout(0)
try:
# doesn't have to be reachable
s.connect(('10.254.254.254', 1))
IP = s.getsockname()[0]
except Exception:
IP = '127.0.0.1'
finally:
s.close()
return IP
def _get_model(self, trial_index=None, fold=0):
"""
Get the model of interest by trial_index. By default,
the function returns the best trial summary with no tolerance.
"""
if trial_index==None:
trial_index = self.book(sort=True, filter_nan=True).index[0]
hps_dict = self.search_book.iloc[trial_index][self.hps_str].to_dict()
oracle_trials = len(self.oracles[fold].get_state()['tried_so_far'])
hps_trials = self.tuners[fold].get_best_hyperparameters(num_trials=oracle_trials)
trial_position = None
for i in range(oracle_trials):
#remove non-hps items before comparison
hps_trial = hps_trials[i].values
hps_trial = { key:hps_trial[key] for key, value in hps_dict.items() }
if hps_dict == hps_trial:
trial_position = i
break
if trial_position == None:
raise ValueError("No trained model in the (%d) fold matches the hyperparameter set"
"indexed at (%d):\n%s"
% (fold, trial_index, hps_dict))
hps = hps_trials[trial_index]
model = self.tuners[fold].hypermodel.build(hps)
return model
def set_tuner(self, **kwds):
"""
Keras tuner kwds
"""
self.tuner_settings = kwds
def search(self, df, weight=None, Nfmp= False, distribute= False, tuner_id= None, **kwds):
"""
Perform the hyperparameter search.
weight: Training weights.
Nfmp: Execute Nfold fits in parallel as multiple processes (multi-processing).
distribute: Allow distributed training.
tuner_id: Chief-worker model ID. This is either 'chief' or 'tunerX', where X is a unique tuner number.
**kwds: Passed directly to fit()
"""
#Meant for internal use only
self.tuner_id = tuner_id
if self.tuner_settings == None:
raise ValueError("The tuner is not yet set. You need to setup tuner_settings as a dictionary kwds for the Keras tuner.")
if weight is None:
weight = Variable("unity", lambda d: np.ones(len(d)))
elif isinstance(weight, str):
weight = Variable(weight, weight)
#Perform the tuning/search on each fold and the oracles and tuners
self.norms = []
oracles = []
tuners = []
multi_tuner = tuner_mp(self)
if Nfmp:
#Run multiprocessing jobs
q= mp.Queue()
procs=[]
for fold_i in range(self.cv.k):
p= mp.Process(target=multi_tuner.search_body,args=(fold_i,df,weight,None,None,distribute,kwds,tuner_id))
procs.append(p)
p.start()
for p in procs:
p.join()
else:
for fold_i in range(self.cv.k):
multi_tuner.search_body(fold_i,df,weight,tuners,oracles,distribute,kwds,tuner_id)
#On search completion, register the combined Nfold searches
if tuner_id == "chief" or tuner_id == None:
for fold_i in range(self.cv.k):
multi_tuner.search_body(fold_i,df,weight,tuners,oracles,distribute,kwds,tuner_id,noTraining=True)
#Fetch hyperparameter names
hps_str=[]
for i in range(len(oracles[0].get_best_trials()[0].get_state()['hyperparameters']['space'])):
hps_str.append(oracles[0].get_best_trials()[0].get_state()['hyperparameters']['space'][i]['config']['name'])
self.hps_str = hps_str
#Evaluate validation mean and std score across folds
search_book = []
fold_0_Ntrials=len(list(oracles[0].trials.values()))
for j in range(fold_0_Ntrials):
fold_0_trial_j = oracles[0].get_best_trials(num_trials=fold_0_Ntrials)[j] #fold_0 oracle, trial_j
fold_0_hps_j = fold_0_trial_j.get_state()['hyperparameters']['values'] #dictionary of fold_0 oracle trial_j ...
#... ALL hyperparameter values
fold_0_hps_j = {p:fold_0_hps_j[p] for p in hps_str} #only model hyperparameters
scores={'fold_0_score': fold_0_trial_j.get_state()['score']} #Register fold_0 trial_j score
#search the fold_i oracle (oracle_i) for the matching trial_m using the hyperparameter values then append the score
for i in range(1,self.cv.k):
fold_i_Ntrials = len(list(oracles[i].trials.values()))
fold_i_trials = oracles[i].get_best_trials(num_trials=fold_i_Ntrials)
for m in range(len(fold_i_trials)):
fold_i_trial_m = fold_i_trials[m]
fold_i_hps_m = fold_i_trial_m.get_state()['hyperparameters']['values']
if fold_0_hps_j.items() <= fold_i_hps_m.items():
scores['fold_'+str(i)+'_score'] = fold_i_trial_m.get_state()['score']
break
search_book.append({**fold_0_hps_j,**scores})
#Convert search_book to a dataframe then evaluate mean and std
search_book = pd.DataFrame(search_book)
scores_book = search_book.drop(hps_str,axis=1)
hps_book = search_book.drop(scores_book.columns,axis=1)
if len(scores_book>1):
search_book = search_book.assign(mean=np.mean(scores_book,axis=1))
search_book = search_book.assign(std=np.std(scores_book,axis=1))
else:
search_book = search_book.assign(mean=np.mean(scores_book,axis=1))
search_book = search_book.assign(std=[0]*len(scores_book))
self.tuners= tuners
self.oracles = oracles
self.search_book = search_book
self.hps_book = hps_book
self.scores_book = scores_book
def book(self, tolerance=np.inf, sort=False, filter_nan=False):
"""
Return the search_book of the HPO.
tolerence: A positive number defining the tolernce level of the tracked metric validation std.
Models with Nfold_std(val_metric) > tolerence will be discarded from the search_book.
sort: Sort by best score mean
filter_nan: Remove trials with std of nan
"""
if tolerance <= 0:
raise ValueError("Tolerance value must either be a positive number.")
#The following if statement is to support custom objective methods
if isinstance(self.tuner_settings['objective'],str):
objective_name = self.tuner_settings['objective']
elif isinstance(self.tuner_settings['objective'], keras_tuner.Objective):
objective_name = self.tuner_settings['objective'].name
else:
raise TypeError("The objective must be a string or tf.keras.Objective")
score_direction=self.oracles[0].get_best_trials()[0].get_state()\
['metrics']['metrics'][objective_name]['direction']
if score_direction=='min':
ascending=True
elif score_direction=='max':
ascending=False
else:
ascending=False
print('Warning: Objective direction is neither max nor min! Defaulted to descending order for optimal trial mean.')
###TODO: The following is a work around dropped trials. It is indeed a performance issue.
###1-It affects run time 2-It affects the optimal hyperparameter set
###Currently, the best course of action is to seed the models identically
#Remove tuner dropped trials
if filter_nan:
dropped_trial_indices = self.search_book[self.search_book.isnull().any(axis=1)].index.tolist()
search_book = self.search_book.drop(dropped_trial_indices)
else:
search_book = self.search_book
#Get score direction method then sort accordingly
if sort:
score_direction = getattr(np,score_direction)
search_book = search_book.sort_values(by=['mean'],axis=0,ascending=ascending)
#Filter by tolerance level
search_book = search_book[search_book['std']<=tolerance]
return search_book
def trial_summary(self, trial_index=None, detailed=False, **kwds):
"""
Summary of the model at a specific trial_index. By default,
the function returns the best trial summary with no tolerance.
"""
if trial_index==None:
trial_index = self.book(sort=True, filter_nan=True).index[0]
trial=dict(self.search_book.loc[trial_index])
print("Index: %s" %trial_index)
print("Mean score: %s \nstd: %s" %(trial['mean'],trial['std']))
print("Hyperparameters:")
for key in self.hps_str:
print("\t%s: %s" %(key,trial[key]))
print('\nTrial model summary:')
model=self._get_model(trial_index=trial_index)
model.summary(**kwds)
if detailed:
print('\n')
print( json.dumps(model.get_config(), indent=1) )
class tuner_mp(object):
def __init__(self,class_object):
self.__dict__= class_object.__dict__.copy()
def search_body(self,fold_i,df,weight,tuners,oracles,distribute,kwds,tuner_id,noTraining=False):
#Set chief/worker environment communication ports
if distribute and not noTraining:
if tuner_id == None: raise ValueError("tuner_id was not passed.")
os.environ["KERASTUNER_TUNER_ID"]= tuner_id
os.environ["KERASTUNER_ORACLE_IP"]= self._get_eth_ip() if self.ETH_IP==None else self.ETH_IP
os.environ["KERASTUNER_ORACLE_PORT"] = str(47808+fold_i)
importlib.reload(keras_tuner)
print("Fold:%s\t ID:%s IP:%s PORT:%s"%(fold_i,os.getenv("KERASTUNER_TUNER_ID"),os.getenv("KERASTUNER_ORACLE_IP"),os.getenv("KERASTUNER_ORACLE_PORT")))
if distribute and noTraining:
try:
del os.environ["KERASTUNER_TUNER_ID"]
del os.environ["KERASTUNER_ORACLE_IP"]
del os.environ["KERASTUNER_ORACLE_PORT"]
except:
pass
print("Fold:%s\t ID:%s IP:%s PORT:%s"%(fold_i,os.getenv("KERASTUNER_TUNER_ID"),os.getenv("KERASTUNER_ORACLE_IP"),os.getenv("KERASTUNER_ORACLE_PORT")))
#Constrain memory growth on physical GPUs
physical_devices = tf.config.list_physical_devices('GPU')
try:
tf.config.experimental.set_memory_growth(physical_devices[0], True)
except:
# In case of CPU or virtual devices
pass
# select training set
selected = self.cv.select_training(df, fold_i)
training_df = df[selected]
# select validation set
selected = self.cv.select_validation(df, fold_i)
validation_df = df[selected]
# seed normalizers
norm = self.norm_cls(training_df, self.input_list)
self.norms.append(norm)
training_df = norm(training_df)
validation_df = norm(validation_df)
# search in fold
tuner = getattr(keras_tuner,self.tuner_name)
tuner_settings_i = self.tuner_settings.copy()
if 'logger' in tuner_settings_i:
tuner_settings_i['logger'].fold = fold_i
tuner_settings_i.update({'project_name':self.tuner_settings['project_name']+'_'+str(fold_i)})
tuner = tuner(self.model,**tuner_settings_i)
tuner.search(training_df[self.input_list],
training_df[self.output_list],
validation_data=(
validation_df[self.input_list],
validation_df[self.output_list],
np.array(weight(validation_df)),
),
sample_weight=np.array(weight(training_df)),
**kwds)
#append for completly saved search
if tuners!=None and oracles!=None:
tuners.append(tuner)
oracles.append(tuner.oracle)
del tuner