Source code for rfpipe.preferences
from __future__ import print_function, division, absolute_import, unicode_literals
from builtins import bytes, dict, object, range, map, input, str
from future.utils import itervalues, viewitems, iteritems, listvalues, listitems
from io import open
import attr
import yaml
import json
from os import getcwd
from collections import OrderedDict
import hashlib
from rfpipe.version import __version__
import logging
logger = logging.getLogger(__name__)
[docs]@attr.s
class Preferences(object):
""" Preferences express half of info needed to
define state. Using preferences with metadata produces a unique state and
pipeline outcome.
"""
rfpipe_version = attr.ib(default=__version__)
# data selection
chans = attr.ib(default=None)
spw = attr.ib(default=None)
excludeants = attr.ib(default=())
selectpol = attr.ib(default='auto') # 'auto', 'all', 'cross'
fileroot = attr.ib(default=None)
# preprocessing
read_tdownsample = attr.ib(default=1)
read_fdownsample = attr.ib(default=1)
l0 = attr.ib(default=0.) # in radians
m0 = attr.ib(default=0.) # in radians
timesub = attr.ib(default=None)
flaglist = attr.ib(default=[('badchtslide', 4., 10), ('blstd', 3.0, 0.05)])
flagantsol = attr.ib(default=True)
badspwpol = attr.ib(default=2.) # 0 means no flagging done
applyonlineflags = attr.ib(default=True)
gainfile = attr.ib(default=None)
# simulate transients from list of tuples with
# values/units: (segment, i0/int, dm/pc/cm3, dt/s, amp/sys, dl/rad, dm/rad)
simulated_transient = attr.ib(default=None)
# processing
nthread = attr.ib(default=1)
# nsegment = attr.ib(default=0)
segmenttimes = attr.ib(default=None) # list of lists of float pairs
memory_limit = attr.ib(default=16) # in GB; includes typical freqs/configs
maximmem = attr.ib(default=16) # in GB; defines chunk for fftw imaging
# search
fftmode = attr.ib(default='fftw') # either 'fftw' or 'cuda'. defines segment size and algorithm used.
dmarr = attr.ib(default=None) # in pc/cm3
dtarr = attr.ib(default=None) # in samples
dm_maxloss = attr.ib(default=0.05) # fractional sensitivity loss
mindm = attr.ib(default=0) # in pc/cm3
maxdm = attr.ib(default=0) # in pc/cm3
dm_pulsewidth = attr.ib(default=3000) # in microsec
searchtype = attr.ib(default='image1') # supported: image1, image1stat
sigma_image1 = attr.ib(default=7) # threshold for image1 algorithm
sigma_image2 = attr.ib(default=None) # second threshold (no algo yet)
nfalse = attr.ib(default=None) # number of thermal false positives per scan
uvres = attr.ib(default=0) # in lambda
npixx = attr.ib(default=0) # set number of x pixels in image
npixy = attr.ib(default=0) # set number of y pixels in image
npix_max = attr.ib(default=0) # set max number of pixels in image
uvoversample = attr.ib(default=1.) # scale factor for to overresolve grid
savenoise = attr.ib(default=False)
savecands = attr.ib(default=False)
candsfile = attr.ib(default=None)
workdir = attr.ib(default=getcwd()) # set upon import
timewindow = attr.ib(default=30)
# logfile = attr.ib(default=True)
loglevel = attr.ib(default='INFO')
@property
def ordered(self):
""" Get OrderedDict of preferences sorted by key
"""
keys = sorted(self.__dict__)
return OrderedDict([(key, self.__dict__[key]) for key in keys])
@property
def json(self):
""" json string that can be loaded into elasticsearch or hashed.
"""
return json.dumps(self.ordered).encode('utf-8')
@property
def name(self):
""" Unique name for an instance of preferences.
To be used to look up preference set for a given candidate or data set.
"""
return hashlib.md5(self.json).hexdigest()
def parsejson(jsonstring):
""" Take json string (as from elasticsearch) and creates preference object.
"""
inprefs = json.loads(jsonstring)
return Preferences(**inprefs)
def parsepreffile(preffile=None, name=None, inprefs=None):
""" Read preference file and set parameter values.
File should have python-like syntax. Full file name needed.
name can be used to select a parameter set if multiple are defined in the
yaml file.
"""
# define baseline dicts
prefs = {}
if inprefs is None:
inprefs = {}
# optionally overload
if preffile:
ptype = preffiletype(preffile)
if ptype == 'yaml':
prefs = _parsepref_yaml(preffile, name=name)
elif ptype == 'old':
prefs = _parsepref_old(preffile)
else:
logger.warn('Preffile type ({0}) not recognized.'.format(preffile))
# optionall overload
for key in inprefs:
prefs[key] = inprefs[key]
return prefs
def _parsepref_old(preffile):
""" Parse parameter file of old type (using exec of Python commands)
"""
pars = {}
with open(preffile, 'r') as fp:
for line in fp.readlines():
# trim out comments and trailing cr
line_clean = line.rstrip('\n').split('#')[0]
if line_clean and '=' in line: # use valid lines only
attribute, value = line_clean.split('=')
try:
value_eval = eval(value.strip())
except NameError:
value_eval = value.strip()
finally:
pars[attribute.strip()] = value_eval
return pars
def _parsepref_yaml(preffile, name=None):
""" Parse parameter file from yaml format.
"""
name = 'default' if not name else name
logger.info("Parsing preffile for preference set {0}".format(name))
with open(preffile, 'r') as fp:
yamlpars = yaml.load(fp)
pars = yamlpars['rfpipe'][name]
return pars
def preffiletype(preffile):
""" Infer type from first uncommented line in preffile
"""
with open(preffile, 'r') as fp:
while True:
line = fp.readline().split('#')[0]
if line:
break
if '=' in line:
return 'old'
elif ':' in line:
return 'yaml'
else:
return None
def oldstate_preferences(d, scan=None):
""" Parse old state dictionary "d" and to define preferences instance
If no scan is given, assumed to be a single scan state dict.
"""
prefs = {}
allowed = [kk for kk in attr.asdict(Preferences()).keys()]
for key in list(d.keys()):
if key in allowed:
if key == 'segmenttimes':
prefs[key] = d[key].tolist()
else:
prefs[key] = d[key]
# prefs['nsegment'] = d['nsegments']
prefs['selectpol'] = 'auto'
if scan is not None:
prefs['segmenttimes'] = d['segmenttimesdict'][scan].tolist()
return prefs