Source code for tesfdmtools.methods.HDF_Cutils

#!/usr/bin/env python3
#
'''
.. module:: HDF_Cutils
   :synopsis: Methods and utility functions for the HDF_Crosstalk script 
.. moduleauthor:: Cor de Vries <c.p.de.vries@sron.nl>

'''

import sys,os
import numpy

import matplotlib
import matplotlib.pyplot as plt

from tesfdmtools.hdf.HMUX import HMUX
from tesfdmtools.utils.widgets.filechooser import getfile
from tesfdmtools.methods.IQrotate import IQphase,IQphrot

[docs]def singlepulse(pulse,bl=10,fr=0.3,dc=50): ''' see if pulse is single pulse Args: * `pulse` = array with pulse data Kwargs: * `bl` = number of bins from begin and end to compute baseline * `fr` = fraction of pulse height on which to search for double peaks * `dc` = max distance in bins, still seen as continuous (no double peak) Returns: * **True** when single peak, otherwise **False** ''' bb=(numpy.sum(pulse[0:bl])+numpy.sum(pulse[-bl:]))/(bl*2.0) # baseline level pp=pulse.min() # pulse peak tt=bb-fr*(bb-pp) # threshold indx,=numpy.where(pulse < tt) # record below threshld if indx.size < 5: return False dd=indx[1:]-indx[0:-1] # peak seperation at threshold if dd.max() < dc: # less than min separation return True # is single peak return False # is double peak
[docs]def surface(xx,yy,weights=None,range=None): ''' find surface of pulse: - fit the offset level - subtract the offset level from the pulse and integrate Args: * `xx` = x-values of pulse record * `yy` = y-values (data) of pulse Kwargs: * `weights` = weight values for each data point * `range` = range to consider within pulse record Returns: * `surf` = surface of pulse minus background * `fit` = fitted background signal (linear fit) ''' if range[0] == range[1]: range=(0,xx.size) if weights is None: weights=numpy.ones(xx.size) else: weights=weights/numpy.sum(weights)*xx.size*(xx[1]-xx[0]) # fit offset level from 5% intervals from either side of the selected range nd=int((range[1]-range[0])/20.0) xf=numpy.array(numpy.concatenate((xx[range[0]:(range[0]+nd)],xx[(range[1]-nd):range[1]])),dtype=float) yf=numpy.concatenate((yy[range[0]:(range[0]+nd)],yy[(range[1]-nd):range[1]])) # linear fit of offset pp=numpy.polyfit(xf,yf,1) fit=pp[0]*xx+pp[1] # subtract offset and integrate surface surf=numpy.sum((yy[range[0]:range[1]]-fit[range[0]:range[1]])*weights[range[0]:range[1]]) return (surf,fit)
crec=0 # initialize current starting record when only (averages of) sets of records are shown wihout analysis
[docs]def crossdata(cfile,channel,frequency,nrecs,threshold,ppos=False,prange=[-200,2000],rotate=True,debug=False): ''' get crosstalk signals by finding records with pulse for selected frequency(pixel) at which the other pixels do not have an X-ray pulse Args: * `cfile` = HDF5 file object * `channel` = channel to process * `frequency` = frequency to process * `nrecs` = if a number, number of records to process. Otherwise process all records * `threshold` = only consider records in which maximum-minimum is larger than the given threshold Kwargs: * `ppos` = when **True** assume all pulses on same position (triggered data) * `prange` = range in samples around pulse to take into acount * `rotate` = rotate pulse in I/Q plane to compensate for average (based on 50 pulses) rotation * `debug` = print/plot extra debug info when True Returns: * `tff` = array of frequencies (pixel numbers) for pixels * `pfreqs` = corresponding real frequencies (kHz) for pixels * `adata` = I-signal averages (true pulse for selected pixel and the no-pulse signals for the other pixels) * `aqdata` = Q-signal averages * `nproc` = number of records used for the averages * `apulse` = average of I-signal pulses for other pixels which do contain a proper pulse ''' global crec crec=0 # number of events for given channel nrecords=cfile.channel[channel].attrs['Nevents'] maxrecs=int(nrecords) if nrecs.isdigit(): maxproc=min(maxrecs,int(nrecs)) else: maxproc=maxrecs # find active pixels from configuration table and their frequencies conf_tab=cfile.channel[channel].freq_conf confindx,=numpy.where(conf_tab['pixel_index'] == frequency) pfreqs={frequency:conf_tab['freq'][confindx[0]]} tff=[frequency] for ff in cfile.channel[channel].freqs: if ff != frequency: confindx,=numpy.where(conf_tab['pixel_index'] == ff) if len(confindx) > 0: pfreqs[ff]=conf_tab['freq'][confindx[0]] tff.append(ff) else: print("Warning: frequencies(pixel) %d not found in the configuration table" % ff) continue tff=numpy.array(tff,dtype=int) # find phase corrections for all pixels (when rotate is requested) if rotate: phis=numpy.zeros(tff.size,dtype=float) # phi corrections for j,ff in enumerate(tff): # go for all pixels ntry=min(50,maxrecs) # average from max of 50 records phi=0.0 for irec in numpy.arange(ntry): record=cfile.channel[channel].freq[ff][irec] phi=phi+IQphase(record[:,0],record[:,1]) # find phase phis[j]=-phi/float(ntry) # average phase correction print("Phase correction for pixel %d %7.2f (deg)" % (ff,(phis[j]/numpy.pi*180.0))) # find position of proper X-ray pulse by examining first 400 records (when triggered data) if ppos: ntry=min(400,maxrecs) pindx=numpy.zeros(ntry) for irec in numpy.arange(ntry): record=cfile.channel[channel].freq[frequency][irec][:,0] if irec == 0: nsamples=record.size//2 if numpy.ptp(record) > threshold: pindx[irec]=numpy.argmin(record) hist,hedges=numpy.histogram(pindx,bins=200,range=(1,(nsamples-1))) ipos=int(hedges[numpy.argmax(hist)]+(hedges[1]-hedges[0])/2.0) p1=max([0,(ipos+prange[0])]) p2=min([nsamples,(ipos+prange[1])]) else: ipos=None p1=0 p2=prange[1]-prange[0] if debug: print("Average position of triggered pulse: ",ipos) # now go through all selected records nproc=0 for irec in numpy.arange(crec,maxrecs): if ( irec % 1000 ) == 0: print("read record: ",irec) record=cfile.channel[channel].freq[frequency][irec] # initialize data accumulation arrays if irec == crec: nsamples=record.size//2 ns=p2-p1 npix=len(tff) data=numpy.zeros((npix,ns),dtype=int) adata=numpy.zeros((npix,ns),dtype=float) qdata=numpy.zeros((npix,ns),dtype=int) aqdata=numpy.zeros((npix,ns),dtype=float) apulse=numpy.zeros(((npix-1),ns),dtype=float) npulse=numpy.zeros((npix-1),dtype=int) # get proper pulses for other frequencies(pixels) then the selected frequency(pixel) for j,ff in enumerate(tff[1:]): if rotate: rrr=cfile.channel[channel].freq[ff][irec] idt,idq=IQphrot(rrr[:,0],rrr[:,1],phis[(j+1)]) else: idt=cfile.channel[channel].freq[ff][irec][:,0] jpos=numpy.argmin(idt) op1=jpos+prange[0] op2=jpos+prange[1] if ( op1 > 0 ) and ( op2 <= nsamples ): idt=idt[op1:op2] if (numpy.ptp(idt) > threshold) and singlepulse(idt): # only single pulses apulse[j,:]=apulse[j,:]+idt npulse[j]=npulse[j]+1 # rotate record in i/Q plane ? if rotate: irecord,qrecord=IQphrot(record[:,0],record[:,1],phis[0]) else: irecord=record[:,0] qrecord=record[:,1] # for selected frequency(pixel) get proper X-ray records if not ppos: jpos=numpy.argmin(irecord) p1=jpos+prange[0] p2=jpos+prange[1] if ( p1 <= 0 ) or ( p2 > nsamples ): continue irecord=irecord[p1:p2] qrecord=qrecord[p1:p2] # print ipos,numpy.argmin(record[:,0]) if ( numpy.ptp(irecord) > threshold ) and singlepulse(irecord): # single pulses only store=True data[0,:]=irecord qdata[0,:]=qrecord # see if there are no pulses for the other frequencies(pixels) for j,ff in enumerate(tff[1:]): if rotate: rrr=cfile.channel[channel].freq[ff][irec][p1:p2,:] data[j+1,:],qdata[j+1,:]=IQphrot(rrr[:,0],rrr[:,1],phis[j+1]) else: data[j+1,:]=cfile.channel[channel].freq[ff][irec][p1:p2,0] qdata[j+1,:]=cfile.channel[channel].freq[ff][irec][p1:p2,1] if numpy.ptp(data[j+1,:]) > threshold: store=False # if ok, store the data if store: adata=adata+data aqdata=aqdata+qdata nproc=nproc+1 if nproc >= maxproc: break # make the averages crec=irec+1 adata=adata/float(nproc) aqdata=aqdata/float(nproc) for i,nn in enumerate(npulse): if nn > 0: apulse[i,:]=apulse[i,:]/float(nn) if debug: print("xpulse",i,apulse[i,:].max(),apulse[i,:].min()) if debug: print('average of ',nproc,' records') return (tff,pfreqs,adata,aqdata,nproc,apulse)
[docs]def cprint(txtname,tff,freqs,xax,idata,qdata,xrange=(0,0),filename='Hdf5'): ''' print basic averaged crosstalk records data to file Args: * `txtname` = name of file for text file * `tff` = list of frequencies (pixel numbers) for pixels * `freqs` = list of real frequencies (kHz) * `xax` = sample x-axis * `idata` = I-data array * `qdata` = Q-data array Kwargs: * `xrange` = x-range of data to plot. (0,0) plot entire range * `filename` = name of file to open and write ''' if xrange[0] == xrange[1]: xrange=(0,xax.size) x1=xrange[0] x2=xrange[1] txtfile=open(txtname,'w') npix=len(tff) txtfile.write('#Crosstalk data for datafile: %s\n' % filename) txtfile.write('# pixel = %d\n#\n' % tff[0]) for i,px in enumerate(tff): txtfile.write('# pixel %3d :: %9.2f (kHz)\n' % (px,freqs[px])) txtfile.write('#\n') line='#%6.6s ' % 'i' line=line+(npix*(' pixel %3d ') % tuple(tff)) txtfile.write(line+'\n') txtfile.write('# '+npix*(' I Q ')+'\n') for i in numpy.arange(x1,x2): line='% 7d' % xax[i] for px in numpy.arange(npix): line=line+' %10.2f %10.2f' % (idata[px,i],qdata[px,i]) line=line+'\n' txtfile.write(line) txtfile.close()
[docs]def cplot(tff,data,xax,xtitle,ytitle,titles,ps=None,filename='hdf5',num='0',xrange=(0,0),qdata=None,qtitle='',\ dlabel=None,qlabel=None,dbckgr=None,qbckgr=None,ptitle=None): ''' make plot (on screen, or postscript file) of the data. For the given pixel, plot its average pulse and the average signals at the same time of all other pixels when those do not have a pulse of their own. Plot I-signals in blue (black labels) and Q-signals in red. Args: * `tff` = array of frequencies (pixel numbers) for the pixels of the data * `data` = 2dim array with I-signal averages for each pixel * `xax` = array with x-values for a data record * `xtitle` = x-axis title for plots * `ytitle` = y-axis title for I-signal data * `titles` = titles for plots Kwargs: * `ps` = if not **None**, hardcopy plots on pdf file * `filename` = name of file which is being processed * ( num not used ) * `xrange` = x-range of data to plot. (0,0) plot entire range * `qdata` = 2dim array with Q-signal averages for each pixel * `qtitle` = y-axis title for Q data * `dlabel` = I-signal label * `qlabel` = Q-signal label * `dbckgr` = I-signal background data * `qbckgr` = Q-signal background data * `ptitle` = title for plot page return: none ''' nplot=len(titles) nrow=4 ncol=1 fontsize=10 if nplot > 4: ncol=2 fontsize=7 if (ps is None) and (ncol == 1) : nrow=nplot//ncol fig = plt.figure() if ps is not None: matplotlib.rcParams.update({'font.size': fontsize}) if xrange[0] == xrange[1]: xrange=(0,xax.size) x1=xrange[0] x2=xrange[1] pagetitle=True for jplot in numpy.arange(nplot): iplot=jplot % 8 if iplot == 0: plots={} if qdata is not None: qplots={} plots[iplot] = plt.subplot2grid((nrow,ncol), ((iplot//ncol),(iplot % ncol)) ) plots[iplot].set_xlabel(xtitle) plots[iplot].set_ylabel(ytitle) plots[iplot].set_title("%d: %7.2f (kHz)" % (tff[jplot],titles[tff[jplot]])) plots[iplot].plot(xax[x1:x2],data[jplot,x1:x2],'b-') if qdata is not None: qplots[iplot]=plots[iplot].twinx() qplots[iplot].set_ylabel(qtitle,color='r') for tl in qplots[iplot].get_yticklabels(): tl.set_color('r') qplots[iplot].plot(xax[x1:x2],qdata[jplot,x1:x2],'r-') if qlabel is not None: qrange=qplots[iplot].axis() xq=qrange[0]+0.97*(qrange[1]-qrange[0]) yr=0.05 if qdata[jplot,x1] < (qrange[2]+0.5*(qrange[3]-qrange[2])): yr=0.85 if qbckgr is not None: qplots[iplot].plot(xax[x1:x2],qbckgr[jplot,x1:x2],'r--') yq=qrange[2]+yr*(qrange[3]-qrange[2]) qplots[iplot].text(xq,yq,qlabel[jplot],verticalalignment='bottom',color='r',horizontalalignment='right') if dlabel is not None: prange=plots[iplot].axis() xt=prange[0]+0.03*(prange[1]-prange[0]) yr=0.05 if data[jplot,x1] < (prange[2]+0.5*(prange[3]-prange[2])): yr=0.78 if dbckgr is not None: plots[iplot].plot(xax[x1:x2],dbckgr[jplot,x1:x2],'b--') yt=prange[2]+yr*(prange[3]-prange[2]) plots[iplot].text(xt,yt,dlabel[jplot],verticalalignment='bottom',color='k',horizontalalignment='left') plots[iplot].set_zorder(2) plots[iplot].patch.set_visible(False) if qdata is not None: qplots[iplot].set_zorder(1) qplots[iplot].patch.set_visible(True) if ( iplot == 7 ) or ( jplot == (nplot-1) ): if ( ptitle is not None ) and ( pagetitle == True ) : plt.suptitle(ptitle,size=12) pagetitle=False if ps is not None: fpage = plt.gcf() # fpage.set_size_inches(8.27, 11.69) fpage.set_size_inches(8.27, 10.75) plt.tight_layout() if ptitle is not None: plt.subplots_adjust(top=0.94) plt.savefig(ps,format='pdf',papertype='a4') else: plt.tight_layout() if ptitle is not None: plt.subplots_adjust(top=0.94) plt.show() plt.close('all')
[docs]def pixelmap(ax,data,pixelax,title='',fig=None,ypixelax=None): ''' Plot matrix values in falsecolor plot Args: * `ax` = plot instance for plotting * `data` = matrix with the data * `pixelax` = array with matrix coordinates Kwargs: * `title` = title for plot * `fig` = figure instance for colorbar * `ypixelax` = array with matrix y-coordinates ''' if ypixelax is None: ypixelax=pixelax npix=pixelax.size ypix=ypixelax.size axlabs=numpy.empty(npix,dtype=object) aylabs=numpy.empty(ypix,dtype=object) for i,pp in enumerate(pixelax): axlabs[i]="%d" % pp for i in numpy.arange(ypix): aylabs[i]="%d" % ypixelax[-i-1] ax.set_xlabel('Pixel') ax.set_ylabel('Pixel') ax.set_title(title) cname='hot' axranges=(-0.5,npix-0.5,-0.5,ypix-0.5) pp=ax.imshow(data,aspect='equal',interpolation='nearest',\ extent=axranges,cmap=plt.get_cmap(cname)) ax.axis(axranges) xticks=numpy.arange(npix) ax.set_xticks(xticks) yticks=numpy.arange(ypix) ax.set_yticks(yticks) ax.set_xticklabels(axlabs) ax.set_yticklabels(aylabs) for i in numpy.arange(ypix-1): ax.plot(axranges[0:2],[i+0.5,i+0.5],'k-') for i in numpy.arange(npix-1): ax.plot([i+0.5,i+0.5],axranges[2:4],'k-') if fig is not None: fig.colorbar(pp,ax=ax)