#!/usr/bin/env python3
'''
.. module:: xdf
:synopsis: Read X-ray event records from an old format XDF file.
.. moduleauthor:: Cor de Vries <c.p.de.vries@sron.nl>
'''
from tesfdmtools.utils.widgets.filechooser import getfile,getdir
from tesfdmtools.utils.cu import cu
import struct
import numpy
import sys
LONG=0
ULONG=1
DOUBLE=2
STRING=3
[docs]class Xdf:
'''
Read old format XDF file.
Kwargs:
* `path` = path where file can be selected when filename is **None**.
* `filename` = xdf file to be read. When **None**, query for file.
* `noise` = if True, read noise data from first record
* `noiselength` = length of noise data
Iterator class.
Returns:
* Dictionary with the record data
Example of use::
xfile=Xdf()
for record in xfile:
...process record...
'''
def __init__(self,path=None,filename=None,noise=False,noiselength=0):
global LONG,ULONG,DOUBLE,STRING
if type(filename) is list:
self.filelist=filename
self.fileno=1
filename=self.filelist[0]
else:
self.filelist=[filename]
self.fileno=1
if path == None:
filepath='/stage/xmmdat10/Athena/raw_data'
else:
filepath=path
if filename == None:
self.xdffile=getfile(pattern="*.xdf",path=filepath)
if self.xdffile == None:
self.xfile=None
sys.exit()
else:
self.xdffile=filename
self.noise=noise # if true, provide noise data-record from first scan record
self.noiselen=noiselength
print("Input file: ",self.xdffile)
self.hinfo={}
self.hinfo[0] =( 0, 8, LONG, 'RUNNUM','run number')
self.hinfo[1] =( 1, 8, LONG, 'FILNUM','file number')
self.hinfo[2] =( 2, 8,STRING, 'DATE','date')
self.hinfo[3] =( 3, 8,STRING, 'TIME','time')
self.hinfo[4] =( 4, 32,STRING,'DETECTOR','detector')
self.hinfo[5] =( 5, 32,STRING, 'SQUID','squid')
self.hinfo[6] =(28,256,STRING, 'COMMENT','comment')
self.hinfo[7] =( 6, 1, ULONG, 'NCHAN','number of channels')
self.hinfo[8] =( 7, 8, ULONG,'SMPLSCHN','samples per channel')
self.hinfo[9] =( 8, 8, ULONG,'SMPLRATE','sample rate')
self.hinfo[28]=( 9, 8,STRING, 'NUMBITS','number of bits')
self.hinfo[10]=(10, 8, ULONG, 'NEVENTS','number of events')
self.hinfo[11]=(11, 8, ULONG,'PTRIGSMP','number of pretrigger samples')
self.hinfo[12]=(12, 3,STRING,'TRIGSRCE','trigger source')
self.hinfo[13]=(13, 3,STRING,'TRIGSLOP','trigger slope')
self.hinfo[14]=(14, 2,STRING,'TRIGCOPL','trigger coupling')
self.hinfo[15]=(15, 8,DOUBLE, 'TRIGLVL','trigger level')
self.hinfo[16]=(16, 8, LONG,'TRIGTMOT','trigger timeout')
self.hinfo[17]=(17, 2,STRING,'ACHCOUPL','channel A coupling')
self.hinfo[18]=(18, 3,DOUBLE,'ACHRANGE','channel A range')
self.hinfo[19]=(19, 2,STRING,'BCHCOUPL','channel B coupling')
self.hinfo[20]=(20, 3,DOUBLE,'BCHRANGE','channel B range')
self.hinfo[21]=(21, 2,STRING,'CCHCOUPL','channel C coupling')
self.hinfo[22]=(22, 3,DOUBLE,'CCHRANGE','channel C range')
self.hinfo[23]=(23, 2,STRING,'DCHCOUPL','channel D coupling')
self.hinfo[24]=(24, 3,DOUBLE,'DCHRANGE','channel D range')
self.hinfo[25]=(25, 8,STRING, 'CARDID','card identifier')
self.hinfo[26]=(26, 16,STRING,'ACQPRGID','aquisition program identifier')
self.hinfo[27]=(27, 3, LONG,'ACQPRGVR','aquisition program version')
self.xopen()
def xopen(self):
global LONG,ULONG,DOUBLE,STRING
fmt=""
for i in range(len(self.hinfo)):
fmt=fmt+("%1d" % self.hinfo[i][1])+'s'
self.xfile=open(self.xdffile,mode='rb')
buff=self.xfile.read(1024)
hdata=struct.unpack_from(fmt,buff,offset=0)
# get header
self.header={}
for i in range(len(self.hinfo)):
if self.hinfo[i][2] == LONG or self.hinfo[i][2] == ULONG:
if len(hdata[i].strip()) == 0:
self.header[(self.hinfo[i][3])]=0
else:
self.header[(self.hinfo[i][3])]=int(hdata[i])
elif self.hinfo[i][2] == DOUBLE:
if len(hdata[i].strip()) == 0:
self.header[(self.hinfo[i][3])]=0.0
else:
self.header[(self.hinfo[i][3])]=float(hdata[i])
else:
self.header[(self.hinfo[i][3])]=cu(hdata[i]).strip()
print("%30s: %s" % (self.hinfo[i][4],cu(hdata[i]).strip()))
self.header["NUMBITS"]=16 # force number of bits
# define data record
self.rfmt='>8s12s2s10s'
self.dlen=self.header['NCHAN']*self.header['SMPLSCHN']
self.rlen=32
if self.noise:
self.time=numpy.arange(0,self.noiselen)/float(self.header['SMPLRATE'])
else:
self.time=numpy.arange(0,self.header['SMPLSCHN'])/float(self.header['SMPLRATE'])
self.recno=0
self.maxrecno=self.header["NEVENTS"]
self.nchan=self.header["NCHAN"]
self.nchanrange=list(range(self.header["NCHAN"]))
# close file
def __del__(self):
if self.xfile != None:
self.xfile.close()
# rewind file
def rewind(self):
if len(self.filelist) > 1:
self.xfile.close()
self.xdffile=self.filelist[0]
self.fileno=1
self.xopen()
else:
self.xfile.seek(1024)
self.recno=0
# iterable object
def __iter__(self):
return self
def __next__(self):
if self.noise:
if self.recno == 0:
record=self.xfile.read(self.rlen)
try:
nnn=int(record[0:8])
except:
nnn=0
self.data={"NUMBER":nnn,"TSTAMP":record[8:20],"TRIGTYP":record[20:22],"TIME":self.time}
data=self.data
data[1]=numpy.fromfile(self.xfile,dtype=">i2",count=self.noiselen)
self.recno=self.recno+1
else:
if ((self.recno+1)*self.noiselen) > self.header['SMPLSCHN']:
raise StopIteration
data=self.data
data[1]=numpy.fromfile(self.xfile,dtype=">i2",count=self.noiselen)
return data
if self.recno == self.maxrecno:
if self.fileno >= len(self.filelist):
raise StopIteration
else:
self.xdffile=self.filelist[self.fileno]
self.fileno=self.fileno+1
self.xfile.close()
self.xopen()
record=self.xfile.read(self.rlen)
drecord=numpy.fromfile(self.xfile,dtype=">i2",count=self.dlen)
try:
nnn=int(record[0:8])
except:
nnn=0
data={"NUMBER":nnn,"TSTAMP":cu(record[8:20]),"TRIGTYP":cu(record[20:22]),"TIME":self.time}
self.recno=self.recno+1
d1=0
d2=d1+self.header['SMPLSCHN']
for i in self.nchanrange:
data[(i+1)]=numpy.array(drecord[d1:d2],dtype='int')
d1=d1+self.header['SMPLSCHN']
d2=d2+self.header['SMPLSCHN']
return data
# =======================================================================================
[docs]def printcarray(array,key=None,cw=10):
'''
Print compound data array
Args:
* `array` = the compound data array
Kwargs:
* `key` = array of keys to be printed. If **None** print all keys
* `cw` = character width of columns to be printed
'''
if key == None:
key=array.dtype.names[0]
nk=len(array[:][0])
h1=nk*"%11s" % tuple(array.dtype.names[0:nk])
print(h1)
for i,pixel_index in enumerate(array[key]):
cf='%%%ds ' % cw
ll=cf % pixel_index
for name in array.dtype.names[1:nk]:
element=array[name][i]
if str(type(element)).find('int') >= 0:
cf='%%%dd ' % cw
elif str(type(element)).find('float') >= 0:
cf='%%%d.3f ' % cw
else:
cf='%%%ds ' % cw
ll=ll+(cf % element)
print(ll)
# =========================================================================================
import h5py
from datetime import datetime
[docs]def xdf2hdf(infile,outfile):
'''
Convert xdf input file to hdf5 output file
Args:
* `infile` = filename of xdf input file
* `outfile` = filename of hdf5 output file
'''
xdf=Xdf(filename=infile)
qxdf=None
if infile.find('_I.') > 0: # is it a I-data file only ?
iqfile=infile.replace('_I.','_Q.')
if os.path.exists(iqfile): # does an associated Q-file exists ?
qxdf=Xdf(filename=iqfile) # yes, open file
date=xdf.header['DATE']
time=xdf.header['TIME']
dd=date+' '+time
print('date/time: ',dd)
dtime=datetime.strptime(dd,'%Y%m%d %H:%M:%S')
print('dtime: ',dtime)
timestamp = (dtime - datetime(1970, 1, 1)).total_seconds()
print('timestamp: ',timestamp)
hd5=h5py.File(outfile,'w')
hd5.attrs['time_start_utc']=timestamp
conf=hd5.create_group("configuration") # create configuration group
cchan=conf.create_group("channel_000") # channel configuration
mux=hd5.create_group("mux") # create mux group
channel=mux.create_group("channel_000") # one channel only
channel.attrs['Nevents']=xdf.header['NEVENTS']
freq=channel.create_group("freq_000") # one frequency (pixel) only
rlen=xdf.header['SMPLSCHN']
cc=numpy.dtype([('pixel_index',int),
('switch',int),
('feedback',int),
('freq',float),
('ampl',float),
('ampl_Q',float),
('delay',float),
('ACB_rotate',float),
('ACB_phase',float),
('gbwp',float),
('RLL_res_I',float),
('RLL_res_Q',float),
('RLL_set_I',float),
('RLL_set_Q',float),
('trig_level',float)])
freq_conf=numpy.zeros((1,),dtype=cc)
printcarray(freq_conf)
cdata=cchan.create_dataset("freq_conf",(freq_conf.size,),dtype=cc)
freq_conf['freq'][0]=1E3
freq_conf['switch'][0]=1
freq_conf['feedback'][0]=1
freq_conf['trig_level'][0]=xdf.header['TRIGLVL']
cdata[...]=freq_conf
nchan=xdf.header['NCHAN']
dummy=numpy.zeros(rlen,dtype=int)
decnt=0
for i,record in enumerate(xdf):
rec="%9.9d" % i
drecord=numpy.empty((rlen,2),dtype=numpy.dtype('int16'))
drecord[:,0]=record[1]
if nchan > 1:
drecord[:,1]=record[2]
else:
if qxdf != None:
drecord[:,1]=qxdf.next()[1]
else:
drecord[:,1]=dummy
freq[rec]=drecord
freq[rec].attrs['data_type']='IQ'
freq[rec].attrs['sample_rate']=xdf.header['SMPLRATE']
freq[rec].attrs['pretrig_samples']=xdf.header['PTRIGSMP']
try:
rtime=datetime.strptime(date+' '+record['TSTAMP']+'000','%Y%m%d %H:%M:%S.%f')
except:
rtime=datetime.strptime(date,'%Y%m%d')
decnt=decnt+1
freq[rec].attrs['event_time']=(rtime-dtime).total_seconds()
freq[rec].attrs['scale_volt_DEMUX']=5E-6
if ( i % 1000 ) == 0:
print("read/write Xdf record: ",i)
if decnt > 0:
print("Number of record time stamp errors: ",decnt)
etimestamp = (rtime - datetime(1970, 1, 1)).total_seconds()
hd5.attrs['time_end_utc']=etimestamp
hd5.close()