# -*- coding: utf-8 -*-
# from ..colfi_mc import models_annmc #version_key
from . import data_simulator as ds
from . import space_updater as su
from .models_ann import Loader
from . import models_ann, models_mdn, models_g, utils, cosmic_params, plotter
import numpy as np
from decimal import Decimal
import coplot.plots as pl
import time
import warnings
#%% NDE
[docs]class NDEs(plotter.PlotPosterior):
"""Estimating (cosmological) parameters with Neural Density Estimators (NDEs).
Parameters
----------
obs_data : array-like or list
The observations (measurements) with shape (obs_length,3), or a list of
observations with shape [(obs_length_1,3), (obs_length_2,3), ...].
The first column is the observational variable, the second column is the
best values of the measurement, and the third column is the error of the measurement.
model : cosmological (or theoretical) model instance
A cosmological (or theoretical) model instance that is used to simulate
training set, it should contains a 'simulate' method, and 'simulate'
should accept input of cosmological parameters, if you use local data sets,
it should also contain 'load_params' and 'load_sample' methods.
param_names : list
A list which contains the parameter names, e.g. ['H0','ombh2','omch2'].
params_dict : dict or None, optional
Information of cosmological parameters that include the labels, the minimum values,
and the maximum values. See :func:`~.cosmic_params.params_dict_zoo`. Default: None
cov_matrix : array-like, list, or None, optional
Covariance matrix of the observational data. It should be an array with
shape (obs_length, obs_length), or a list of covariance matrix with
shape [(obs_length_1, obs_length_1), (obs_length_2, obs_length_2), ...].
If there is no covariance for some observations, the covariance matrix
should be set to None. e.g. [cov_matrix_1, None, cov_matrix_3]. Default: None
init_chain : None or array-like, optional
The initial ANN or MCMC chain, which is usually based on prvious parameter
estimation. Default: None
init_params : None or array-like, optional
The initial settings of the parameter space. If ``init_chain`` is given,
``init_params`` will be ignored. Default: None
nde_type : str or list, optional
A string (or a list with two strings in it) that indicate which NDE should be used.
There are four NDEs that can be used: 'ANN', 'MDN', 'MNN', or 'ANNMC'.
If a string is given, such as 'ANN', only ANN will be used for parameter estimation.
If a list that contains two NDEs is given, such as ['ANN', 'MNN'], then
ANN will be used in the burn-in phase to find the burn-in end step,
MNN will be used after the burn-in phase to obtain the posterior. Default: 'MNN'
num_train : int, optional
The number of samples of the training set. Default: 3000
num_vali : int, optional
The number of samples of the validation set. Default: 100
base_N : int, optional
The basic (or minimum) number of samples in the training set, which works only
in the burn-in phase. Default: 1000
space_type : str, optional
The type of parameter space. It can be 'hypercube', 'LHS', 'hypersphere',
'hyperellipsoid', or 'posterior_hyperellipsoid'. Default: 'hyperellipsoid'
local_samples : None, str, or list, optional
Path of local samples, None, or 'sample' or ['sample'] or ['sample_1', 'sample_2', ...].
If None, no local samples are used. Default: None
chain_n : int, optional
If the number of ANN chains to be obtained, which also equals to the steps
after the burn-in phase, it will be used to stop the whole training process.
This only works after burn-in phase. Default: 3
chain_leng : int, optional
The length of each ANN chain. Default: 10000
Attributes
----------
activation_func : str, optional
The name of activation function, which can be 'ReLU', 'LeakyReLU', 'PReLU', 'RReLU',
'ReLU6', 'ELU', 'CELU', 'SELU', 'SiLU', 'Sigmoid', 'LogSigmoid', 'Tanh',
'Tanhshrink', 'Softsign', or 'Softplus' (see :func:`~.element.activation`). Default: 'Softplus'
hidden_layer : int, optional
The number of the hidden layer of the network (for a single branch network). Default: 3
branch_hiddenLayer : int, optional
The number of the hidden layer for the branch part of the network (for a multibranch network). Default: 1
trunk_hiddenLayer : int, optional
The number of the hidden layer for the trunk part of the network (for a multibranch network). Default: 2
lr : float, optional
The learning rate setting of the network. Default: 1e-2
lr_min : float, optional
The minimum of the learning rate. Default: 1e-8
batch_size : int, optional
The batch size setting of the network. Default: 1250
auto_batchSize : bool, optional
If True, the batch size will be set automatically in the training process,
otherwise, use the setting of ``batch_size``. Default: True
epoch : int, optional
The number of epoch of the training process. Default: 2000
epoch_branch : int, optional
The number of epoch of the training process (for the branch part of the multibranch network). Default: 2000
auto_epoch : bool, optional
If True, the epoch will be set automatically in the training process,
otherwise, use the setting of ``epoch``. Default: False
comp_type : str, optional
The name of component used in the ``MDN`` method, which can be 'Gaussian' or 'Beta'.
Default: 'Gaussian'
comp_n : int, optional
The number of components used in the ``MDN`` method. Default: 3
spaceSigma : int or array-like, optional
The size of parameter space to be learned. It is a int or a numpy array
with shape of (n,), where n is the number of parameters, e.g. for spaceSigma=5,
the parameter space to be learned is :math:`[-5\sigma, +5\sigma]`. Default: 5
noise_type : str, optional
The type of Gaussian noise added to the training set, which should be
'singleNormal' or 'multiNormal'. It only works for the NDEs ``ANN``, ``MDN``,
and ``MNN``. For ``ANN`` and ``MNN``, both 'singleNormal' and 'multiNormal'
can be used, but it is recommended to use 'multiNormal'; For ``MDN``, only
'singleNormal' can be used. Default: 'multiNormal'
factor_sigma : float, optional
For the case of ``noise_type`` = 'singleNormal', ``factor_sigma`` should be
set to 1. For the case of ``noise_type`` = 'multiNormal', it is the standard
deviation of the coefficient of the observational error (standard deviation). Default: 0.2
multi_noise : int, optional
The number of realization of noise added to the measurement in one epoch. Default: 5
scale_obs : bool, optional
If True, the observational data (measurements) will be scaled based on the
base values of the data. Default: False
scale_params : bool, optional
If True, the cosmological parameters will be scaled based on the base
values of parameters. See :class:`~.data_processor.ParamsScaling`. Default: True
norm_obs : bool, optional
If True, the observational data feed to the network will be normalized. Default: True
norm_params : bool, optional
If True, the cosmological parameters will be normalized. Default: True
independent_norm_obs : bool, optional
If True, each data point in the observational data (measurements) will be
normalized independently. Default: False
independent_norm_params : bool, optional
If True, each cosmological parameters will be normalized independently. Default: True
norm_type : str, optional
The method of normalization, which can be 'z_score', 'minmax', or 'mean'
(see :class:`~.data_processor.Normalize`). Default: 'z_score'
train_branch : bool, optional
If True, the branch part of the multibranch network will be trained before
training the entire network. Default: False
repeat_n : int, optional
The number of iterations using the same batch of data during network training,
which is usually set to 1 or 3. Default: 3
fast_training : bool, optional
If True, the batch size will be set to ``batch_size*multi_noise`` and
the network will be trained fast. Default: False
randn_num : float or str, optional
A random number that identifies the saved results. Default: float
file_identity : str, optional
A string that identifies the files saved to the disk, which is useful to
identify the saved files. Default: ''
expectedBurnInEnd_step : int, optional
The expected burn-in end step. If the burn-in phase does not end at a step
equal to ``expectedBurnInEnd_step``, the training process will be broken,
which means the setting of hyperparameters is not good or the NDE used is not suitable.
chain_true_path : str, optional
The path of the true chain of the posterior which can be obtained by using other methods,
such as the MCMC method. Note: only ``.npy`` and ``.txt`` file is supported. Default: ''
label_true : str, optional
The legend label of the true chain. Default: 'True'
fiducial_params : list, optional
A list that contains the fiducial cosmological parameters. Default: []
Note
----
The number of samples of the training set should be large enough to ensure
the network learns a reliable mapping. For example, set num_train to 1000,
or a larger value like 3000.
The epoch should also be set large enough to ensure a well-learned network.
e.g. set epoch to 2000, or a larger value like 3000.
The initial parameter space is suggested to set large enough to cover the true parameters.
In this case, it be easier for the network to find the best-fit value of parameters.
It is better to set the number of ANN chains ``chain_n`` a large value like 3,
and this will minimize the effect of randomness on the results. However,
it is also acceptable to set a smaller value like 1.
The advantage of this method is that we can analyze the results before the end of
the training process, and determine how many steps can be used to estimate parameters.
Local samples can be used as training set to save time, so when using this method,
you can generate a sample library for later reuse.
"""
def __init__(self, obs_data, model, param_names, params_dict=None, cov_matrix=None,
init_chain=None, init_params=None, nde_type='MNN', num_train=3000,
num_vali=100, base_N=1000, space_type='hyperellipsoid', local_samples=None,
chain_n=3, chain_leng=10000):
#observational data & cosmological model
self.obs_data = obs_data
self.model = model
self.param_names = param_names
self.params_dict = params_dict
self.cov_matrix = cov_matrix
self.init_chain = init_chain
self.init_params = self._init_params(init_params)
#NDE model
self.nde_type_pair = self._nde_type_pair(nde_type)
self.activation_func = 'Softplus'
self.hidden_layer = 3
self.branch_hiddenLayer = 1
self.branch_n = self._branch_n()
self.trunk_hiddenLayer = 2
self.lr = 1e-2
self.lr_min = 1e-8
self.batch_size = 1250 #change to 750? set batch_size=0.75*num_train and < 1500?
self.auto_batchSize = False #re-set & optimize this? because small batchSize will let the ann trained better
self.epoch = 2000
self.epoch_branch = 2000
self.auto_epoch = False
self.comp_type = 'Gaussian'
self.comp_n = 3
#training data
self.num_train = num_train
self.num_vali = num_vali
self.base_N = base_N
self.spaceSigma = 5
self.space_type = space_type
self.local_samples = local_samples
#data preprocessing
self.noise_type = 'multiNormal'
self.factor_sigma = 0.2
self.multi_noise = 5
#need to tongyi setting
if self.branch_n==1:
self.scale_obs = False #set True for multiBranch network? need to test, & test oneBranchNet when setting True
else:
self.scale_obs = True
self.scale_params = True
self.norm_obs = True
self.norm_params = True
self.independent_norm_obs = False
self.independent_norm_params = True
self.norm_type = 'z_score'
#training
self.train_branch = False
self.repeat_n = 3
self.fast_training = False
self.randn_num = round(abs(np.random.randn()/5.), 5)
self.file_identity = ''
#updating
self.chain_n = chain_n
self.chain_leng = chain_leng
self.expectedBurnInEnd_step = 10
#True posterior & parameters
self.chain_true_path = '' #only support .npy or .txt file
self.label_true = 'True'
self.fiducial_params = []
self.show_idx = None
def _init_params(self, prior):
if prior is None:
prior = np.array([[-100, 100] for i in range(len(self.param_names))])
params_limit = cosmic_params.ParamsProperty(self.param_names, params_dict=self.params_dict).params_limit
return su.CheckParameterSpace.check_limit(prior, params_limit)
def _nde_type_pair(self, nde_type):
## ----- version_key -----
if isinstance(nde_type, str):
if nde_type=='ANNMC':
raise ValueError("The ANNMC method is on the way, please use the ANN, MDN, or MNN method instead.")
elif isinstance(nde_type, list):
if nde_type[0]=='ANNMC' or nde_type[1]=='ANNMC':
raise ValueError("The ANNMC method is on the way, please use the ANN, MDN, or MNN method instead.")
## -----------------------
if isinstance(nde_type, str):
self.nde_type_str = nde_type
return [nde_type, nde_type]
elif isinstance(nde_type, list):
self.nde_type_str = nde_type[0] + '_' + nde_type[1]
return nde_type
def _branch_n(self):
if type(self.obs_data) is list:
return len(self.obs_data)
else:
return 1
def _hparams_shared(self):
return {
#NDE model
'activation_func' : self.activation_func,
'hidden_layer' : self.hidden_layer,
'branch_hiddenLayer' : self.branch_hiddenLayer,
'trunk_hiddenLayer' : self.trunk_hiddenLayer,
'lr' : self.lr,
'lr_branch' : self.lr,
'lr_min' : self.lr_min,
'batch_size' : self.batch_size,
'auto_batchSize' : self.auto_batchSize,
'epoch' : self.epoch,
'epoch_branch' : self.epoch_branch,
'auto_epoch' : self.auto_epoch,
#data preprocessing
'scale_obs' : self.scale_obs,
'scale_params' : self.scale_params,
'norm_obs' : self.norm_obs,
'norm_params' : self.norm_params,
'independent_norm_obs' : self.independent_norm_obs,
'independent_norm_params' : self.independent_norm_params,
'norm_type' : self.norm_type,
#training
'file_identity_str' : self.file_identity_str,
}
def _hparams_ANN(self):
hp_shared = self._hparams_shared().copy()
hp_ann = {
#data preprocessing
'noise_type' : self.noise_type,
'factor_sigma' : self.factor_sigma,
'multi_noise' : self.multi_noise,
}
hp_shared.update(hp_ann)
return hp_shared
def _hparams_MDN(self):
hp_ann = self._hparams_ANN().copy()
hp_mdn = {
#data preprocessing
'noise_type' : 'singleNormal',
'factor_sigma' : 1,
#MDN model
'comp_type' : self.comp_type,
'comp_n' : self.comp_n,
}
hp_ann.update(hp_mdn)
return hp_ann
def _hparams_MNN(self):
return self._hparams_ANN().copy()
def _hparams_ANNMC(self):
hp_shared = self._hparams_shared().copy()
hp_mc = {
#data preprocessing
'scale_obs' : True, #
'scale_params' : True,
'independent_norm_obs' : True, #
'independent_norm_params' : False, #
}
hp_shared.update(hp_mc)
return hp_shared
@property
def obs_variables(self):
if self.branch_n==1:
return self.obs_data[:,0]
else:
obs_varis = []
for i in range(self.branch_n):
obs_varis.append(self.obs_data[i][:,0])
return np.array(obs_varis, dtype=object)
@property
def obs_errors(self):
if self.branch_n==1:
return self.obs_data[:,2]
else:
obs_errs = []
for i in range(self.branch_n):
obs_errs.append(self.obs_data[i][:,2])
return obs_errs
@property
def cov_copy(self):
if self.cov_matrix is None:
return None
else:
return self.cov_matrix.copy()
@property
def num_train_burnIn(self):
bn = self.num_train//2
if bn <= self.base_N:
return self.base_N
else:
return bn
#remove?
@property
def base_epoch(self):
return self.epoch//2
@property
def file_identity_str(self):
if self.file_identity:
return '_%s'%self.file_identity
else:
return ''
[docs] def print_hparams(self):
#print all useful hyper parameters
#only print the first randn_num
pass
[docs] def update_params_space(self, step, chain_all, burnInEnd, burnInEnd_step):
# update parameter space
if step==2:
chain_0 = self.init_chain
elif step>=3:
chain_0 = chain_all[-2]
updater = su.UpdateParameterSpace(step,self.param_names,chain_all[-1],chain_0=chain_0,init_params=self.init_params,spaceSigma=self.spaceSigma,params_dict=self.params_dict)
if step>=3 and updater.small_dev(limit_dev=0.001):
#to be improved to get chain_ann after exit()???, or remove these two lines???
exit()
#this is based on experiments, update this??? eg. max(updater.param_devs)<0.5?
# if burnInEnd_step is None and max(updater.param_devs)<1 and max(updater.error_devs)<0.5:
# if burnInEnd_step is None and max(updater.param_devs)<0.5 and max(updater.error_devs)<0.5: #test!!!
# emoji & unicode: https://unicode.org/emoji/charts/full-emoji-list.html & http://www.jsons.cn/unicode
param_devs, error_devs, spaceSigma_all = updater.param_devs, updater.error_devs, updater.spaceSigma_all
if burnInEnd_step is None and max(param_devs)<=0.25 and max(error_devs)<=0.25: #test
burnInEnd = True
burnInEnd_step = step - 1 #the good chain will not contain the burn-in step chain; if set step-2, the good chain will contain the burn-in step chain!
print('\n\n'+'='*73)
print('*'*5+' '*11+'\u53c2\u6570\u5df2\u8fbe\u5230\u7a33\u5b9a\u503c\uff0c\u540e\u9762\u7684\u94fe\u53ef\u7528\u4e8e\u53c2\u6570\u4f30\u8ba1\u3002'+' '*10+'*'*5)
print('*'*5+' '*11+'The parameters have reached stable values'+' '*11+'*'*5)
print('*'*5+' '*1+'The chains of later steps can be used for parameter inference'+' '*1+'*'*5)
print('*'*5+' '*28+'\U0001F386\U0001F388\U0001F389'+' '*29+'*'*5)
print('='*73+'\n')
if burnInEnd:
print('\n'+'#'*25+' step {}/{} '.format(step, burnInEnd_step+self.chain_n)+'#'*25)
else:
print('\n'+'#'*25+' step {} '.format(step)+'#'*25)
self.spaceSigma_min = updater.spaceSigma_min
updater.print_learningRange()
return burnInEnd, burnInEnd_step, param_devs, error_devs, spaceSigma_all
[docs] def simulate(self, nde_type, space_type, step, burnInEnd, param_devs, error_devs, spaceSigma_all,
space_type_all=[], prev_space=None, chain_all=[], sim_obs=None, sim_params=None):
"""Simulate training data."""
if step==1:
# set training number & space_type
training_n = self.num_train_burnIn
if self.init_chain is None:
if space_type=='hypersphere' or space_type=='hyperellipsoid' or space_type=='posterior_hyperellipsoid':
s_type = 'hypercube' #use LHS?
# s_type = 'LHS'
else:
s_type = space_type
else:
s_type = space_type
space_type_all.append(s_type)
#simulate data
print('\n'+'#'*25+' step {} '.format(step)+'#'*25)
if self.branch_n==1:
simor = ds.SimObservations(training_n, self.model, self.param_names, chain=self.init_chain, params_space=self.init_params,
spaceSigma=self.spaceSigma, params_dict=self.params_dict, space_type=s_type,
cut_crossedLimit=True, local_samples=self.local_samples, prevStep_data=None)
else:
simor = ds.SimMultiObservations(self.branch_n, training_n, self.model, self.param_names, chain=self.init_chain, params_space=self.init_params,
spaceSigma=self.spaceSigma, params_dict=self.params_dict, space_type=s_type,
cut_crossedLimit=True, local_samples=self.local_samples, prevStep_data=None)
sim_obs, sim_params = simor.simulate()
prev_space = simor.params_space #used for next step
else:
# set training number & space_type
if burnInEnd:
training_n = self.num_train + self.num_vali
else:
if max(param_devs)<=0.5 and max(error_devs)<=0.25 and nde_type!='ANNMC':
training_n = self.num_train
else:
training_n = self.num_train_burnIn
s_type = space_type
space_type_all.append(s_type)
#simulate data
if space_type_all[-1]==space_type_all[-2]:
prevStep_data = [sim_obs, sim_params]
else:
prevStep_data = None
# check if it has problems when using prevStep_data???
rel_dev_limit = 0.1
if self.branch_n==1:
simor = ds.SimObservations(training_n, self.model, self.param_names, chain=chain_all[-1], params_space=None, spaceSigma=spaceSigma_all,
params_dict=self.params_dict, space_type=s_type, cut_crossedLimit=True, local_samples=None, prevStep_data=prevStep_data, rel_dev_limit=rel_dev_limit) #reset local_samples???
else:
simor = ds.SimMultiObservations(self.branch_n, training_n, self.model, self.param_names, chain=chain_all[-1], params_space=None, spaceSigma=spaceSigma_all,
params_dict=self.params_dict, space_type=s_type, cut_crossedLimit=True, local_samples=None, prevStep_data=prevStep_data, rel_dev_limit=rel_dev_limit) #reset local_samples???
simor.prev_space = prev_space
sim_obs, sim_params = simor.simulate()
prev_space = simor.params_space #used for next step
#test remove nan, to be added to the code???
# good_index = np.where(~np.isnan(sim_obs[:,0])) #test
# sim_obs = sim_obs[good_index] #test
# sim_params = sim_params[good_index] #test
return sim_obs, sim_params, space_type_all, prev_space
[docs] def split_data(self, sim_obs, sim_params, burnInEnd=False):
"""Split the simulated data into training set and validation set."""
if burnInEnd:
idx = np.random.choice(self.num_train+self.num_vali, self.num_train+self.num_vali, replace=False)
if self.branch_n==1:
train_set = [sim_obs[idx[:self.num_train]], sim_params[idx[:self.num_train]]]
vali_set = [sim_obs[idx[self.num_train:]], sim_params[idx[self.num_train:]]]
else:
sim_obs_train = [sim_obs[i][idx[:self.num_train]] for i in range(self.branch_n)]
sim_params_train = sim_params[idx[:self.num_train]]
sim_obs_vali = [sim_obs[i][idx[self.num_train:]] for i in range(self.branch_n)]
sim_params_vali = sim_params[idx[self.num_train:]]
train_set = [sim_obs_train, sim_params_train]
vali_set = [sim_obs_vali, sim_params_vali]
else:
train_set = [sim_obs, sim_params]
vali_set = [None, None]
return train_set, vali_set
def _train_ANN(self, train_set, vali_set, step=1, burnInEnd=False, burnInEnd_step=None,
randn_num=0.123, save_items=True, showEpoch_n=100, params_space=None):
"""Train an ANN using the given data."""
self.hparams_ANN = self._hparams_ANN()
if self.branch_n==1:
self.eco = models_ann.OneBranchMLP(train_set, self.param_names, vali_set=vali_set, obs_errors=self.obs_errors,
cov_matrix=self.cov_copy, params_dict=self.params_dict, loss_name='L1')
else:
self.eco = models_ann.MultiBranchMLP(train_set, self.param_names, vali_set=vali_set, obs_errors=self.obs_errors,
cov_matrix=self.cov_copy, params_dict=self.params_dict, loss_name='L1')
if step==1:
self.eco.print_info = True
if step>=2:
self.eco.spaceSigma_min = self.spaceSigma_min
self.eco.burnInEnd = burnInEnd
self.eco.burnInEnd_step = burnInEnd_step
self.eco.randn_num = randn_num
self.eco.params_space = params_space
for key, value in self.hparams_ANN.items():
exec('self.eco.%s = value'%(key))
if self.branch_n==1:
self.eco.train(repeat_n=self.repeat_n, showEpoch_n=showEpoch_n, fast_training=self.fast_training)
else:
self.eco.train(repeat_n=self.repeat_n, train_branch=self.train_branch, parallel=False, showEpoch_n=showEpoch_n, fast_training=self.fast_training) #reset parallel???
#predict chain
#Note: here we use self.cov_copy to avoid data type error in "eco"
chain_1 = self.eco.predict_chain(self.obs_data, cov_matrix=self.cov_copy, chain_leng=self.chain_leng)
#save results
if save_items:
self.eco.save_net(path=self.path)
self.eco.save_loss(path=self.path)
self.eco.save_chain(path=self.path)
self.eco.save_ndeType(path=self.path)
self.eco.save_hparams(path=self.path)
return chain_1, [self.eco.train_loss, self.eco.vali_loss]
def _train_MDN(self, train_set, vali_set, step=1, burnInEnd=False, burnInEnd_step=None,
randn_num=0.123, save_items=True, showEpoch_n=100, params_space=None):
"""Train an MDN using the given data."""
self.hparams_MDN = self._hparams_MDN()
if self.branch_n==1:
self.eco = models_mdn.OneBranchMDN(train_set, self.param_names, vali_set=vali_set, obs_errors=self.obs_errors,
cov_matrix=self.cov_copy, params_dict=self.params_dict)
else:
self.eco = models_mdn.MultiBranchMDN(train_set, self.param_names, vali_set=vali_set, obs_errors=self.obs_errors,
cov_matrix=self.cov_copy, params_dict=self.params_dict)
if step==1:
self.eco.print_info = True
if step>=2:
self.eco.spaceSigma_min = self.spaceSigma_min
self.eco.burnInEnd = burnInEnd
self.eco.burnInEnd_step = burnInEnd_step
self.eco.randn_num = randn_num
self.eco.params_space = params_space
for key, value in self.hparams_MDN.items():
exec('self.eco.%s = value'%(key))
if self.branch_n==1:
self.eco.train(repeat_n=self.repeat_n, showEpoch_n=showEpoch_n, fast_training=self.fast_training)
# self.eco.train_AvgMultiNoise(repeat_n=self.repeat_n, showEpoch_n=showEpoch_n) #need further test
else:
self.eco.train(repeat_n=self.repeat_n, train_branch=self.train_branch, parallel=False, showEpoch_n=showEpoch_n, fast_training=self.fast_training) #reset parallel???
#predict chain
#Note: here use self.cov_copy is to avoid data type error in "eco"
chain_1 = self.eco.predict_chain(self.obs_data, chain_leng=self.chain_leng)
#save results
if save_items and chain_1 is not None:
self.eco.save_net(path=self.path)
self.eco.save_loss(path=self.path)
self.eco.save_chain(path=self.path)
self.eco.save_ndeType(path=self.path)
self.eco.save_hparams(path=self.path)
return chain_1, [self.eco.train_loss, self.eco.vali_loss]
def _train_MNN(self, train_set, vali_set, step=1, burnInEnd=False, burnInEnd_step=None,
randn_num=0.123, save_items=True, showEpoch_n=100, params_space=None):
"""Train an MNN using the given data."""
self.hparams_MNN = self._hparams_MNN()
if self.branch_n==1:
self.eco = models_g.OneBranchMLP_G(train_set, self.param_names, vali_set=vali_set, obs_errors=self.obs_errors,
cov_matrix=self.cov_copy, params_dict=self.params_dict)
else:
self.eco = models_g.MultiBranchMLP_G(train_set, self.param_names, vali_set=vali_set, obs_errors=self.obs_errors,
cov_matrix=self.cov_copy, params_dict=self.params_dict)
if step==1:
self.eco.print_info = True
if step>=2:
self.eco.spaceSigma_min = self.spaceSigma_min
self.eco.burnInEnd = burnInEnd
self.eco.burnInEnd_step = burnInEnd_step
self.eco.randn_num = randn_num
self.eco.params_space = params_space
for key, value in self.hparams_MNN.items():
exec('self.eco.%s = value'%(key))
if self.branch_n==1:
self.eco.train(repeat_n=self.repeat_n, showEpoch_n=showEpoch_n, fast_training=self.fast_training)
# self.eco.train_AvgMultiNoise(repeat_n=self.repeat_n, showEpoch_n=showEpoch_n) #need further test
else:
self.eco.train(repeat_n=self.repeat_n, train_branch=self.train_branch, parallel=False, showEpoch_n=showEpoch_n, fast_training=self.fast_training) #reset parallel???
#predict chain
#Note: here use self.cov_copy is to avoid data type error in "eco"
chain_1 = self.eco.predict_chain(self.obs_data, cov_matrix=self.cov_copy, chain_leng=self.chain_leng)
#save results
if save_items:
self.eco.save_net(path=self.path)
self.eco.save_loss(path=self.path)
self.eco.save_chain(path=self.path)
self.eco.save_ndeType(path=self.path)
self.eco.save_hparams(path=self.path)
return chain_1, [self.eco.train_loss, self.eco.vali_loss]
def _train_ANNMC(self, train_set, vali_set, step=1, burnInEnd=False, burnInEnd_step=None,
randn_num=0.123, save_items=True, showEpoch_n=100, params_space=None):
"""Train an ANNMC using the given data."""
self.hparams_ANNMC = self._hparams_ANNMC()
if self.branch_n==1:
self.eco = models_annmc.OneBranchMLP_MC(train_set, self.param_names, vali_set=vali_set, params_dict=self.params_dict, loss_name='L1')
else:
self.eco = models_annmc.MultiBranchMLP_MC(train_set, self.param_names, vali_set=vali_set, params_dict=self.params_dict, loss_name='L1')
if step==1:
self.eco.print_info = True
if step>=2:
self.eco.spaceSigma_min = self.spaceSigma_min
self.eco.burnInEnd = burnInEnd
self.eco.burnInEnd_step = burnInEnd_step
self.eco.randn_num = randn_num
self.eco.params_space = params_space
for key, value in self.hparams_ANNMC.items():
exec('self.eco.%s = value'%(key))
if self.branch_n==1:
self.eco.train(repeat_n=self.repeat_n, showEpoch_n=showEpoch_n)
else:
self.eco.train(repeat_n=self.repeat_n, train_branch=self.train_branch, parallel=False, showEpoch_n=showEpoch_n) #reset parallel???
#predict chain
#Note: here we use self.cov_copy to avoid data type error in "eco"
chain_1 = self.eco.predict_chain(self.obs_data, cov_matrix=self.cov_copy, lnlike=None, nwalkers=2*len(self.param_names))
#save results
if save_items:
self.eco.save_net(path=self.path)
self.eco.save_loss(path=self.path)
self.eco.save_chain(path=self.path)
self.eco.save_ndeType(path=self.path)
self.eco.save_hparams(path=self.path)
return chain_1, [self.eco.train_loss, self.eco.vali_loss]
[docs] def get_space_type_parts(self):
if self.nde_type_pair[0]=='ANNMC':
if self.space_type=='hypercube' or self.space_type=='LHS':
space_type_part_1 = [self.space_type for i in range(self.expectedBurnInEnd_step)]
else:
space_type_part_1 = ['hypercube' for i in range(self.expectedBurnInEnd_step)]
else:
space_type_part_1 = [self.space_type for i in range(self.expectedBurnInEnd_step)]
if self.nde_type_pair[1]=='ANNMC':
if self.space_type=='hypercube' or self.space_type=='LHS':
space_type_part_2 = [self.space_type for i in range(self.chain_n)]
else:
space_type_part_2 = ['hypercube' for i in range(self.chain_n)]
else:
space_type_part_2 = [self.space_type for i in range(self.chain_n)]
return space_type_part_1, space_type_part_2
def _check_randn_num(self):
self.randn_num = round(abs(self.randn_num), 5)
if self.randn_num>1:
self.randn_num = round(abs(self.randn_num/int(self.randn_num)), 5)
if self.randn_num<1:
self.randn_num = round(self.randn_num+1, 5)
def _randn_nums(self):
return [round(self.randn_num+i, 5) for i in range(self.expectedBurnInEnd_step+self.chain_n)]
[docs] def save_variables(self, randn_num=0.123):
fileName = 'variables%s_%s_%s'%(self.file_identity_str, self.nde_type_str, randn_num)
utils.savenpy(self.path+'/variables', fileName, self.obs_variables, dtype=object)
[docs] def save_ndeInfo(self, path='ann', randn_num=0.123):
fileName = 'ndeInfo%s_%s_%s'%(self.file_identity_str, self.nde_type_str, randn_num)
utils.savenpy(path+'/nde_info', fileName, np.array([self.nde_type_pair, self.nde_type_str, self.branch_n], dtype=object), dtype=object)
[docs] def train(self, path='ann', save_items=True, showEpoch_n=100):
"""Train NDEs and save the results.
Parameters
----------
path : str, optional
The path of the results to be saved. Default: 'ann'
save_items : bool, optional
If True, results will be saved to disk, otherwise, results will not be saved.
showEpoch_n : int, optional
The number of iterations interval for printing. Default: 100
Returns
-------
list
A list of chains and losses.
"""
self.path = path
self._check_randn_num()
randn_nums = self._randn_nums()
#logs & variables & nde_info
if save_items:
logName = 'log%s_%s_%s'%(self.file_identity_str, self.nde_type_str, randn_nums[0])
utils.logger(path=self.path+'/logs', fileName=logName)
self.save_variables(randn_num=randn_nums[0])
self.save_ndeInfo(path=self.path, randn_num=randn_nums[0])
print('randn_num: %s'%randn_nums[0])
self.chain_all = []
self.loss_all = []
burnInEnd = False
self.burnInEnd_step = None
space_type_all = []
nde_type_part_1 = [self.nde_type_pair[0] for i in range(self.expectedBurnInEnd_step)]
nde_type_part_2 = [self.nde_type_pair[1] for i in range(self.chain_n)]
space_type_part_1, space_type_part_2 = self.get_space_type_parts()
param_devs, error_devs, spaceSigma_all = None, None, None
self.sim_obs, self.sim_params, prev_space = None, None, None
start_time = time.time()
for step in range(1, self.expectedBurnInEnd_step+self.chain_n+1):
#update parameter space & set nde_type and space_type
if step>=2:
burnInEnd, self.burnInEnd_step, param_devs, error_devs, spaceSigma_all = self.update_params_space(step, self.chain_all, burnInEnd, self.burnInEnd_step)
if burnInEnd:
nde_type_i = nde_type_part_2[step-self.burnInEnd_step-1]
space_type_i = space_type_part_2[step-self.burnInEnd_step-1]
else:
nde_type_i = nde_type_part_1[step-1]
space_type_i = space_type_part_1[step-1]
#simulate data & split data
self.sim_obs, self.sim_params, space_type_all, prev_space = self.simulate(nde_type_i, space_type_i, step, burnInEnd, param_devs, error_devs, spaceSigma_all,
space_type_all=space_type_all, prev_space=prev_space,
chain_all=self.chain_all, sim_obs=self.sim_obs, sim_params=self.sim_params)
self.train_set, self.vali_set = self.split_data(self.sim_obs, self.sim_params, burnInEnd=burnInEnd)
#training
self._train = eval("self._train_%s"%nde_type_i)
print('\nThe neural density estimator used here is %s'%nde_type_i)
chain_1, losses = self._train(self.train_set, self.vali_set, step=step, burnInEnd=burnInEnd, burnInEnd_step=self.burnInEnd_step,
randn_num=randn_nums[step-1], save_items=save_items, showEpoch_n=showEpoch_n, params_space=prev_space)
#works for MDN in the case None ann chain obtained, see also models_ann --> auto_train
retrain_n = 0
while chain_1 is None:
retrain_n += 1
print("\nRe-training step %s because no ANN chain is obtained!!! The number of retrains: %s\n"%(step,retrain_n))
chain_1, losses = self._train(self.train_set, self.vali_set, step=step, burnInEnd=burnInEnd, burnInEnd_step=self.burnInEnd_step,
randn_num=randn_nums[step-1], save_items=save_items, showEpoch_n=showEpoch_n, params_space=prev_space)
if chain_1 is None and retrain_n>=10:
print('The %s has been retrained %s times, but the ANN chain still cannot be obtained \U0001F602\U0001F602\U0001F602'%(nde_type_i, retrain_n))
print('please reset the hyperparameters of the network (such as the number of epochs, the number of training samples, etc.), or use the ANN or MNN method instead.')
break
self.chain_all.append(chain_1)
self.loss_all.append(losses)
if step==1:
utils.save_predict(path=self.path, nde_type=self.nde_type_str, randn_num=self.randn_num, file_identity_str=self.file_identity_str, chain_true_path=self.chain_true_path, label_true=self.label_true, fiducial_params=self.fiducial_params)
utils.savenpy(self.path+'/initial_params', 'initParams%s_%s_%s'%(self.file_identity_str, self.nde_type_str, self.randn_num), prev_space)
#if burn-in not end after 10 steps, will stop the training process
if step==self.expectedBurnInEnd_step and not burnInEnd:
print('Burn-in not end even after %s steps, please reset hyperparameters of the network or use other NDEs!'%self.expectedBurnInEnd_step)
break
#if chain_n ANN chains are obtained, the training process will stop
if burnInEnd and step-self.burnInEnd_step==self.chain_n:
break
if save_items and self.chain is not None:
fileName = 'chain%s_%s_%s'%(self.file_identity_str, self.nde_type_str, self.randn_num)
utils.savenpy(self.path+'/chain_ann', fileName, self.chain)
print("\nTime elapsed for the training process: %.3f minutes"%((time.time()-start_time)/60))
return self.chain_all, self.loss_all
@property
def good_chains(self):
"""The ANN chians after the burn-in phase."""
if self.burnInEnd_step is None:
warnings.warn('The number of steps is too small to find the Burn-In step and good chains!')
return None
else:
return self.chain_all[self.burnInEnd_step:]
@property
def chain(self):
"""Combined ANN chain using the result of steps after burn-in."""
if self.good_chains is None:
return None
else:
return np.concatenate(self.good_chains, axis=0)
@property
def good_losses(self):
if self.burnInEnd_step is None:
return None
else:
return self.loss_all[self.burnInEnd_step:]
#%% predict
[docs]class Predict(plotter.PlotPosterior):
"""Reanalysis using the saved chains or the well-trained NDEs.
Parameters
----------
obs_data : array-like, list, or None, optional
The observations (measurements) with shape (obs_length,3), or a list of
observations with shape [(obs_length_1,3), (obs_length_2,3), ...].
The first column is the observational variable, the second column is
the best values of the measurement, and the third column is the error of
the measurement. If None, only the saved chains can be used for parameter
estimations, and will not check variables. Default: None
cov_matrix : array-like, list, or None, optional
Covariance matrix of the observational data. It should be an array with
shape (obs_length, obs_length), or a list of covariance matrix with
shape [(obs_length_1, obs_length_1), (obs_length_2, obs_length_2), ...].
If there is no covariance for some observations, the covariance matrix
should be set to None. e.g. [cov_matrix_1, None, cov_matrix_3]. Default: None
path : str, optional
The path of the results saved. Default: 'ann'
randn_num : float or str, optional
A random number that identifies the saved results. Default: float
steps_n : None or int, optional
The number of steps of the training process to be used. If None, the files will be
found automatically. Default: None
Attributes
----------
chain_leng : int, optional
The length of each ANN chain, which equals the number of samples to be
generated by a NDE model when predicting an ANN chain. This only works when
using the :func:`from_net` method. Default: 10000
chain_true_path : str, optional
The path of the true chain of the posterior which can be obtained by using other methods,
such as the MCMC method. Note: only ``.npy`` and ``.txt`` file is supported. Default: ''
label_true : str, optional
The legend label of the true chain. Default: 'True'
fiducial_params : list, optional
A list that contains the fiducial cosmological parameters. Default: []
show_idx : None or list, optional
The index of cosmological parameters when plotting contours. This allows
us to change the order of the cosmological parameters. If None, the order
of parameters follows that in the ANN chain. If list, the minimum value
of it should be 1. See :class:`~.plotter.PlotPosterior`. Default: None
"""
def __init__(self, obs_data=None, cov_matrix=None, path='ann', randn_num=float,
steps_n=None):
self.obs_data = obs_data
self.cov_matrix = cov_matrix
self.path = path
self.randn_num = str(randn_num)
self.steps_n = steps_n
if self.steps_n is not None:
self.randn_nums = [str(Decimal(str(randn_num)) + Decimal(str(i))) for i in range(steps_n)]
self.chain_leng = 10000
self.chain_true_path = '' #only support .npy or .txt file
self.label_true = 'True'
self.fiducial_params = []
self.show_idx = None
self.loader = Loader(path=path)
@property
def obs_variables(self):
if self.branch_n==1:
return self.obs_data[:,0]
else:
obs_varis = []
for i in range(self.branch_n):
obs_varis.append(self.obs_data[i][:,0])
return obs_varis
@property
def trained_variables(self):
file_path = utils.FilePath(filedir=self.path+'/variables', randn_num=self.randn_num, suffix='.npy').filePath()
return np.load(file_path, allow_pickle=True)
@property
def same_variables(self):
if self.branch_n==1:
return np.all(self.obs_variables==self.trained_variables)
else:
same_varis = [np.all(self.obs_variables[i]==self.trained_variables[i]) for i in range(self.branch_n)]
return np.all(same_varis)
@property
def cov_copy(self):
if self.cov_matrix is None:
return None
else:
return np.copy(self.cov_matrix)
[docs] def get_eco(self):
if self.branch_n==1:
if self.nde_type=='ANN':
self.eco = models_ann.PredictOBMLP(path=self.path)
elif self.nde_type=='MDN':
self.eco = models_mdn.PredictOBMDN(path=self.path)
elif self.nde_type=='MNN':
self.eco = models_g.PredictOBMLP_G(path=self.path)
elif self.nde_type=='ANNMC':
self.eco = models_annmc.PredictOBMLP_MC(path=self.path)
else:
if self.nde_type=='ANN':
self.eco = models_ann.PredictMBMLP(path=self.path)
elif self.nde_type=='MDN':
self.eco = models_mdn.PredictMBMDN(path=self.path)
elif self.nde_type=='MNN':
self.eco = models_g.PredictMBMLP_G(path=self.path)
elif self.nde_type=='ANNMC':
self.eco = models_annmc.PredictMBMLP_MC(path=self.path)
[docs] def from_chain(self):
"""Predict using saved chains.
Raises
------
ValueError
If variables of the input observational data are different from those
used to train the NDE, an error will be raised.
"""
self.load_ndeInfo(self.randn_num)
if self.obs_data is not None and not self.same_variables:
raise ValueError('Variables of the input observational data are different from those used to train the network!')
self.chain_all = []
self.good_chains = []
self.good_losses = []
if self.steps_n is None:
# automatically find files
running_num = self.randn_num
while True:
self.loader.randn_num = running_num
nde_type, p_name, p_dict, burnInEnd_step, _ = self.loader.load_ndeType()
if nde_type is None:
break
self.nde_type, self.param_names, self.params_dict, self.burnInEnd_step = nde_type, p_name, p_dict, burnInEnd_step
self.get_eco()
self.eco.randn_num = running_num
self.eco.load_chain()
self.eco.load_hparams()
self.eco.load_loss()
self.eco.load_ndeType()
self.chain_all.append(self.eco.chain)
if self.burnInEnd_step is not None:
self.good_chains.append(self.eco.chain)
self.good_losses.append([self.eco.train_loss, self.eco.vali_loss])
running_num = str(Decimal(running_num)+Decimal('1'))
else:
for i in range(self.steps_n):
self.loader.randn_num = self.randn_nums[i]
nde_type, p_name, p_dict, burnInEnd_step, _ = self.loader.load_ndeType()
if nde_type is None:
break
self.nde_type, self.param_names, self.params_dict, self.burnInEnd_step = nde_type, p_name, p_dict, burnInEnd_step
self.get_eco()
self.eco.randn_num = self.randn_nums[i]
self.eco.load_chain()
self.eco.load_hparams()
self.eco.load_loss()
self.eco.load_ndeType()
self.chain_all.append(self.eco.chain)
if self.burnInEnd_step is not None:
self.good_chains.append(self.eco.chain)
self.good_losses.append([self.eco.train_loss, self.eco.vali_loss])
self.file_identity_str = self.eco.file_identity_str
if len(self.good_chains)==0:
print('The number of steps is too small to find the Burn-In step and good chains \U0001F602\U0001F602\U0001F602')
self.chain = None
else:
# Combined ANN chain using the result of steps after burn-in.
print('The parameters have reached stable values and good chains are obtained \U0001F386\U0001F388\U0001F389')
self.chain = np.concatenate(self.good_chains, axis=0)
fileName = 'chain%s_%s_%s'%(self.file_identity_str, self.nde_type_str, self.randn_num)
utils.savenpy(self.path+'/chain_ann', fileName, self.chain)
[docs] def from_net(self):
"""Predict using saved NDEs.
Raises
------
ValueError
If variables of the input observational data are different from those
used to train the NDE, an error will be raised.
"""
self.load_ndeInfo(self.randn_num)
if self.obs_data is None:
raise ValueError('The observational data should be given, otherwise, use the function `from_chain`')
if self.obs_data is not None and not self.same_variables:
raise ValueError('Variables of the input observational data are different from those used to train the network!')
self.chain_all = []
self.good_chains = []
self.good_losses = []
if self.steps_n is None:
# automatically find files
running_num = self.randn_num
while True:
self.loader.randn_num = running_num
nde_type, p_name, p_dict, burnInEnd_step, _ = self.loader.load_ndeType()
if nde_type is None:
break
self.nde_type, self.param_names, self.params_dict, self.burnInEnd_step = nde_type, p_name, p_dict, burnInEnd_step
self.get_eco()
self.eco.randn_num = running_num
self.eco.load_net()
self.eco.load_hparams()
self.eco.load_loss()
self.eco.load_ndeType()
if self.nde_type=='MDN':
self.eco.predict_chain(self.obs_data, chain_leng=self.chain_leng)
elif self.nde_type=='ANNMC':
self.eco.predict_chain(self.obs_data, cov_matrix=self.cov_copy, lnlike=None, nwalkers=2*len(self.param_names))
else:
self.eco.predict_chain(self.obs_data, cov_matrix=self.cov_copy, chain_leng=self.chain_leng)
self.chain_all.append(self.eco.chain)
if self.burnInEnd_step is not None:
self.good_chains.append(self.eco.chain)
self.good_losses.append([self.eco.train_loss, self.eco.vali_loss])
running_num = str(Decimal(running_num)+Decimal('1'))
else:
for i in range(self.steps_n):
self.loader.randn_num = self.randn_nums[i]
nde_type, p_name, p_dict, burnInEnd_step, _ = self.loader.load_ndeType()
if nde_type is None:
break
self.nde_type, self.param_names, self.params_dict, self.burnInEnd_step = nde_type, p_name, p_dict, burnInEnd_step
self.get_eco()
self.eco.randn_num = self.randn_nums[i]
self.eco.load_net()
self.eco.load_hparams()
self.eco.load_loss()
self.eco.load_ndeType()
if self.nde_type=='MDN':
self.eco.predict_chain(self.obs_data, chain_leng=self.chain_leng)
elif self.nde_type=='ANNMC':
self.eco.predict_chain(self.obs_data, cov_matrix=self.cov_copy, lnlike=None, nwalkers=2*len(self.param_names))
else:
self.eco.predict_chain(self.obs_data, cov_matrix=self.cov_copy, chain_leng=self.chain_leng)
self.chain_all.append(self.eco.chain)
if self.burnInEnd_step is not None:
self.good_chains.append(self.eco.chain)
self.good_losses.append([self.eco.train_loss, self.eco.vali_loss])
self.file_identity_str = self.eco.file_identity_str
if len(self.good_chains)==0:
print('The number of steps is too small to find the Burn-In step and good chains \U0001F602\U0001F602\U0001F602')
self.chain = None
else:
print('The parameters have reached stable values and good chains are obtained \U0001F386\U0001F388\U0001F389')
self.chain = np.concatenate(self.good_chains, axis=0)
fileName = 'chain%s_%s_%s'%(self.file_identity_str, self.nde_type_str, self.randn_num)
utils.savenpy(self.path+'/chain_ann', fileName, self.chain)
[docs] def get_loss(self, alpha=0.6, show_logLoss=False, save_fig=True, show_minLoss=True):
self.fig_loss, _ = self.eco.plot_loss(alpha=alpha, show_logLoss=show_logLoss, show_minLoss=show_minLoss)
if save_fig:
pl.savefig(self.path+'/figures', 'loss%s_%s_%s.pdf'%(self.file_identity_str, self.nde_type,self.randn_num), self.fig_loss)
[docs]class PredictNDEs(plotter.PlotMultiPosterior):
"""Reanalysis using the saved chains for several NDE results.
Parameters
----------
path : str, optional
The path of the results saved. Default: 'ann'
randn_nums : list, optional
A series of random numbers identify the saved results. Default: [1.123,1.123]
Attributes
----------
chain_true_path : str, optional
The path of the true chain of the posterior which can be obtained by using other methods,
such as the MCMC method. Note: only ``.npy`` and ``.txt`` file is supported. Default: ''
label_true : str, optional
The legend label of the true chain. Default: 'True'
show_idx : None or list, optional
The index of cosmological parameters when plotting contours. This allows
us to change the order of the cosmological parameters. If None, the order
of parameters follows that in the ANN chain. If list, the minimum value
of it should be 1. See :class:`~.plotter.PlotPosterior`. Default: None
"""
def __init__(self, path='ann', randn_nums=[1.123,1.123]):
self.path = path
self.randn_nums = randn_nums
self.chain_n = len(randn_nums)
self.loader = Loader(path=path)
self.chain_true_path = ''
self.label_true = 'True'
self.show_idx = None
[docs] def from_chain(self):
"""Predict using saved chains."""
self.chains = []
self.nde_types = []
for i in range(self.chain_n):
self.loader.randn_num = self.randn_nums[i]
nde_type, p_name, p_dict, _, self.file_identity_str = self.loader.load_ndeType(raise_err=True)
self.load_ndeInfo(self.randn_nums[i])
self.nde_types.append(self.nde_type_pair[1])
self.param_names = p_name
self.params_dict = p_dict
self.loader.load_chain_ann()
self.chains.append(self.loader.chain_ann)
self.chain_n = len(self.chains)