Source code for so_pysm_models.interpolating

import os
import numpy as np
from scipy.interpolate import interp1d

import healpy as hp

import pysm


[docs]class InterpolatingComponent: def __init__( self, path, input_units, target_nside, interpolation_kind="linear", has_polarization=True, pixel_indices=None, mpi_comm=None, verbose=False, ): """PySM component interpolating between precomputed maps See more details at https://so-pysm-models.readthedocs.io/en/latest/so_pysm_models/models.html Parameters ---------- path : str Path should contain maps named as the frequency in GHz e.g. 20.fits or 20.5.fits or 00100.fits input_units : str Any unit available in PySM (see `pysm.convert_units` e.g. `Jysr`, `MJsr`, `uK_RJ`, `K_CMB`). target_nside : int HEALPix NSIDE of the output maps has_polarization : bool whether or not to simulate also polarization maps pixel_indices : ndarray of ints Outputs partial maps given HEALPix pixel indices in RING ordering mpi_comm : mpi4py communicator See the documentation of pysm.read_map verbose : bool Control amount of output """ self.maps = {} self.maps = self.get_filenames(path) self.freqs = np.array(list(self.maps.keys())) self.freqs.sort() self.input_units = input_units self.nside = target_nside self.pixel_indices = pixel_indices self.has_polarization = has_polarization self.interpolation_kind = interpolation_kind self.mpi_comm = mpi_comm self.verbose = verbose
[docs] def get_filenames(self, path): # Override this to implement name convention filenames = {} for f in os.listdir(path): if f.endswith(".fits"): freq = float(os.path.splitext(f)[0]) filenames[freq] = os.path.join(path, f) return filenames
[docs] def signal(self, nu, **kwargs): """Return map at given frequency or array of frequencies""" if np.isscalar(nu): # special case: we request only 1 frequency and that is among the ones # available as input check_isclose = np.isclose(self.freqs, nu) if np.any(check_isclose): freq = self.freqs[check_isclose][0] out = self.read_map(freq) if self.has_polarization: return out else: zeros = np.zeros_like(out) return np.array([out, zeros, zeros]) else: # continue with interpolation as with an array of nus nu = np.array([nu]) else: nu = np.asarray(nu) assert ( nu[0] >= self.freqs[0] ), "Frequency not supported, requested {} Ghz < lower bound {} GHz".format( nu[0], self.freqs[0] ) assert ( nu[-1] <= self.freqs[-1] ), "Frequency not supported, requested {} Ghz > upper bound {} GHz".format( nu[-1], self.freqs[-1] ) first_freq_i, last_freq_i = np.searchsorted(self.freqs, [nu[0], nu[-1]]) first_freq_i -= 1 last_freq_i += 1 freq_range = self.freqs[first_freq_i:last_freq_i] if self.verbose: print("Frequencies considered:", freq_range) npix = ( len(self.pixel_indices) if self.pixel_indices is not None else hp.nside2npix(self.nside) ) # allocate a single array for all maps to be used by the interpolator # always use size 3 for polarization because PySM always expects IQU maps all_maps = np.zeros( (len(freq_range), 3 if self.has_polarization else 1, npix), dtype=np.double ) for i, freq in enumerate(freq_range): if self.has_polarization: all_maps[i] = self.read_map(freq) if self.verbose: for i_pol, pol in enumerate("IQU"): print( "Mean emission at {} GHz in {}: {:.4g} uK_RJ".format( freq, pol, all_maps[i][i_pol].mean() ) ) else: all_maps[i][0] = self.read_map(freq) out = interp1d(freq_range, all_maps, axis=0, kind=self.interpolation_kind)(nu) # the output of out is always 3D, (num_freqs, IQU, npix), if num_freqs is one # we return only a 2D array. if len(out) == 1: return out[0] else: return out
[docs] def read_map(self, freq): if self.verbose: print("Reading map {}".format(self.maps[freq])) m = pysm.read_map( self.maps[freq], nside=self.nside, field=(0, 1, 2) if self.has_polarization else 0, pixel_indices=self.pixel_indices, mpi_comm=self.mpi_comm, ) return m * pysm.convert_units(self.input_units, "uK_RJ", freq)