Source code for rfpipe.metadata
from __future__ import print_function, division, absolute_import, unicode_literals
from builtins import bytes, dict, object, range, map, input, str
from future.utils import itervalues, viewitems, iteritems, listvalues, listitems
from io import open
import os.path
import attr
import numpy as np
from rfpipe import source, util
import pwkit.environments.casa.util as casautil
import logging
logger = logging.getLogger(__name__)
qa = casautil.tools.quanta()
me = casautil.tools.measures()
[docs]@attr.s
class Metadata(object):
""" Metadata we need to translate parameters into a pipeline state.
Called from a function defined for a given source (e.g., an sdm file).
Built from nominally immutable attributes and properties.
To modify metadata, use attr.assoc(inst, key=newval)
"""
# basics
datasource = attr.ib(default=None)
datasetId = attr.ib(default=None)
filename = attr.ib(default=None) # full path to SDM (optional)
scan = attr.ib(default=None) # int
subscan = attr.ib(default=None) # int
bdfdir = attr.ib(default=None)
bdfstr = attr.ib(default=None)
# data structure and source properties
source = attr.ib(default=None)
radec = attr.ib(default=None) # (radians, radians)
inttime = attr.ib(default=None) # seconds
nints_ = attr.ib(default=None)
telescope = attr.ib(default=None)
# array/antenna info
starttime_mjd = attr.ib(default=None) # float
endtime_mjd_ = attr.ib(default=None)
dishdiameter = attr.ib(default=None)
intent = attr.ib(default=None)
antids = attr.ib(default=None)
# stationids = attr.ib(default=None) # needed?
xyz = attr.ib(default=None) # in m, geocentric
# spectral info
spworder = attr.ib(default=None)
spw_orig = attr.ib(default=None) # indexes for spw
spw_nchan = attr.ib(default=None) # channels per spw
spw_reffreq = attr.ib(default=None) # reference frequency (ch0) in Hz
spw_chansize = attr.ib(default=None) # channel size in Hz
pols_orig = attr.ib(default=None)
[docs] def atdefaults(self):
""" Is metadata still set at default values? """
return not any([self.__dict__[ab] for ab in self.__dict__])
# @property
# def spw_chanr(self):
# chanr = []
# i0 = 0
# for nch in self.spw_nchan:
# chanr.append((i0, i0+nch))
# i0 = nch
# return chanr
@property
def freq_orig(self):
"""Spacing of channel centers in GHz.
Out of order metadata order is sorted in state/data reading"""
return np.concatenate([np.linspace(self.spw_reffreq[ii],
self.spw_reffreq[ii] +
(self.spw_nchan[ii]-1) *
self.spw_chansize[ii],
self.spw_nchan[ii])
for ii in range(len((self.spw_reffreq)))]).astype('float32')/1e9
@property
def nspw_orig(self):
return len(self.spw_reffreq)
@property
def nchan_orig(self):
return len(self.freq_orig)
@property
def nants_orig(self):
return len(self.antids)
@property
def nbl_orig(self):
return int(self.nants_orig*(self.nants_orig-1)/2)
@property
def antpos(self):
x = self.xyz[:, 0].tolist()
y = self.xyz[:, 1].tolist()
z = self.xyz[:, 2].tolist()
return me.position('itrf', qa.quantity(x, 'm'),
qa.quantity(y, 'm'), qa.quantity(z, 'm'))
@property
def starttime_string(self):
return qa.time(qa.quantity(self.starttime_mjd, 'd'),
form='ymd', prec=8)[0]
@property
def endtime_mjd(self):
""" If nints\_ is defined (e.g., for SDM data), then endtime_mjd is calculated.
Otherwise (e.g., for scan_config/vys data), it looks for endtime_mjd\_
attribute
"""
if self.endtime_mjd_:
return self.endtime_mjd_
elif self.nints_:
assert self.nints_ > 0, "nints_ must be greater than zero"
return self.starttime_mjd + (self.nints_*self.inttime)/(24*3600)
else:
raise AttributeError("Either endtime_mjd_ or nints_ need to be "
"defined.")
@property
def nints(self):
""" If endtime_mjd\_ is defined (e.g., for scan_config/vys
data), then endtime_mjd is calculated.
Otherwise (e.g., for SDM data), it looks for nints\_ attribute
"""
if self.nints_:
return self.nints_
elif self.endtime_mjd_:
assert self.endtime_mjd > self.starttime_mjd, "endtime_mjd must be larger than starttime_mjd"
return np.round((self.endtime_mjd_ - self.starttime_mjd)*(24*3600)/self.inttime).astype(int)
else:
raise AttributeError("Either endtime_mjd_ or nints_ need to be "
"defined.")
@property
def uvrange_orig(self):
(ur, vr, wr) = util.calc_uvw(datetime=self.starttime_string,
radec=self.radec,
antpos=self.antpos,
telescope=self.telescope)
u = ur * self.freq_orig.min() * (1e9/3e8) * (-1)
v = vr * self.freq_orig.min() * (1e9/3e8) * (-1)
return (u.max() - u.min(), v.max() - v.min())
@property
def npol_orig(self):
return len(self.pols_orig)
@property
def scanId(self):
assert self.datasetId is not None
return '{0}.{1}.{2}'.format(self.datasetId, self.scan, self.subscan)
def config_metadata(config, datasource='vys'):
""" Creates dict holding metadata from evla_mcast scan config object.
Parallel structure to sdm_metadata, so this inherits some of its
nomenclature. datasource defines expected data source (vys expected when
using scan config)
spworder is required for proper indexing of vys data.
"""
logger.info('Reading metadata from config object')
meta = {}
meta['datasource'] = datasource
meta['datasetId'] = config.datasetId
meta['scan'] = config.scanNo
meta['subscan'] = config.subscanNo
# meta['configid'] = config.Id
meta['starttime_mjd'] = config.startTime
meta['endtime_mjd_'] = config.stopTime
meta['source'] = str(config.source)
meta['intent'] = str(config.scan_intent)
meta['telescope'] = str(config.telescope)
antennas = config.get_antennas()
meta['antids'] = [str(ant.name) for ant in antennas]
# meta['stationids'] = config.listOfStations
meta['xyz'] = np.array([ant.xyz for ant in antennas])
meta['radec'] = (np.radians(config.ra_deg), np.radians(config.dec_deg))
meta['dishdiameter'] = 25. # ?
subbands = config.get_subbands()
subband0 = subbands[0] # **parsing single subband for now
meta['inttime'] = subband0.hw_time_res # assumes vys stream post-hw-integ
meta['pols_orig'] = subband0.pp
meta['spw_nchan'] = [sb.spectralChannels for sb in subbands]
meta['spw_chansize'] = [1e6*sb.bw/subband0.spectralChannels for sb in subbands]
meta['spw_orig'] = ['{0}-{1}'.format(sb.IFid, sb.sbid) for sb in subbands]
meta['spw_reffreq'] = [(sb.sky_center_freq-sb.bw/subband0.spectralChannels*(sb.spectralChannels/2))*1e6 for sb in subbands]
meta['spworder'] = sorted([('{0}-{1}'.format(sb.IFid, sb.sbid),
meta['spw_reffreq'][subbands.index(sb)])
for sb in subbands], key=lambda x: x[1])
return meta
[docs]def sdm_metadata(sdmfile, scan, bdfdir=None):
""" Wraps Metadata call to provide immutable, attribute-filled class instance.
"""
logger.info('Reading metadata from {0}, scan {1}'.format(sdmfile, scan))
sdm = source.getsdm(sdmfile, bdfdir=bdfdir)
scanobj = sdm.scan(scan)
meta = {}
meta['datasource'] = 'sdm'
meta['datasetId'] = os.path.basename(sdmfile.rstrip('/'))
meta['filename'] = sdmfile
meta['scan'] = int(scan)
meta['subscan'] = 1 # TODO: update for more than one subscan per scan
meta['bdfdir'] = bdfdir
# meta['configid'] = scanobj.configDescriptionId
bdfstr = scanobj.bdf.fname
if (not os.path.exists(bdfstr)) or ('X1' in bdfstr):
meta['bdfstr'] = None
else:
meta['bdfstr'] = bdfstr
meta['starttime_mjd'] = scanobj.startMJD
meta['nints_'] = scanobj.numIntegration
try:
inttime = scanobj.bdf.get_integration(0).interval
meta['inttime'] = inttime
except AttributeError:
logger.warn("No BDF found. inttime not set.")
meta['source'] = str(scanobj.source)
meta['intent'] = ' '.join(scanobj.intents)
meta['telescope'] = str(sdm['ExecBlock'][0]['telescopeName']).strip()
meta['antids'] = [str(ant) for ant in scanobj.antennas]
# meta['stationids'] = scanobj.stations
meta['xyz'] = np.array(scanobj.positions)
meta['radec'] = scanobj.coordinates.tolist()
meta['dishdiameter'] = float(str(sdm['Antenna'][0].dishDiameter).strip())
meta['spw_orig'] = [int(str(spw).split('_')[1]) for spw in scanobj.spws]
meta['spw_nchan'] = scanobj.numchans
meta['spw_reffreq'] = scanobj.reffreqs
meta['spw_chansize'] = scanobj.chanwidths
try:
meta['pols_orig'] = scanobj.bdf.spws[0].pols('cross')
except AttributeError:
logger.warn("No BDF found. Inferring pols from xml.")
meta['pols_orig'] = [pol for pol in (str(sdm['Polarization'][0]
.corrType)).strip().split(' ')
if pol in ['XX', 'YY', 'XY', 'YX',
'RR', 'LL', 'RL', 'LR',
'A*A', 'A*B', 'B*A', 'B*B']]
try:
# TODO: remove for datasource=vys or sim?
meta['spworder'] = sorted(zip(['{0}-{1}'.format(spw.swbb.rstrip('_8BIT'),
spw.sw-1)
for spw in scanobj.bdf.spws],
np.array(scanobj.reffreqs)/1e6),
key=lambda x: x[1])
except AttributeError:
logger.warn("No BDF found. spworder not defined.")
return meta
[docs]def mock_metadata(t0, t1, nants, nspw, chans, npol, inttime_micros, scan=1,
subscan=1, datasource='vys', datasetid='test', **kwargs):
""" Wraps Metadata call to provide immutable, attribute-filled class instance.
Parallel structure to sdm_metadata, so this inherits some of its
nomenclature. t0, t1 are times in mjd. Supports up to nant=27, npol=4, and
nspw=32. chans is total number of channels over all spw (equal per spw).
datasource is expected source of data (typically vys when mocking).
"""
logger.info('Generating mock metadata')
meta = {}
meta['datasource'] = datasource
meta['datasetId'] = datasetid
meta['scan'] = scan
meta['subscan'] = subscan
meta['bdfdir'] = ''
meta['starttime_mjd'] = t0
meta['endtime_mjd_'] = t1
meta['inttime'] = inttime_micros/1e6
meta['source'] = 'testsource'
meta['intent'] = 'OBSERVE_TARGET'
meta['telescope'] = 'VLA'
meta['antids'] = ['ea{0:02}'.format(ant) for ant in range(1, nants+1)]
# meta['stationids'] = range(nants)
meta['xyz'] = np.array([[-1604008.7444, -5042135.8251, 3553403.7108],
[-1601315.9005, -5041985.30747, 3554808.311],
[-1604865.6575, -5042190.032, 3552962.3635],
[-1601068.806, -5042051.9327, 3554824.8388],
[-1596127.7308, -5045193.7421, 3552652.4197],
[-1601110.022, -5041488.0826, 3555597.4446],
[-1601061.9544, -5041175.8753, 3556058.0267],
[-1602044.9123, -5042025.8014, 3554427.8357],
[-1600863.6922, -5039885.3167, 3557965.3178],
[-1599340.8001, -5043150.963, 3554065.2315],
[-1601004.6988, -5040802.801, 3556610.1493],
[-1597899.8959, -5044068.6847, 3553432.4502],
[-1600801.9314, -5042219.3826, 3554706.4294],
[-1600930.0836, -5040316.3864, 3557330.39],
[-1603249.6721, -5042091.4281, 3553797.7842],
[-1601173.9647, -5041902.6458, 3554987.5342],
[-1606841.961, -5042279.6752, 3551913.0214],
[-1602592.8535, -5042054.9924, 3554140.7028],
[-1599926.1041, -5042772.9772, 3554319.8011],
[-1598663.082, -5043581.3912, 3553767.0141],
[-1605808.6341, -5042230.084, 3552459.1978],
[-1600416.518, -5042462.4305, 3554536.0417],
[-1601614.0832, -5042001.6569, 3554652.5059],
[-1601147.9425, -5041733.8336, 3555235.947],
[-1597053.1244, -5044604.675, 3553058.9927],
[-1600690.6125, -5038758.7161, 3559632.0571],
[-1600781.0607, -5039347.4391, 3558761.5271]])[:nants]
meta['radec'] = [0., 0.]
meta['dishdiameter'] = 25
meta['spw_orig'] = list(range(nspw))
meta['spw_reffreq'] = np.linspace(2e9, 4e9, 33)[:nspw]
meta['spw_chansize'] = [2000000]*nspw
chanperspw = chans//nspw
meta['spw_nchan'] = [chanperspw]*nspw
if npol == 4:
meta['pols_orig'] = ['A*A', 'A*B', 'B*A', 'B*B']
elif npol == 2:
meta['pols_orig'] = ['A*A', 'B*B']
else:
logger.warn("npol must be 2 or 4 (autos or full pol)")
meta['spworder'] = sorted([('{0}-{1}'.format('AC1', sbid),
meta['spw_reffreq'][sbid])
for sbid in range(nspw)], key=lambda x: x[1])
return meta
def oldstate_metadata(d, scan=None, bdfdir=None):
""" Parses old state function ("d", a dictionary) into new metadata instance
Note: d from merged candidate file will have some parameters defined by
last scan.
If scan is None, it will assume d is from single scan, not merged
"""
meta = {}
meta['datasource'] = 'sdm'
meta['datasetId'] = d['fileroot']
meta['subscan'] = 1
meta['bdfdir'] = bdfdir
if scan is not None:
meta['starttime_mjd'] = d['starttime_mjddict'][scan]
meta['scan'] = scan
else:
meta['starttime_mjd'] = d['starttime_mjd']
meta['scan'] = d['scan']
meta['inttime'] = d['inttime']
meta['nints_'] = d['nints']
meta['source'] = str(d['source'])
meta['telescope'] = 'VLA'
meta['antids'] = ['ea'+str(ant) for ant in d['ants']] # ** test that these are the same as what we expected with rtpipe **
# meta['xyz'] = #
meta['radec'] = d['radec'] # ** for last scan! **
meta['dishdiameter'] = d['dishdiameter']
meta['spw_orig'] = d['spw_orig']
meta['spw_nchan'] = d['spw_nchan']
meta['spw_reffreq'] = d['spw_reffreq']
meta['spw_chansize'] = d['spw_chansize']
meta['pols_orig'] = d['pols_orig']
logger.info('Read metadata from old state dictionary for scan {0}'
.format(scan))
return meta