https://github.com/pysam-developers/pysam
Raw File
Tip revision: 746e4d9bd149722f2dbdb99b1e15760a7c2b7979 authored by Andreas Heger on 16 June 2020, 22:52:51 UTC
bump version to 0.16.0.1 to allow upload of fixed wheels to pypi
Tip revision: 746e4d9
TestUtils.py
import sys
import os
import glob
import difflib
import gzip
import contextlib
import inspect
import tempfile
import pysam

WORKDIR = os.path.abspath(os.path.join(os.path.dirname(__file__),
                                       "pysam_test_work"))

BAM_DATADIR = os.path.abspath(os.path.join(os.path.dirname(__file__),
                                           "pysam_data"))

TABIX_DATADIR = os.path.abspath(os.path.join(os.path.dirname(__file__),
                                             "tabix_data"))

CBCF_DATADIR = os.path.abspath(os.path.join(os.path.dirname(__file__),
                                            "cbcf_data"))

LINKDIR = os.path.abspath(os.path.join(
    os.path.dirname(__file__), "..", "linker_tests"))


TESTS_TEMPDIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "tmp"))


IS_PYTHON3 = sys.version_info[0] >= 3


if IS_PYTHON3:
    from itertools import zip_longest
    from urllib.request import urlopen
else:
    from itertools import izip as zip_longest
    from urllib2 import urlopen


if IS_PYTHON3:
    def force_str(s):
        try:
            return s.decode('ascii')
        except AttributeError:
            return s

    def force_bytes(s):
        try:
            return s.encode('ascii')
        except AttributeError:
            return s
else:
    def force_str(s):
        return s

    def force_bytes(s):
        return s


def openfile(fn):
    if fn.endswith(".gz"):
        try:
            return gzip.open(fn, "rt", encoding="utf-8")
        except TypeError:
            return gzip.open(fn, "r")
    else:
        return open(fn)


def checkBinaryEqual(filename1, filename2):
    '''return true if the two files are binary equal.
    '''
    if os.path.getsize(filename1) != os.path.getsize(filename2):
        return False

    infile1 = open(filename1, "rb")
    infile2 = open(filename2, "rb")

    def chariter(infile):
        while 1:
            c = infile.read(1)
            if c == b"":
                break
            yield c

    found = False
    for c1, c2 in zip_longest(chariter(infile1), chariter(infile2)):
        if c1 != c2:
            break
    else:
        found = True

    infile1.close()
    infile2.close()
    return found


def checkGZBinaryEqual(filename1, filename2):
    '''return true if the decompressed contents of the two files
    are binary equal.
    '''
    with gzip.open(filename1, "rb") as infile1:
        d1 = infile1.read()
        with gzip.open(filename2, "rb") as infile2:
            d2 = infile2.read()
        if d1 == d2:
            return True
    return False


def check_samtools_view_equal(
        filename1, filename2,
        without_header=False):
    '''return true if the two files are equal in their
    content through samtools view.
    '''
    # strip MD and NM tags, as not preserved in CRAM files
    args = ["-x", "MD", "-x", "NM"]
    if not without_header:
        args.append("-h")

    lines1 = pysam.samtools.view(*(args + [filename1]))
    lines2 = pysam.samtools.view(*(args + [filename2]))

    if len(lines1) != len(lines2):
        return False

    if lines1 != lines2:
        # line by line comparison
        # sort each line, as tags get rearranged between
        # BAM/CRAM
        for n, pair in enumerate(zip(lines1, lines2)):
            l1, l2 = pair
            l1 = sorted(l1[:-1].split("\t"))
            l2 = sorted(l2[:-1].split("\t"))
            if l1 != l2:
                print("mismatch in line %i" % n)
                print(l1)
                print(l2)
                return False
        else:
            return False

    return True


def check_url(url):
    '''return True if URL is available.

    A URL might not be available if it is the wrong URL
    or there is no connection to the URL.
    '''
    try:
        urlopen(url, timeout=1)
        return True
    except:
        return False


def checkFieldEqual(cls, read1, read2, exclude=[]):
    '''check if two reads are equal by comparing each field.'''

    # add the . for refactoring purposes.
    for x in (".query_name",
              ".query_sequence",
              ".flag",
              ".reference_id",
              ".reference_start",
              ".mapping_quality",
              ".cigartuples",
              ".next_reference_id",
              ".next_reference_start",
              ".template_length",
              ".query_length",
              ".query_qualities",
              ".bin",
              ".is_paired", ".is_proper_pair",
              ".is_unmapped", ".mate_is_unmapped",
              ".is_reverse", ".mate_is_reverse",
              ".is_read1", ".is_read2",
              ".is_secondary", ".is_qcfail",
              ".is_duplicate"):
        n = x[1:]
        if n in exclude:
            continue
        cls.assertEqual(getattr(read1, n), getattr(read2, n),
                        "attribute mismatch for %s: %s != %s" %
                        (n, getattr(read1, n), getattr(read2, n)))


def check_lines_equal(cls, a, b, sort=False, filter_f=None, msg=None):
    """check if contents of two files are equal comparing line-wise.

    sort: bool
       sort contents of both files before comparing.
    filter_f:
       remover lines in both a and b where expression is True
    """
    with openfile(a) as inf:
        aa = inf.readlines()
    with openfile(b) as inf:
        bb = inf.readlines()

    if filter_f is not None:
        aa = [x for x in aa if not filter_f(x)]
        bb = [x for x in bb if not filter_f(x)]

    if sort:
        cls.assertEqual(sorted(aa), sorted(bb), msg)
    else:
        cls.assertEqual(aa, bb, msg)


def get_temp_filename(suffix=""):
    caller_name = inspect.getouterframes(inspect.currentframe(), 2)[1][3]
    try:
        os.makedirs(TESTS_TEMPDIR)
    except OSError:
        pass

    f = tempfile.NamedTemporaryFile(
        prefix="pysamtests_tmp_{}_".format(caller_name),
        suffix=suffix,
        delete=False,
        dir=TESTS_TEMPDIR)

    f.close()
    return f.name

@contextlib.contextmanager
def get_temp_context(suffix="", keep=False):
    caller_name = inspect.getouterframes(inspect.currentframe(), 3)[1][3]
    try:
        os.makedirs(TESTS_TEMPDIR)
    except OSError:
        pass

    f = tempfile.NamedTemporaryFile(
        prefix="pysamtests_tmp_{}_".format(caller_name),
        suffix=suffix,
        delete=False,
        dir=TESTS_TEMPDIR)

    f.close()
    yield f.name
    
    if not keep:
        # clear up any indices as well
        for f in glob.glob(f.name + "*"):
            os.unlink(f)


def load_and_convert(filename, encode=True):
    '''load data from filename and convert all fields to string.

    Filename can be either plain or compressed (ending in .gz).
    '''
    data = []
    if filename.endswith(".gz"):
        with gzip.open(filename) as inf:
            for line in inf:
                line = line.decode("ascii")
                if line.startswith("#"):
                    continue
                d = line.strip().split("\t")
                data.append(d)
    else:
        with open(filename) as f:
            for line in f:
                if line.startswith("#"):
                    continue
                d = line.strip().split("\t")
                data.append(d)

    return data


def flatten_nested_list(l):
    return [i for ll in l for i in ll]
back to top