https://gitlab.com/kantundpeterpan/masseltof
Raw File
Tip revision: c45a7e66a8a89c436d41b8431938b35acfa4e53b authored by Heiner Atze on 17 August 2021, 11:38:48 UTC
Update README.md
Tip revision: c45a7e6
MS_analysis_class.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Tue Jan 22 18:20:00 2019

@author: kantundpeterpan
"""

import pandas as pd

import matplotlib
matplotlib.use("Qt5Agg")

import matplotlib.pyplot as plt

from matplotlib.widgets import Cursor

import numpy as np

from PyQt5 import QtGui, QtCore, QtWidgets
from PyQt5.QtCore import Qt

from .snap_cursor import SnaptoCursor
from .easy_cursor import EasyCursor
from .peak_label_dialog import peak_label_dialog

import peakutils
from scipy.signal import savgol_filter

class MS_analysis():
    
    def __init__(self, msanalyzer):
        
        self.msanalyzer = msanalyzer

    def deconvolute(self, z, m_z, H=None, Na = 0, K = 0):
        
        if H == None:
            H = z 
        
        try:
            z = int(z)
            m_z = float(m_z)
            H = int(H)
            Na = int(Na)
            K = int(K)
        except:
            return
        
        m_h = self.msanalyzer.m_h
        m_na = self.msanalyzer.m_na
        m_k_39 = self.msanalyzer.m_k_39
        
        mass = (m_z * z) - H * m_h - Na * m_na - K * m_k_39
        
        mass = round(mass, 4) 
        
        return mass
    
    def calc_peak_centroid(self, xmin, xmax):
        
        try:
            xmin = float(xmin)
            xmax = float(xmax)
        except:
            print('Invalid input for centroid calculation')
        
        ind = (self.msanalyzer.data.raw.x > xmin) & (self.msanalyzer.data.raw.x < xmax)
        
        centroid = peakutils.centroid(self.msanalyzer.data.raw.x[ind],
                                      self.msanalyzer.data.raw.y[ind])
        
        return centroid
    
    def draw_centroid(self, x):
    
        ind = np.searchsorted(self.msanalyzer.data.raw.x, x)
        
        ymin = 0
        ymax = self.msanalyzer.data.raw.y[ind]
        
        self.ax.vlines(x, ymin = ymin, ymax = ymax)
    
    def append_peak(self,
                    peak_label,
                    m_z,
                    z,
                    deconv_mass,
                    intensity,
                    calc_mass,
                    delta,
                    remarks,
                    ann,
                    ann_x,
                    ann_y):
        
        print('called append')
        
        z = int(z)
        
        df = pd.DataFrame({
             'peak_label':peak_label,
              #'m/z':m_z,
              'z':z,
              'mass':deconv_mass, #(m_z*z)-z*1.007276466583, #that depends no?!
              'intensity':intensity,
              'calc_mass':calc_mass,
              'delta':delta,
              'remarks':remarks,
              'ann_obj':ann,
              'ann_x':ann_x,
              'ann_y':ann_y},
              index = [m_z])
         
        #df.set_index('m/z')
        
        self.msanalyzer.data.peaks = self.msanalyzer.data.peaks.append(df,
                                                                       ignore_index = False)
    
    
    def annotate_peak(self,
                      text,
                      m_z,
                      z,
                      deconv,
                      calc_mass,
                      delta,
                      remarks = '',
                      df = None ,
                      ax = None,
                      arrowstyle = '-|>',
                      reload = False):
        
        print('called annoate, received: ', m_z)
        
        if df == None:
            df = self.msanalyzer.data.raw
        if ax == None:
            ax = self.ax
            
        ind = (df.x > m_z - 0.05) & (df.x < m_z + 0.05)
        y = df.y[ind].max()
        x = df.x[df.y == y].values[0]
        
        print(x,y)
        
        if reload:
                      
            data = self.msanalyzer.data.peaks.loc[m_z] 
            
            x_text = data.ann_x
            y_text = data.ann_y
            
#             print(x_text, y_text)
            
            an = ax.annotate(s = '%s\n%s\n%s' % (text, r'$\frac{m}{z}=$' + str(x), r'$z=$'+str(z)),
                 xy = (x, y),
                 xycoords = 'data',
                 xytext = (x_text,y_text),
                 textcoords = 'data',
                 ha = 'left',
                 va = 'top',
                 arrowprops = {'arrowstyle':arrowstyle})
            
            self.msanalyzer.data.peaks.at[m_z, 'ann_obj'] = an 
            
        if not reload:
            an = ax.annotate(s = '%s\n%s\n%s' % (text, r'$\frac{m}{z}=$' + str(x), r'$z=$'+str(z)),
                             xy = (x, y),
                             xycoords = 'data',
                             ha = 'left',
                             va = 'top',
                             arrowprops = {'arrowstyle':arrowstyle})
            
            ann_x = float(an.get_position()[0])
            ann_y = float(an.get_position()[1])

            self.append_peak(text,
                             m_z,
                             z,
                             deconv,
                             y,
                             calc_mass,
                             delta,
                             remarks,
                             an,
                             ann_x,
                             ann_y)
            
        an.draggable()
              
        self.fig.canvas.draw_idle()
        
    def update_annotation(self, m_z, label, z, remarks):
        
        self.msanalyzer.data.peaks.at[m_z, 'peak_label'] = label
        self.msanalyzer.data.peaks.at[m_z, 'z'] = z
        self.msanalyzer.data.peaks.at[m_z, 'remarks'] = remarks
        
        self.msanalyzer.data.peaks.loc[m_z].ann_obj.set_text('%s\n%s\n%s' % (label,
                                      r'$\frac{m}{z}=$' + str(m_z),
                                      r'$z=$'+str(z)))
               
                                   
    def update_ann_positions(self, event):
        
        if not hasattr(self.ax, 'texts'):
            return
        #Disable pandas warning for chained assignment, does not apply here
        pd.options.mode.chained_assignment = None
        
        for ann in self.ax.texts:
            if isinstance(ann, plt.Annotation):
                ann_x = ann.get_position()[0]
                ann_y = ann.get_position()[1]
                ind = self.msanalyzer.data.peaks.ann_obj == ann
    
                #self.msanalyzer.data.peaks.ann_x[self.msanalyzer.data.peaks['m/z'] == m_z] = ann_x
                #self.msanalyzer.data.peaks.ann_y[self.msanalyzer.data.peaks['m/z'] == m_z] = ann_y
                
                self.msanalyzer.data.peaks.at[ind, 'ann_x'] = ann_x
                self.msanalyzer.data.peaks.at[ind, 'ann_y'] = ann_y
                
    def stuff_on_close(self, event):
        #self.fig = None
        #self.ax = None
        #self.peak_label_box = None
        #self.z_box = None

        self.msanalyzer.data.peaks.to_csv(self.msanalyzer.name + '_peaks.csv')
        
        for species in self.msanalyzer.mol_db.index:
            self.msanalyzer.mol_db.at[species, 'labeling_variants'] = [[variant[0],
                                                          variant[1].labeling_pattern,
                                                          tuple(variant[1].labeling)]\
                   for variant in self.msanalyzer.mol_db.loc[species].species_obj.labeling_variants.iterrows()]
        
        self.msanalyzer.mol_db.iloc[:,:-1].to_csv('species_test_sequences.csv')

    def plot_tic(self, show = True):
        
        self.tic_fig, self.tic_ax = plt.subplots(1,1)
        self.tic_ax.set_xlabel('Retention Time (min)')
        self.tic_ax.set_ylabel('TIC')
        self.tic_ax.plot(self.msanalyzer.data.ret_t, self.msanalyzer.data.tic, marker = 'o', markersize = 3)
        self.tic_cursor = EasyCursor(self.tic_ax, msanalyzer=self.msanalyzer)
        self.tic_fig.canvas.mpl_connect('button_press_event', self.tic_cursor.open_pop_menu)
        
        if show:
            plt.show()
        
    def plot_spectrum(self,x='x', y='y', data=None, cursor=True,
                      savitzky_golay = False, show = True,
                      **kwargs):

        if data == None:
            data = self.msanalyzer.data.raw
        
        if 'ax' not in kwargs.keys():
            self.fig, self.ax = plt.subplots(1,1)
        else:
            ax = kwargs.pop('ax')
            self.fig = ax.figure
            self.ax = ax
        self.root = self.fig.canvas.manager.window
        
        #self.fig = fig
        #self.ax = ax
        
        if savitzky_golay:
            if 'sg_params' in kwargs:
                sg_params = kwargs.pop('sg_params')
            else:
                sg_params = (11,5)
            y = savgol_filter(data[y], *sg_params)
        else:
            y = data[y]
        
        self.ax.plot(data[x], y, **kwargs)    
        
        self.data_line = self.ax.lines[0]
        self.peak_label_dialog = peak_label_dialog(self.root, self)
        
        if cursor:
            self.cursor = SnaptoCursor(self)
            self.fig.canvas.mpl_connect('button_press_event', self.cursor.open_pop_menu)        
            self.fig.canvas.mpl_connect('motion_notify_event', self.cursor.mouse_move)

        if self.msanalyzer.data.peaks.shape[0] != 0:
            for row in self.msanalyzer.data.peaks.iterrows():
                m_z = row[0]
                data = row[1]
                self.annotate_peak(data.peak_label,
                                   m_z,
                                   data.z,
                                   data.mass,
                                   data.calc_mass,
                                   data.delta,
                                   reload = True)

        self.fig.suptitle(self.msanalyzer.name)
                
#         self.cursor = Cursor(self.ax, useblit=True, color='darkgrey', linewidth=1)       
        self.fig.canvas.mpl_connect('draw_event', self.update_ann_positions)
        self.fig.canvas.mpl_connect('close_event', self.stuff_on_close)
        
        self.ax.set_xlim(self.data_line.get_data()[0].min(),self.data_line.get_data()[0].max())
        self.ax.margins(0.2)
               
        #self.root.showMaximized()
        self.fig.tight_layout()
        self.ax.set_xlabel('m/z')
        self.ax.set_ylabel('counts')
        if show:
            self.fig.show()
        
back to top