https://github.com/thompsar/Venison
Revision c622d9ab2eb3089cb7e4b1f73fe57c4de25de9a2 authored by Andrew Thompson on 12 May 2020, 20:45:14 UTC, committed by GitHub on 12 May 2020, 20:45:14 UTC
1 parent e9dc3e5
Tip revision: c622d9ab2eb3089cb7e4b1f73fe57c4de25de9a2 authored by Andrew Thompson on 12 May 2020, 20:45:14 UTC
Update README.md
Update README.md
Tip revision: c622d9a
spectrum.py
import numpy as np
import multiprocessing as mp
from multiprocessing import Pool, cpu_count
from scipy import optimize
class spectrum():
"""Class that loads and parses Bruker spectrum files, assumes DTA and DSC files present
Parameters
----------
path: Full path to .DSC or .DTA file
Attributes:
metadata: (dict) full metadata from DSC file, portions of which are explicitly saved as attributes
real: (1D or 2D array)
imaginary: (1D or 2D array)
abcissa: (1D array) the x-axis for the file (time (ns) for FLTPR and wavelength (nm) for SUPR)
ordinate: (2D array) the y-axis for the data (data,slice_number)
"""
def __init__(self,path):
"""Instantiate the object
Args:
path (str): Full path to either the DSC or DTA file
"""
self.path = path
self.metadata = {}
self.parse_metadata()
self.load_data()
self.set_defaults()
def parse_metadata(self):
self.metadata = dict.fromkeys(['XMIN','XWID','XPTS','YPTS'])
self.metadata['YPTS'] = 1 #always at least one slice, but not listed in DSC if YPTS = 1
with open(self.path.split('.')[0]+".DSC") as fd: #Extract parameters from DSC File
for line in fd:
if line.startswith(tuple(self.metadata.keys())):
[key,value] = line.split()
self.metadata[key] = float(value)
for key in ['XPTS','YPTS']:
self.metadata[key] = int(self.metadata[key])
def load_data(self):
raw_data=np.fromfile(self.path.split('.')[0]+".DTA",dtype=np.float64,sep="").byteswap() #Bruker data is 64bit big endian
array_shape = (self.metadata['YPTS'],self.metadata['XPTS'])
self.real2D=raw_data[::2].reshape(array_shape) #Real portion of data in ypts x xpts matrix
self.imaginary2D=raw_data[1::2].reshape(array_shape) #Imaginary portion of data in ypts x xpts matrix
havedata=np.sum(self.real2D[:,-1]>0) #find number of slices that have complete datasets (i.e. non zero ends)
self.real2D=self.real2D[0:havedata] #Drop empty slices
self.imaginary2D=self.imaginary2D[0:havedata] #Drop empty slices
self.metadata['YPTS'] = havedata #correct YPTS, is this the correct thing to do or will it cause issues?
self.dt = self.metadata['XWID']/(self.metadata['XPTS']-1)/1000 #delta t in µs
self.raw_time=(np.arange(self.metadata['XPTS'])*self.dt+self.metadata['XMIN']/1000) #Time axis of raw data in µs
self.cutoff =self.raw_time[-1] #aribitrary start of cutoff
self.background = self.raw_time[int(self.raw_time.size/2)] #arbitrary choice of start of background region
self.slice_mask = np.ones(self.metadata['YPTS'],dtype=bool)
self.avg_slices()
def set_defaults(self):
self.dimensions = 512
self.rmin = 1.5
self.rmax = 10
self.alphas = np.logspace(-4,6, 51)
def delete_slice(self,index):
self.slice_mask[index] = False
def avg_slices(self):
self.real = np.sum(self.real2D[self.slice_mask],axis=0)
maximum = np.max(self.real)
self.real = self.real/maximum
self.imaginary = np.sum(self.imaginary2D[self.slice_mask],axis=0)/maximum
def build_kernel(self):
ny0 = 52.04 # dipolar frequency at 1 nm for g=ge
w0 = 2*np.pi*ny0 #angular frequency
self.full_t = np.arange(0., self.tmax+2*self.dt,self.dt) #time axis in μs, +2*dt to ensure tmax is included
self.r = np.linspace(self.rmin,self.rmax,self.dimensions) #distance axis in nm
self.full_kernel = np.zeros((len(self.r),len(self.full_t)))
rarray=(w0/self.r[:,np.newaxis]**3)
tarray=self.full_t[np.newaxis,:]
kslice=rarray*tarray
for l in range(1001):
self.full_kernel=self.full_kernel+np.cos(kslice*(3*(l/1000)**2-1))
self.full_kernel=self.full_kernel/self.full_kernel[:,0][:,np.newaxis]
self.full_kernel=self.full_kernel.T
#initialize waveform
self.waveform = np.zeros(self.full_kernel.shape[0])
self.Lmatrix = np.zeros((self.dimensions-2,self.dimensions))
for i in range(0, self.Lmatrix.shape[0]):
self.Lmatrix[i, 1 + (i - 1)] = 1.
self.Lmatrix[i, 1 + (i - 0)] = -2.
self.Lmatrix[i, 1 + (i + 1)] = 1.
def hMat(alpha):
return np.dot(self.full_kernel,
np.dot(
np.linalg.pinv(np.dot(self.full_kernel.T,self.full_kernel)+alpha**2*np.dot(self.Lmatrix.T,self.Lmatrix)),
self.full_kernel.T))
self.full_hMats=np.asarray([np.diag(hMat(alpha)) for alpha in self.alphas])
#Kernel, t and hMats may be subsets of the full computed version.
self.kernel = self.full_kernel
self.t = self.full_t
self.hMats = self.full_hMats
#initialize tikhonov parameters
self.bvector = np.concatenate([self.waveform,
np.zeros(self.Lmatrix.shape[0]) ]) #insert reference
self.nnls_solutions = np.zeros([self.alphas.shape[0],self.kernel.shape[1]])
def background_correct(self,background_curve_full):
background = background_curve_full[self.waveform_range]
self.normamp=(self.zeroamp-background[0])/(background[0])
self.waveform=(self.real[self.waveform_range]-background)/background
self.waveform=self.waveform/self.normamp
self.bvector = np.concatenate([self.waveform,
np.zeros(self.Lmatrix.shape[0])])
if self.kernel.shape[0]!=self.waveform.shape[0]:
self.resize_kernel()
def resize_kernel(self):
new_size = self.waveform.shape[0]
self.kernel=self.full_kernel[:new_size,:]
self.t=self.full_t[:new_size]
self.hMats=self.full_hMats[:,:new_size]
def update_ranges(self):
self.waveform_range = range(abs(self.raw_time-self.zeropoint).argmin(),
abs(self.raw_time-self.cutoff).argmin()+1) #plus one to account for non-inclusive indexing
self.background_range = range(abs(self.raw_time-self.background).argmin(),
abs(self.raw_time-self.cutoff).argmin())
def regularize(self, alpha):
return optimize.nnls(np.concatenate([self.kernel,alpha*self.Lmatrix]),self.bvector)[0]
def generate_lcurve(self):
pool = mp.Pool()
results = [pool.apply_async(self.regularize, args=(alpha,)) for alpha in self.alphas]
self.solutions = [p.get() for p in results]
self.solutions=np.array(self.solutions)
pool.close()
self.tikhonovfits = np.dot(self.kernel,self.solutions.T).T #generate all the spectra from solutions
self.tikhonovfits = self.tikhonovfits/self.tikhonovfits[:,0][:,np.newaxis] #use broadcasting to normalize solutions

Computing file changes ...