https://gitlab.com/kantundpeterpan/masseltof
Tip revision: c45a7e66a8a89c436d41b8431938b35acfa4e53b authored by Heiner Atze on 17 August 2021, 11:38:48 UTC
Update README.md
Update README.md
Tip revision: c45a7e6
mzml_Parser.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Sat Mar 9 18:10:53 2019
@author: kantundpeterpan
"""
import os
import numpy as np
import pandas as pd
import pymzml
from . import mz_binning
from . import mz_binning_fixed_bins
import gc
class mzML_parser(object):
def combine_spectra(self, tmin=0, tmax=0,
scanIDs=None, peaktype='raw',
binning=False, remove_zero=False,
**kwargs):
run = pymzml.run.Reader(self.file)
scans = []
if scanIDs:
for i,s in enumerate(run):
if i in scanIDs:
scans.append(s)
else:
for s in run:
if s.scan_time_in_minutes()>=tmin and s.scan_time_in_minutes()<tmax:
scans.append(s)
_map_func = lambda x:x.peaks(peaktype)
array_s = np.array([*map(_map_func, scans)])
dfs = np.vstack(array_s).astype(np.float64)
#dfs.view('float64, float64').sort(order = ['f0'], axis=0)
dfs = dfs[np.argsort(dfs[:,0]),:]
print(dfs.shape)
binsize = kwargs.pop('binsize',
self.get_min_mz_delta(array_s))
print('binsize: ', binsize)
if binning:
spectra = []
for div in np.array_split(dfs, indices_or_sections=len(dfs)//5000):
spectrum = self.binning(div, binsize)
spectra.append(spectrum)
spectrum = np.vstack(spectra)
else:
spectrum = dfs
if remove_zero:
spectrum = spectrum[np.nonzero(spectrum[:,1])]
spectrum = pd.DataFrame(spectrum, columns=['x', 'y'])
self.msanalyzer.data.raw = spectrum.pivot_table(values='y', index='x',
aggfunc=np.sum).reset_index()
gc.collect()
def combine_spectra_fixed_bins(self,
tmin, tmax,
mz_min, mz_max,
binsize,
remove_zero=False,
apply_filter=False):
run = pymzml.run.Reader(self.file)
scans = (s for s in run if s.scan_time[0]/60>tmin and s.scan_time[0]/60<tmax)
array_s = np.array([self.get_non_zero(s.peaks('raw')) for s in scans])
dfs = np.vstack(array_s)
dfs = dfs[np.argsort(dfs[:,0]),:]
#dfs.view('float64, float64').sort(order = ['f0'], axis=0)
if apply_filter:
ind = (dfs[:,0] > mz_min) & (dfs[:,0] < mz_max)
dfs = dfs[ind]
delta = mz_max - mz_min
bins = np.linspace(mz_min, mz_max, int(delta//binsize))
spectrum = self.binning_fixed_bins(dfs, bins, binsize)
if remove_zero:
spectrum = spectrum[np.nonzero(spectrum[:,1])]
gc.collect()
return spectrum
def get_non_zero(self,a):
x = np.array(a)
ind = (x[:,1] != 0)
z = x[ind]
return z
def get_min_mz_delta(self, array_of_spectra):
min_deltas = np.empty(array_of_spectra.shape[0])
i=0
for spec in array_of_spectra:
deltas = spec[1:,0] - spec[:-1,0]
min_delta = np.min(deltas)
min_deltas[i] = min_delta
i+=1
total_min_delta = np.min(min_deltas)
return total_min_delta
def __init__(self, mzml_file, msanalyzer):
self.msanalyzer = msanalyzer
self.file = os.path.abspath(mzml_file)
self.binning = mz_binning.binning
self.binning_fixed_bins = mz_binning_fixed_bins.binning_fixed
self.msanalyzer.data.ret_t = []
self.msanalyzer.data.tic = []
run = pymzml.run.Reader(self.file)
for s in run:
self.msanalyzer.data.ret_t.append(s.scan_time_in_minutes())
try:
self.msanalyzer.data.tic.append(s.TIC)
except:
tmp_tic = np.sum(
s.peaks('raw')[:,1]
)
self.msanalyzer.data.tic.append(tmp_tic)