Source code for tesfdmtools.hdf.xdf

#!/usr/bin/env python3
'''
.. module:: xdf
   :synopsis: Read X-ray event records from an old format XDF file. 
.. moduleauthor:: Cor de Vries <c.p.de.vries@sron.nl>
'''

from tesfdmtools.utils.widgets.filechooser import getfile,getdir
from tesfdmtools.utils.cu import cu
import struct
import numpy
import sys

LONG=0
ULONG=1
DOUBLE=2
STRING=3

[docs]class Xdf: ''' Read old format XDF file. Kwargs: * `path` = path where file can be selected when filename is **None**. * `filename` = xdf file to be read. When **None**, query for file. * `noise` = if True, read noise data from first record * `noiselength` = length of noise data Iterator class. Returns: * Dictionary with the record data Example of use:: xfile=Xdf() for record in xfile: ...process record... ''' def __init__(self,path=None,filename=None,noise=False,noiselength=0): global LONG,ULONG,DOUBLE,STRING if type(filename) is list: self.filelist=filename self.fileno=1 filename=self.filelist[0] else: self.filelist=[filename] self.fileno=1 if path == None: filepath='/stage/xmmdat10/Athena/raw_data' else: filepath=path if filename == None: self.xdffile=getfile(pattern="*.xdf",path=filepath) if self.xdffile == None: self.xfile=None sys.exit() else: self.xdffile=filename self.noise=noise # if true, provide noise data-record from first scan record self.noiselen=noiselength print("Input file: ",self.xdffile) self.hinfo={} self.hinfo[0] =( 0, 8, LONG, 'RUNNUM','run number') self.hinfo[1] =( 1, 8, LONG, 'FILNUM','file number') self.hinfo[2] =( 2, 8,STRING, 'DATE','date') self.hinfo[3] =( 3, 8,STRING, 'TIME','time') self.hinfo[4] =( 4, 32,STRING,'DETECTOR','detector') self.hinfo[5] =( 5, 32,STRING, 'SQUID','squid') self.hinfo[6] =(28,256,STRING, 'COMMENT','comment') self.hinfo[7] =( 6, 1, ULONG, 'NCHAN','number of channels') self.hinfo[8] =( 7, 8, ULONG,'SMPLSCHN','samples per channel') self.hinfo[9] =( 8, 8, ULONG,'SMPLRATE','sample rate') self.hinfo[28]=( 9, 8,STRING, 'NUMBITS','number of bits') self.hinfo[10]=(10, 8, ULONG, 'NEVENTS','number of events') self.hinfo[11]=(11, 8, ULONG,'PTRIGSMP','number of pretrigger samples') self.hinfo[12]=(12, 3,STRING,'TRIGSRCE','trigger source') self.hinfo[13]=(13, 3,STRING,'TRIGSLOP','trigger slope') self.hinfo[14]=(14, 2,STRING,'TRIGCOPL','trigger coupling') self.hinfo[15]=(15, 8,DOUBLE, 'TRIGLVL','trigger level') self.hinfo[16]=(16, 8, LONG,'TRIGTMOT','trigger timeout') self.hinfo[17]=(17, 2,STRING,'ACHCOUPL','channel A coupling') self.hinfo[18]=(18, 3,DOUBLE,'ACHRANGE','channel A range') self.hinfo[19]=(19, 2,STRING,'BCHCOUPL','channel B coupling') self.hinfo[20]=(20, 3,DOUBLE,'BCHRANGE','channel B range') self.hinfo[21]=(21, 2,STRING,'CCHCOUPL','channel C coupling') self.hinfo[22]=(22, 3,DOUBLE,'CCHRANGE','channel C range') self.hinfo[23]=(23, 2,STRING,'DCHCOUPL','channel D coupling') self.hinfo[24]=(24, 3,DOUBLE,'DCHRANGE','channel D range') self.hinfo[25]=(25, 8,STRING, 'CARDID','card identifier') self.hinfo[26]=(26, 16,STRING,'ACQPRGID','aquisition program identifier') self.hinfo[27]=(27, 3, LONG,'ACQPRGVR','aquisition program version') self.xopen() def xopen(self): global LONG,ULONG,DOUBLE,STRING fmt="" for i in range(len(self.hinfo)): fmt=fmt+("%1d" % self.hinfo[i][1])+'s' self.xfile=open(self.xdffile,mode='rb') buff=self.xfile.read(1024) hdata=struct.unpack_from(fmt,buff,offset=0) # get header self.header={} for i in range(len(self.hinfo)): if self.hinfo[i][2] == LONG or self.hinfo[i][2] == ULONG: if len(hdata[i].strip()) == 0: self.header[(self.hinfo[i][3])]=0 else: self.header[(self.hinfo[i][3])]=int(hdata[i]) elif self.hinfo[i][2] == DOUBLE: if len(hdata[i].strip()) == 0: self.header[(self.hinfo[i][3])]=0.0 else: self.header[(self.hinfo[i][3])]=float(hdata[i]) else: self.header[(self.hinfo[i][3])]=cu(hdata[i]).strip() print("%30s: %s" % (self.hinfo[i][4],cu(hdata[i]).strip())) self.header["NUMBITS"]=16 # force number of bits # define data record self.rfmt='>8s12s2s10s' self.dlen=self.header['NCHAN']*self.header['SMPLSCHN'] self.rlen=32 if self.noise: self.time=numpy.arange(0,self.noiselen)/float(self.header['SMPLRATE']) else: self.time=numpy.arange(0,self.header['SMPLSCHN'])/float(self.header['SMPLRATE']) self.recno=0 self.maxrecno=self.header["NEVENTS"] self.nchan=self.header["NCHAN"] self.nchanrange=list(range(self.header["NCHAN"])) # close file def __del__(self): if self.xfile != None: self.xfile.close() # rewind file def rewind(self): if len(self.filelist) > 1: self.xfile.close() self.xdffile=self.filelist[0] self.fileno=1 self.xopen() else: self.xfile.seek(1024) self.recno=0 # iterable object def __iter__(self): return self def __next__(self): if self.noise: if self.recno == 0: record=self.xfile.read(self.rlen) try: nnn=int(record[0:8]) except: nnn=0 self.data={"NUMBER":nnn,"TSTAMP":record[8:20],"TRIGTYP":record[20:22],"TIME":self.time} data=self.data data[1]=numpy.fromfile(self.xfile,dtype=">i2",count=self.noiselen) self.recno=self.recno+1 else: if ((self.recno+1)*self.noiselen) > self.header['SMPLSCHN']: raise StopIteration data=self.data data[1]=numpy.fromfile(self.xfile,dtype=">i2",count=self.noiselen) return data if self.recno == self.maxrecno: if self.fileno >= len(self.filelist): raise StopIteration else: self.xdffile=self.filelist[self.fileno] self.fileno=self.fileno+1 self.xfile.close() self.xopen() record=self.xfile.read(self.rlen) drecord=numpy.fromfile(self.xfile,dtype=">i2",count=self.dlen) try: nnn=int(record[0:8]) except: nnn=0 data={"NUMBER":nnn,"TSTAMP":cu(record[8:20]),"TRIGTYP":cu(record[20:22]),"TIME":self.time} self.recno=self.recno+1 d1=0 d2=d1+self.header['SMPLSCHN'] for i in self.nchanrange: data[(i+1)]=numpy.array(drecord[d1:d2],dtype='int') d1=d1+self.header['SMPLSCHN'] d2=d2+self.header['SMPLSCHN'] return data
# =======================================================================================
[docs]def printcarray(array,key=None,cw=10): ''' Print compound data array Args: * `array` = the compound data array Kwargs: * `key` = array of keys to be printed. If **None** print all keys * `cw` = character width of columns to be printed ''' if key == None: key=array.dtype.names[0] nk=len(array[:][0]) h1=nk*"%11s" % tuple(array.dtype.names[0:nk]) print(h1) for i,pixel_index in enumerate(array[key]): cf='%%%ds ' % cw ll=cf % pixel_index for name in array.dtype.names[1:nk]: element=array[name][i] if str(type(element)).find('int') >= 0: cf='%%%dd ' % cw elif str(type(element)).find('float') >= 0: cf='%%%d.3f ' % cw else: cf='%%%ds ' % cw ll=ll+(cf % element) print(ll)
# ========================================================================================= import h5py from datetime import datetime
[docs]def xdf2hdf(infile,outfile): ''' Convert xdf input file to hdf5 output file Args: * `infile` = filename of xdf input file * `outfile` = filename of hdf5 output file ''' xdf=Xdf(filename=infile) qxdf=None if infile.find('_I.') > 0: # is it a I-data file only ? iqfile=infile.replace('_I.','_Q.') if os.path.exists(iqfile): # does an associated Q-file exists ? qxdf=Xdf(filename=iqfile) # yes, open file date=xdf.header['DATE'] time=xdf.header['TIME'] dd=date+' '+time print('date/time: ',dd) dtime=datetime.strptime(dd,'%Y%m%d %H:%M:%S') print('dtime: ',dtime) timestamp = (dtime - datetime(1970, 1, 1)).total_seconds() print('timestamp: ',timestamp) hd5=h5py.File(outfile,'w') hd5.attrs['time_start_utc']=timestamp conf=hd5.create_group("configuration") # create configuration group cchan=conf.create_group("channel_000") # channel configuration mux=hd5.create_group("mux") # create mux group channel=mux.create_group("channel_000") # one channel only channel.attrs['Nevents']=xdf.header['NEVENTS'] freq=channel.create_group("freq_000") # one frequency (pixel) only rlen=xdf.header['SMPLSCHN'] cc=numpy.dtype([('pixel_index',int), ('switch',int), ('feedback',int), ('freq',float), ('ampl',float), ('ampl_Q',float), ('delay',float), ('ACB_rotate',float), ('ACB_phase',float), ('gbwp',float), ('RLL_res_I',float), ('RLL_res_Q',float), ('RLL_set_I',float), ('RLL_set_Q',float), ('trig_level',float)]) freq_conf=numpy.zeros((1,),dtype=cc) printcarray(freq_conf) cdata=cchan.create_dataset("freq_conf",(freq_conf.size,),dtype=cc) freq_conf['freq'][0]=1E3 freq_conf['switch'][0]=1 freq_conf['feedback'][0]=1 freq_conf['trig_level'][0]=xdf.header['TRIGLVL'] cdata[...]=freq_conf nchan=xdf.header['NCHAN'] dummy=numpy.zeros(rlen,dtype=int) decnt=0 for i,record in enumerate(xdf): rec="%9.9d" % i drecord=numpy.empty((rlen,2),dtype=numpy.dtype('int16')) drecord[:,0]=record[1] if nchan > 1: drecord[:,1]=record[2] else: if qxdf != None: drecord[:,1]=qxdf.next()[1] else: drecord[:,1]=dummy freq[rec]=drecord freq[rec].attrs['data_type']='IQ' freq[rec].attrs['sample_rate']=xdf.header['SMPLRATE'] freq[rec].attrs['pretrig_samples']=xdf.header['PTRIGSMP'] try: rtime=datetime.strptime(date+' '+record['TSTAMP']+'000','%Y%m%d %H:%M:%S.%f') except: rtime=datetime.strptime(date,'%Y%m%d') decnt=decnt+1 freq[rec].attrs['event_time']=(rtime-dtime).total_seconds() freq[rec].attrs['scale_volt_DEMUX']=5E-6 if ( i % 1000 ) == 0: print("read/write Xdf record: ",i) if decnt > 0: print("Number of record time stamp errors: ",decnt) etimestamp = (rtime - datetime(1970, 1, 1)).total_seconds() hd5.attrs['time_end_utc']=etimestamp hd5.close()