"""
This module defines the Match class that is used in fitting routines.
"""
import numpy as np
from scipy.interpolate import InterpolatedUnivariateSpline
from scipy.interpolate import LSQUnivariateSpline
from scipy.interpolate import splrep, spleval
[docs]class Match(object):
def __init__(self, *args, **kwargs):
"""
The Match object used for fitting functions
Args:
spec (smsyn.spectrum.Spectrum): Spectrum object containing
the data to be fit
lib (smsyn.library.Library): Library object containing
the model library and `synth` method
wavmask (boolean array): same length as spec.wav. If True
ignore in the likelihood calculation
cont_method (str): Method for accounting for the mismatch in the
continuum between observed and model spectra. One of:
`spline-forward-model` or `spline-double-div`
"""
spec, lib, wavmask = args
assert wavmask.dtype==np.dtype('bool'), "mask must be boolean"
self.spec = spec
self.lib = lib
self.wavmask = wavmask
cont_method = 'spline-fm'
if kwargs.has_key('cont_method'):
cont_method = kwargs['cont_method']
self.cont_method = cont_method
[docs] def model(self, params, wav=None, **kwargs):
"""Calculate model
Return the model for a given set of parameters
Args:
params (lmfit.Parameters): Parameters object containing at least
teff, logg, fe, vsini, psf, and spline coefficients
wav (array): (optional) array of wavelengths at which to
calculate the model. Useful for generating a more finely
sampled model for plotting
**kwargs: extra keyword arguments passed to lib.synth
"""
if wav is None:
wav = self.spec.wav
teff = params['teff'].value
logg = params['logg'].value
fe = params['fe'].value
vsini = params['vsini'].value
psf = params['psf'].value
_model = self.lib.synth(wav, teff, logg, fe, vsini, psf, **kwargs)
return _model
[docs] def spline(self, params, wav):
"""Continuum model
Unpacks the params object and returns a spline evaluated at specified
wavelengths.
Args:
params (lmfit.Parameters): See params in self.model
wav: array of wavelengths at which to calculate the continuum model.
Returns:
array: spline
"""
node_wav, node_flux = get_spline_nodes(params)
assert len(node_wav) > 3 and len(node_flux) > 3, "Must pass > 3 nodes"
node_wav = np.array(node_wav)
node_flux = np.array(node_flux)
splrep = InterpolatedUnivariateSpline(node_wav, node_flux)
cont = splrep(wav)
return cont
[docs] def resid(self, params, **kwargs):
"""Residuals
Return the residuals
Args:
params (lmfit.Parameters): see params in self.model
Returns:
array: model minus data
"""
flux = self.spec.flux.copy()
wav = self.spec.wav
model = self.model(params, wav=wav, **kwargs)
if self.cont_method=='spline-fm':
model /= self.spline(params, wav)
if self.cont_method=='spline-dd':
node_wav, node_flux = get_spline_nodes(params)
t = node_wav[1:-1]
spl = LSQUnivariateSpline(wav, flux, t, k=3, ext=0)
flux /= spl(wav)
spl = LSQUnivariateSpline(wav, model, t, k=3, ext=0)
model /= spl(wav)
res = flux - model
return res
[docs] def nresid(self, params, **kwargs):
"""Normalized residuals
Args:
params (lmfit.Parameters): see params in self.model
Returns:
array: model minus data divided by errors
"""
return self.resid(params, **kwargs) / self.spec.uflux
[docs] def masked_nresid(self, params, **kwargs):
"""Masked normalized residuals
Return the normalized residuals with masked wavelengths excluded
Args:
params (lmfit.Parameters): see params in self.model
Returns:
array: normalized residuals where self.wavmask == 1
"""
return self.nresid(params, **kwargs)[~self.wavmask]
def chi2med(self, params):
_resid = self.resid(params)
med = np.median(_resid)
_resid -= med
_chi2med = np.sum(_resid**2)
return _chi2med
class MatchLincomb(Match):
def __init__(self, *args, **kwargs):
"""
The Match object used for fitting functions
Args:
spec (smsyn.spectrum.Spectrum): Spectrum object containing
the data to be fit
lib (smsyn.library.Library): Library object containing
the model library and `synth` method
wavmask (boolean array): same length as spec.wav. If True
ignore in the likelihood calculation
cont_method (str): Method for accounting for the mismatch in the
continuum between observed and model spectra. One of:
`spline-forward-model` or `spline-double-div`
"""
spec, lib, wavmask, model_indecies = args
assert wavmask.dtype==np.dtype('bool'), "mask must be boolean"
self.spec = spec
self.lib = lib
self.wavmask = wavmask
self.model_indecies= model_indecies
cont_method = 'spline-fm'
if kwargs.has_key('cont_method'):
cont_method = kwargs['cont_method']
self.cont_method = cont_method
def model(self, params, wav=None):
if wav is None:
wav = self.spec.wav
mw = get_model_weights(params)
vsini = params['vsini'].value
psf = params['psf'].value
_model = self.lib.synth_lincomb(
wav, self.model_indecies, mw, vsini, psf
)
return _model
def spline_nodes(wav_min, wav_max, angstroms_per_node=20,):
# calculate number of spline nodes
node_wav_min = np.floor(wav_min)
node_wav_max = np.ceil(wav_max)
nodes = (node_wav_max - node_wav_min) / angstroms_per_node
nodes = int(np.round(nodes))
node_wav = np.linspace(node_wav_min, node_wav_max, nodes)
node_wav = node_wav.astype(int)
return node_wav
def add_spline_nodes(params, node_wav, vary=True):
for node in node_wav:
params.add('sp%i' % node, value=1.0, vary=vary)
def get_spline_nodes(params):
node_wav = []
node_flux = []
for key in params.keys():
if key.startswith('sp'):
node_wav.append( float( key.replace('sp','') ) )
node_flux.append( params[key].value )
node_wav = np.array(node_wav)
node_flux = np.array(node_flux)
return node_wav, node_flux
def add_model_weights(params, nmodels, min=0, max=1):
value = 1.0 / nmodels
for i in range(nmodels):
params.add('mw%i' % i ,value=value,min=min,max=max)
def get_model_weights(params):
nmodels = len([k for k in params.keys() if k[:2]=='mw'])
model_weights = [params['mw%i' % i].value for i in range(nmodels)]
model_weights = np.array(model_weights)
return model_weights