#!/usr/bin/env python3
#
'''
.. module:: HDF_Cutils
:synopsis: Methods and utility functions for the HDF_Crosstalk script
.. moduleauthor:: Cor de Vries <c.p.de.vries@sron.nl>
'''
import sys,os
import numpy
import matplotlib
import matplotlib.pyplot as plt
from tesfdmtools.hdf.HMUX import HMUX
from tesfdmtools.utils.widgets.filechooser import getfile
from tesfdmtools.methods.IQrotate import IQphase,IQphrot
[docs]def singlepulse(pulse,bl=10,fr=0.3,dc=50):
'''
see if pulse is single pulse
Args:
* `pulse` = array with pulse data
Kwargs:
* `bl` = number of bins from begin and end to compute baseline
* `fr` = fraction of pulse height on which to search for double peaks
* `dc` = max distance in bins, still seen as continuous (no double peak)
Returns:
* **True** when single peak, otherwise **False**
'''
bb=(numpy.sum(pulse[0:bl])+numpy.sum(pulse[-bl:]))/(bl*2.0) # baseline level
pp=pulse.min() # pulse peak
tt=bb-fr*(bb-pp) # threshold
indx,=numpy.where(pulse < tt) # record below threshld
if indx.size < 5:
return False
dd=indx[1:]-indx[0:-1] # peak seperation at threshold
if dd.max() < dc: # less than min separation
return True # is single peak
return False # is double peak
[docs]def surface(xx,yy,weights=None,range=None):
'''
find surface of pulse:
- fit the offset level
- subtract the offset level from the pulse and integrate
Args:
* `xx` = x-values of pulse record
* `yy` = y-values (data) of pulse
Kwargs:
* `weights` = weight values for each data point
* `range` = range to consider within pulse record
Returns:
* `surf` = surface of pulse minus background
* `fit` = fitted background signal (linear fit)
'''
if range[0] == range[1]:
range=(0,xx.size)
if weights is None:
weights=numpy.ones(xx.size)
else:
weights=weights/numpy.sum(weights)*xx.size*(xx[1]-xx[0])
# fit offset level from 5% intervals from either side of the selected range
nd=int((range[1]-range[0])/20.0)
xf=numpy.array(numpy.concatenate((xx[range[0]:(range[0]+nd)],xx[(range[1]-nd):range[1]])),dtype=float)
yf=numpy.concatenate((yy[range[0]:(range[0]+nd)],yy[(range[1]-nd):range[1]]))
# linear fit of offset
pp=numpy.polyfit(xf,yf,1)
fit=pp[0]*xx+pp[1]
# subtract offset and integrate surface
surf=numpy.sum((yy[range[0]:range[1]]-fit[range[0]:range[1]])*weights[range[0]:range[1]])
return (surf,fit)
crec=0 # initialize current starting record when only (averages of) sets of records are shown wihout analysis
[docs]def crossdata(cfile,channel,frequency,nrecs,threshold,ppos=False,prange=[-200,2000],rotate=True,debug=False):
'''
get crosstalk signals by finding records with pulse for selected frequency(pixel)
at which the other pixels do not have an X-ray pulse
Args:
* `cfile` = HDF5 file object
* `channel` = channel to process
* `frequency` = frequency to process
* `nrecs` = if a number, number of records to process. Otherwise process all records
* `threshold` = only consider records in which maximum-minimum is larger than the given threshold
Kwargs:
* `ppos` = when **True** assume all pulses on same position (triggered data)
* `prange` = range in samples around pulse to take into acount
* `rotate` = rotate pulse in I/Q plane to compensate for average (based on 50 pulses) rotation
* `debug` = print/plot extra debug info when True
Returns:
* `tff` = array of frequencies (pixel numbers) for pixels
* `pfreqs` = corresponding real frequencies (kHz) for pixels
* `adata` = I-signal averages (true pulse for selected pixel and the no-pulse signals for the other pixels)
* `aqdata` = Q-signal averages
* `nproc` = number of records used for the averages
* `apulse` = average of I-signal pulses for other pixels which do contain a proper pulse
'''
global crec
crec=0
# number of events for given channel
nrecords=cfile.channel[channel].attrs['Nevents']
maxrecs=int(nrecords)
if nrecs.isdigit():
maxproc=min(maxrecs,int(nrecs))
else:
maxproc=maxrecs
# find active pixels from configuration table and their frequencies
conf_tab=cfile.channel[channel].freq_conf
confindx,=numpy.where(conf_tab['pixel_index'] == frequency)
pfreqs={frequency:conf_tab['freq'][confindx[0]]}
tff=[frequency]
for ff in cfile.channel[channel].freqs:
if ff != frequency:
confindx,=numpy.where(conf_tab['pixel_index'] == ff)
if len(confindx) > 0:
pfreqs[ff]=conf_tab['freq'][confindx[0]]
tff.append(ff)
else:
print("Warning: frequencies(pixel) %d not found in the configuration table" % ff)
continue
tff=numpy.array(tff,dtype=int)
# find phase corrections for all pixels (when rotate is requested)
if rotate:
phis=numpy.zeros(tff.size,dtype=float) # phi corrections
for j,ff in enumerate(tff): # go for all pixels
ntry=min(50,maxrecs) # average from max of 50 records
phi=0.0
for irec in numpy.arange(ntry):
record=cfile.channel[channel].freq[ff][irec]
phi=phi+IQphase(record[:,0],record[:,1]) # find phase
phis[j]=-phi/float(ntry) # average phase correction
print("Phase correction for pixel %d %7.2f (deg)" % (ff,(phis[j]/numpy.pi*180.0)))
# find position of proper X-ray pulse by examining first 400 records (when triggered data)
if ppos:
ntry=min(400,maxrecs)
pindx=numpy.zeros(ntry)
for irec in numpy.arange(ntry):
record=cfile.channel[channel].freq[frequency][irec][:,0]
if irec == 0:
nsamples=record.size//2
if numpy.ptp(record) > threshold:
pindx[irec]=numpy.argmin(record)
hist,hedges=numpy.histogram(pindx,bins=200,range=(1,(nsamples-1)))
ipos=int(hedges[numpy.argmax(hist)]+(hedges[1]-hedges[0])/2.0)
p1=max([0,(ipos+prange[0])])
p2=min([nsamples,(ipos+prange[1])])
else:
ipos=None
p1=0
p2=prange[1]-prange[0]
if debug:
print("Average position of triggered pulse: ",ipos)
# now go through all selected records
nproc=0
for irec in numpy.arange(crec,maxrecs):
if ( irec % 1000 ) == 0:
print("read record: ",irec)
record=cfile.channel[channel].freq[frequency][irec]
# initialize data accumulation arrays
if irec == crec:
nsamples=record.size//2
ns=p2-p1
npix=len(tff)
data=numpy.zeros((npix,ns),dtype=int)
adata=numpy.zeros((npix,ns),dtype=float)
qdata=numpy.zeros((npix,ns),dtype=int)
aqdata=numpy.zeros((npix,ns),dtype=float)
apulse=numpy.zeros(((npix-1),ns),dtype=float)
npulse=numpy.zeros((npix-1),dtype=int)
# get proper pulses for other frequencies(pixels) then the selected frequency(pixel)
for j,ff in enumerate(tff[1:]):
if rotate:
rrr=cfile.channel[channel].freq[ff][irec]
idt,idq=IQphrot(rrr[:,0],rrr[:,1],phis[(j+1)])
else:
idt=cfile.channel[channel].freq[ff][irec][:,0]
jpos=numpy.argmin(idt)
op1=jpos+prange[0]
op2=jpos+prange[1]
if ( op1 > 0 ) and ( op2 <= nsamples ):
idt=idt[op1:op2]
if (numpy.ptp(idt) > threshold) and singlepulse(idt): # only single pulses
apulse[j,:]=apulse[j,:]+idt
npulse[j]=npulse[j]+1
# rotate record in i/Q plane ?
if rotate:
irecord,qrecord=IQphrot(record[:,0],record[:,1],phis[0])
else:
irecord=record[:,0]
qrecord=record[:,1]
# for selected frequency(pixel) get proper X-ray records
if not ppos:
jpos=numpy.argmin(irecord)
p1=jpos+prange[0]
p2=jpos+prange[1]
if ( p1 <= 0 ) or ( p2 > nsamples ):
continue
irecord=irecord[p1:p2]
qrecord=qrecord[p1:p2]
# print ipos,numpy.argmin(record[:,0])
if ( numpy.ptp(irecord) > threshold ) and singlepulse(irecord): # single pulses only
store=True
data[0,:]=irecord
qdata[0,:]=qrecord
# see if there are no pulses for the other frequencies(pixels)
for j,ff in enumerate(tff[1:]):
if rotate:
rrr=cfile.channel[channel].freq[ff][irec][p1:p2,:]
data[j+1,:],qdata[j+1,:]=IQphrot(rrr[:,0],rrr[:,1],phis[j+1])
else:
data[j+1,:]=cfile.channel[channel].freq[ff][irec][p1:p2,0]
qdata[j+1,:]=cfile.channel[channel].freq[ff][irec][p1:p2,1]
if numpy.ptp(data[j+1,:]) > threshold:
store=False
# if ok, store the data
if store:
adata=adata+data
aqdata=aqdata+qdata
nproc=nproc+1
if nproc >= maxproc:
break
# make the averages
crec=irec+1
adata=adata/float(nproc)
aqdata=aqdata/float(nproc)
for i,nn in enumerate(npulse):
if nn > 0:
apulse[i,:]=apulse[i,:]/float(nn)
if debug:
print("xpulse",i,apulse[i,:].max(),apulse[i,:].min())
if debug:
print('average of ',nproc,' records')
return (tff,pfreqs,adata,aqdata,nproc,apulse)
[docs]def cprint(txtname,tff,freqs,xax,idata,qdata,xrange=(0,0),filename='Hdf5'):
'''
print basic averaged crosstalk records data to file
Args:
* `txtname` = name of file for text file
* `tff` = list of frequencies (pixel numbers) for pixels
* `freqs` = list of real frequencies (kHz)
* `xax` = sample x-axis
* `idata` = I-data array
* `qdata` = Q-data array
Kwargs:
* `xrange` = x-range of data to plot. (0,0) plot entire range
* `filename` = name of file to open and write
'''
if xrange[0] == xrange[1]:
xrange=(0,xax.size)
x1=xrange[0]
x2=xrange[1]
txtfile=open(txtname,'w')
npix=len(tff)
txtfile.write('#Crosstalk data for datafile: %s\n' % filename)
txtfile.write('# pixel = %d\n#\n' % tff[0])
for i,px in enumerate(tff):
txtfile.write('# pixel %3d :: %9.2f (kHz)\n' % (px,freqs[px]))
txtfile.write('#\n')
line='#%6.6s ' % 'i'
line=line+(npix*(' pixel %3d ') % tuple(tff))
txtfile.write(line+'\n')
txtfile.write('# '+npix*(' I Q ')+'\n')
for i in numpy.arange(x1,x2):
line='% 7d' % xax[i]
for px in numpy.arange(npix):
line=line+' %10.2f %10.2f' % (idata[px,i],qdata[px,i])
line=line+'\n'
txtfile.write(line)
txtfile.close()
[docs]def cplot(tff,data,xax,xtitle,ytitle,titles,ps=None,filename='hdf5',num='0',xrange=(0,0),qdata=None,qtitle='',\
dlabel=None,qlabel=None,dbckgr=None,qbckgr=None,ptitle=None):
'''
make plot (on screen, or postscript file) of the data. For the given pixel, plot its average pulse and
the average signals at the same time of all other pixels when those do not have a pulse of their own.
Plot I-signals in blue (black labels) and Q-signals in red.
Args:
* `tff` = array of frequencies (pixel numbers) for the pixels of the data
* `data` = 2dim array with I-signal averages for each pixel
* `xax` = array with x-values for a data record
* `xtitle` = x-axis title for plots
* `ytitle` = y-axis title for I-signal data
* `titles` = titles for plots
Kwargs:
* `ps` = if not **None**, hardcopy plots on pdf file
* `filename` = name of file which is being processed
* ( num not used )
* `xrange` = x-range of data to plot. (0,0) plot entire range
* `qdata` = 2dim array with Q-signal averages for each pixel
* `qtitle` = y-axis title for Q data
* `dlabel` = I-signal label
* `qlabel` = Q-signal label
* `dbckgr` = I-signal background data
* `qbckgr` = Q-signal background data
* `ptitle` = title for plot page
return:
none
'''
nplot=len(titles)
nrow=4
ncol=1
fontsize=10
if nplot > 4:
ncol=2
fontsize=7
if (ps is None) and (ncol == 1) :
nrow=nplot//ncol
fig = plt.figure()
if ps is not None:
matplotlib.rcParams.update({'font.size': fontsize})
if xrange[0] == xrange[1]:
xrange=(0,xax.size)
x1=xrange[0]
x2=xrange[1]
pagetitle=True
for jplot in numpy.arange(nplot):
iplot=jplot % 8
if iplot == 0:
plots={}
if qdata is not None:
qplots={}
plots[iplot] = plt.subplot2grid((nrow,ncol), ((iplot//ncol),(iplot % ncol)) )
plots[iplot].set_xlabel(xtitle)
plots[iplot].set_ylabel(ytitle)
plots[iplot].set_title("%d: %7.2f (kHz)" % (tff[jplot],titles[tff[jplot]]))
plots[iplot].plot(xax[x1:x2],data[jplot,x1:x2],'b-')
if qdata is not None:
qplots[iplot]=plots[iplot].twinx()
qplots[iplot].set_ylabel(qtitle,color='r')
for tl in qplots[iplot].get_yticklabels():
tl.set_color('r')
qplots[iplot].plot(xax[x1:x2],qdata[jplot,x1:x2],'r-')
if qlabel is not None:
qrange=qplots[iplot].axis()
xq=qrange[0]+0.97*(qrange[1]-qrange[0])
yr=0.05
if qdata[jplot,x1] < (qrange[2]+0.5*(qrange[3]-qrange[2])):
yr=0.85
if qbckgr is not None:
qplots[iplot].plot(xax[x1:x2],qbckgr[jplot,x1:x2],'r--')
yq=qrange[2]+yr*(qrange[3]-qrange[2])
qplots[iplot].text(xq,yq,qlabel[jplot],verticalalignment='bottom',color='r',horizontalalignment='right')
if dlabel is not None:
prange=plots[iplot].axis()
xt=prange[0]+0.03*(prange[1]-prange[0])
yr=0.05
if data[jplot,x1] < (prange[2]+0.5*(prange[3]-prange[2])):
yr=0.78
if dbckgr is not None:
plots[iplot].plot(xax[x1:x2],dbckgr[jplot,x1:x2],'b--')
yt=prange[2]+yr*(prange[3]-prange[2])
plots[iplot].text(xt,yt,dlabel[jplot],verticalalignment='bottom',color='k',horizontalalignment='left')
plots[iplot].set_zorder(2)
plots[iplot].patch.set_visible(False)
if qdata is not None:
qplots[iplot].set_zorder(1)
qplots[iplot].patch.set_visible(True)
if ( iplot == 7 ) or ( jplot == (nplot-1) ):
if ( ptitle is not None ) and ( pagetitle == True ) :
plt.suptitle(ptitle,size=12)
pagetitle=False
if ps is not None:
fpage = plt.gcf()
# fpage.set_size_inches(8.27, 11.69)
fpage.set_size_inches(8.27, 10.75)
plt.tight_layout()
if ptitle is not None:
plt.subplots_adjust(top=0.94)
plt.savefig(ps,format='pdf',papertype='a4')
else:
plt.tight_layout()
if ptitle is not None:
plt.subplots_adjust(top=0.94)
plt.show()
plt.close('all')
[docs]def pixelmap(ax,data,pixelax,title='',fig=None,ypixelax=None):
'''
Plot matrix values in falsecolor plot
Args:
* `ax` = plot instance for plotting
* `data` = matrix with the data
* `pixelax` = array with matrix coordinates
Kwargs:
* `title` = title for plot
* `fig` = figure instance for colorbar
* `ypixelax` = array with matrix y-coordinates
'''
if ypixelax is None:
ypixelax=pixelax
npix=pixelax.size
ypix=ypixelax.size
axlabs=numpy.empty(npix,dtype=object)
aylabs=numpy.empty(ypix,dtype=object)
for i,pp in enumerate(pixelax):
axlabs[i]="%d" % pp
for i in numpy.arange(ypix):
aylabs[i]="%d" % ypixelax[-i-1]
ax.set_xlabel('Pixel')
ax.set_ylabel('Pixel')
ax.set_title(title)
cname='hot'
axranges=(-0.5,npix-0.5,-0.5,ypix-0.5)
pp=ax.imshow(data,aspect='equal',interpolation='nearest',\
extent=axranges,cmap=plt.get_cmap(cname))
ax.axis(axranges)
xticks=numpy.arange(npix)
ax.set_xticks(xticks)
yticks=numpy.arange(ypix)
ax.set_yticks(yticks)
ax.set_xticklabels(axlabs)
ax.set_yticklabels(aylabs)
for i in numpy.arange(ypix-1):
ax.plot(axranges[0:2],[i+0.5,i+0.5],'k-')
for i in numpy.arange(npix-1):
ax.plot([i+0.5,i+0.5],axranges[2:4],'k-')
if fig is not None:
fig.colorbar(pp,ax=ax)