# -*- coding: utf-8 -*-
import numpy as np
import torch
#%% data conversion
[docs]def numpy2torch(data, dtype=torch.FloatTensor):
""" Transfer data from the numpy array (on CPU) to the torch tensor (on CPU). """
# dtype = torch.FloatTensor
data = torch.from_numpy(data).type(dtype)
return data
[docs]def numpy2cuda(data, device=None, dtype=torch.cuda.FloatTensor):
""" Transfer data from the numpy array (on CPU) to the torch tensor (on GPU). """
if device is None:
# dtype = torch.cuda.FloatTensor
data = torch.from_numpy(data).type(dtype)
else:
data = numpy2torch(data)
data = torch2cuda(data, device=device)
return data
[docs]def torch2cuda(data, device=None):
""" Transfer data (torch tensor) from CPU to GPU. """
return data.cuda(device=device)
[docs]def torch2numpy(data):
""" Transfer data from the torch tensor (on CPU) to the numpy array (on CPU). """
return data.numpy()
[docs]def cuda2torch(data):
""" Transfer data (torch tensor) from GPU to CPU. """
return data.cpu()
[docs]def cuda2numpy(data):
""" Transfer data from the torch tensor (on GPU) to the numpy array (on CPU). """
return data.cpu().numpy()
[docs]def cpu2cuda(data):
"""Transfer data from CPU to GPU.
Parameters
----------
data : array-like or tensor
Numpy array or torch tensor.
Raises
------
TypeError
The data type should be :class:`np.ndarray` or :class:`torch.Tensor`.
Returns
-------
Tensor
Torch tensor.
"""
d_type = type(data)
if d_type is np.ndarray:
return numpy2cuda(data)
elif d_type is torch.Tensor:
return torch2cuda(data)
else:
raise TypeError('The data type should be numpy.ndarray or torch.Tensor')
#%% network and data transfer
[docs]class Transfer(object):
"""Network and data transfer."""
def __init__(self, net, obs, params, obs_base, obs_vali=None, params_vali=None,
obs_errors=None, cholesky_factor=None, branch_n=int):
self.net = net
self.obs = obs
self.params = params
self.obs_base = obs_base
self.obs_vali = obs_vali
self.params_vali = params_vali
self.obs_errors = obs_errors
self.cholesky_factor = cholesky_factor
self.branch_n = branch_n
[docs] def check_GPU(self):
if torch.cuda.is_available():
device_ids = list(range(torch.cuda.device_count()))
device = device_ids[0]
else:
device_ids = None
device = None
return device_ids, device
def _prints(self, items, prints=True):
if prints:
print(items)
[docs] def call_GPU(self, prints=True):
if torch.cuda.is_available():
self.use_GPU = True
gpu_num = torch.cuda.device_count()
if gpu_num > 1:
self.use_multiGPU = True
self._prints('\nTraining the network using {} GPUs'.format(gpu_num), prints=prints)
else:
self.use_multiGPU = False
self._prints('\nTraining the network using 1 GPU', prints=prints)
else:
self.use_GPU = False
self._prints('\nTraining the network using CPU', prints=prints)
[docs] def transfer_net(self, use_DDP=False, device_ids=None, prints=True):
if device_ids is None:
device = None
else:
device = device_ids[0]
self.call_GPU(prints=prints)
if self.use_GPU:
self.net = self.net.cuda(device=device)
if self.use_multiGPU:
if use_DDP:
self.net = torch.nn.parallel.DistributedDataParallel(self.net, device_ids=device_ids)
else:
self.net = torch.nn.DataParallel(self.net, device_ids=device_ids)
[docs] def transfer_base(self):
if self.use_GPU:
self.obs_base_torch = numpy2cuda(self.obs_base)
self.params_base_torch = numpy2cuda(self.params_base)
else:
self.obs_base_torch = numpy2torch(self.obs_base)
self.params_base_torch = numpy2torch(self.params_base)
[docs] def transfer_trainSet(self, transfer_base=True):
if self.use_GPU:
self.obs = numpy2cuda(self.obs)
self.params = numpy2cuda(self.params)
if transfer_base:
self.obs_base_torch = numpy2cuda(self.obs_base)
self.params_base_torch = numpy2cuda(self.params_base)
else:
self.obs = numpy2torch(self.obs)
self.params = numpy2torch(self.params)
if transfer_base:
self.obs_base_torch = numpy2torch(self.obs_base)
self.params_base_torch = numpy2torch(self.params_base)
[docs] def transfer_valiSet(self):
if self.use_GPU:
#vali_set
if self.obs_vali is not None:
self.obs_vali = numpy2cuda(self.obs_vali)
self.params_vali = numpy2cuda(self.params_vali)
else:
#vali_set
if self.obs_vali is not None:
self.obs_vali = numpy2torch(self.obs_vali)
self.params_vali = numpy2torch(self.params_vali)
[docs] def transfer_data(self):
if self.use_GPU:
self.obs = numpy2cuda(self.obs)
self.params = numpy2cuda(self.params)
if self.cholesky_factor is None:
self.obs_errors = numpy2cuda(self.obs_errors)
else:
self.cholesky_factor = numpy2cuda(self.cholesky_factor)
self.obs_base_torch = numpy2cuda(self.obs_base)
self.params_base_torch = numpy2cuda(self.params_base)
#vali_set
if self.obs_vali is not None:
self.obs_vali = numpy2cuda(self.obs_vali)
self.params_vali = numpy2cuda(self.params_vali)
else:
self.obs = numpy2torch(self.obs)
self.params = numpy2torch(self.params)
if self.cholesky_factor is None:
self.obs_errors = numpy2torch(self.obs_errors)
else:
self.cholesky_factor = numpy2torch(self.cholesky_factor)
self.obs_base_torch = numpy2torch(self.obs_base)
self.params_base_torch = numpy2torch(self.params_base)
#vali_set
if self.obs_vali is not None:
self.obs_vali = numpy2torch(self.obs_vali)
self.params_vali = numpy2torch(self.params_vali)
[docs] def transfer_MB_trainSet(self):
if self.use_GPU:
self.obs = [numpy2cuda(self.obs[i]) for i in range(self.branch_n)]
self.params = numpy2cuda(self.params)
self.obs_base_torch = [numpy2cuda(self.obs_base[i]) for i in range(self.branch_n)]
self.params_base_torch = numpy2cuda(self.params_base)
else:
self.obs = [numpy2torch(self.obs[i]) for i in range(self.branch_n)]
self.params = numpy2torch(self.params)
self.obs_base_torch = [numpy2torch(self.obs_base[i]) for i in range(self.branch_n)]
self.params_base_torch = numpy2torch(self.params_base)
[docs] def transfer_MB_valiSet(self):
if self.use_GPU:
#vali_set
if self.obs_vali is not None:
self.obs_vali = [numpy2cuda(self.obs_vali[i]) for i in range(self.branch_n)]
self.params_vali = numpy2cuda(self.params_vali)
else:
#vali_set
if self.obs_vali is not None:
self.obs_vali = [numpy2torch(self.obs_vali[i]) for i in range(self.branch_n)]
self.params_vali = numpy2torch(self.params_vali)
[docs] def transfer_MB_data(self):
if self.use_GPU:
self.obs = [numpy2cuda(self.obs[i]) for i in range(self.branch_n)]
self.params = numpy2cuda(self.params)
for i in range(self.branch_n):
if self.cholesky_factor[i] is None:
self.obs_errors[i] = numpy2cuda(self.obs_errors[i])
else:
self.cholesky_factor[i] = numpy2cuda(self.cholesky_factor[i])
self.obs_base_torch = [numpy2cuda(self.obs_base[i]) for i in range(self.branch_n)]
self.params_base_torch = numpy2cuda(self.params_base)
#vali_set
if self.obs_vali is not None:
self.obs_vali = [numpy2cuda(self.obs_vali[i]) for i in range(self.branch_n)]
self.params_vali = numpy2cuda(self.params_vali)
else:
self.obs = [numpy2torch(self.obs[i]) for i in range(self.branch_n)]
self.params = numpy2torch(self.params)
for i in range(self.branch_n):
if self.cholesky_factor[i] is None:
self.obs_errors[i] = numpy2torch(self.obs_errors[i])
else:
self.cholesky_factor[i] = numpy2torch(self.cholesky_factor[i])
self.obs_base_torch = [numpy2torch(self.obs_base[i]) for i in range(self.branch_n)]
self.params_base_torch = numpy2torch(self.params_base)
#vali_set
if self.obs_vali is not None:
self.obs_vali = [numpy2torch(self.obs_vali[i]) for i in range(self.branch_n)]
self.params_vali = numpy2torch(self.params_vali)
#%% statistic of a numpy array
[docs]class Statistic(object):
def __init__(self, x, dim=None):
"""Statistics of an array.
Parameters
----------
x : array-like or tensor
The data to be calculated.
dim : None or int, optional
The dimension to reduce, it should be set to None or 0. If None, all dimensions
will be reduced; if 0, only the mini-batch dimension will be reduced,
which means each element will be normalized independently. Default: None
Returns
-------
None.
"""
self.x = x
self.dtype = type(x)
self.dim = dim
@property
def mean(self):
if self.dtype==np.ndarray:
return np.mean(self.x, axis=self.dim)
elif self.dtype==torch.Tensor:
if self.dim is None:
return torch.mean(self.x)
else:
return torch.mean(self.x, dim=self.dim)
@property
def xmin(self):
if self.dtype==np.ndarray:
return np.min(self.x, axis=self.dim)
elif self.dtype==torch.Tensor:
if self.dim is None:
return torch.min(self.x)
else:
return torch.min(self.x, dim=self.dim)[0]
@property
def xmax(self):
if self.dtype==np.ndarray:
return np.max(self.x, axis=self.dim)
elif self.dtype==torch.Tensor:
if self.dim is None:
return torch.max(self.x)
else:
return torch.max(self.x, dim=self.dim)[0]
@property
def std(self):
if self.dtype==np.ndarray:
return np.std(self.x, axis=self.dim)
elif self.dtype==torch.Tensor:
if self.dim is None:
return torch.std(self.x)
else:
return torch.std(self.x, dim=self.dim)
#change the name to get_st?
[docs] def statistic(self):
st = {'min' : self.xmin,
'max' : self.xmax,
'mean': self.mean,
'std' : self.std,
}
return st
[docs] def statistic_torch(self, use_GPU=True):
st = self.statistic()
dict_element = ['min', 'max', 'mean', 'std']
if use_GPU:
for e in dict_element:
st[e] = numpy2cuda(st[e])
else:
for e in dict_element:
st[e] = numpy2torch(st[e])
return st
#%% normalization & inverse normalization
[docs]class Normalize(object):
""" Normalize data. """
def __init__(self, x, statistic={}, norm_type='z_score', a=1e-6, b=0.999999):
self.x = x
self.stati = statistic
self.norm_type = norm_type
self.a = a #only for minmax
self.b = b #only for minmax
[docs] def minmax(self):
"""min-max normalization
Rescaling the range of features to scale the range in [0, 1] or [a,b]
https://en.wikipedia.org/wiki/Feature_scaling
"""
return self.a + (self.x-self.stati['min'])*(self.b-self.a) / (self.stati['max']-self.stati['min'])
[docs] def mean(self):
""" mean normalization """
return (self.x-self.stati['mean'])/(self.stati['max']-self.stati['min'])
[docs] def z_score(self):
""" standardization/z-score/zero-mean normalization """
return (self.x-self.stati['mean'])/self.stati['std']
[docs] def norm(self):
return eval('self.%s()'%self.norm_type)
[docs]class InverseNormalize(object):
""" Inverse transformation of class :class:`~Normalize`. """
def __init__(self, x1, statistic={}, norm_type='z_score', a=1e-6, b=0.999999):
self.x = x1
self.stati = statistic
self.norm_type = norm_type
self.a = a #only for minmax
self.b = b #only for minmax
[docs] def minmax(self):
return (self.x-self.a) * (self.stati['max']-self.stati['min']) / (self.b-self.a) + self.stati['min']
[docs] def mean(self):
return self.x * (self.stati['max']-self.stati['min']) + self.stati['mean']
[docs] def z_score(self):
return self.x * self.stati['std'] + self.stati['mean']
[docs] def inverseNorm(self):
return eval('self.%s()'%self.norm_type)
#%% data preprocessing
[docs]class DataPreprocessing(object):
"""Data preprocessing of measurements and cosmological parameters."""
def __init__(self, obs, params, obs_base, params_base, params_vali=None):
self.obs = obs
self.params = params
self.obs_base = obs_base
self.params_base = params_base
self.params_vali = params_vali
self.scale_obs = False
self.scale_params = True
self.norm_obs = True
self.norm_params = True
self.independent_norm_params = True
self.norm_type = 'z_score'
def _get_params_tot(self):
if self.params_vali is None:
return self.params
else:
return np.concatenate((self.params, self.params_vali), axis=0)
[docs] def get_statistic(self, max_idx=None):
"""Get statistics of observations and parameters.
Parameters
----------
max_idx : None or int, optional
The maximum index of obs when calculating statistics of observations.
It is useful to set a maximum index for the training set with a lot of data,
which will reduce the use of computer resources. Default: None
Returns
-------
None.
"""
if self.scale_obs:
self.obs_statistic = Statistic(self.obs[:max_idx]/self.obs_base, dim=self.statistic_dim_obs).statistic()
if self.independent_norm_obs:
self.obs_statistic_torch = Statistic(self.obs[:max_idx]/self.obs_base, dim=self.statistic_dim_obs).statistic_torch(use_GPU=self.use_GPU)
else:
self.obs_statistic = Statistic(self.obs[:max_idx], dim=self.statistic_dim_obs).statistic()
if self.independent_norm_obs:
self.obs_statistic_torch = Statistic(self.obs[:max_idx], dim=self.statistic_dim_obs).statistic_torch(use_GPU=self.use_GPU)
self.params_tot = self._get_params_tot() #the using of params_tot will avoid nan in vali_loss when using Beta components
if self.scale_params:
self.params_statistic = Statistic(self.params_tot/self.params_base, dim=self.statistic_dim_params).statistic()
if self.independent_norm_params:
self.params_statistic_torch = Statistic(self.params_tot/self.params_base, dim=self.statistic_dim_params).statistic_torch(use_GPU=self.use_GPU)
else:
self.params_statistic = Statistic(self.params_tot, dim=self.statistic_dim_params).statistic()
if self.independent_norm_params:
self.params_statistic_torch = Statistic(self.params_tot, dim=self.statistic_dim_params).statistic_torch(use_GPU=self.use_GPU)
[docs] def get_MB_statistic(self):
if self.scale_obs:
self.obs_statistic = [Statistic(self.obs[i]/self.obs_base[i], dim=self.statistic_dim_obs).statistic() for i in range(len(self.obs))]
if self.independent_norm_obs:
self.obs_statistic_torch = [Statistic(self.obs[i]/self.obs_base[i], dim=self.statistic_dim_obs).statistic_torch(use_GPU=self.use_GPU) for i in range(len(self.obs))]
else:
self.obs_statistic = [Statistic(self.obs[i], dim=self.statistic_dim_obs).statistic() for i in range(len(self.obs))]
if self.independent_norm_obs:
self.obs_statistic_torch = [Statistic(self.obs[i], dim=self.statistic_dim_obs).statistic_torch(use_GPU=self.use_GPU) for i in range(len(self.obs))]
self.params_tot = self._get_params_tot() #the using of params_tot will avoid nan in vali_loss when using Beta components
if self.scale_params:
self.params_statistic = Statistic(self.params_tot/self.params_base, dim=self.statistic_dim_params).statistic()
if self.independent_norm_params:
self.params_statistic_torch = Statistic(self.params_tot/self.params_base, dim=self.statistic_dim_params).statistic_torch(use_GPU=self.use_GPU)
else:
self.params_statistic = Statistic(self.params_tot, dim=self.statistic_dim_params).statistic()
if self.independent_norm_params:
self.params_statistic_torch = Statistic(self.params_tot, dim=self.statistic_dim_params).statistic_torch(use_GPU=self.use_GPU)
[docs] def normalize_obs(self, obs, obs_base):
if self.scale_obs:
obs = obs / obs_base
if self.norm_obs:
if self.independent_norm_obs and type(obs)==torch.Tensor:
obs = Normalize(obs, self.obs_statistic_torch, norm_type=self.norm_type).norm()
else:
obs = Normalize(obs, self.obs_statistic, norm_type=self.norm_type).norm()
return obs
[docs] def inverseNormalize_obs(self, obs, obs_base):
if self.norm_obs:
if self.independent_norm_obs and type(obs)==torch.Tensor:
obs = InverseNormalize(obs, self.obs_statistic_torch, norm_type=self.norm_type).inverseNorm()
else:
obs = InverseNormalize(obs, self.obs_statistic, norm_type=self.norm_type).inverseNorm()
if self.scale_obs:
obs = obs * obs_base
return obs
[docs] def normalize_params(self, params, params_base):
if self.scale_params:
params = params / params_base
if self.norm_params:
if self.independent_norm_params and type(params)==torch.Tensor:
params = Normalize(params, self.params_statistic_torch, norm_type=self.norm_type).norm()
else:
params = Normalize(params, self.params_statistic, norm_type=self.norm_type).norm()
return params
[docs] def inverseNormalize_params(self, params, params_base):
if self.norm_params:
if self.independent_norm_params and type(params)==torch.Tensor:
params = InverseNormalize(params, self.params_statistic_torch, norm_type=self.norm_type).inverseNorm()
else:
params = InverseNormalize(params, self.params_statistic, norm_type=self.norm_type).inverseNorm()
if self.scale_params:
params = params * params_base
return params
[docs] def normalize_MB_obs(self, obs, obs_base):
if self.scale_obs:
obs = [obs[i]/obs_base[i] for i in range(len(obs))]
if self.norm_obs:
if self.independent_norm_obs and type(obs[0])==torch.Tensor:
obs = [Normalize(obs[i], self.obs_statistic_torch[i], norm_type=self.norm_type).norm() for i in range(len(obs))]
else:
obs = [Normalize(obs[i], self.obs_statistic[i], norm_type=self.norm_type).norm() for i in range(len(obs))]
return obs
[docs] def inverseNormalize_MB_obs(self, obs, obs_base):
if self.norm_obs:
if self.independent_norm_obs and type(obs[0])==torch.Tensor:
obs = [InverseNormalize(obs[i], self.obs_statistic_torch[i], norm_type=self.norm_type).inverseNorm() for i in range(len(obs))]
else:
obs = [InverseNormalize(obs[i], self.obs_statistic[i], norm_type=self.norm_type).inverseNorm() for i in range(len(obs))]
if self.scale_obs:
obs = [obs[i] * obs_base[i] for i in range(len(obs))]
return obs