https://gitlab.com/kantundpeterpan/masseltof
Tip revision: c45a7e66a8a89c436d41b8431938b35acfa4e53b authored by Heiner Atze on 17 August 2021, 11:38:48 UTC
Update README.md
Update README.md
Tip revision: c45a7e6
fastmzml.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Fri Aug 23 17:35:26 2019
@author: kantundpeterpan
"""
import lxml.etree
from lxml import objectify
import numpy as np
import pandas as pd
from . import mz_binning
from . import mz_binning_fixed_bins
from multiprocessing import Pool
import zlib
import base64
import os
import gc
# get spectra - ret time, tic, mz_array, intens array
# hard code zlib compression and binary data type
class FastMZml(object):
float_dtype = {'64-bit float':np.float64,
'32-bit float':np.float32}
def __init__(self, file, msanalyzer, stream=False):
self.msanalyzer = msanalyzer
self.file = os.path.abspath(file)
self.binning = mz_binning.binning
self.binning_fixed_bins = mz_binning_fixed_bins.binning_fixed
self.stream = stream
self.parse_mzML()
self.msanalyzer.data.ret_t = self.data.scan_time.values
self.msanalyzer.data.tic = self.data.scan_tic.values
def parse_mzML(self):
tree = lxml.etree.parse(self.file)
root = tree.getroot()
for elem in root.getiterator():
if not hasattr(elem.tag, 'find'): continue # (1)
i = elem.tag.find('}')
if i >= 0:
elem.tag = elem.tag[i+1:]
objectify.deannotate(root, cleanup_namespaces=True)
#self.tree = tree
#self.root = root
self.tic = np.array(root.xpath('//run//spectrum/cvParam[@name="total ion current"]/@value')).astype(float)
self.scan_times = np.array(root.xpath('//run//spectrum//scan/cvParam/@value')).astype(float)
self.scan_time_unit = np.array(root.xpath('//run//spectrum//scan/cvParam/@unitName'))
try:
dtype = root.xpath('//run//spectrum//binaryDataArray/cvParam[@accession="MS:1000523"]/@name')[0]
except:
dtype = root.xpath('//run//spectrum//binaryDataArray/cvParam[@accession="MS:1000521"]/@name')[0]
self.scan_dtype = self.float_dtype[dtype]
print('dtype: ', self.scan_dtype)
self.data = pd.DataFrame({
'scan_time':self.scan_times/60,
'scan_time_unit':self.scan_time_unit,
'scan_tic':self.tic
})
#could cause memory issues!
if not self.stream:
mz_bin_str = root.xpath('//run//spectrum//binaryDataArray/cvParam[@name="m/z array"]/../binary/text()')
ints_bin_str = root.xpath('//run//spectrum//binaryDataArray/cvParam[@name="intensity array"]/../binary/text()')
mz_list = [self.get_np_array_from_compressed_encoded_byte_string(bs).reshape(-1,1).astype(np.float64) for bs in mz_bin_str]
ints_list = [self.get_np_array_from_compressed_encoded_byte_string(bs).reshape(-1,1).astype(np.int64) for bs in ints_bin_str]
self.data['mz_array'] = mz_list
self.data['int_array'] = ints_list
def combine_spectra(self, tmin, tmax):
ind = (self.data.scan_time >= tmin) & (self.data.scan_time < tmax)
mzs = self.data.mz_array[ind].values
ints = self.data.int_array[ind].values
#return mzs, ints
array_s = np.array([np.hstack([mz, val]) for mz, val in zip(mzs, ints)])
stacked_spectrum = np.vstack(array_s)
stacked_spectrum = stacked_spectrum[np.argsort(stacked_spectrum[:,0])]
print(stacked_spectrum.shape)
binsize = self.get_min_mz_delta(array_s)
print('binsize: ', binsize)
spectra = []
for div in np.array_split(stacked_spectrum, indices_or_sections=len(stacked_spectrum)//5000):
spectrum = self.binning(div, binsize)
spectra.append(spectrum)
spectrum = np.vstack(spectra)
spectrum = spectrum[np.nonzero(spectrum[:,1])]
spectrum = pd.DataFrame(spectrum, columns=['x', 'y'])
self.msanalyzer.data.raw = spectrum.pivot_table(values='y', index='x',
aggfunc=np.sum).reset_index()
gc.collect()
def combine_spectra_iter(self, tmin, tmax, remove_zero = True,
binning=True, **kwargs):
ind = (self.data.scan_time >= tmin) & (self.data.scan_time < tmax)
tree = lxml.etree.parse(self.file)
root = tree.getroot()
for elem in root.getiterator():
if not hasattr(elem.tag, 'find'): continue # (1)
i = elem.tag.find('}')
if i >= 0:
elem.tag = elem.tag[i+1:]
objectify.deannotate(root, cleanup_namespaces=True)
spectra = root.xpath('//run//spectrum')
mzs = []
ints = []
for idx, spec in zip(ind, spectra):
if idx:
tmp_mz = spec.xpath('binaryDataArrayList/binaryDataArray/cvParam[@name="m/z array"]/../binary/text()')[0]
tmp_ints = spec.xpath('binaryDataArrayList/binaryDataArray/cvParam[@name="intensity array"]/../binary/text()')[0]
mzs.append(self.get_np_array_from_compressed_encoded_byte_string(tmp_mz).reshape(-1,1))
ints.append(self.get_np_array_from_compressed_encoded_byte_string(tmp_ints).reshape(-1,1))
#return mzs,ints
array_s = np.array([np.hstack([mz, val]) for mz, val in zip(mzs, ints)])
if 'binsize' not in kwargs.keys():
binsize = self.get_min_mz_delta(array_s)
else:
binsize = kwargs['binsize']
del mzs, ints
gc.collect()
stacked_spectrum = np.vstack(array_s)
del array_s
gc.collect()
stacked_spectrum = stacked_spectrum[np.argsort(stacked_spectrum[:,0])]
print(stacked_spectrum.shape)
if binning:
print('binsize: ', binsize)
spectra = []
for div in np.array_split(stacked_spectrum.astype(np.float64), indices_or_sections=len(stacked_spectrum)//5000):
spectrum = self.binning(div, binsize)
spectra.append(spectrum)
del stacked_spectrum
gc.collect()
spectrum = np.vstack(spectra)
else:
spectrum = stacked_spectrum
if remove_zero:
spectrum = spectrum[np.nonzero(spectrum[:,1])]
spectrum = pd.DataFrame(spectrum, columns=['x', 'y'])
self.msanalyzer.data.raw = spectrum.pivot_table(values='y', index='x',
aggfunc=np.sum).reset_index()
gc.collect()
def combine_spectra_fixed_bins(self, tmin, tmax,
mz_min, mz_max,
binsize,
apply_filter):
ind = (self.data.scan_time >= tmin) & (self.data.scan_time < tmax)
mzs = self.data.mz_array[ind].values
ints = self.data.int_array[ind].values
array_s = np.array([np.hstack([mz, val]) for mz, val in zip(mzs, ints)])
stacked_spectrum = np.vstack(array_s)
stacked_spectrum = stacked_spectrum[np.argsort(stacked_spectrum[:,0])]
if apply_filter:
ind = (stacked_spectrum[:,0] > mz_min) & (stacked_spectrum[:,0] < mz_max)
stacked_spectrum = stacked_spectrum[ind,:]
print(stacked_spectrum.shape)
print('binsize: ', binsize)
delta = mz_max - mz_min
bins = np.linspace(mz_min, mz_max, int(delta//binsize))
spectrum = self.binning_fixed_bins(stacked_spectrum, bins, binsize)
# for div in np.array_split(stacked_spectrum, indices_or_sections=len(stacked_spectrum)//5000):
# spectrum = self.binning_fixed_bins(div, binsize)
# spectra.append(spectrum)
#spectrum = np.vstack(spectra)
#spectrum = spectrum[np.nonzero(spectrum[:,1])]
spectrum = pd.DataFrame(spectrum, columns=['x', 'y'])
self.msanalyzer.data.raw = spectrum.pivot_table(values='y', index='x',
aggfunc=np.sum).reset_index()
self.msanalyzer.data.raw.y = self.msanalyzer.data.raw.y.astype(int)
gc.collect()
return self.msanalyzer.data.raw.values
def get_np_array_from_compressed_encoded_byte_string(self, byte_string):
decoded_str = base64.b64decode(byte_string)
try:
inflated_str = zlib.decompress(decoded_str)
except:
inflated_str = decoded_str
array = np.frombuffer(inflated_str, dtype = self.scan_dtype)
return array
def get_min_mz_delta(self, array_of_spectra):
min_deltas = np.empty(array_of_spectra.shape[0])
i=0
for spec in array_of_spectra:
deltas = spec[1:,0] - spec[:-1,0]
min_delta = np.min(deltas)
min_deltas[i] = min_delta
i+=1
total_min_delta = np.min(min_deltas)
return total_min_delta