Skip to main content
  • Home
  • Development
  • Documentation
  • Donate
  • Operational login
  • Browse the archive

swh logo
SoftwareHeritage
Software
Heritage
Archive
Features
  • Search

  • Downloads

  • Save code now

  • Add forge now

  • Help

https://github.com/vejnar/LabxPipe
02 May 2023, 09:53:20 UTC
  • Code
  • Branches (1)
  • Releases (8)
  • Visits
    • Branches
    • Releases
    • HEAD
    • refs/heads/main
    • v0.6.0
    • v0.5.0
    • v0.4.0
    • v0.3.0
    • v0.2.0
    • v0.1.2
    • v0.1.1
    • v0.1.0
  • f4c2bd6
  • /
  • src
  • /
  • labxpipe_scripts
  • /
  • lxpipe_merge_count.py
Raw File Download
Take a new snapshot of a software origin

If the archived software origin currently browsed is not synchronized with its upstream version (for instance when new commits have been issued), you can explicitly request Software Heritage to take a new snapshot of it.

Use the form below to proceed. Once a request has been submitted and accepted, it will be processed as soon as possible. You can then check its processing state by visiting this dedicated page.
swh spinner

Processing "take a new snapshot" request ...

To reference or cite the objects present in the Software Heritage archive, permalinks based on SoftWare Hash IDentifiers (SWHIDs) must be used.
Select below a type of object currently browsed in order to display its associated SWHID and permalink.

  • content
  • directory
  • revision
  • snapshot
origin badgecontent badge
swh:1:cnt:ed07dce1f8232708e5637557406241188cd8325b
origin badgedirectory badge
swh:1:dir:6bf3e7e1f8ddcbe72188ffdef81445fa3dc9c885
origin badgerevision badge
swh:1:rev:adf29e860d11a9d1d3ecb06afff81bbf46207d62
origin badgesnapshot badge
swh:1:snp:cc771c9e970c5a9ae56f266e0d2aa3ecec08a923

This interface enables to generate software citations, provided that the root directory of browsed objects contains a citation.cff or codemeta.json file.
Select below a type of object currently browsed in order to generate citations for them.

  • content
  • directory
  • revision
  • snapshot
Generate software citation in BibTex format (requires biblatex-software package)
Generating citation ...
Generate software citation in BibTex format (requires biblatex-software package)
Generating citation ...
Generate software citation in BibTex format (requires biblatex-software package)
Generating citation ...
Generate software citation in BibTex format (requires biblatex-software package)
Generating citation ...
Tip revision: adf29e860d11a9d1d3ecb06afff81bbf46207d62 authored by vejnar on 19 April 2023, 15:50:13 UTC
Add --suffix to extract command
Tip revision: adf29e8
lxpipe_merge_count.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

#
# Copyright © 2013 Charles E. Vejnar
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://www.mozilla.org/MPL/2.0/.
#

"""Merge & normalize count"""

import argparse
import collections
import json
import os
import sys

import numpy as np
import pandas as pd
import zstandard as zstd

import labxdb

import pyfnutils as pfu
import pyfnutils.log

from labxpipe import utils

def read_data(path_data, step_name, name_column):
    if step_name == 'counting':
        m = pd.read_csv(path_data+'.csv')
    elif step_name == 'counting_cufflinks':
        m = pd.read_table(path_data+'.fpkm_tracking')
        m.sort([name_column], inplace=True)
        m.index = range(len(m))
    return m

def is_spike_in(prefix, name):
    if name.find(prefix) == -1:
        return True
    else:
        return False

