Raw File
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Sat Mar  9 18:10:53 2019

@author: kantundpeterpan
"""
import os
import numpy as np
import pandas as pd
import pymzml
from . import mz_binning
from . import mz_binning_fixed_bins
import gc

class mzML_parser(object):
    
    def combine_spectra(self, tmin=0, tmax=0,
                        scanIDs=None, peaktype='raw',
                        binning=False, remove_zero=False,
                        **kwargs):
        
        run = pymzml.run.Reader(self.file)
            
        scans = []
        if scanIDs:
            for i,s in enumerate(run):
                if i in scanIDs:
                    scans.append(s)
        else:
            for s in run:
                if s.scan_time_in_minutes()>=tmin and s.scan_time_in_minutes()<tmax:
                    scans.append(s)
        
        _map_func = lambda x:x.peaks(peaktype)
        array_s = np.array([*map(_map_func, scans)])
        dfs = np.vstack(array_s).astype(np.float64)
        #dfs.view('float64, float64').sort(order = ['f0'], axis=0)
        dfs = dfs[np.argsort(dfs[:,0]),:]
    
        print(dfs.shape)
        
        binsize = kwargs.pop('binsize',
                             self.get_min_mz_delta(array_s))
        print('binsize: ', binsize)

        if binning:
            spectra = []
            for div in np.array_split(dfs, indices_or_sections=len(dfs)//5000):
                spectrum = self.binning(div, binsize)
                spectra.append(spectrum)
    
            spectrum = np.vstack(spectra)
        
        else:
            spectrum = dfs
        
        
        if remove_zero:
            spectrum = spectrum[np.nonzero(spectrum[:,1])]
        
        spectrum = pd.DataFrame(spectrum, columns=['x', 'y'])
        
        
        self.msanalyzer.data.raw = spectrum.pivot_table(values='y', index='x',
                                                        aggfunc=np.sum).reset_index()
        
        gc.collect()
        
    def combine_spectra_fixed_bins(self,
                                   tmin, tmax,
                                   mz_min, mz_max,
                                   binsize,
                                   remove_zero=False,
                                   apply_filter=False):
        
        run = pymzml.run.Reader(self.file)
        
        scans = (s for s in run if s.scan_time[0]/60>tmin and s.scan_time[0]/60<tmax)
        
        array_s = np.array([self.get_non_zero(s.peaks('raw')) for s in scans])
        dfs = np.vstack(array_s)
        dfs = dfs[np.argsort(dfs[:,0]),:]
        #dfs.view('float64, float64').sort(order = ['f0'], axis=0)
        
        if apply_filter:
            ind = (dfs[:,0] > mz_min) & (dfs[:,0] < mz_max)
            dfs = dfs[ind]
        
        delta = mz_max - mz_min
        bins = np.linspace(mz_min, mz_max, int(delta//binsize))
        
        spectrum = self.binning_fixed_bins(dfs, bins, binsize)
       
        if remove_zero:
            spectrum = spectrum[np.nonzero(spectrum[:,1])]
        
        gc.collect()
        
        return spectrum
        
    def get_non_zero(self,a):
        x = np.array(a)
        ind = (x[:,1] != 0)
        z = x[ind]
        return z
    
    def get_min_mz_delta(self, array_of_spectra):
    
        min_deltas = np.empty(array_of_spectra.shape[0])
        
        i=0
        for spec in array_of_spectra:
            deltas = spec[1:,0] - spec[:-1,0]
            min_delta = np.min(deltas)
            min_deltas[i] = min_delta
            i+=1
            
        total_min_delta = np.min(min_deltas)
        
        return total_min_delta    
    
    def __init__(self, mzml_file, msanalyzer):
        self.msanalyzer = msanalyzer
        self.file = os.path.abspath(mzml_file)
        self.binning = mz_binning.binning
        self.binning_fixed_bins = mz_binning_fixed_bins.binning_fixed
        self.msanalyzer.data.ret_t = []
        self.msanalyzer.data.tic = []
        
        run = pymzml.run.Reader(self.file)
        
        for s in run:
            self.msanalyzer.data.ret_t.append(s.scan_time_in_minutes())
            try:
                self.msanalyzer.data.tic.append(s.TIC)
            except:
                tmp_tic = np.sum(
                    s.peaks('raw')[:,1]
                    )
                self.msanalyzer.data.tic.append(tmp_tic)
back to top