# -*- coding: utf-8 -*-
from . import space_updater as su
from . import cosmic_params, utils
import coplot.plot_contours as plc
import numpy as np
import warnings
import torch
from torch.distributions.multivariate_normal import MultivariateNormal
from torch.distributions.beta import Beta
from torch.distributions.laplace import Laplace
from smt.sampling_methods import LHS
from matplotlib.path import Path
import matplotlib.pyplot as plt
import itertools
import math
#%% select samples from the local data sets or previous step samples
#to be updated to use new best-fit (best values & errors)???
#it seems this works well for hypercube & LHS, but not for hypersphere & hyperellipsoid, test this!!!
#because the params_space of hypersphere & hyperellipsoid will be a bit larger
[docs]class ParametersFilter(object):
"""Select cosmological parameters from a data set according to a given parameter space.
Parameters
----------
param_names : list
A list that contains parameter names.
sim_params : array-like
The simulated cosmological parameters with the shape of (N, n),
where N is the number of samples and n is the number of parameters.
params_space : array-like
The parameter space with the shape of (n, 2), where n is the number of parameters.
For each parameter, it is: [lower_limit, upper_limit].
prev_space : array-like
The parameter space of local simulated data (or mock data in previous step),
with shape of (n, 2), where n is the number of parameters.
For each parameter, it is: [lower_limit, upper_limit].
check_include : bool, optional
If True, it will check whether ``params_space`` is in the space of ``sim_params``,
otherwise, do nothing. Default: True
rel_dev_limit : float, optional
The limit of the relative deviation when ``params_space`` is not in the space of ``sim_params``,
the default is 20% (this means if ``params_space`` is :math:`[-5\sigma, +5\sigma]`, it can deviate :math:`<1\sigma` from ``sim_params``),
note that it should be :math:`<0.4` (the deviation :math:`<2\sigma` for parameter space
:math:`[-5\sigma, +5\sigma]`). Default: 0.2
"""
def __init__(self, param_names, sim_params, params_space, prev_space,
check_include=True, rel_dev_limit=0.2):
self.param_names = param_names
self.sim_params = sim_params
self.params_lower = params_space[:,0]
self.params_upper = params_space[:,1]
self.prev_space = prev_space
self.check_include = check_include
self.relDev_limit = rel_dev_limit
def _check_relDev(self):
if self.relDev_limit >= 0.4:
warnings.warn('"rel_dev_limit" must be <0.4 for parameter space [-5\sigma, +5\sigma], it is better to set to 0.2', Warning)
@property
def include(self):
"""Check whether ``params_space`` is in the space of the ``sim_params``.
Returns
-------
bool
If ``params_space`` is in the space of the ``sim_params``,
return True, otherwise, return False.
"""
self._check_relDev()
#old,
# sim_params_min = np.min(self.sim_params, axis=0)
# sim_params_max = np.max(self.sim_params, axis=0)
# sim_params_mean = np.mean(self.sim_params, axis=0)
#new
sim_params_min = self.prev_space[:,0]
sim_params_max = self.prev_space[:,1]
sim_params_mean = np.mean(self.prev_space, axis=1)
residual_lower_min = np.min(self.params_lower - sim_params_min)
residual_upper_max = np.max(self.params_upper - sim_params_max)
relativeDev_lower = (self.params_lower - sim_params_min)/(sim_params_mean - sim_params_min)
relativeDev_upper = (self.params_upper - sim_params_max)/(sim_params_max - sim_params_mean)
# print(min(relativeDev_lower), max(relativeDev_upper), '!!!!')
if residual_lower_min>=0 and residual_upper_max<=0:
return True
elif min(relativeDev_lower)>=-self.relDev_limit and max(relativeDev_upper)<=self.relDev_limit:
return True
else:
out_index_lower = np.where(relativeDev_lower<self.relDev_limit)[0]
out_index_upper = np.where(relativeDev_upper>self.relDev_limit)[0]
out_index = np.union1d(out_index_lower, out_index_upper)
if len(out_index)==len(self.param_names):
print('The parameter space to be learned is not included in the parameter space of the local data sets')
else:
for i in out_index:
print("Learning range of %s is not included in the parameter space of the local data sets"%(self.param_names[i]))
return False
[docs] def filter_index(self):
residual_lower = self.sim_params - self.params_lower
residual_upper = self.sim_params - self.params_upper
residual_lower_min = np.min(residual_lower, axis=1)
residual_upper_max = np.max(residual_upper, axis=1)
index_min = np.where(residual_lower_min >= 0)
index_max = np.where(residual_upper_max <= 0)
self.index = np.intersect1d(index_min, index_max)
return self.index
[docs] def filter_params(self):
if self.check_include:
if self.include:
self.filter_index()
return self.sim_params[self.index]
else:
self.filter_index()
return self.sim_params[self.index]
#%% simulate parameters
[docs]class CutParams(object):
"""Cut parameter samples that crossed the parameter limits.
Parameters
----------
param_names : list
A list that contains parameter names.
params_dict : dict or None, optional
Information of cosmological parameters that include the labels, the minimum values,
and the maximum values. See :func:`~.cosmic_params.params_dict_zoo`. Default: None
"""
def __init__(self, param_names, params_dict=None):
p_property = cosmic_params.ParamsProperty(param_names, params_dict=params_dict)
self.params_limit = p_property.params_limit
[docs] def cut_params(self, params, params_limit=None):
if params_limit is None:
params_limit = self.params_limit
idx_params, idx_edge = np.where(~np.isnan(params_limit))
edge_values = params_limit[idx_params, idx_edge]
for i in range(len(idx_params)):
#left edge
if idx_edge[i]==0:
params = params[np.where(params[:,idx_params[i]]>edge_values[i])]
#right edge
elif idx_edge[i]==1:
params = params[np.where(params[:,idx_params[i]]<edge_values[i])]
return params
[docs]class SimParameters(CutParams):
"""Simulate parameters.
Parameters
----------
param_names : list
A list that contains parameter names.
chain : array-like or None
The predicted ANN chain in the previous step. If ``chain`` is an array,
``params_space`` will be ignored. If ``chain`` is None, ``params_space``
should be given. Default: None
params_space : array-like or None
The parameter space with the shape of (n, 2), where n is the number of parameters.
For each parameter, it is: [lower_limit, upper_limit]. This is only used
for space_type='hypercube' and space_type='LHS'. If ``chain`` is an array,
``params_space`` will be ignored. If ``chain`` is None,
``params_space`` should be given. Default: None
spaceSigma : int or array-like, optional
The size of parameter space to be learned. It is a int or a numpy array
with shape of (n,), where n is the number of parameters, e.g. for spaceSigma=5,
the parameter space to be learned is :math:`[-5\sigma, +5\sigma]`. Default: 5
params_dict : dict or None, optional
Information of cosmological parameters that include the labels, the minimum values,
and the maximum values. See :func:`~.cosmic_params.params_dict_zoo`. Default: None
space_type : str, optional
The type of parameter space. It can be 'hypercube', 'LHS', 'hypersphere',
'hyperellipsoid', or 'posterior_hyperellipsoid'. Default: 'hypercube'
cut_crossedLimit : bool, optional
If True, the data points that cross the parameter limits will be cut.
This only works when space_type is 'hypersphere', 'hyperellipsoid',
or 'posterior_hyperellipsoid'. Default: True
cut_crossedBest : bool, optional
If True, the folded data points that cross the best values will be cut.
It is recommended to set it to True. This only works when space_type is
'hypersphere', 'hyperellipsoid', 'or 'posterior_hyperellipsoid',
and when ``cut_crossedLimit=False``. Default: True
cross_best : bool, optional
If True, the folded data points will cross the best values, otherwise,
the folded data points will not cross the best values. This only works when
space_type is 'hypersphere', 'hyperellipsoid', or 'posterior_hyperellipsoid',
and when ``cut_crossedLimit=False`` and ``cut_crossedBest=False``. Default: False
Attributes
----------
seed : None or int, optional
Seed number which controls random draws. Default: None
Note
----
Either ``chain`` or ``params_space`` should be given to simulate samples.
"""
#remove posterior_hyperellipsoid or update it ?
def __init__(self, param_names, chain=None, params_space=None, spaceSigma=5,
params_dict=None, space_type='hypercube', cut_crossedLimit=True,
cut_crossedBest=True, cross_best=False):
self.param_names = param_names
p_property = cosmic_params.ParamsProperty(param_names, params_dict=params_dict) #
self.params_limit = p_property.params_limit #
self.chain = self._chain(chain) #the input chain should be limited using the params_limit, modify in other palce???
self.spaceSigma = spaceSigma
self.space_type = self._space_type(space_type)
self.params_space = self._params_space(params_space) #define after self.space_type
self.cut_crossedLimit = cut_crossedLimit #
self.cut_crossedBest = cut_crossedBest
self.cross_best = cross_best
self.seed = None
self.max_error = True
@property
def params_n(self):
return len(self.param_names)
def _chain(self, chain):
if chain is None:
return None
else:
chain = su.Chains.reshape_chain(chain)
# using asymmetric error, modify space_updater???
self.best_fit = su.Chains.bestFit(chain, best_type='mode', symmetry_error=False)
# self.best_fit = su.Chains.bestFit(chain, best_type='median', symmetry_error=False) #further test???
self.best = self.best_fit[:,0]
self.sigma_max = np.max(self.best_fit[:,1:], axis=1)
return chain
def _space_type(self, space_type):
if self.chain is None:
if space_type=='hypersphere' or space_type=='hyperellipsoid' or space_type=='posterior_hyperellipsoid':
raise ValueError("A chain should be given if space_type is set to 'hypersphere', 'hyperellipsoid', or 'posterior_hyperellipsoid', otherwise, set space_type to 'hypercube' or 'LHS'")
return space_type
def _params_space(self, p_space):
if self.chain is not None:
if self.space_type=='posterior_hyperellipsoid':
# this params_space will be used in self.posterior_hyperellipsoid
return self.get_edge_space()
else:
p_space = np.c_[self.best-self.sigma_max*self.spaceSigma, self.best+self.sigma_max*self.spaceSigma]
return su.CheckParameterSpace.check_limit(p_space, self.params_limit)
else:
if p_space is None:
raise ValueError("An ANN chain should be given, otherwise, params_space should not be set to None.")
else:
return su.CheckParameterSpace.check_limit(p_space, self.params_limit)
[docs] def hypercube(self, N):
"""Generate samples uniformly in a hypercube parameter space using uniform distribution.
Parameters
----------
N : int
The number of data to be simulated.
Returns
-------
array-like
Parameters.
"""
#see also https://ls11-www.cs.tu-dortmund.de/people/swessing/diversipy/doc/hycusampling.html
#Beachkofski, B.; Grandhi, R. (2002). Improved Distributed Hypercube Sampling.
print('Generating samples uniformly in a hypercube parameter space using uniform distribution')
return self.uniform_params(N, self.params_space)
[docs] def lhs(self, N):
"""Generate samples uniformly in a hypercube parameter space using Latin hypercube sampling.
https://en.wikipedia.org/wiki/Latin_hypercube_sampling
https://blog.csdn.net/yuxeaotao/article/details/108952326
Parameters
----------
N : int
The number of data to be simulated.
Returns
-------
array-like
Parameters.
"""
print('Generating samples uniformly in a hypercube parameter space using Latin hypercube sampling')
#Note: When using the LHS method, it is necessary to simulate the data of training set size at one time.
#pay attention to this when applying it???
return LHS(xlimits=self.params_space, criterion='center', random_state=self.seed)(N)
[docs] def random_ball(self, N, dimension, radius=1):
"""Generate samples uniformly in a ball with N dimension (hypersphere).
https://www.cnpython.com/qa/349434
https://www.zhihu.com/question/277712372
https://blogs.sas.com/content/iml/2016/04/06/generate-points-uniformly-in-ball.html
https://arxiv.org/pdf/1404.1347.pdf
https://www.sciencedirect.com/science/article/pii/S0047259X10001211
"""
# First generate random directions by normalizing the length of a vector of random-normal values (these distribute evenly on ball).
random_directions = np.random.normal(size=(dimension, N))
random_directions = random_directions / np.linalg.norm(random_directions, axis=0)
# Second generate a random radius with probability proportional to the surface area of a ball with a given radius.
random_radii = np.random.random(N)**(1.0/dimension)
# Return the list of random (direction & length) points.
return radius * (random_directions * random_radii).T
[docs] def fold_sphere(self, params):
"""Fold the simulated parameters using the extremum of the parameters.
https://en.wikipedia.org/wiki/Folded_normal_distribution
"""
idx_params, idx_edge = np.where(~np.isnan(self.params_limit))
edge_values = self.params_limit[idx_params, idx_edge]
best_values = self.best[idx_params]
for i in range(len(idx_params)):
#left edge
if idx_edge[i]==0:
if np.min(params[:,idx_params[i]]) < edge_values[i]:
if self.cut_crossedBest:
cut_edge_l = 2*edge_values[i] - best_values[i]
params = params[np.where(params[:,idx_params[i]]>cut_edge_l)]
params_l = params[np.where(params[:,idx_params[i]]<edge_values[i])]
params_r = params[np.where(params[:,idx_params[i]]>edge_values[i])]
if not self.cross_best and max(abs(params_l[:,idx_params[i]]-edge_values[i]))>abs(best_values[i]-edge_values[i]):
a = 2*edge_values[i] - best_values[i]
b = edge_values[i]
p_min = min(params_l[:,idx_params[i]])
p_max = max(params_l[:,idx_params[i]])
params_l[:,idx_params[i]] = a + (params_l[:,idx_params[i]]-p_min)*(b-a) / (p_max-p_min)
params_l[:,idx_params[i]] = 2*edge_values[i] - params_l[:,idx_params[i]]
params = np.r_[params_l, params_r]
#right edge
elif idx_edge[i]==1:
if np.max(params[:,idx_params[i]]) > edge_values[i]:
if self.cut_crossedBest:
cut_edge_r = 2*edge_values[i] - best_values[i]
params = params[np.where(params[:,idx_params[i]]<cut_edge_r)]
params_l = params[np.where(params[:,idx_params[i]]<edge_values[i])]
params_r = params[np.where(params[:,idx_params[i]]>edge_values[i])]
if not self.cross_best and max(abs(params_r[:,idx_params[i]]-edge_values[i]))>abs(best_values[i]-edge_values[i]):
a = edge_values[i]
b = 2*edge_values[i] - best_values[i]
p_min = min(params_r[:,idx_params[i]])
p_max = max(params_r[:,idx_params[i]])
params_r[:,idx_params[i]] = a + (params_r[:,idx_params[i]]-p_min)*(b-a) / (p_max-p_min)
params_r[:,idx_params[i]] = 2*edge_values[i] - params_r[:,idx_params[i]]
params = np.r_[params_l, params_r]
return params
[docs] def hypersphere(self, N):
"""Generate samples uniformly in a hypersphere parameter space.
Parameters
----------
N : int
The number of data to be simulated.
Returns
-------
array-like
Parameters.
"""
print('Generating samples uniformly in a hypersphere parameter space')
radius = 1.0
params_uncorr = self.random_ball(N, self.params_n, radius=radius)
params_uncorr = params_uncorr * self.sigma_max / radius * self.spaceSigma + self.best #note the radius
#cut or fold parameters
if self.cut_crossedLimit:
params_uncorr = self.cut_params(params_uncorr)
else:
params_uncorr = self.fold_sphere(params_uncorr)
return params_uncorr
[docs] def normal_params(self, N, best, sigma_max, spaceSigma):
print('Generating Gaussian parameter space using standard deviation of the parameter')
if isinstance(best, np.ndarray):
params_n = len(best)
else:
params_n = 1
return np.random.randn(N, params_n)*sigma_max/3.0 * spaceSigma + best
[docs] def hyperellipsoid(self, N):
"""Generate samples uniformly in a hyperellipsoid parameter space using covariance between parameters.
https://scipy-cookbook.readthedocs.io/items/CorrelatedRandomSamples.html
https://blogs.sas.com/content/iml/2012/02/08/use-the-cholesky-transformation-to-correlate-and-uncorrelate-variables.html
Parameters
----------
N : int
The number of data to be simulated.
Returns
-------
array-like
Parameters.
Note
----
For Cholesky decomposition, the covariance matrix :math:`C = LL^T`.
So, the transformation relationship between correlated parameters :math:`P_{corr}`
and uncorrelated parameters :math:`P_{uncorr}` is
:math:`P_{corr} = LP_{uncorr}`, :math:`P_{uncorr} = L^{-1}P_{corr}`
"""
#See also https://www.zhihu.com/question/268718682/answer/471626663 , note that u = L^-1 x should be u = L x
#check again for this method?
if self.params_n==1:
# params_corr = self.normal_params(N, self.best, self.sigma_max, self.spaceSigma) #normal distribution
params_corr = self.hypersphere(N) #uniform distribution, use this, more reasonable
else:
print('Generating samples uniformly in a hyperellipsoid parameter space using covariance between parameters')
radius = 1.0
#Covariance matrix may non-positive definite when max_error=False, so just set max_error=True
cov = su.Chains.cov_matrix(self.chain, max_error=self.max_error) #max_error here is using sigma_max
params_uncorr = self.random_ball(N, self.params_n, radius=radius)
#method 1
# params_uncorr = params_uncorr / radius * self.spaceSigma #note the radius
# L = np.linalg.cholesky(cov)
# params_corr = L.dot(params_uncorr.T).T + self.best
#method 2, use this?
sigma_max = np.sqrt(np.diagonal(cov))
sigma_scale_factor = radius / sigma_max #note the radius
idx = np.where(np.ones_like(cov, dtype=bool))
cov_scaled = np.zeros_like(cov)
cov_scaled[idx] = cov[idx] * sigma_scale_factor[idx[0]] * sigma_scale_factor[idx[1]]
L = np.linalg.cholesky(cov_scaled)
#test
# factor_sigma = 0.2
# # error_factor = np.abs(np.random.normal(1, factor_sigma, (len(L),len(L))))
# error_factor = np.abs(np.random.normal(0, factor_sigma, (len(L),len(L)))) + 1
# L = L * error_factor
params_corr = L.dot(params_uncorr.T).T
params_corr = params_corr / sigma_scale_factor / radius * self.spaceSigma + self.best #note the radius
# cut or fold parameters
if self.cut_crossedLimit:
params_corr = self.cut_params(params_corr)
else:
params_corr = self.fold_sphere(params_corr)
return params_corr
#to be updated for plc._hist2d, do not plot figure???
def _get_contour_edge(self, p1, p2, sigma=3):
"""Get the boundary of an ellipse using the chain.
Parameters
----------
p1 : array-like
1-D array with shape of (N,). The chain of the first parameter.
p2 : array-like
1-D array with shape of (N,). The chain of the second parameter.
sigma : int, optional
Multiple of the standard deviation, which can be 1, 2, 3, 4, or 5. Default: 3
Returns
-------
edge : array-like
The boundary of the ellipse.
"""
if sigma==1:
level = 0.6826
elif sigma==2:
level = 0.9544
elif sigma==3:
level = 0.9974
elif sigma==4:
level = 0.99994
elif sigma==5:
level = 0.9999994
contour_edge = plc._hist2d(p1, p2, bins=100, smooth=0, levels=(level,), line_width=1) #smooth: 0 or 1
plt.close() #close the figure plotted in plc._hist2d
segs = contour_edge.allsegs
lens = np.array([len(segs[0][i]) for i in range(len(segs[0]))])
idx_max = np.where(lens==max(lens))[0][0]
edge = segs[0][idx_max]
return edge
@property
def combinations(self):
return [c for c in itertools.combinations(range(0, self.params_n), 2)]
[docs] def get_contour_edges(self,sigma=3):
#change the chain to a new chain such as a 5sigma chain
chain_new = (self.chain - self.best) / 3.0 * self.spaceSigma + self.best
edge_ij = []
for idx in self.combinations:
_edge_ij = self._get_contour_edge(chain_new[:,idx[0]], chain_new[:,idx[1]], sigma=sigma)
p_property_ij = cosmic_params.ParamsProperty([self.param_names[idx[0]], self.param_names[idx[1]]])
params_limit_ij = p_property_ij.params_limit
_edge_ij = self.cut_params(_edge_ij, params_limit_ij)
edge_ij.append(_edge_ij)
return edge_ij
[docs] def get_edge_space(self, sigma=3):
self.edge_ij = self.get_contour_edges(sigma=sigma)
space_mins = [[] for i in range(self.params_n)]
space_maxs = [[] for i in range(self.params_n)]
for i, idx in zip(range(len(self.combinations)), self.combinations):
range_i_min, range_j_min = min(self.edge_ij[i][:,0]), min(self.edge_ij[i][:,1])
range_i_max, range_j_max = max(self.edge_ij[i][:,0]), max(self.edge_ij[i][:,1])
space_mins[idx[0]].append(range_i_min)
space_mins[idx[1]].append(range_j_min)
space_maxs[idx[0]].append(range_i_max)
space_maxs[idx[1]].append(range_j_max)
space_mins = np.array([max(e) for e in space_mins]).reshape(-1, 1)
space_maxs = np.array([min(e) for e in space_maxs]).reshape(-1, 1)
space = np.c_[space_mins, space_maxs]
return su.CheckParameterSpace.check_limit(space, self.params_limit)
[docs] def in_polygon(self, edge, x, y, get_points=True):
"""Judge whether the given points are in the area surrounded by the polygon.
Parameters
----------
edge : array-like
2-D array with shape (N, 2). The vertices of a polygon.
x : array-like
1-D array with shape (M,). The x coordinate of the data points.
y : array-like
1-D array with shape (M,). The y coordinate of the data points.
get_points : bool, optional
If True, it will return data points inside the area, if False,
it will return a bool array which is True if the (closed) path
contains the corresponding point. Default: True
Returns
-------
array-like
Points in the polygon.
"""
# https://blog.csdn.net/weixin_43794311/article/details/121027299
_path = Path(edge)
points = np.c_[x, y]
_in = _path.contains_points(points) #Judge whether the point is in the area.
# _in = _path.contains_points(points, radius=-1e-10) #Judge whether the point is in the area or on the curve.
if get_points:
idx_in = np.where(_in==True)
x_in = x[idx_in]
y_in = y[idx_in]
return x_in, y_in
else:
return _in
[docs] def unique_elements(self, list_array):
"""Find the unique elements of a list which contains various of arrays.
Parameters
----------
list_array : list
A list that contais various of arrays.
Returns
-------
array-like
The sorted unique elements of the list.
"""
if len(list_array)==1:
return list_array[0]
elif len(list_array)==2:
return np.intersect1d(list_array[0], list_array[1])
elif len(list_array)>=3:
uniq = np.intersect1d(list_array[0], list_array[1])
for i in range(len(list_array)-2):
uniq = np.intersect1d(uniq, list_array[i+2])
return uniq
[docs] def posterior_hyperellipsoid(self, N, factor=float):
if self.params_n==1:
params_corr = self.hypersphere(N) #uniform distribution
else:
print('Generating samples in a deformed hyperellipsoid parameter space using the posterior density (ANN chain) directly')
if factor==float:
factor = 1/0.43**self.params_n
sim_points = self.uniform_params(int(N*factor), self.params_space)
points_in = []
for i, idx in zip(range(len(self.combinations)), self.combinations):
points_ij = self.in_polygon(self.edge_ij[i], sim_points[:,idx[0]], sim_points[:,idx[1]], get_points=False)
idx_in_ij = np.where(points_ij==True)[0]
points_in.append(idx_in_ij)
points_in = self.unique_elements(points_in)
params_corr = sim_points[points_in]
idx_rand = np.random.choice(len(params_corr), len(params_corr), replace=False)
params_corr = params_corr[idx_rand]
# cut or fold parameters
if self.cut_crossedLimit:
params_corr = self.cut_params(params_corr)
else:
params_corr = self.fold_sphere(params_corr)
return params_corr
def _get_params(self, N):
if self.space_type=='hypercube':
return self.hypercube(N)
elif self.space_type=='LHS':
return self.lhs(N)
elif self.space_type=='hypersphere':
return self.hypersphere(N)
elif self.space_type=='hyperellipsoid':
return self.hyperellipsoid(N)
elif self.space_type=='posterior_hyperellipsoid':
#here 0.43 is based on experiments and can also be set to other values
factor = 1/0.43**self.params_n
params = self.posterior_hyperellipsoid(N, factor=factor)
while len(params)<N:
factor = factor * math.ceil(N/len(params))
params = self.posterior_hyperellipsoid(N, factor=factor)
return params[:N]
[docs] def get_params(self, N):
np.random.seed(self.seed)
if self.cut_crossedLimit or self.cut_crossedBest:
factor = 1
params = self._get_params(N*factor)
while len(params)<N:
factor += 1
params = self._get_params(N*factor)
return params[:N]
else:
return self._get_params(N)
#test, multi_params & data_seed should be changed, to let seed and parameters random combination
[docs] def get_multiParams(self, N, multi_params=1, use_dataSeed=False, reorder=True):
params = self.get_params(N)
if multi_params==1:
#!!! do not use data seed
return params
else:
multi_P = []
for i in range(multi_params):
multi_P.append(params)
multi_P = np.concatenate(multi_P, axis=0)
if reorder:
index = np.random.choice(len(multi_P), len(multi_P), replace=False)
multi_P = multi_P[index]
if use_dataSeed:
data_seed = np.random.choice(100000000, multi_params, replace=False) #test
self.data_seed = np.repeat(data_seed, N, axis=0) #test
if reorder:
self.data_seed = self.data_seed[index] #test
return multi_P
#%% simulate training set
[docs]class SimObservations(SimParameters):
"""Simulate training set.
Parameters
----------
N : int
The number of data to be simulated.
model : cosmological (or theoretical) model instance
A cosmological (or theoretical) model instance that is used to simulate
training set, it should contains a 'simulate' method, and 'simulate' should
accept input of cosmological parameters, if you use the local data sets,
it should also contain 'load_params', 'load_params_space', and 'load_sample' methods.
param_names : list
A list that contains parameter names.
chain : array-like or None
The predicted ANN chain in the previous step. If ``chain`` is an array,
``params_space`` will be ignored. If ``chain`` is None, ``params_space``
should be given. Default: None
params_space : array-like or None
The parameter space with the shape of (n, 2), where n is the number of parameters.
For each parameter, it is: [lower_limit, upper_limit]. This is only used
for space_type='hypercube' and space_type='LHS'. If ``chain`` is an array,
``params_space`` will be ignored. If ``chain`` is None,
``params_space`` should be given. Default: None
spaceSigma : int or array-like, optional
The size of parameter space to be learned. It is a int or a numpy array
with shape of (n,), where n is the number of parameters, e.g. for spaceSigma=5,
the parameter space to be learned is :math:`[-5\sigma, +5\sigma]`. Default: 5
params_dict : dict or None, optional
Information of cosmological parameters that include the labels, the minimum values,
and the maximum values. See :func:`~.cosmic_params.params_dict_zoo`. Default: None
space_type : str, optional
The type of parameter space. It can be 'hypercube', 'LHS', 'hypersphere',
'hyperellipsoid', or 'posterior_hyperellipsoid'. Default: 'hypercube'
cut_crossedLimit : bool, optional
If True, the data points that cross the parameter limits will be cut.
This only works when space_type is 'hypersphere', 'hyperellipsoid',
or 'posterior_hyperellipsoid'. Default: True
cut_crossedBest : bool, optional
If True, the folded data points that cross the best values will be cut.
It is recommended to set it to True. This only works when space_type is
'hypersphere', 'hyperellipsoid', or 'posterior_hyperellipsoid',
and when ``cut_crossedLimit=False``. Default: True
cross_best : bool, optional
If True, the folded data points will cross the best values, otherwise,
the folded data points will not cross the best values. This only works
when space_type is 'hypersphere', 'hyperellipsoid', or 'posterior_hyperellipsoid',
and when ``cut_crossedLimit=False`` and ``cut_crossedBest=False``. Default: False
local_samples : None, str, or list, optional
Path of local samples, None, 'sample' or ['sample'] or ['sample_1', 'sample_2', ...].
If None, no local samples are used. Default: None
prevStep_data : None or list, optional
Samples simulated in the previous step, if list, it should be [obs, params].
The obs or params has shape (N, n), where N is the number of samples and n is
the number of data points in one measurement (or is the number of parameters). Default: None
check_include : bool, optional
If True, will check whether ``params_space`` is in the space of ``local_samples``,
otherwise, do nothing. Default: True
rel_dev_limit : float, optional
The limit of the relative deviation when ``params_space`` is not in the
space of ``sim_params``, the default is 20% (this means if ``params_space`` is
:math:`[-5\sigma, +5\sigma]`, it can deviate :math:`<1\sigma` from ``sim_params``),
note that it should be :math:`<0.4` (the deviation :math:`<2\sigma` for parameter space
:math:`[-5\sigma, +5\sigma]`). Default: 0.2
Attributes
----------
prev_space : array-like
The parameter space of local simulated data (or mock data in previous step),
with shape of (n, 2), where n is the number of parameters. For each parameter,
it is: [lower_limit, upper_limit].
seed : None or int, optional
Seed number which controls random draws. Default: None
Note
----
Either ``chain`` or ``params_space`` should be given to simulate samples.
"""
def __init__(self, N, model, param_names, chain=None, params_space=None,
spaceSigma=5, params_dict=None, space_type='hypercube',
cut_crossedLimit=True, cut_crossedBest=True, cross_best=False,
local_samples=None, prevStep_data=None, check_include=True,
rel_dev_limit=0.2):
self.N = N
self.model = model
self.param_names = param_names
p_property = cosmic_params.ParamsProperty(param_names, params_dict=params_dict) #
self.params_limit = p_property.params_limit #
self.param_fullNames = p_property.param_fullNames #
self.chain = self._chain(chain)
self.spaceSigma = spaceSigma
self.space_type = self._space_type(space_type)
self.params_space = self._params_space(params_space) #define after self.space_type
self.cut_crossedLimit = cut_crossedLimit
self.cut_crossedBest = cut_crossedBest
self.cross_best = cross_best
self.local_samples = self._local_samples(local_samples)
self.prevStep_data = prevStep_data
self.check_include = check_include
self.rel_dev_limit = rel_dev_limit
self.prev_space = None
self.seed = None
self.max_error = True
def _local_samples(self, loc_samples):
if loc_samples is None:
return loc_samples
else:
return utils.makeList(loc_samples)
[docs] def comb_obs(self, obs_1, obs_2):
# Combine two sets of obsl samples.
is_list = isinstance(obs_1, list)
if is_list:
obs_c = []
for i in range(len(obs_1)):
obs_c.append(np.r_[obs_1[i], obs_2[i]])
return obs_c
else:
if np.ndim(obs_1)==1:
obs_1 = obs_1.reshape(1, -1)
if np.ndim(obs_2)==1:
obs_2 = obs_2.reshape(1, -1)
return np.r_[obs_1, obs_2]
def _filter_localSample(self, local_sample, N_local, local_params, p_filter):
#print('Note: parameter space of the local samples should be large enough to cover the true parameter values (it is better to larger than prior paramter space)')
index_chosen = p_filter.filter_index()
if len(index_chosen) >= N_local:
index_chosen = index_chosen[:N_local]
#to be removed
# if len(index_chosen) >= 1:
# print('Indices of the last five samples chosen from the local data sets are: {}'.format(index_chosen[-5:]))
#when len(index_chosen)=1, selected_* should be reshaped to (1,-1), to be improved, but this may not happen
selected_params = local_params[index_chosen]
#old, to be removed
# selected_obs = []
# for i in range(len(index_chosen)):
# selected_obs.append(self.model.load_sample(local_sample, index_chosen[i]))
# selected_obs = np.array(selected_obs)
#new, updated
local_obs = self.model.load_sample(local_sample)
selected_obs = local_obs[index_chosen]
return selected_obs, selected_params
[docs] def filter_localSample(self, local_sample, N_local):
"""Select samples from the local data sets.
Parameters
----------
local_sample : str
Folders of local samples.
N_local : int
The number of local samples to be selected.
Returns
-------
array-like
The selected observations and parameters.
Note
----
Parameter space of the local samples should be in the initial parameter space.
"""
local_params = self.model.load_params(local_sample)
self.local_space = self.model.load_params_space(local_sample)
p_filter = ParametersFilter(self.param_names, local_params, self.params_space, self.local_space, check_include=self.check_include, rel_dev_limit=self.rel_dev_limit)
if self.check_include:
if p_filter.include:
selected_obs, selected_params = self._filter_localSample(local_sample, N_local, local_params, p_filter)
else:
# print('The parameter space to be learned is not included in the parameter space of the local data sets')
selected_obs, selected_params = [], []
else:
selected_obs, selected_params = self._filter_localSample(local_sample, N_local, local_params, p_filter)
return selected_obs, selected_params
#to be improved, for the if... else..., other functions like this should also be improved
[docs] def filter_localSamples(self, N_local):
print('Loading samples from the local data sets ...')
if len(self.local_samples)==1:
print('Local samples: %s'%(self.local_samples[0]))
local_obs, local_params = self.filter_localSample(self.local_samples[0], N_local)
print('The number of samples selected: %s'%(len(local_params)))
return local_obs, local_params
elif len(self.local_samples)>=2:
print('Local samples: %s'%(self.local_samples[0]))
local_obs, local_params = self.filter_localSample(self.local_samples[0], N_local)
print('The number of samples selected: %s'%(len(local_params)))
if len(local_params)==N_local:
return local_obs, local_params
else:
missing_N = N_local - len(local_params)
for sample in self.local_samples[1:]:
print('\nLocal samples: %s'%(sample))
local_obs_2, local_params_2 = self.filter_localSample(sample, missing_N)
print('The number of samples selected: %s'%(len(local_params_2)))
if len(local_params)==0:
local_obs, local_params = local_obs_2, local_params_2
else:
if len(local_params_2)!=0:
local_obs = self.comb_obs(local_obs, local_obs_2)
local_params = np.r_[local_params, local_params_2]
if len(local_params)==N_local:
return local_obs, local_params
missing_N = N_local - len(local_params)
return local_obs, local_params
def _filter_previousSamples(self, N_pre, pre_obs, pre_params, p_filter):
index_chosen = p_filter.filter_index()
if len(index_chosen) >= N_pre:
index_chosen = index_chosen[:N_pre]
#when len(index_chosen)=1, selected_* should be reshaped to (1,-1), to be improved, but this may not happen
selected_params = pre_params[index_chosen]
selected_obs = pre_obs[index_chosen]
return selected_obs, selected_params
[docs] def filter_previousSamples(self, N_pre):
"""Select samples from the mock data simulated in the previous step.
Parameters
----------
N_pre : int
The number of samples to be selected.
Returns
-------
array-like
The selected observations and parameters.
"""
print('Selecting samples from the mock data of the previous step ...')
pre_obs, pre_params = self.prevStep_data[0], self.prevStep_data[1]
p_filter = ParametersFilter(self.param_names, pre_params, self.params_space, self.prev_space, check_include=self.check_include, rel_dev_limit=self.rel_dev_limit)
if self.check_include:
if p_filter.include:
selected_obs, selected_params = self._filter_previousSamples(N_pre, pre_obs, pre_params, p_filter)
else:
print('The parameter space to be learned is not included in the parameter space of the the previous step samples')
selected_obs, selected_params = [], []
else:
selected_obs, selected_params = [], []
return selected_obs, selected_params
[docs] def simulate_obs(self, N):
params = self.get_params(N)
obs = []
for i in range(N):
if (i+1)%200==0:
print('Simulating the sample: {}/{}'.format(i+1,N))
obs.append(self.model.simulate(params[i])[1])
obs = np.array(obs)
return obs, params
[docs] def simulate(self):
missing_N = self.N
#local samples
if self.local_samples is not None:
local_obs, local_params = self.filter_localSamples(missing_N)
print('%s sets of local samples are added to the training set\n'%(len(local_params)))
if len(local_params)!=0:
self.sim_obs, self.sim_params = local_obs, local_params
missing_N = missing_N - len(local_params)
#previous step samples, Note: if 'local_samples' is not None, previous step samples will not be used
if self.local_samples is None and self.prevStep_data is not None and missing_N!=0:
pre_obs, pre_params = self.filter_previousSamples(missing_N)
print('%s sets of samples of previous step are added to the training set\n'%(len(pre_params)))
if len(pre_params)!=0:
if missing_N==self.N:
self.sim_obs, self.sim_params = pre_obs, pre_params
else:
self.sim_obs = self.comb_obs(self.sim_obs, pre_obs)
self.sim_params = np.r_[self.sim_params, pre_params]
missing_N = missing_N - len(pre_params)
#simulate samples
if missing_N!=0:
print('%s sets of new samples to be simulated ...'%(missing_N))
new_obs, new_params = self.simulate_obs(missing_N)
if len(new_params)==self.N:
self.sim_obs, self.sim_params = new_obs, new_params
else:
self.sim_obs = self.comb_obs(self.sim_obs, new_obs)
self.sim_params = np.r_[self.sim_params, new_params]
return self.sim_obs, self.sim_params
#old, to be removed
# def save_samples(self, path='sim_data/sample'):
# params = self.get_params(self.N)
# utils.savenpy(path, 'parameters', params, dtype=np.float32)
# self._save_paramsName(path)
# for i in range(self.N):
# if (i+1)%100==0:
# print('Simulating the samples: {}/{}'.format(i+1,self.N))
# x, y = self.model.simulate(params[i])
# utils.savenpy(path, 'y_%s'%i, y, dtype=np.float32)
# utils.savenpy(path, 'x', x, dtype=np.float32)
#updated
[docs] def save_samples(self, path='sim_data/sample'):
params = self.get_params(self.N)
utils.savenpy(path, 'parameters', params, dtype=np.float32)
self._save_paramsName(path)
self._save_params_space(path)
obs = []
for i in range(self.N):
if (i+1)%100==0:
print('Simulating the samples: {}/{}'.format(i+1,self.N))
x, y = self.model.simulate(params[i])
obs.append(y)
obs = np.array(obs)
utils.savenpy(path, 'x', x, dtype=np.float32)
utils.savenpy(path, 'y', obs, dtype=np.float32)
#test
[docs] def save_samples_2(self, multi_params=1, path='sim_data/sample', use_dataSeed=False):
# params = self.get_multiParams(self.N, multi_params=multi_params)
params = self.get_multiParams(self.N, multi_params=multi_params, use_dataSeed=use_dataSeed, reorder=True) #test
utils.savenpy(path, 'parameters', params, dtype=np.float32)
self._save_paramsName(path)
self._save_params_space(path)
obs = []
for i in range(self.N*multi_params):
if (i+1)%100==0:
print('Simulating the samples: {}/{}'.format(i+1,self.N*multi_params))
if use_dataSeed:
np.random.seed(self.data_seed[i]) #test
x, y = self.model.simulate(params[i])
obs.append(y)
obs = np.array(obs)
utils.savenpy(path, 'x', x, dtype=np.float32)
utils.savenpy(path, 'y', obs, dtype=np.float32)
#test
[docs] def save_samples_3(self, part_size=10, multi_params=1, path='sim_data/sample'):
params = self.get_multiParams(self.N, multi_params=multi_params)
utils.savenpy(path, 'parameters', params, dtype=np.float32)
self._save_paramsName(path)
self._save_params_space(path)
part_n = len(params) // part_size
utils.savenpy(path, 'part_n_size', [part_n, part_size])
for p in range(part_n):
params_part = params[part_size*p:part_size*(p+1)]
utils.savenpy(path, 'parameters_%s'%p, params_part, dtype=np.float32)
obs = []
for i in range(part_size):
if (i+1)%100==0:
print('Simulating the samples: {}/{}'.format(part_size*p+i+1,self.N*multi_params))
x, y = self.model.simulate(params_part[i])
obs.append(y)
obs = np.array(obs)
utils.savenpy(path, 'x', x, dtype=np.float32)
utils.savenpy(path, 'y_%s'%p, obs, dtype=np.float32)
#test, test before use
[docs] def save_samples_3_onePart(self, params, part_size=10, part_idx=0, path='sim_data/sample'):
if self.seed is None:
raise ValueError("The attribute 'seed' should be assigned to an integer")
utils.savenpy(path, 'parameters', params, dtype=np.float32)
self._save_paramsName(path)
self._save_params_space(path)
part_n = len(params) // part_size
if part_idx+1>part_n:
raise ValueError("The number of part is %s, so the value of 'part_idx' should be <%s"%(part_n, part_n))
utils.savenpy(path, 'part_n_size', [part_n, part_size])
params_part = params[part_size*part_idx:part_size*(part_idx+1)]
utils.savenpy(path, 'parameters_%s'%part_idx, params_part, dtype=np.float32)
obs = []
for i in range(part_size):
if (i+1)%100==0:
print('Simulating the samples: {}/{}'.format(part_size*part_idx+i+1, len(params)))
x, y = self.model.simulate(params_part[i])
obs.append(y)
obs = np.array(obs)
utils.savenpy(path, 'x', x, dtype=np.float32)
utils.savenpy(path, 'y_%s'%part_idx, obs, dtype=np.float32)
def _save_paramsName(self, path):
np.savetxt(path+"/paramNames.txt", self.param_fullNames, fmt='%s', delimiter=' ')
def _save_params_space(self, path):
utils.savenpy(path, 'params_space', self.params_space, dtype=np.float32)
[docs]class SimMultiObservations(SimObservations):
"""Simulate training set containing multiple observations (for multi-branch network).
Parameters
----------
branch_n : int
The number of branch of the network.
N : int
The number of data to be simulated.
model : cosmological (or theoretical) model instance
A cosmological (or theoretical) model instance that is used to simulate training set,
it should contains a 'simulate' method, and 'simulate' should accept input of
cosmological parameters, if you use the local data sets, it should also
contain 'load_params', 'load_params_space', and 'load_sample' methods.
param_names : list
A list that contains parameter names.
chain : array-like or None
The predicted ANN chain in the previous step. If ``chain`` is an array, ``params_space`` will be ignored.
If ``chain`` is None, ``params_space`` should be given. Default: None
params_space : array-like or None
The parameter space with the shape of (n, 2), where n is the number of parameters.
For each parameter, it is: [lower_limit, upper_limit]. This is only used
for space_type='hypercube' and space_type='LHS'. If ``chain`` is an array,
``params_space`` will be ignored. If ``chain`` is None,
``params_space`` should be given. Default: None
spaceSigma : int or array-like, optional
The size of parameter space to be learned. It is a int or a numpy array
with shape of (n,), where n is the number of parameters, e.g. for spaceSigma=5,
the parameter space to be learned is :math:`[-5\sigma, +5\sigma]`. Default: 5
params_dict : dict or None, optional
Information of cosmological parameters that include the labels, the minimum values,
and the maximum values. See :func:`~.cosmic_params.params_dict_zoo`. Default: None
space_type : str, optional
The type of parameter space. It can be 'hypercube', 'LHS', 'hypersphere',
'hyperellipsoid', or 'posterior_hyperellipsoid'. Default: 'hypercube'
cut_crossedLimit : bool, optional
If True, the data points that cross the parameter limits will be cut.
This only works when space_type is 'hypersphere', 'hyperellipsoid',
or 'posterior_hyperellipsoid'. Default: True
cut_crossedBest : bool, optional
If True, the folded data points that cross the best values will be cut.
It is recommended to set it to True. This only works when space_type is
'hypersphere', 'hyperellipsoid', or 'posterior_hyperellipsoid',
and when ``cut_crossedLimit=False``. Default: True
cross_best : bool, optional
If True, the folded data points will cross the best values, otherwise,
the folded data points will not cross the best values. This only works
when space_type is 'hypersphere', 'hyperellipsoid', or 'posterior_hyperellipsoid',
and when ``cut_crossedLimit=False`` and ``cut_crossedBest=False``. Default: False
local_samples : None, str, or list, optional
Path of local samples, None, 'sample' or ['sample'] or ['sample_1', 'sample_2', ...].
If None, no local samples are used. Default: None
prevStep_data : None or list, optional
Samples simulated in the previous step, if list, it should be [obs, params].
The obs or params has shape (N, n), where N is the number of samples and n is
the number of data points in one measurement (or is the number of parameters). Default: None
check_include : bool, optional
If True, will check whether ``params_space`` is in the space of ``local_samples``,
otherwise, do nothing. Default: True
rel_dev_limit : float, optional
The limit of the relative deviation when ``params_space`` is not in the
space of ``sim_params``, the default is 20% (this means if ``params_space`` is
:math:`[-5\sigma, +5\sigma]`, it can deviate :math:`<1\sigma` from ``sim_params``),
note that it should be :math:`<0.4` (the deviation :math:`<2\sigma` for parameter space
:math:`[-5\sigma, +5\sigma]`). Default: 0.2
Attributes
----------
prev_space : array-like
The parameter space of local simulated data (or mock data in previous step),
with shape of (n, 2), where n is the number of parameters. For each parameter,
it is: [lower_limit, upper_limit].
seed : None or int, optional
Seed number which controls random draws. Default: None
Note
----
Either ``chain`` or ``params_space`` should be given to simulate samples.
"""
def __init__(self, branch_n, N, model, param_names, chain=None, params_space=None,
spaceSigma=5, params_dict=None, space_type='hypercube', cut_crossedLimit=True,
cut_crossedBest=True, cross_best=False, local_samples=None, prevStep_data=None,
check_include=True, rel_dev_limit=0.2):
self.branch_n = branch_n
self.N = N
self.model = model
self.param_names = param_names
p_property = cosmic_params.ParamsProperty(param_names, params_dict=params_dict) #
self.params_limit = p_property.params_limit #
self.param_fullNames = p_property.param_fullNames #
self.chain = self._chain(chain)
self.spaceSigma = spaceSigma
self.space_type = self._space_type(space_type)
self.params_space = self._params_space(params_space) #define after self.space_type
self.cut_crossedLimit = cut_crossedLimit
self.cut_crossedBest = cut_crossedBest
self.cross_best = cross_best
self.local_samples = self._local_samples(local_samples)
self.prevStep_data = prevStep_data
self.check_include = check_include
self.rel_dev_limit = rel_dev_limit
self.prev_space = None
self.seed = None
self.max_error = True
def _filter_localSample(self, local_sample, N_local, local_params, p_filter):
#print('Note: parameter space of the local samples should be large enough to cover the true parameter values (it is better to larger than prior paramter space)')
index_chosen = p_filter.filter_index()
if len(index_chosen) >= N_local:
index_chosen = index_chosen[:N_local]
#to be removed
# if len(index_chosen) >= 1:
# print('Indices of the last five samples chosen from the local data sets are: {}'.format(index_chosen[-5:]))
#when len(index_chosen)=1, selected_* should be reshaped to (1,-1), to be improved, but this may not happen
selected_params = local_params[index_chosen]
#old, to be removed
# selected_obs = []
# for comp in range(self.branch_n):
# _obs = []
# for i in range(len(index_chosen)):
# _obs.append(self.model.load_sample(local_sample, index_chosen[i])[comp])
# selected_obs.append(np.array(_obs))
#new, updated
local_obs = self.model.load_sample(local_sample)
selected_obs = [local_obs[comp][index_chosen] for comp in range(self.branch_n)]
return selected_obs, selected_params
def _filter_previousSamples(self, N_pre, pre_obs, pre_params, p_filter):
index_chosen = p_filter.filter_index()
if len(index_chosen) >= N_pre:
index_chosen = index_chosen[:N_pre]
#when len(index_chosen)=1, selected_* should be reshaped to (1,-1), to be improved, but this may not happen
selected_params = pre_params[index_chosen]
selected_obs = []
for comp in range(self.branch_n):
selected_obs.append(pre_obs[comp][index_chosen])
return selected_obs, selected_params
[docs] def simulate_obs(self, N):
params = self.get_params(N)
for i in range(self.branch_n):
exec('obs_%s=[]'%i)
for i in range(N):
if (i+1)%100==0:
print('Simulating the sample: {}/{}'.format(i+1,N))
data = self.model.simulate(params[i])[1]
for j in range(self.branch_n):
exec('obs_%s.append(data[j])'%j)
obs = []
for i in range(self.branch_n):
exec('obs_%s=np.array(obs_%s)'%(i,i))
exec('obs.append(obs_%s)'%i)
return obs, params
#old, to be removed
# def save_samples(self, root_path='sim_data', branch_paths=['comp1','comp2']):
# params = self.get_params(self.N)
# utils.savenpy(root_path, 'parameters', params, dtype=np.float32)
# self._save_paramsName(root_path)
# for i in range(self.N):
# if (i+1)%100==0:
# print('Simulating the samples: {}/{}'.format(i+1,self.N))
# xx, yy = self.model.simulate(params[i])
# for j in range(self.branch_n):
# utils.savenpy(root_path+'/'+branch_paths[j], 'y_%s'%i, yy[j], dtype=np.float32)
# for j in range(len(branch_paths)):
# utils.savenpy(root_path+'/'+branch_paths[j], 'x', xx[j], dtype=np.float32)
#new, updated
[docs] def save_samples(self, path='sim_data', branch_paths=['comp1','comp2']):
params = self.get_params(self.N)
utils.savenpy(path, 'parameters', params, dtype=np.float32)
self._save_paramsName(path)
self._save_params_space(path)
for i in range(self.branch_n):
exec('obs_%s=[]'%i)
for i in range(self.N):
if (i+1)%100==0:
print('Simulating the samples: {}/{}'.format(i+1,self.N))
xx, yy = self.model.simulate(params[i])
for j in range(self.branch_n):
exec('obs_%s.append(yy[j])'%j)
for j in range(self.branch_n):
utils.savenpy(path+'/'+branch_paths[j], 'x', xx[j], dtype=np.float32)
exec('obs_%s=np.array(obs_%s)'%(j,j))
utils.savenpy(path+'/'+branch_paths[j], 'y', eval('obs_%s'%j), dtype=np.float32)
#%% Add Gaussian noise
[docs]class AddGaussianNoise(object):
"""Add Gaussian noise for simulated data.
Parameters
----------
obs : torch tensor or a list of torch tensor
The simulated observations (measurements) with shape (N, obs_length),
or a list of observations with shape [(N,obs_length_1), (N,obs_length_2), ...]
params : torch tensor or None
The simulated cosmological parameters. Default: None
obs_errors : torch tensor, a list of torch tensor, or None, optional
Observational errors (standard deviation) with shape (obs_length,),
or a list of errors with shape [(obs_length_1,), (obs_length_2,), ...].
If ``cholesky_factor`` is set to None, the observational errors should be given. Default: None
cholesky_factor : torch tensor, a list of torch tensor, or None, optional
Cholesky factor of covariance matrix with shape (obs_length, obs_length),
or a list of Cholesky factor of covariance matrix with shape
[(obs_length_1, obs_length_1), (obs_length_2, obs_length_2), ...]. If the
cholesky factor is given, the ``obs_errors`` will be ignored. Default: None
noise_type : str, optional
The type of Gaussian noise added to the training set, 'singleNormal'
or 'multiNormal'. Default: 'multiNormal'
factor_sigma : float, optional
For the case of ``noise_type`` = 'singleNormal', ``factor_sigma`` should be
set to 1. For the case of ``noise_type`` = 'multiNormal', it is the standard
deviation of the coefficient of the observational error (standard deviation). Default: 0.2
multi_noise : int, optional
The number of realization of noise added to a measurement. Default: 5
use_GPU : bool, optional
If True, the noise will be generated by GPU, otherwise, it will be generated by CPU. Default: True
"""
#combine obs_errors & cholesky_factor?
def __init__(self, obs, params=None, obs_errors=None, cholesky_factor=None,
noise_type='multiNormal', factor_sigma=0.2, multi_noise=5, use_GPU=True):
self.obs = obs
self.params = params
self.is_list = isinstance(self.obs, list)
self.obs_errors = obs_errors
self.cholesky_factor = self._cholesky_factor(cholesky_factor)
self.noise_type = noise_type
self.factor_sigma = factor_sigma
self.multi_noise = multi_noise
self.use_GPU = use_GPU
if use_GPU:
self.epsilon = torch.cuda.FloatTensor([1e-20])
else:
self.epsilon = torch.FloatTensor([1e-20])
def _cholesky_factor(self, cholesky_factor):
if self.is_list and cholesky_factor is None:
return [None for i in range(len(self.obs))]
else:
return cholesky_factor
#to be updated for 2D/3D maps
[docs] def obs_noise(self, measurement, obs_error=None, cholesky_factor=None):
# Note 1: "torch.FloatTensor(ell_num).normal_().cuda()"(a) is equivalent to "torch.randn(ell_num).cuda()"(b)
# and equivalent to "torch.cuda.FloatTensor(ell_num).normal_()"(c), in which (c) is faster than (a) and (b)
# ##########################################################################################
# Note 2: in the method of cudaErr, if the input 'data' is in torch.cuda.FloatTensor type,
# then the input 'data' will equal to the output 'data' ! To avoid this bug,
# the input 'data' should be in torch.FloatTensor type !
# def cudaErr(data):
# for i in range(data.size(0)):
# data[i] = data[i] * 0.1
# return data
# ###########################################################################################
# Note 3: (i): m=MultivariateNormal(torch.zeros(measurement_leng), cov_torch)
# (ii): m=MultivariateNormal(torch.zeros(measurement_leng).cuda(), cov_cuda)
# (iii): m=MultivariateNormal(torch.cuda.FloatTensor(measurement_leng).zero_(), cov_cuda)
# the speed of (i), (ii) and (iii) is (ii) > (i) > (iii)
# Note 4: even though (ii) is faster, it is slower than (i) when using:
# batch_index = np.random.choice(len(self.inputs), self.batch_size, replace=False)
# xx = self.inputs[batch_index] # this step is slower, why?
# Note 5: in MultivariateNormal, using scale_tril will be more efficient than covariance_matrix and precision_matrix,
# see: https://pytorch.org/docs/stable/distributions.html?highlight=multivariatenormal#torch.distributions.multivariate_normal.MultivariateNormal
# Note 6: (i): m = MultivariateNormal(mean, covariance_matrix=cov_matrix) #slower than the case when using scale_tril
# (ii): L = torch.cholesky(cov_matrix, upper=False) #cov=LL^T
# m = MultivariateNormal(mean, scale_tril=L) #using scale_tril will be more efficient
# case (ii) is faster than case (i)
if cholesky_factor is None:
if self.use_GPU:
noise = torch.cuda.FloatTensor(measurement.size()).normal_(0,1) * obs_error
else:
noise = torch.FloatTensor(measurement.size()).normal_(0,1) * obs_error
else:
if self.use_GPU:
mean = torch.zeros(cholesky_factor.size(-1)).cuda()
else:
mean = torch.zeros(cholesky_factor.size(-1))
m = MultivariateNormal(mean, scale_tril=cholesky_factor)
noise = m.sample((measurement.size(0),))
return noise
#to be updated for 2D/3D maps
def _singleNormalObs(self, measurement, obs_error=None, cholesky_factor=None, error_factor=1):
if cholesky_factor is None:
noise = self.obs_noise(measurement, obs_error=obs_error*error_factor)
else:
noise = self.obs_noise(measurement, cholesky_factor=cholesky_factor*error_factor)
return measurement + noise
#to be updated for 2D/3D maps
def _multiNormalObs(self, measurement, obs_error=None, cholesky_factor=None, factor_sigma=0.2):
if cholesky_factor is None:
if self.use_GPU:
error_factor = torch.abs(torch.cuda.FloatTensor(measurement.size(1)).normal_(0, factor_sigma)) #A !!!+, use this
# error_factor = torch.abs(torch.cuda.FloatTensor(measurement.size(0), 1).normal_(0, factor_sigma))
# error_factor = torch.abs(torch.cuda.FloatTensor(measurement.size()).normal_(0, factor_sigma)) !!!
# error_factor = torch.abs(torch.cuda.FloatTensor(1).normal_(0, factor_sigma)) #test for map-->params
# print(error_factor.shape, (obs_error*error_factor).shape)
#test Beta distribution, need further test
# error_factor = Beta(1, 5).sample((measurement.size(1),)).cuda()
# error_factor = Beta(1, 10).sample((measurement.size(1),)).cuda()
# error_factor = Beta(1, 3).sample((measurement.size(1),)).cuda()
# print('using Beta distribution ........')
#test exponential distribution, need further test
# error_factor = torch.cuda.FloatTensor(measurement.size(1),).exponential_(6)
#test Laplace distribution, need further test
# error_factor = torch.abs(Laplace(0, 0.2).sample((measurement.size(1),)).cuda()) #not good
# error_factor = torch.abs(Laplace(0, 0.3).sample((measurement.size(1),)).cuda())
# error_factor = torch.abs(Laplace(0, 0.1).sample((measurement.size(1),)).cuda())
else:
error_factor = torch.abs(torch.FloatTensor(measurement.size(1)).normal_(0, factor_sigma))
# error_factor = torch.abs(torch.FloatTensor(measurement.size(0), 1).normal_(0, factor_sigma))
# error_factor = torch.abs(torch.FloatTensor(measurement.size()).normal_(0, factor_sigma))
# error_factor = torch.abs(torch.FloatTensor(1).normal_(0, factor_sigma))
noise = self.obs_noise(measurement, obs_error=obs_error*error_factor)
else:
if self.use_GPU:
#try to update this to make the result better???
# method 1
#the epsilon is used to ensure error_factor>0, because the diagonal of cholesky_factor*error_factor must be positive
error_factor = torch.abs(torch.cuda.FloatTensor(1).normal_(0, factor_sigma)) + self.epsilon #A !!, use this
# method 2, less good, need further test?
# mean = torch.zeros(cholesky_factor.size(-1)).cuda()
# m = MultivariateNormal(mean, scale_tril=cholesky_factor)
# obs_error = m.sample((measurement.size(0),))
# error_factor = torch.abs(torch.cuda.FloatTensor(measurement.size(0), 1).normal_(0, factor_sigma)) #!, less good
# # error_factor = torch.abs(torch.cuda.FloatTensor(measurement.size(1)).normal_(0, factor_sigma)) #?
# # error_factor = torch.abs(torch.cuda.FloatTensor(1).normal_(0, factor_sigma)) #?
# noise = obs_error * error_factor
# method 3, the same as method 2, further test?
# cov = cholesky_factor.clone()
# error_factor = torch.abs(torch.cuda.FloatTensor(measurement.size(1)).normal_(0, factor_sigma)) + 1e-4 #self.epsilon #A !!, use this
# idx = torch.where(cov)
# cov[idx] = cov[idx] * error_factor[idx[0]] * error_factor[idx[1]]
# cholesky_factor_2 = torch.cholesky(cov)
# noise = self.obs_noise(measurement, cholesky_factor=cholesky_factor_2)
# method 4, the same as method 3, further test? is more reasonable than method 5?, better?
# cov = cholesky_factor.clone()
# # print('ddddd????')
# factor_sigma_2 = 0.1
# # factor_sigma_2 = factor_sigma
# error_factor_2 = torch.abs(torch.cuda.FloatTensor(measurement.size(1)).normal_(1, factor_sigma_2))
# idx = torch.where(cov)
# cov[idx] = cov[idx] * error_factor_2[idx[0]] * error_factor_2[idx[1]]
# error_factor = torch.abs(torch.cuda.FloatTensor(1).normal_(0, factor_sigma)) + self.epsilon #A !!, use this
# cov = cov * error_factor**2
# cholesky_factor_2 = torch.cholesky(cov)
# noise = self.obs_noise(measurement, cholesky_factor=cholesky_factor_2)
# method 5, the same as method 4, but random cholesky_factor, right?
# # factor_sigma_2 = 0.01 #random < 5%
# # factor_sigma_2 = 0.03 #random < 10%
# factor_sigma_2 = 0.2 #random < 100%
# # factor_sigma_2 = factor_sigma
# # print(factor_sigma_2, 'ddd???')
# error_factor_2 = torch.abs(torch.cuda.FloatTensor(measurement.size(1),measurement.size(1)).normal_(1, factor_sigma_2))
# # cholesky_f = cholesky_f * error_factor_2 #right?
# error_factor = torch.abs(torch.cuda.FloatTensor(1).normal_(0, factor_sigma)) + self.epsilon #A !!, use this
# noise = self.obs_noise(measurement, cholesky_factor=cholesky_factor*error_factor_2*error_factor)
else:
#the epsilon is used to ensure error_factor>0, because the diagonal of cholesky_factor*error_factor must be positive
error_factor = torch.abs(torch.FloatTensor(1).normal_(0, factor_sigma)) + self.epsilon
# method 1
noise = self.obs_noise(measurement, cholesky_factor=cholesky_factor*error_factor)
return measurement + noise
[docs] def singleNormalObs(self, error_factor=1):
if self.is_list:
noisy_sp = []
for i in range(len(self.obs)):
_noisy_sp = self._singleNormalObs(self.obs[i], obs_error=self.obs_errors[i], cholesky_factor=self.cholesky_factor[i], error_factor=error_factor)
noisy_sp.append(_noisy_sp)
else:
noisy_sp = self._singleNormalObs(self.obs, obs_error=self.obs_errors, cholesky_factor=self.cholesky_factor, error_factor=error_factor)
return noisy_sp
[docs] def multiNormalObs(self, factor_sigma=0.2):
if self.is_list:
noisy_sp = []
for i in range(len(self.obs)):
_noisy_sp = self._multiNormalObs(self.obs[i], obs_error=self.obs_errors[i], cholesky_factor=self.cholesky_factor[i], factor_sigma=factor_sigma)
noisy_sp.append(_noisy_sp)
else:
noisy_sp = self._multiNormalObs(self.obs, obs_error=self.obs_errors, cholesky_factor=self.cholesky_factor, factor_sigma=factor_sigma)
return noisy_sp
[docs] def noisyObs(self):
if self.noise_type=='singleNormal':
return self.singleNormalObs(error_factor=self.factor_sigma)
elif self.noise_type=='multiNormal':
return self.multiNormalObs(factor_sigma=self.factor_sigma)
else:
raise ValueError("noise_type must be 'singleNormal' or 'multiNormal'")
[docs] def noisySample(self):
return self.noisyObs(), self.params
[docs] def multiParams(self):
multi_P = []
for i in range(self.multi_noise):
multi_P.append(self.params)
multi_P = torch.cat(multi_P, dim=0)
return multi_P
[docs] def multiNoisyObs(self):
_multi_NS = []
for i in range(self.multi_noise):
_multi_NS.append(self.noisyObs())
if self.is_list:
multi_NS = []
for comp in range(len(self.obs)):
oneComp_NS = []
for i in range(self.multi_noise):
oneComp_NS.append(_multi_NS[i][comp])
oneComp_NS = torch.cat(oneComp_NS, dim=0)
multi_NS.append(oneComp_NS)
else:
multi_NS = torch.cat(_multi_NS, dim=0)
return multi_NS
[docs] def multiNoisySample(self, reorder=True):
multi_P = self.multiParams()
multi_NS = self.multiNoisyObs()
if reorder:
index = np.random.choice(len(multi_P), len(multi_P), replace=False)
multi_P = multi_P[index]
if self.is_list:
for comp in range(len(self.obs)):
multi_NS[comp] = multi_NS[comp][index]
else:
multi_NS = multi_NS[index]
return multi_NS, multi_P