Source code for colfi.plotter

# -*- coding: utf-8 -*-

from . import space_updater as su
from . import cosmic_params, utils
import numpy as np
import matplotlib.pyplot as plt
import coplot.plots as pl
import coplot.plot_contours as plc
import coplot.plot_settings as pls
import copy
f_c = pl.fiducial_colors


[docs]class BestFitsData(object): """Best fit values of each steps, used to plot steps in :class:`PlotPosterior`""" def __init__(self, chain_all, chain, param_labels='', burnInEnd_step=None, nde_type_pair=['ANN','MNN'], show_initParams=False, init_params=float, chain_true=None, label_true='True', show_idx=None): self.chain_all = chain_all self.steps_n = len(self.chain_all) self.chain = chain self.param_labels = param_labels self.burnInEnd_step = burnInEnd_step self.nde_type_pair = nde_type_pair self.show_initParams = show_initParams self.init_params = init_params self.chain_true = chain_true self.label_true = label_true self.show_idx = show_idx @property def bestFits_all(self): best_ann = [] for i in range(self.steps_n): best_ann.append(su.Chains.bestFit(self.chain_all[i], symmetry_error=False)) return np.array(best_ann) @property def best_fit(self): if self.chain is None: return None else: return su.Chains.bestFit(self.chain, symmetry_error=False) @property def best_fit_true(self): if self.chain_true is None: return None else: return su.Chains.bestFit(self.chain_true, symmetry_error=False)
[docs] def panel_data(self, p_index): data = {'labels' : [r'$N_{\rm est}$', self.param_labels[p_index]]} if self.chain is None: best_mean = self.bestFits_all[-1][p_index][0]# else: best_mean = self.best_fit[p_index][0]# data['best_mean'] = best_mean data['err_mean'] = [self.best_fit[p_index][1], self.best_fit[p_index][2]] best_fits = self.bestFits_all[:, p_index, :] data['bestFits'] = best_fits y_max = max(best_fits[:,0] + best_fits[:,2]) y_min = min(best_fits[:,0] - best_fits[:,1]) if self.show_initParams: init_p = self.init_params[p_index] data['init_param'] = init_p y_max = max(y_max, init_p[1]) y_min = min(y_min, init_p[0]) dy = y_max - y_min ylim_min = y_min - dy*0.1 if y_max-best_mean > 0.7*dy or y_max-best_mean < 0.3*dy: ylim_max = y_max + dy*0.1 else: ylim_max = y_max + dy*0.7 lims = [0, self.steps_n+1, ylim_min, ylim_max] data['ylim_max'], data['ylim_min'] = ylim_max, ylim_min data['lims'] = lims data['steps_n'] = self.steps_n if self.chain_true is not None: best_mean_true = self.best_fit_true[p_index][0]# data['best_mean_true'] = best_mean_true data['err_mean_true'] = [self.best_fit_true[p_index][1], self.best_fit_true[p_index][2]] return data
[docs] def panels_data(self): datasets = [] if self.show_idx is None: idx_all = [i for i in range(len(self.param_labels))] else: idx_all = [idx - 1 for idx in self.show_idx] for index in idx_all: data = self.panel_data(index) data['burnInEnd_step'] = self.burnInEnd_step datasets.append(data) return datasets
[docs] def panel(self, data): #initial parameter if self.show_initParams: plt.fill_between([0, data['steps_n']+1], data['init_param'][0], data['init_param'][1], color='#FF3030', label='Initial '+data['labels'][1], alpha=0.15) ##FF3030, 63B8FF #nde if self.nde_type_pair[0]==self.nde_type_pair[1]: plt.errorbar(range(1, data['steps_n']+1), data['bestFits'][:,0], yerr=[data['bestFits'][:,1],data['bestFits'][:,2]], fmt='o', color='r', label=self.nde_type_pair[1]) else: burnInEnd_step = data['burnInEnd_step'] steps_n = data['steps_n'] best = data['bestFits'][:,0] err_low = data['bestFits'][:,1] err_up = data['bestFits'][:,2] if burnInEnd_step is None: plt.errorbar(range(1, steps_n+1), best, yerr=[err_low, err_up], fmt='o', color='r', label=self.nde_type_pair[0]) else: plt.errorbar(range(1, burnInEnd_step+1), best[:burnInEnd_step], yerr=[err_low[:burnInEnd_step], err_up[:burnInEnd_step]], fmt='o', color='r', label=self.nde_type_pair[0], alpha=0.3) plt.errorbar(range(burnInEnd_step+1, steps_n+1), best[burnInEnd_step:], yerr=[err_low[burnInEnd_step:], err_up[burnInEnd_step:]], fmt='o', color='r', label=self.nde_type_pair[1]) #burn-in-end if self.burnInEnd_step is not None: plt.plot([data['burnInEnd_step']+0.5, data['burnInEnd_step']+0.5], [data['ylim_min'], data['ylim_max']], '--', color='grey', label='End of burn-in', lw=2) #mean values if self.chain is not None: if self.chain_true is None: plt.plot([0, data['steps_n']+1], [data['best_mean'], data['best_mean']], 'k-', label='Best-fit', lw=2) plt.fill_between([0, data['steps_n']+1], data['best_mean']-data['err_mean'][0], data['best_mean']+data['err_mean'][1], color='grey', alpha=0.3) else: plt.plot([0, data['steps_n']+1], [data['best_mean_true'], data['best_mean_true']], 'k-', label=self.label_true, lw=2) plt.fill_between([0, data['steps_n']+1], data['best_mean_true']-data['err_mean_true'][0], data['best_mean_true']+data['err_mean_true'][1], color='grey', alpha=0.3) plt.legend(fontsize=12)
[docs]class LossesData(object): """Losses of training set and validataion set of steps after burn-in phase, which are used to plot losses in :class:`PlotPosterior`""" def __init__(self, good_losses, alpha=0.6, title_labels='', text_labels='', show_minLoss=True): self.good_losses = good_losses self.alpha = alpha self.title_labels = title_labels self.text_labels = text_labels self.show_minLoss = show_minLoss
[docs] def panel_data(self, index): data = {'labels' : [r'Epochs', r'Loss']} train_loss, vali_loss = self.good_losses[index] data['train_loss'] = train_loss data['vali_loss'] = vali_loss x = np.linspace(1, len(train_loss), len(train_loss)) data['x'] = x vali_loss_size = len(vali_loss) data['vali_loss_size'] = vali_loss_size train_loss_mean = np.mean(train_loss[-100:]) data['train_loss_mean'] = train_loss_mean train_loss_min = np.min(train_loss[-100:]) # train_loss_max = np.max(train_loss[-100:]) if vali_loss_size==0: train_loss_max = np.max(train_loss) # else: train_loss_max = np.max(train_loss[-100:]) # vali_loss_mean = np.mean(vali_loss[-100:]) data['vali_loss_mean'] = vali_loss_mean vali_loss_min = np.min(vali_loss[-100:]) vali_loss_max = np.max(vali_loss[-100:]) if vali_loss_size==0: loss_min, loss_max = train_loss_min, train_loss_max else: # loss_min, loss_max = min(train_loss_min, vali_loss_min), max(train_loss_max, vali_loss_max) loss_min, loss_max = min(train_loss_min, vali_loss_min), max(train_loss_max, vali_loss_min) #use this loss_diff = loss_max - loss_min fraction_loss = 0.18 fraction_low = 0.08 if vali_loss_size==0: ylim_tot = loss_diff * 1.15 else: ylim_tot = loss_diff / fraction_loss delta_low = fraction_low * ylim_tot ylim_min = loss_min - delta_low ylim_max = ylim_min + ylim_tot lims = [0, len(train_loss), ylim_min, ylim_max] data['lims'] = lims text_x = lims[0] + (lims[1]-lims[0])*0.8 text_y = lims[2] + (lims[3]-lims[2])*0.645 data['text_x'] = text_x data['text_y'] = text_y return data
[docs] def panels_data(self): datasets = [] for index in range(len(self.good_losses)): data = self.panel_data(index) datasets.append(data) return datasets
[docs] def panel(self, data): if self.show_minLoss: plt.plot(data['x'], data['train_loss'], label=r'Training set $(\mathcal{L}_{\rm train}=%.3f)$'%(data['train_loss_mean'])) else: plt.plot(data['x'], data['train_loss'], label=r'Training set') if data['vali_loss_size']!=0: if self.show_minLoss: plt.plot(data['x'], data['vali_loss'], label=r'Validation set $(\mathcal{L}_{\rm vali}=%.3f)$'%(data['vali_loss_mean']), alpha=self.alpha) else: plt.plot(data['x'], data['vali_loss'], label=r'Validation set', alpha=self.alpha) plt.title(self.title_labels, fontsize=11) # plt.text(data['text_x'], data['text_y'], self.text_labels, fontsize=12) plt.legend(fontsize=10.5)
[docs]class PosteriorInfo(object): """Some information of NDEs, cosmological parameters, and chains, which will be used in :class:`PlotPosterior`""" def __init__(self, param_names, path='ann', params_dict=None): self.param_names = param_names self.path = path self.params_dict = params_dict self.chain_true_path = '' #only support .npy or .txt file self.label_true = 'True' self.file_identity_str = '' @property def param_labels(self): return cosmic_params.ParamsProperty(self.param_names,params_dict=self.params_dict).labels @property def chain_true(self): if self.chain_true_path: suffix = self.chain_true_path.split('.')[-1] else: return None if suffix=='npy': chain = np.load(self.chain_true_path) elif suffix=='txt': chain = np.loadtxt(self.chain_true_path) else: raise ValueError("The file type supported is .npy or .txt file.") return chain
[docs] def load_ndeInfo(self, randn_num): file_path = utils.FilePath(filedir=self.path+'/nde_info', randn_num=randn_num, suffix='.npy').filePath() self.nde_type_pair, self.nde_type_str, self.branch_n = np.load(file_path, allow_pickle=True)
[docs]class PlotPosterior(PosteriorInfo): """Plot posterior distribution using the ANN chains. Parameters ---------- chain_all : list The ANN chains obtained in all steps. chain : array-like The good ANN chain obtained after burn-in phase. param_names : list A list which contains the parameter names, e.g. ['H0','ombh2','omch2']. path : str, optional The path of the results saved. Default: 'ann' nde_type_pair : list, optional A list that contains two NDEs, the first NDE is used to estimate parameters in the burn-in phase, the second NDE is used to estimate parameters after the burn-in phase. Therefore, the first NDE is ued to find the burn-in end step and the second NDE is used to obtain the posterior. Default: ['ANN','MNN'] randn_num : float or str, optional A random number that identifies the saved results. Default: 1.234 burnInEnd_step : None or int, optional The burn-in end step. Default: None params_dict : dict or None, optional Information of cosmological parameters that include the labels, the minimum values, and the maximum values. See :func:`~.cosmic_params.params_dict_zoo`. Default: None good_losses : None or list, optional The losses of training set and validation set after the burn-in phase. Default: None Attributes ---------- chain_true_path : str, optional The path of the true chain of the posterior which can be obtained by using other methods, such as the MCMC method. Note: only ``.npy`` and ``.txt`` file is supported. Default: '' label_true : str, optional The legend label of the true chain. Default: 'True' fiducial_params : list, optional A list that contains the fiducial cosmological parameters. Default: [] show_idx : None or list, optional The index of cosmological parameters when plotting contours. This allows us to change the order of the cosmological parameters. If None, the order of parameters follows that in the ANN chain. If list, the minimum value of it should be 1. See :class:`~.plotter.PlotPosterior`. Default: None file_identity_str : str, optional A string that identifies the files saved to the disk, which is useful to identify the saved files. Default: '' """ def __init__(self, chain_all, chain, param_names, path='ann', nde_type_pair=['ANN','MNN'], randn_num=1.234, burnInEnd_step=None, params_dict=None, good_losses=None): self.chain_all = chain_all self.steps_n = len(self.chain_all) self.chain = chain self.param_names = param_names self.path = path self.nde_type_pair = nde_type_pair self.nde_type_str = nde_type_pair[0] + '_' + nde_type_pair[1] self.randn_num = randn_num self.burnInEnd_step = burnInEnd_step self.params_dict = params_dict self.good_losses = good_losses self.chain_true_path = '' self.label_true = 'True' self.fiducial_params = [] self.show_idx = None self.file_identity_str = ''
[docs] def get_steps(self, show_initParams=False, layout_adjust=[0.3, 0.25], suptitle='', save_fig=True): file_path = utils.FilePath(filedir=self.path+'/initial_params', randn_num=self.randn_num, suffix='.npy').filePath() self.init_params = np.load(file_path) panel_model = BestFitsData(self.chain_all, self.chain, param_labels=self.param_labels, burnInEnd_step=self.burnInEnd_step, nde_type_pair=self.nde_type_pair, show_initParams=show_initParams, init_params=self.init_params, chain_true=self.chain_true, label_true=self.label_true, show_idx=self.show_idx) self.fig_steps = pl.MultiplePanels(panel_model).plot(layout_adjust=layout_adjust, ticks_size=10, major_locator_integers=[True,False]) plt.suptitle(suptitle, fontsize=16) if save_fig: pl.savefig(self.path+'/figures', 'steps%s_%s_%s.pdf'%(self.file_identity_str, self.nde_type_str,self.randn_num), self.fig_steps) return self.fig_steps
[docs] def get_contour(self, bins=100, smooth=3, fill_contours=False, sigma=2, show_titles=True, line_width=2, lims=None, legend=True, save_fig=True): if self.chain is None: return None if self.show_idx is None: chain_show = self.chain chain_true_show = self.chain_true labels_show = self.param_labels else: index = [idx - 1 for idx in self.show_idx] chain_show = self.chain[:, index] if self.chain_true is not None: chain_true_show = self.chain_true[:, index] labels_show = [] for idx in index: labels_show.append(self.param_labels[idx]) show_num = chain_show.shape[1] if self.chain_true is None: legend_labels = [self.nde_type_pair[1]] else: print('\ndev_max: %.2f\\sigma'%np.max(su.Chains.param_devs(chain_true_show, chain_show))) print('dev_mean: %.2f\\sigma'%np.mean(su.Chains.param_devs(chain_true_show, chain_show))) print('error_dev_mean: %.2f%%'%(np.mean(su.Chains.error_devs(chain_show, chain_true_show))*100)) chain_show = [chain_show, chain_true_show] legend_labels = [self.nde_type_pair[1], self.label_true] #fiducial parameters if len(self.fiducial_params)==0: best_values = None show_best_value_lines = False best_value_colors = None else: best_values = self.fiducial_params show_best_value_lines = True best_value_colors = f_c[7] if self.chain_true is None else [f_c[7], f_c[7]] if show_num==1: self.fig_contour = plc.Plot_1d(chain_show).plot(bins=bins,labels=labels_show,smooth=smooth, show_title=show_titles,line_width=line_width, legend=legend,legend_labels=legend_labels) else: self.fig_contour = plc.Contours(chain_show).plot(bins=bins,labels=labels_show,smooth=smooth,fill_contours=fill_contours, show_titles=show_titles,line_width=line_width,layout_adjust=[0.0,0.0], sigma=sigma,lims=lims,legend=legend,legend_labels=legend_labels, best_values=best_values, show_best_value_lines=show_best_value_lines, best_value_colors=best_value_colors) if self.chain is not None and save_fig: pl.savefig(self.path+'/figures', 'contour%s_%s_%s.pdf'%(self.file_identity_str, self.nde_type_pair[1],self.randn_num), self.fig_contour) return self.fig_contour
[docs] def get_losses(self, alpha=0.6, show_minLoss=True, layout_adjust=[0.25, 0.25], save_fig=True): if self.burnInEnd_step is None: return None panel_model = LossesData(self.good_losses, alpha=alpha, text_labels=self.nde_type_pair[1], show_minLoss=show_minLoss) self.fig_loss = pl.MultiplePanels(panel_model).plot(layout_adjust=layout_adjust, ticks_size=10, major_locator_integers=[True,False]) if save_fig: pl.savefig(self.path+'/figures', 'losses%s_%s_%s.pdf'%(self.file_identity_str, self.nde_type_pair[1],self.randn_num), self.fig_loss)
[docs]class PlotMultiPosterior(PosteriorInfo): """Plot posterior distribution for multiple ANN chains. Parameters ---------- chains : list The ANN chains obtained after burn-in phase. param_names : list A list which contains the parameter names, e.g. ['H0','ombh2','omch2']. path : str, optional The path of the results saved. Default: 'ann' nde_types : list, optional A list that contains names of NDEs. Default: ['ANN','MDN'] randn_nums : list, optional A list that contains random number which identifies the saved results. Default: [1.123,1.123] params_dict : dict or None, optional Information of cosmological parameters that include the labels, the minimum values, and the maximum values. See :func:`~.cosmic_params.params_dict_zoo`. Default: None Attributes ---------- chain_true_path : str, optional The path of the true chain of the posterior which can be obtained by using other methods, such as the MCMC method. Note: only ``.npy`` and ``.txt`` file is supported. Default: '' label_true : str, optional The legend label of the true chain. Default: 'True' fiducial_params : list, optional A list that contains the fiducial cosmological parameters. Default: [] show_idx : None or list, optional The index of cosmological parameters when plotting contours. This allows us to change the order of the cosmological parameters. If None, the order of parameters follows that in the ANN chain. If list, the minimum value of it should be 1. See :class:`~.plotter.PlotPosterior`. Default: None file_identity_str : str, optional A string that identifies the files saved to the disk, which is useful to identify the saved files. Default: '' """ def __init__(self, chains, param_names, path='ann', nde_types=['ANN','MDN'], randn_nums=[1.123,1.123], params_dict=None): self.chains = chains self.param_labels = cosmic_params.ParamsProperty(param_names,params_dict=params_dict).labels self.path = path self.nde_types = nde_types self.randn_nums = randn_nums self.chain_n = len(randn_nums) self.chain_true_path = '' #only support .npy or .txt file self.label_true = 'True' self.fiducial_params = [] self.show_idx = None #min(show_idx)=1 self.file_identity_str = '' @property def contour_name(self): name = 'contour%s'%self.file_identity_str for i in range(self.chain_n): name = name + '_%s%s'%(self.nde_types[i], self.randn_nums[i]) name = name + '.pdf' return name
[docs] def get_contours(self, bins=100, smooth=3, fill_contours=False, sigma=2, show_titles=True, line_width=2, lims=None, legend=True, save_fig=True): if self.show_idx is None: chains_show = self.chains chain_true_show = self.chain_true labels_show = self.param_labels else: index = [idx - 1 for idx in self.show_idx] chains_show = [self.chains[i][:, index] for i in range(self.chain_n)] if self.chain_true is not None: chain_true_show = self.chain_true[:, index] labels_show = [] for idx in index: labels_show.append(self.param_labels[idx]) show_num = chains_show[0].shape[1] if self.chain_true is None: legend_labels = self.nde_types else: chains_show.append(chain_true_show) legend_labels = self.nde_types + [self.label_true] #fiducial parameters if len(self.fiducial_params)==0: best_values = None show_best_value_lines = False best_value_colors = None else: best_values = [self.fiducial_params for i in range(self.chain_n)] show_best_value_lines = True best_value_colors = [f_c[7] for i in range(self.chain_n)] if show_num==1: self.fig_contour = plc.Plot_1d(chains_show).plot(bins=bins,labels=labels_show,smooth=smooth, show_title=show_titles,line_width=line_width, legend=legend,legend_labels=legend_labels) else: self.fig_contour = plc.Contours(chains_show).plot(bins=bins,labels=labels_show,smooth=smooth,fill_contours=fill_contours, show_titles=show_titles,line_width=line_width,layout_adjust=[0.0,0.0], sigma=sigma,lims=lims,legend=legend,legend_labels=legend_labels, best_values=best_values,show_best_value_lines=show_best_value_lines, best_value_colors=best_value_colors) if save_fig: pl.savefig(self.path+'/figures', self.contour_name, self.fig_contour) return self.fig_contour
#%%
[docs]def pcc(x, y): '''Pearson correlation coefficient https://en.wikipedia.org/wiki/Pearson_correlation_coefficient ''' data = np.c_[x, y].T cov = np.cov(data) rho = cov[0,1] / np.sqrt(cov[0,0]) / np.sqrt(cov[1,1]) return rho
[docs]def R2(obs, pred): '''Coefficient of determination https://en.wikipedia.org/wiki/Coefficient_of_determination https://baike.baidu.com/item/%E5%8F%AF%E5%86%B3%E7%B3%BB%E6%95%B0/8020809?fromtitle=coefficient%20of%20determination&fromid=18081717&fr=aladdin https://doi.org/10.1093/mnras/stz010 obs: observed data pred: predicted data ''' obs_mean = np.mean(obs) #method 1 # r2 = np.sum((pred - obs_mean)**2) / np.sum((obs - obs_mean)**2) #method 2, use this ss_res = np.sum((obs - pred)**2) ss_tot = np.sum((obs - obs_mean)**2) r2 = 1 - ss_res/ss_tot return r2
[docs]class BestPredictedData(object): def __init__(self, params_testSet, predParams_testSet, params_trainingSet=None, predParams_trainingSet=None, param_labels='', show_reErr=True, coef_type='R2'): self.sim_params = params_testSet self.pred_params = predParams_testSet self.params_trainingSet = params_trainingSet self.predParams_trainingSet = predParams_trainingSet self.param_labels = param_labels self.show_reErr = show_reErr self.coef_type = coef_type
[docs] def panel_data(self, p_index): data = {'labels' : [self.param_labels[p_index]+' (True)', self.param_labels[p_index]+' (Predicted)']} data['sim_param'] = self.sim_params[:, p_index] data['pred_param'] = self.pred_params[:, p_index] if self.coef_type=='r': data['coef'] = pcc(self.sim_params[:, p_index], self.pred_params[:, p_index]) #r elif self.coef_type=='R2': data['coef'] = R2(self.sim_params[:, p_index], self.pred_params[:, p_index]) #R^2 param_min_1, param_max_1 = self.sim_params[:, p_index].min(), self.sim_params[:, p_index].max() param_min_2, param_max_2 = self.pred_params[:, p_index].min(), self.pred_params[:, p_index].max() param_min, param_max = min([param_min_1, param_min_2]), max([param_max_1, param_max_2]) lims = [param_min, param_max, param_min, param_max] data['lims'] = lims pp = np.linspace(param_min, param_max, 100) data['pp'] = pp data['xx'] = (param_max-param_min)*0.06 + param_min #0.1, 0.06 data['yy'] = (param_max-param_min)*0.85 + param_min #0.8, 0.85 data['yy_coef'] = (param_max-param_min)*0.72 + param_min #0.68, 0.72 return data
[docs] def panels_data(self): if self.show_reErr: re_err = (self.pred_params - self.sim_params) / self.sim_params reErr_bestfit = su.Chains.bestFit(re_err, symmetry_error=False)#relative error of predicted parameters datasets = [] for index in range(len(self.param_labels)): data = self.panel_data(index) if self.show_reErr: data['reErr_bestfit'] = reErr_bestfit[index] datasets.append(data) return datasets
[docs] def panel(self, data): plt.plot(data['sim_param'], data['pred_param'], '.') plt.plot(data['pp'], data['pp'], 'r', lw=1.618) if self.show_reErr: plt.text(data['xx'], data['yy'], '$\delta: %.3f_{-%.3f}^{+%.3f}$'%(data['reErr_bestfit'][0],data['reErr_bestfit'][1],data['reErr_bestfit'][2]), fontsize=16) if self.coef_type=='r': plt.text(data['xx'], data['yy_coef'], '$r: %.3f$'%(data['coef']), fontsize=16) print('r (%s): %.3f'%(data['labels'][0].split('(')[0], data['coef'])) #r, to be removed? elif self.coef_type=='R2': plt.text(data['xx'], data['yy_coef'], '$R^2: %.3f$'%(data['coef']), fontsize=16) print('R^2 (%s): %.3f'%(data['labels'][0].split('(')[0], data['coef'])) #R^2, to be removed?
# plt.legend(fontsize=12)
[docs]class PlotPrediction(object): """Plot predicted cosmological parameters. Parameters ---------- params_testSet : array-like Cosmological parameters in the test set. predParams_testSet : array-like Predicted cosmological parameters for the test set. param_names : list A list which contains the parameter names, e.g. ['H0','ombh2','omch2']. params_trainingSet : array-like, optional Cosmological parameters in the training set. Default: None predParams_trainingSet : array-like, optional Predicted cosmological parameters for the training set. Default: None params_dict : dict or None, optional Information of cosmological parameters that include the labels, the minimum values, and the maximum values. See :func:`~.cosmic_params.params_dict_zoo`. Default: None show_reErr : bool, optional If True, will calculate and show the best-fit values and standard deviations of the relative errors between the predicted parameters and the true parameters. Default: True randn_num : float or str, optional A random number that identifies the saved results. Default: '' path : str, optional The path of the results saved. Default: 'ann' nde_type : str, optional A string that indicate which NDE should be used. See :class:`~.nde.NDEs`. Default: 'ANN' dataSet_type : str, optional The type of the data set. Default: 'testSet' coef_type : str, optional A quantity that quantifies the degree of linear correlation, which can be Pearson correlation coefficient ('r') or coefficient of determination ('R2'). Default: 'R2' show_idx : None or list, optional The index of cosmological parameters when plotting figures. This allows us to change the order of the cosmological parameters. If None, the order of parameters follows that in the training data. If list, the minimum value of it should be 1. See :class:`~.plotter.PlotPosterior`. Default: None file_identity_str : str, optional A string that identifies the files saved to the disk, which is useful to identify the saved files. Default: '' Returns ------- None. """ def __init__(self, params_testSet, predParams_testSet, param_names, params_trainingSet=None, predParams_trainingSet=None, params_dict=None, show_reErr=True, randn_num='', path='ann', nde_type='ANN', dataSet_type='testSet', coef_type='R2'): self.params_testSet = params_testSet self.predParams_testSet = predParams_testSet self.param_labels = cosmic_params.ParamsProperty(param_names,params_dict=params_dict).labels # self.params_trainingSet = params_trainingSet #remove ? # self.predParams_trainingSet = predParams_trainingSet #remove? self.show_reErr = show_reErr self.randn_num = randn_num self.path = path self.nde_type = nde_type self.dataSet_type = dataSet_type self.coef_type = coef_type self.show_idx = None #min(show_idx)=1 self.file_identity_str = ''
[docs] def plot(self, lat_n=3, panel_size=(4,3), layout_adjust=[0.3, 0.25], suptitle=''): if self.show_idx is None: params_testSet_show = self.params_testSet predParams_testSet_show = self.predParams_testSet labels_show = self.param_labels else: index = [idx - 1 for idx in self.show_idx] params_testSet_show = self.params_testSet[:, index] predParams_testSet_show = self.predParams_testSet[:, index] labels_show = [] for idx in index: labels_show.append(self.param_labels[idx]) # panel_model = BestPredictedData(self.params_testSet, self.predParams_testSet, param_labels=self.param_labels, show_reErr=self.show_reErr) panel_model = BestPredictedData(params_testSet_show, predParams_testSet_show, param_labels=labels_show, show_reErr=self.show_reErr, coef_type=self.coef_type) self.fig_pred = pl.MultiplePanels(panel_model, lat_n=lat_n).plot(panel_size=panel_size, layout_adjust=layout_adjust, ticks_size=10) plt.suptitle(suptitle, fontsize=16) return self.fig_pred
[docs] def save_fig(self): pl.savefig(self.path+'/figures', 'prediction%s_%s_%s_%s.pdf'%(self.file_identity_str, self.dataSet_type, self.nde_type,self.randn_num), self.fig_pred)
#%%
[docs]class PlotHparamsEffect(object): def __init__(self, fiducial_params, chain_mcmc=None, randn_nums=0.123, path='ann'): """ Plot the effect of hyperparameters. Parameters ---------- fiducial_params : array-like The fiducial values of parameters. chain_mcmc : array-like, optional The MCMC chain. Default: None randn_nums : float or list, optional The random numbers that corresponding to the saved ANN chains. Format of randn_nums: [L1, L2, ...], [L1, [L2_1, L2_2, ...], ...], [[p1, p2, ...], [p1, p2, ...], ...], or [[p1, p2, ...], [p1, [p2_1, p2_2, ...], ...]] path : str or list, optional The path of the saved ANN chains. file_identity_str : str, optional A string that identifies the files saved to the disk, which is useful to identify the saved files. Default: '' Returns ------- None. """ self.fiducial_params = fiducial_params self.chain_mcmc = chain_mcmc self.randn_nums = [randn_nums] if type(randn_nums) is float else randn_nums self.path = [path] if type(path) is str else path self.file_identity_str = '' # def sublines2line(self, chains, values, nde_type): # return
[docs] def points2line(self, chains, values, nde_type): chains_comb = {} values_comb = [v[0] for v in values] for i in range(len(values_comb)): chains_comb[str(values_comb[i])] = chains[i][str(values_comb[i])] if nde_type.count(nde_type[0])==len(nde_type): nde_type_comb = nde_type[0] return chains_comb, values_comb, nde_type_comb
# def subpoints2point(self, chains, values, nde_type): # return
[docs] def load_subLine_point_chains(self, path, randn_num): #load chains for sub-line or points try: file_path = utils.FilePath(filedir=path+'/auto_chains', randn_num=randn_num, suffix='.npy').filePath() chains_ann, _, key, value, nde_type, _, _ = np.load(file_path, allow_pickle=True) except OSError: file_path = utils.FilePath(filedir=path+'/auto_settings', randn_num=randn_num, suffix='.npy').filePath() finished_randn_nums_2, key, value, nde_type, _, _ = np.load(file_path, allow_pickle=True) print('Loading chains from folder chains/ for %s=%s'%(key, value[0])) if len(value)==1: #for point c_list = [] for k in range(len(finished_randn_nums_2[0])): file_p = utils.FilePath(filedir=path+'/chains', randn_num=finished_randn_nums_2[0][k], suffix='.npy').filePath() c_list.append(np.load(file_p)) chains_ann = {str(value[0]) : c_list} else: #for sub-line raise ValueError('The function of this part will be added later !!!') pass return chains_ann, key, value, nde_type
[docs] def load_chains(self): #Format of randn_nums: [L1, [L2_1, L2_2, ...], [p1, p2, p3, ...], [p1, [p2_1, p2_2, ...], ...], ...] self.chains_ann, keys, values, self.nde_type = [], [], [], [] for i in range(len(self.randn_nums)): #line, [L1, ...] if isinstance(self.randn_nums[i], float): file_path = utils.FilePath(filedir=self.path[i]+'/auto_chains', randn_num=self.randn_nums[i], suffix='.npy').filePath() chains_ann, _, key, value, nde_type, self.param_names, self.params_dict = np.load(file_path, allow_pickle=True) self.chains_ann.append(chains_ann) keys.append(key) # values.append(value) # self.nde_type.append(nde_type) elif isinstance(self.randn_nums[i], list): #check if all element of self.randn_nums[i] is float #sub-line or point, [[L2_1, L2_2, ...], [p1, p2, p3, ...], ...] if all([isinstance(r, float) for r in self.randn_nums[i]]): chains_tmp, values_tmp, nde_type_tmp = [], [], [] for j in range(len(self.randn_nums[i])): chains_ann, key, value, nde_type = self.load_subLine_point_chains(self.path[i], self.randn_nums[i][j]) chains_tmp.append(chains_ann) keys.append(key) values_tmp.append(value) nde_type_tmp.append(nde_type) if len(value)==1: #point chains_comb, values_comb, nde_type_comb = self.points2line(chains_tmp, values_tmp, nde_type_tmp) else: #sub-line chains_comb, values_comb, nde_type_comb = self.sublines2line(chains_tmp, values_tmp, nde_type_tmp) self.chains_ann.append(chains_comb) values.append(values_comb) self.nde_type.append(nde_type_comb) else: #sub-point, [[p1, [p2_1, p2_2, ...], ...], ...] chains_tmp, values_tmp, nde_type_tmp = [], [], [] for j in range(len(self.randn_nums[i])): if isinstance(self.randn_nums[i][j], float): #point chains_ann, key, value, nde_type = self.load_subLine_point_chains(self.path[i], self.randn_nums[i][j]) elif isinstance(self.randn_nums[i][j], list): #sub-point chains_ann, key, value, nde_type = [], [], [], [] for k in range(len(self.randn_nums[i][j])): _chains_ann, _key, _value, _nde_type = self.load_subLine_point_chains(self.path[i], self.randn_nums[i][j][k]) chains_ann = chains_ann + _chains_ann[str(_value[0])] key.append(_key) value.append(_value) nde_type.append(_nde_type) chains_ann = {str(_value[0]) : chains_ann} if key.count(key[0])==len(key): key = key[0] if value.count(value[0])==len(value): value = value[0] if nde_type.count(nde_type[0])==len(nde_type): nde_type = nde_type[0] chains_tmp.append(chains_ann) keys.append(key) values_tmp.append(value) nde_type_tmp.append(nde_type) chains_comb, values_comb, nde_type_comb = self.points2line(chains_tmp, values_tmp, nde_type_tmp) self.chains_ann.append(chains_comb) values.append(values_comb) self.nde_type.append(nde_type_comb) if keys.count(keys[0])==len(keys): self.key = keys[0] if values.count(values[0])==len(values): self.values = values[0]
[docs] def get_bestFits_ann(self): if self.chain_mcmc is not None: self.bestFit_mcmc = su.Chains.bestFit(self.chain_mcmc, symmetry_error=True) self.bestFits_ann = [{} for i in range(len(self.randn_nums))] for i in range(len(self.randn_nums)): repeat_net = {str(v) : len(self.chains_ann[i][str(v)]) for v in self.values} for v in self.values: self.bestFits_ann[i][str(v)] = [] for j in range(repeat_net[str(v)]): self.bestFits_ann[i][str(v)].append( su.Chains.bestFit(self.chains_ann[i][str(v)][j], symmetry_error=True) )
[docs] def get_devs(self, bins=10, smooth=1, show_hist=True): self.load_chains() self.get_bestFits_ann() # self.devs_from_fid_mean = [{} for i in range(len(self.randn_nums))] self.devs_from_fid_mean = [] for i in range(len(self.randn_nums)): repeat_net = {str(v) : len(self.chains_ann[i][str(v)]) for v in self.values} dev_tmp_1 = [] for v in self.values: # self.devs_from_fid_mean[i][str(v)] = [] self.dev_tmp_2 = [] for j in range(repeat_net[str(v)]): # self.devs_from_fid_mean[i][str(v)].append( np.mean(np.abs( (self.bestFits_ann[i][str(v)][j][:,0]-self.fiducial_params) / self.bestFits_ann[i][str(v)][j][:,1] )) ) self.dev_tmp_2.append( np.mean(np.abs( (self.bestFits_ann[i][str(v)][j][:,0]-self.fiducial_params) / self.bestFits_ann[i][str(v)][j][:,1] )) ) self.dev_tmp_2 = np.array(self.dev_tmp_2) dev_bestFit = su.Chains.bestFit(self.dev_tmp_2, symmetry_error=False, bins=bins, smooth=smooth) #symmetry_error=False #plot histogram if show_hist: # print(self.dev_tmp_2.mean()) plt.figure() plt.hist(self.dev_tmp_2, bins=bins, density=True) x, prob = su.pdf_1(self.dev_tmp_2, bins, smooth) plt.plot(x, prob, 'r', lw=2) plt.title(self.nde_type[i] + ' (%s=%s; %s points; $%.3f_{-%.3f}^{+%.3f}$)'%(self.key, v, len(self.dev_tmp_2), dev_bestFit[0][0],dev_bestFit[0][1],dev_bestFit[0][2]), fontsize=16) plt.xlabel(r'${\rm Mean\ deviation}\ [\sigma]$', fontsize=18) plt.ylabel('PDF', fontsize=18) dev_tmp_1.append(dev_bestFit.reshape(-1)) dev_tmp_1 = np.array(dev_tmp_1) self.devs_from_fid_mean.append(dev_tmp_1) #devs from mcmc if self.chain_mcmc is None: self.devs_from_mcmc_mean = None self.dev_error_from_mcmc_mean = None else: self.devs_from_mcmc_mean = [{} for i in range(len(self.randn_nums))] self.dev_error_from_mcmc_mean = [{} for i in range(len(self.randn_nums))] for i in range(len(self.randn_nums)): repeat_net = {str(v) : len(self.chains_ann[i][str(v)]) for v in self.values} for v in self.values: self.devs_from_mcmc_mean[i][str(v)] = [] self.dev_error_from_mcmc_mean[i][str(v)] = [] for j in range(repeat_net[str(v)]): self.devs_from_mcmc_mean[i][str(v)].append( np.mean(su.Chains.param_devs(self.chains_ann[i][str(v)][j], self.chain_mcmc)) ) self.dev_error_from_mcmc_mean[i][str(v)].append( np.mean(su.Chains.error_devs(self.chains_ann[i][str(v)][j], self.chain_mcmc)) )
@property def xlabel(self): if self.key=='hidden_layer': return 'Number\ of\ hidden\ layers' elif self.key=='num_train': return 'Number\ of\ training\ samples' elif self.key=='epoch': return 'Number\ of\ epochs' elif self.key=='activation_func': return 'Activation\ function'
[docs] def panel_data(self, bins=10, smooth=1, show_hist=True): self.get_devs(bins=bins, smooth=smooth, show_hist=show_hist) labels = [r'$\rm %s$'%self.xlabel, r'${\rm Mean\ deviation}\ [\sigma]$'] data = {'labels':labels} data['devs_from_fid'] = self.devs_from_fid_mean if self.chain_mcmc is not None: data['devs_from_mcmc'] = self.devs_from_mcmc_mean data['devs_error_from_mcmc'] = self.dev_error_from_mcmc_mean if self.key=='activation_func': x = np.linspace(1, len(self.values), len(self.values)) else: x = self.values data['x'] = x # # y_max, y_min = np.max(np.array([dev_best,dev_err])), np.min(np.array([dev_best,dev_err])) # y_max, y_min = max(self.devs_from_fid_mean), min(self.devs_from_fid_mean) # dy = y_max - y_min # ylim_max = y_max + dy*0.1 # ylim_min = y_min - dy*0.1 # if ylim_min<=0: # ylim_min = 0 x_max, x_min = max(x), min(x) dx = x_max - x_min if self.key=='activation_func': # xlim_max = x_max + dx*0.03 #for activation function # xlim_min = x_min - dx*0.03 #for activation function # xlim_max = x_max + dx*0.101 #for figsize=(6, 4.5) # xlim_min = x_min - dx*0.101 #for figsize=(6, 4.5) xlim_max = x_max + dx*0.03 #for figsize=(6*3, 4.5) xlim_min = x_min - dx*0.03 #for figsize=(6*3, 4.5) else: # xlim_max = x_max + dx*0.101 #for figsize=(6, 4.5) # xlim_min = x_min - dx*0.101 #for figsize=(6, 4.5) xlim_max = x_max + dx*0.05 #for figsize=(6, 4.5) xlim_min = x_min - dx*0.05 #for figsize=(6, 4.5) # lims = [xlim_min, xlim_max, ylim_min, ylim_max] lims = [xlim_min, xlim_max] # data['lims'] = None data['lims'] = lims return data
[docs] def plot(self, bins=10, smooth=1, show_hist=True, save_fig=True, save_path='figures'): data = self.panel_data(bins=bins, smooth=smooth, show_hist=show_hist) if self.key=='activation_func': fig = plt.figure(figsize=(3*6*1.2, 4.5*1.2)) #for activation function else: fig = plt.figure(figsize=(6, 4.5)) pls.PlotSettings().setting(labels=data['labels'], ticks_size=12, major_locator_N=None) for i in range(len(self.randn_nums)): plt.plot(data['x'], data['devs_from_fid'][i][:,0], '-o', label=self.nde_type[i], color=pl.fiducial_colors[i], lw=2) # plt.errorbar(data['x'], data['devs_from_fid'][i][:,0], yerr=data['devs_from_fid'][i][:,1], # fmt='%so'%(pl.fiducial_line_styles[i]), label=self.nde_type[i], color=pl.fiducial_colors[i], lw=2) # plt.fill_between(data['x'], data['devs_from_fid'][i][:,0]-data['devs_from_fid'][i][:,1], # data['devs_from_fid'][i][:,0]+data['devs_from_fid'][i][:,1], color=pl.fiducial_colors[i], alpha=0.3) plt.fill_between(data['x'], data['devs_from_fid'][i][:,0]-data['devs_from_fid'][i][:,1], data['devs_from_fid'][i][:,0]+data['devs_from_fid'][i][:,2], color=pl.fiducial_colors[i], alpha=0.3) if self.key == 'activation_func': # plt.xticks(data['x'], self.xtick_label, fontsize=12, rotation=0)#for activation function plt.legend(fontsize=11*1.2) else: plt.legend(fontsize=12*1.2) if self.key == 'hidden_layer' or self.key=='activation_func': plt.xticks(data['x'], self.values, fontsize=12) plt.xlim(data['lims'][0], data['lims'][1]) # plt.ylim(data['lims'][2], data['lims'][3]) randn_num_str = copy.deepcopy(self.randn_nums) while type(randn_num_str) is not float: randn_num_str = randn_num_str[0] fig_name = 'hparams%s_'%self.file_identity_str + self.key + '_%s.pdf'%(randn_num_str) print(fig_name) if save_fig: pl.savefig(save_path, fig_name, fig) return fig