def main(argv=None):
    if argv is None:
        argv = sys.argv
    # Started from wrapper?
    prog = os.path.basename(argv[0])
    if len(argv) > 1 and argv[1] == 'merge-count':
        job_cmd = argv[:2]
        argv_parser = argv[2:]
        prog += ' merge-count'
    else:
        job_cmd = argv[:1]
        argv_parser = argv[1:]
    # Parse arguments
    parser = argparse.ArgumentParser(prog=prog, description='Merge & Normalize count.')
    parser.add_argument('-c', '--pipeline', dest='path_pipeline', action='store', required=True, help='Path to pipeline')
    parser.add_argument('-s', '--step', dest='step_name', action='store', default='counting', help='Step name')
    parser.add_argument('-l', '--levels', dest='levels', action='store', default='sample,replicate', help='Level to include (comma separated within sample, replicate)')
    parser.add_argument('-a', '--fontools_path_main', dest='fontools_path_main', action='store', default='.', help='Root path to annotation files')
    parser.add_argument('-m', '--columns', dest='columns', action='store', default='count_1,count_2,count_900', help='Column name(s) to merge (comma separated) (default:count_1,count_2,count_900)')
    parser.add_argument('-n', '--name_column', dest='name_column', action='store', default='name', help='Name column (i.e. name or tracking_id)')
    parser.add_argument('-x', '--suffix', dest='suffix', action='store', default='', help='Output filename suffix')
    parser.add_argument('-t', '--strict_column_names', dest='strict_column_names', action='store_true', default=False, help='Use strict column labels (only alphabetic letters, numbers and _)')
    parser.add_argument('-k', '--spike_in_main_prefix', dest='spike_in_main_prefix', action='store', default='', help='Prefix of main gene (other genes are spike-in), i.e. ENSDAR')
    parser.add_argument('-p', '--spike_in_names', dest='spike_in_names', action='store', default='yst,zbf', help='Spike-in and non spike-in normalization filenames, i.e. yst,zbf (comma separated)')
    parser.add_argument('--log', dest='path_log', action='store', help='Path to log')
    parser.add_argument('--path_config', dest='path_config', action='store', help='Path to config')
    parser.add_argument('--http_url', '--labxdb_http_url', dest='labxdb_http_url', action='store', help='Database HTTP URL')
    parser.add_argument('--http_login', '--labxdb_http_login', dest='labxdb_http_login', action='store', help='Database HTTP login')
    parser.add_argument('--http_password', '--labxdb_http_password', dest='labxdb_http_password', action='store', help='Database HTTP password')
    parser.add_argument('--http_path', '--labxdb_http_path', dest='labxdb_http_path', action='store', help='Database HTTP path')
    parser.add_argument('--http_db', '--labxdb_http_db', dest='labxdb_http_db', action='store', help='Database HTTP DB')
    args = parser.parse_args(argv_parser)

    # Get config (JSON single file or all files in path_config)
    config = {}
    paths = []
    if args.path_config is None:
        if 'HTS_CONFIG_PATH' in os.environ:
            paths.append(os.environ['HTS_CONFIG_PATH'])
        elif 'XDG_CONFIG_HOME' in os.environ:
            paths.append(os.path.join(os.environ['XDG_CONFIG_HOME'], 'hts'))
    else:
        paths.append(args.path_config)
    for path in paths:
        if os.path.isdir(path):
            for f in sorted(os.listdir(path)):
                if f.endswith('.json'):
                    config = {**config, **json.load(open(os.path.join(path, f)))}
        elif os.path.isfile(path):
            config = {**config, **json.load(open(path))}

    # Input local config from args
    vargs = vars(args)
    for a, v in vargs.items():
        if v is not None and (a not in config or v != parser.get_default(a)):
            config[a] = v
        if a in ['levels', 'columns', 'spike_in_names']:
            if v is None:
                config[a] = []
            else:
                config[a] = [r.strip() for r in v.split(',')]

    # Logging
    logger = pfu.log.define_root_logger('merge_count', level='info', filename=config.get('path_log'))

    # Init. DBLink
    if 'labxdb_http_path' not in config and 'labxdb_http_db' not in config:
        if 'labxdb_http_path_seq' in config:
            config['labxdb_http_path'] = config['labxdb_http_path_seq']
        else:
            config['labxdb_http_db'] = 'seq'
    dbl = labxdb.DBLink(config.get('labxdb_http_url'), config.get('labxdb_http_login'), config.get('labxdb_http_password'), config.get('labxdb_http_path'), config.get('labxdb_http_db'))

    # Get run config
    pipe_config = json.load(open(config['path_pipeline']))
    run_name = pipe_config['name']
    run_path = pipe_config['path_output']
    # Get step config
    step_config = None
    for s in pipe_config['analysis']:
        if s['step_name'] == config['step_name']:
            step_config = s
    if step_config is None:
        logger.error(f"Could not find «{config['step_name']}» step in pipeline.")
        return 1
    # Get feature names
    feature_names = []
    if 'features' in step_config:
        feature_names = [f['name'] for f in step_config['features']]
    # Get data annotation
    pipe_config_replicate_refs = pipe_config.get('replicate_refs', [])
    pipe_config_replicate_refs.sort()
    pipe_config_run_refs = pipe_config.get('run_refs', [])
    pipe_config_run_refs.sort()
    result = []
    for level, search, refs in [('replicate', '2 replicate_ref', pipe_config_replicate_refs), ('run', '3 run_ref', pipe_config_run_refs)]:
        if len(refs) > 0:
            result += [(level, r) for r in dbl.post('tree', {'search_criterion':[search+' EQUAL '+r for r in refs], 'search_gate':'OR', 'sort_criterion':['1 track_priority ASC', '2 replicate_order ASC', '3 run_order ASC'], 'limit':'ALL'})]

    for feature_name in feature_names:
        # Determine feature_name type based on feature name
        if feature_name.find('gene') != -1:
            feature_name_type = 'gene'
        elif feature_name.find('transcript') != -1:
            feature_name_type = 'transcript'
        else:
            logger.error(f'Could not determine the feature type of {feature_name}.')
            return 1
        logger.info(f'Detected feature: {feature_name_type}')

        # Data
        data = {}

        # Get how to merge data
        merges = []
        merges_labels = set()
        for level, project in result:
            for sample in project['children']:
                merge_sample_refs = []
                for replicate in sample['children']:
                    merge_replicate_refs = []
                    # Open replicate data files
                    if level == 'replicate' and replicate['replicate_ref'] in pipe_config_replicate_refs:
                        merge_replicate_refs.append(replicate['replicate_ref'])
                        if replicate['replicate_ref'] not in data:
                            logger.info(f"Opening {replicate['replicate_ref']} {feature_name}")
                            path_data = os.path.join(run_path, replicate['replicate_ref'], config['step_name'], feature_name)
                            data[replicate['replicate_ref']] = read_data(path_data, config['step_name'], config['name_column'])
                    # Open run data files
                    elif level == 'run':
                        for run in replicate['children']:
                            if run['run_ref'] in pipe_config_run_refs:
                                merge_replicate_refs.append(run['run_ref'])
                                if run['run_ref'] not in data:
                                    logger.info(f"Opening {run['run_ref']} {feature_name}")
                                    path_data = os.path.join(run_path, run['run_ref'], config['step_name'], feature_name)
                                    data[run['run_ref']] = read_data(path_data, config['step_name'], config['name_column'])
                    # Add to merges
                    if 'replicate' in config['levels']:
                        label_short = replicate['label_short']
                        if 'sample' in config['levels'] and label_short == sample['label_short']:
                            label_short += ' R' + str(replicate['replicate_order'])
                        if label_short in merges_labels:
                            logger.warning(f'Duplicate label: {label_short}')
                            label_short += ' dup'
                        if config['strict_column_names']:
                            label_short = utils.label2var(label_short)
                        merges.append({'refs':merge_replicate_refs, 'label_short':label_short, 'level':'replicate'})
                        merges_labels.add(label_short)
                    merge_sample_refs.extend(merge_replicate_refs)
                if 'sample' in config['levels']:
                    label_short = sample['label_short']
                    if label_short in merges_labels:
                        logger.warning(f'Duplicate label: {label_short}')
                        label_short += ' dup'
                    if config['strict_column_names']:
                        label_short = utils.label2var(label_short)
                    merges.append({'refs':merge_sample_refs, 'label_short':label_short, 'level':'sample'})
                    merges_labels.add(label_short)
        logger.info('Merging')
        for merge in merges:
            logger.info(f"{merge['level']: >11}{merge['label_short']: >30}  {','.join(merge['refs'])}")

        # Find annot path from pipeline
        for f in step_config['features']:
            if f['name'] == feature_name:
                annot_fon = f['path_json']
                break

        # Open annotation file
        path_annot = os.path.join(config['fontools_path_main'], 'annots', annot_fon)
        path_annot_fon2 = os.path.join(config['fontools_path_main'], 'annots', annot_fon.replace('fon1', 'fon2'))
        if os.path.exists(path_annot_fon2):
            legacy_fon2 = True
            logger.info(f'Opening {path_annot_fon2}')
            annots = json.load(open(path_annot_fon2))
            Annot = collections.namedtuple('Annot', annots['columns'])
        elif os.path.exists(path_annot):
            legacy_fon2 = False
            logger.info(f'Opening {path_annot}')
            if path_annot.endswith('.zst'):
                annots = json.load(zstd.open(path_annot, 'rt'))
            else:
                annots = json.load(open(path_annot, 'rt'))
        else:
            logger.info(f'{path_annot} not found')
            annots = None

        for main_column in config['columns']:
            logger.info(f'Step {feature_name} {main_column}')
            first_sample = True

            # Init output dataframe
            if first_sample:
                # Get two columns from first run
                main = data[merges[0]['refs'][0]].loc[:,[config['name_column'], 'length']]

                # Renaming main feature to its name
                if feature_name_type == 'gene':
                    main.columns = ['gene', 'gene_length']
                    first_datacol_idx = 2
                elif feature_name_type == 'transcript':
                    main.columns = ['transcript', 'transcript_length']
                    first_datacol_idx = 2

                # Add annotations
                if annots is not None:
                    if feature_name_type == 'gene':
                        # Add gene name
                        if legacy_fon2:
                            dgn = pd.DataFrame([(a.gene_stable_id, a.gene_name) for a in map(Annot._make, annots['annotations'])], columns=['gene', 'gene_name']).drop_duplicates()
                        else:
                            dgn = pd.DataFrame([(a['gene_stable_id'], a['gene_name']) for a in annots['features']], columns=['gene', 'gene_name']).drop_duplicates()
                        main = pd.merge(main, dgn, on='gene', how='left')
                        first_datacol_idx = 3
                    elif feature_name_type == 'transcript':
                        # Add gene ID and name
                        if legacy_fon2:
                            dgn = pd.DataFrame([(a.transcript_stable_id, a.gene_stable_id, a.gene_name) for a in map(Annot._make, annots['annotations'])], columns=['transcript', 'gene', 'gene_name']).drop_duplicates()
                        else:
                            dgn = pd.DataFrame([(a['transcript_stable_id'], a['gene_stable_id'], a['gene_name']) for a in annots['features']], columns=['transcript', 'gene', 'gene_name']).drop_duplicates()
                        main = pd.merge(main, dgn, on='transcript', how='left')
                        first_datacol_idx = 4

                first_sample = False

            # Merge count
            count_labels = []
            count_sums = []
            for merge in merges:
                series = []
                for ref in merge['refs']:
                    m = data[ref]
                    # Check format of series is compatible
                    if np.all(main.loc[:,feature_name_type] == m.loc[:,config['name_column']]):
                        series.append(m.loc[:,main_column])
                    else:
                        logger.warning(f'Format of {sample_ref} incorrect')
                        continue
                count_labels.append(merge['label_short'])
                count_sums.append(np.sum(series, axis=0))

            # Export
            main = pd.concat([main, pd.DataFrame(np.vstack(count_sums).T, columns=count_labels)], axis=1)
            main.to_csv(f"{run_name}_{main_column}_{feature_name}{config['suffix']}.csv", index=False)

            # Normalization
            if config['step_name'] == 'counting':
                if config['spike_in_main_prefix'] != '':
                    spike_in_names = config['spike_in_names']
                    # Normalize by *Spike-in Total*
                    sel_spike = main[feature_name_type].map(lambda x: is_spike_in(config['spike_in_main_prefix'], x))
                    sel_spike[0] = False # Remove total
                    if sel_spike.sum() == 0:
                        logger.error('No spike-in feature found')
                        return 1
                    # Normalize by *Main Total* (Regular RPKM)
                    sel_main = np.invert(sel_spike)
                    sel_main[0] = False # Remove total
                    # Normalize by *(Main + Spike-in) Total*
                    sel_all = sel_main.copy()
                    sel_all.loc[:] = True
                    sel_all[0] = False # Remove total
                    sels = [(spike_in_names[0], '_'+spike_in_names[0], sel_spike), (spike_in_names[1], '_'+spike_in_names[1], sel_main), ('all', '_all', sel_all)]
                else:
                    # Normalize by *Total* (Regular RPKM)
                    sel_all = np.ones(len(main[feature_name_type]), 'bool')
                    sel_all[0] = False # Remove total
                    sels = [('all', '', sel_all)]

                totals_data = []
                for sel_name, sel_suffix, sel in sels:
                    rpkm_labels = []
                    rpkms = []
                    for col in main.columns[first_datacol_idx:]:
                        total = main.loc[sel, col].sum()
                        rpkm = (main.loc[:, col]) * (1000./main.loc[:, feature_name_type+'_length']) * (1000000./total)
                        rpkms.append(rpkm)
                        rpkm_labels.append(col)
                        totals_data.append([col, sel_name, total])

                    # Save dataframe
                    head = main.iloc[:, :first_datacol_idx].copy()
                    rpkm = pd.concat([head, pd.DataFrame(np.vstack(rpkms).T, columns=rpkm_labels)], axis=1)
                    rpkm.to_csv(f"{run_name}_rpkm_{main_column[main_column.find('_')+1:]}_{feature_name}{sel_suffix}{config['suffix']}.csv", index=False)

                # Save totals
                pd.DataFrame(totals_data, columns=['sample', 'name', 'total']).to_csv(f"{run_name}_total_{main_column}_{feature_name}{config['suffix']}.csv", index=False)

if __name__ == '__main__':
    sys.exit(main())

back to top

Software Heritage — Copyright (C) 2015–2026, The Software Heritage developers. License: GNU AGPLv3+.
The source code of Software Heritage itself is available on our development forge.
The source code files archived by Software Heritage are available under their own copyright and licenses.
Terms of use: Archive access, API— Content policy— Contact— JavaScript license information— Web API