https://gitlab.com/kantundpeterpan/masseltof
Raw File
Tip revision: c45a7e66a8a89c436d41b8431938b35acfa4e53b authored by Heiner Atze on 17 August 2021, 11:38:48 UTC
Update README.md
Tip revision: c45a7e6
fastmzml.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Fri Aug 23 17:35:26 2019

@author: kantundpeterpan
"""

import lxml.etree

from lxml import objectify

import numpy as np
import pandas as pd

from . import mz_binning
from . import mz_binning_fixed_bins

from multiprocessing import Pool

import zlib
import base64
import os

import gc

# get spectra - ret time, tic, mz_array, intens array

# hard code zlib compression and binary data type

class FastMZml(object):
    
    float_dtype = {'64-bit float':np.float64,
                   '32-bit float':np.float32}
    
    
    def __init__(self, file, msanalyzer, stream=False):
        
        self.msanalyzer = msanalyzer
        self.file = os.path.abspath(file)
        self.binning = mz_binning.binning
        self.binning_fixed_bins = mz_binning_fixed_bins.binning_fixed
        self.stream = stream
        
        self.parse_mzML()

        self.msanalyzer.data.ret_t = self.data.scan_time.values
        self.msanalyzer.data.tic = self.data.scan_tic.values
        
    def parse_mzML(self):
        
        tree = lxml.etree.parse(self.file)
        root = tree.getroot()
        
        for elem in root.getiterator():
            if not hasattr(elem.tag, 'find'): continue  # (1)
            i = elem.tag.find('}')
            if i >= 0:
                elem.tag = elem.tag[i+1:]
        objectify.deannotate(root, cleanup_namespaces=True)
        
        #self.tree = tree
        #self.root = root
        
        self.tic = np.array(root.xpath('//run//spectrum/cvParam[@name="total ion current"]/@value')).astype(float)    
        self.scan_times = np.array(root.xpath('//run//spectrum//scan/cvParam/@value')).astype(float)    
        self.scan_time_unit = np.array(root.xpath('//run//spectrum//scan/cvParam/@unitName'))

        try:
            dtype = root.xpath('//run//spectrum//binaryDataArray/cvParam[@accession="MS:1000523"]/@name')[0]
        except:
            dtype = root.xpath('//run//spectrum//binaryDataArray/cvParam[@accession="MS:1000521"]/@name')[0]
        
        self.scan_dtype = self.float_dtype[dtype]
        print('dtype: ', self.scan_dtype)
        
        self.data = pd.DataFrame({
                'scan_time':self.scan_times/60,
                'scan_time_unit':self.scan_time_unit,
                'scan_tic':self.tic
                })
        
        #could cause memory issues!
        if not self.stream:
            mz_bin_str = root.xpath('//run//spectrum//binaryDataArray/cvParam[@name="m/z array"]/../binary/text()')
            ints_bin_str = root.xpath('//run//spectrum//binaryDataArray/cvParam[@name="intensity array"]/../binary/text()')
            
            mz_list = [self.get_np_array_from_compressed_encoded_byte_string(bs).reshape(-1,1).astype(np.float64) for bs in mz_bin_str]
            ints_list = [self.get_np_array_from_compressed_encoded_byte_string(bs).reshape(-1,1).astype(np.int64) for bs in ints_bin_str]
            
            self.data['mz_array'] = mz_list
            self.data['int_array'] = ints_list
            
    def combine_spectra(self, tmin, tmax):
        
        ind = (self.data.scan_time >= tmin) & (self.data.scan_time < tmax)
        
        mzs = self.data.mz_array[ind].values
        ints = self.data.int_array[ind].values
        
        #return mzs, ints
        
        array_s = np.array([np.hstack([mz, val]) for mz, val in zip(mzs, ints)])
        stacked_spectrum = np.vstack(array_s)
        stacked_spectrum = stacked_spectrum[np.argsort(stacked_spectrum[:,0])]
        
        print(stacked_spectrum.shape)
        binsize = self.get_min_mz_delta(array_s)
        print('binsize: ', binsize)
        
        spectra = []
        
        for div in np.array_split(stacked_spectrum, indices_or_sections=len(stacked_spectrum)//5000):
            spectrum = self.binning(div, binsize)
            spectra.append(spectrum)
    
        spectrum = np.vstack(spectra)
        
        spectrum = spectrum[np.nonzero(spectrum[:,1])]
        
        spectrum = pd.DataFrame(spectrum, columns=['x', 'y'])
        
        self.msanalyzer.data.raw = spectrum.pivot_table(values='y', index='x',
                                                        aggfunc=np.sum).reset_index()
        
        gc.collect()
        
    def combine_spectra_iter(self, tmin, tmax, remove_zero = True, 
                             binning=True, **kwargs):
        
        ind = (self.data.scan_time >= tmin) & (self.data.scan_time < tmax)
        
        tree = lxml.etree.parse(self.file)
        root = tree.getroot()
        
        for elem in root.getiterator():
            if not hasattr(elem.tag, 'find'): continue  # (1)
            i = elem.tag.find('}')
            if i >= 0:
                elem.tag = elem.tag[i+1:]
        objectify.deannotate(root, cleanup_namespaces=True)
        
        spectra = root.xpath('//run//spectrum')
        
        mzs = []
        ints = []
        
        for idx, spec in zip(ind, spectra):
            if idx:
                tmp_mz = spec.xpath('binaryDataArrayList/binaryDataArray/cvParam[@name="m/z array"]/../binary/text()')[0]
                tmp_ints = spec.xpath('binaryDataArrayList/binaryDataArray/cvParam[@name="intensity array"]/../binary/text()')[0]
                
                mzs.append(self.get_np_array_from_compressed_encoded_byte_string(tmp_mz).reshape(-1,1))
                ints.append(self.get_np_array_from_compressed_encoded_byte_string(tmp_ints).reshape(-1,1))
                
        #return mzs,ints
    
        array_s = np.array([np.hstack([mz, val]) for mz, val in zip(mzs, ints)])
        if 'binsize' not in kwargs.keys():
            binsize = self.get_min_mz_delta(array_s)
        else:
            binsize = kwargs['binsize']
        del mzs, ints
        gc.collect()
        stacked_spectrum = np.vstack(array_s)
        del array_s
        gc.collect()
        stacked_spectrum = stacked_spectrum[np.argsort(stacked_spectrum[:,0])]
        
        print(stacked_spectrum.shape)
        
        if binning:
            print('binsize: ', binsize)
            spectra = []
            
            for div in np.array_split(stacked_spectrum.astype(np.float64), indices_or_sections=len(stacked_spectrum)//5000):
                spectrum = self.binning(div, binsize)
                spectra.append(spectrum)
            
            del stacked_spectrum
            gc.collect()
            
            spectrum = np.vstack(spectra)
        
        else:
            spectrum = stacked_spectrum
        
        
        if remove_zero:
            spectrum = spectrum[np.nonzero(spectrum[:,1])]
        
        spectrum = pd.DataFrame(spectrum, columns=['x', 'y'])
        
        self.msanalyzer.data.raw = spectrum.pivot_table(values='y', index='x',
                                                        aggfunc=np.sum).reset_index()
        
        gc.collect()
        
    def combine_spectra_fixed_bins(self, tmin, tmax,
                                         mz_min, mz_max,
                                         binsize,
                                         apply_filter):
        
        ind = (self.data.scan_time >= tmin) & (self.data.scan_time < tmax)
        
        mzs = self.data.mz_array[ind].values
        ints = self.data.int_array[ind].values
        
        array_s = np.array([np.hstack([mz, val]) for mz, val in zip(mzs, ints)])
        stacked_spectrum = np.vstack(array_s)
        stacked_spectrum = stacked_spectrum[np.argsort(stacked_spectrum[:,0])]
        
        if apply_filter:
            ind = (stacked_spectrum[:,0] > mz_min) & (stacked_spectrum[:,0] < mz_max)
            stacked_spectrum = stacked_spectrum[ind,:]
        
        print(stacked_spectrum.shape)
        print('binsize: ', binsize)
              
        delta = mz_max - mz_min
        bins = np.linspace(mz_min, mz_max, int(delta//binsize))
        
        spectrum = self.binning_fixed_bins(stacked_spectrum, bins, binsize)
        
       # for div in np.array_split(stacked_spectrum, indices_or_sections=len(stacked_spectrum)//5000):
        #    spectrum = self.binning_fixed_bins(div, binsize)
         #   spectra.append(spectrum)
    
        #spectrum = np.vstack(spectra)
        
        #spectrum = spectrum[np.nonzero(spectrum[:,1])]
        
        spectrum = pd.DataFrame(spectrum, columns=['x', 'y'])
        
        self.msanalyzer.data.raw = spectrum.pivot_table(values='y', index='x',
                                                        aggfunc=np.sum).reset_index()
        
        self.msanalyzer.data.raw.y = self.msanalyzer.data.raw.y.astype(int)
        
        gc.collect()
        
        return self.msanalyzer.data.raw.values
        
    def get_np_array_from_compressed_encoded_byte_string(self, byte_string):
        
        decoded_str = base64.b64decode(byte_string)
        try:
            inflated_str = zlib.decompress(decoded_str)
        except:
            inflated_str = decoded_str
        array = np.frombuffer(inflated_str, dtype = self.scan_dtype)
        
        return array
    
    def get_min_mz_delta(self, array_of_spectra):
        
    
        min_deltas = np.empty(array_of_spectra.shape[0])
        
        i=0
        for spec in array_of_spectra:
            deltas = spec[1:,0] - spec[:-1,0]
            min_delta = np.min(deltas)
            min_deltas[i] = min_delta
            i+=1
            
        total_min_delta = np.min(min_deltas)
        
        return total_min_delta   
back to top