https://github.com/pysam-developers/pysam
Raw File
Tip revision: 746e4d9bd149722f2dbdb99b1e15760a7c2b7979 authored by Andreas Heger on 16 June 2020, 22:52:51 UTC
bump version to 0.16.0.1 to allow upload of fixed wheels to pypi
Tip revision: 746e4d9
AlignmentFile_test.py
#!/usr/bin/env python
'''unit testing code for pysam.

Execute in the :file:`tests` directory as it requires the Makefile
and data files located there.
'''

import unittest
import os
import shutil
import sys
import collections
import subprocess
import logging
import array
if sys.version_info.major >= 3:
    from io import StringIO
else:
    from StringIO import StringIO

from functools import partial

import pysam
import pysam.samtools
from TestUtils import checkBinaryEqual, checkGZBinaryEqual, check_url, \
    check_samtools_view_equal, checkFieldEqual, force_str, \
    get_temp_filename, BAM_DATADIR


##################################################
#
# Detailed test of file contents
#
# Data are read either through file based iterator
# access (BasicTestBAMFromFile) or by calling fetch
# without coordinates (BasicTestBAMFromFetch)
##################################################
class BasicTestBAMFromFetch(unittest.TestCase):

    '''basic first test - detailed testing
    if information in file is consistent
    with information in AlignedSegment object.'''

    def setUp(self):
        self.samfile = pysam.AlignmentFile(
            os.path.join(BAM_DATADIR, "ex3.bam"),
            "rb")
        self.reads = list(self.samfile.fetch())

    def tearDown(self):
        self.samfile.close()

    def testARqname(self):
        self.assertEqual(self.reads[0].query_name,
                         "read_28833_29006_6945")
        self.assertEqual(self.reads[1].query_name,
                         "read_28701_28881_323b")

    def testARflag(self):
        self.assertEqual(self.reads[0].flag, 99)
        self.assertEqual(self.reads[1].flag, 147)

    def testARrname(self):
        self.assertEqual(self.reads[0].reference_id, 0)
        self.assertEqual(self.reads[1].reference_id, 1)

    def testARpos(self):
        self.assertEqual(self.reads[0].reference_start, 33 - 1)
        self.assertEqual(self.reads[1].reference_start, 88 - 1)

    def testARmapq(self):
        self.assertEqual(self.reads[0].mapping_quality, 20)
        self.assertEqual(self.reads[1].mapping_quality, 30)

    def testARcigar(self):
        self.assertEqual(self.reads[0].cigartuples, [(0, 10), (2, 1), (0, 25)])
        self.assertEqual(self.reads[1].cigartuples, [(0, 35)])

    def testARcigarstring(self):
        self.assertEqual(self.reads[0].cigarstring, '10M1D25M')
        self.assertEqual(self.reads[1].cigarstring, '35M')

    def testARmrnm(self):
        self.assertEqual(self.reads[0].next_reference_id, 0)
        self.assertEqual(self.reads[1].next_reference_id, 1)
        self.assertEqual(self.reads[0].next_reference_id, 0)
        self.assertEqual(self.reads[1].next_reference_id, 1)

    def testARmpos(self):
        self.assertEqual(self.reads[0].next_reference_start, 200 - 1)
        self.assertEqual(self.reads[1].next_reference_start, 500 - 1)
        self.assertEqual(self.reads[0].next_reference_start, 200 - 1)
        self.assertEqual(self.reads[1].next_reference_start, 500 - 1)

    def testARQueryLength(self):
        self.assertEqual(
            self.reads[0].query_length, 35,
            "insert size mismatch in read 1: %s != %s" %
            (self.reads[0].query_length, 35))
        self.assertEqual(
            self.reads[1].query_length, 35,
            "insert size mismatch in read 2: %s != %s" %
            (self.reads[1].query_length, 35))
        self.assertEqual(
            self.reads[0].query_length, 35,
            "insert size mismatch in read 1: %s != %s" %
            (self.reads[0].query_length, 35))
        self.assertEqual(
            self.reads[1].query_length, 35,
            "insert size mismatch in read 2: %s != %s" %
            (self.reads[1].query_length, 35))

    def testARseq(self):
        self.assertEqual(
            self.reads[0].query_sequence,
            "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG")
        self.assertEqual(
            self.reads[1].query_sequence,
            "ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA")
        self.assertEqual(
            self.reads[3].query_sequence,
            "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG")

    def testARqual(self):
        self.assertEqual(
            pysam.qualities_to_qualitystring(self.reads[0].query_qualities),
            "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<")
        self.assertEqual(
            pysam.qualities_to_qualitystring(self.reads[1].query_qualities),
            "<<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<")
        self.assertEqual(
            pysam.qualities_to_qualitystring(self.reads[3].query_qualities),
            "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<")

    def testARquery(self):
        self.assertEqual(
            self.reads[0].query_alignment_sequence,
            "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG",
            "query mismatch in read 1: %s != %s" %
            (self.reads[0].query_alignment_sequence,
             "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG"))
        self.assertEqual(
            self.reads[1].query_alignment_sequence,
            "ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA",
            "query size mismatch in read 2: %s != %s" %
            (self.reads[1].query_alignment_sequence,
             "ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA"))
        self.assertEqual(
            self.reads[3].query_alignment_sequence,
            "TAGCTAGCTACCTATATCTTGGTCTT",
            "query mismatch in read 4: %s != %s" %
            (self.reads[3].query_alignment_sequence,
             "TAGCTAGCTACCTATATCTTGGTCTT"))

    def testARqqual(self):
        self.assertEqual(
            pysam.qualities_to_qualitystring(
                self.reads[0].query_alignment_qualities),
            "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<",
            "qquality string mismatch in read 1: %s != %s" %
            (pysam.qualities_to_qualitystring(self.reads[0].query_alignment_qualities),
             "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<"))
        self.assertEqual(
            pysam.qualities_to_qualitystring(
                self.reads[1].query_alignment_qualities),
            "<<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<",
            "qquality string mismatch in read 2: %s != %s" %
            (pysam.qualities_to_qualitystring(self.reads[1].query_alignment_qualities),
             "<<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<"))
        self.assertEqual(
            pysam.qualities_to_qualitystring(
                self.reads[3].query_alignment_qualities),
            "<<<<<<<<<<<<<<<<<:<9/,&,22",
            "qquality string mismatch in read 3: %s != %s" %
            (pysam.qualities_to_qualitystring(self.reads[3].query_alignment_qualities),
             "<<<<<<<<<<<<<<<<<:<9/,&,22"))

    def testPresentOptionalFields(self):
        self.assertEqual(
            self.reads[0].opt('NM'), 22,
            "optional field mismatch in read 1, NM: %s != %s" %
            (self.reads[0].opt('NM'), 22))
        self.assertEqual(
            self.reads[0].opt('RG'), 'L1',
            "optional field mismatch in read 1, RG: %s != %s" %
            (self.reads[0].opt('RG'), 'L1'))
        self.assertEqual(
            self.reads[1].opt('RG'), 'L2',
            "optional field mismatch in read 2, RG: %s != %s" %
            (self.reads[1].opt('RG'), 'L2'))
        self.assertEqual(
            self.reads[1].opt('MF'), 18,
            "optional field mismatch in read 2, MF: %s != %s" %
            (self.reads[1].opt('MF'), 18))

    def testPairedBools(self):
        self.assertEqual(self.reads[0].is_paired, True, "is paired mismatch in read 1: %s != %s" % (
            self.reads[0].is_paired, True))
        self.assertEqual(self.reads[1].is_paired, True, "is paired mismatch in read 2: %s != %s" % (
            self.reads[1].is_paired, True))
        self.assertEqual(self.reads[0].is_proper_pair, True, "is proper pair mismatch in read 1: %s != %s" % (
            self.reads[0].is_proper_pair, True))
        self.assertEqual(self.reads[1].is_proper_pair, True, "is proper pair mismatch in read 2: %s != %s" % (
            self.reads[1].is_proper_pair, True))

    def testTags(self):
        self.assertEqual(self.reads[0].tags,
                         [('NM', 22), ('RG', 'L1'),
                          ('PG', 'P1'), ('XT', 'U')])
        self.assertEqual(self.reads[1].tags,
                         [('MF', 18), ('RG', 'L2'),
                          ('PG', 'P2'), ('XT', 'R')])

    def testOpt(self):
        self.assertEqual(self.reads[0].opt("XT"), "U")
        self.assertEqual(self.reads[1].opt("XT"), "R")


class BasicTestSAMFromFetch(BasicTestBAMFromFetch):

    def setUp(self):
        self.samfile = pysam.AlignmentFile(
            os.path.join(BAM_DATADIR, "ex3.sam"),
            "r")
        self.reads = list(self.samfile.fetch())


class BasicTestCRAMFromFetch(BasicTestBAMFromFetch):

    def setUp(self):
        self.samfile = pysam.AlignmentFile(
            os.path.join(BAM_DATADIR, "ex3.cram"),
            "rc")
        self.reads = list(self.samfile.fetch())

    def testTags(self):
        self.assertEqual(
            sorted(self.reads[0].tags),
            sorted([('RG', 'L1'),
                    ('NM', 22),
                    ('MD', '0C0T1G1C0C0A1G0^G0C1C1G1A0T2G0G0G0A1C1G1G1A2C0'),
                    ('PG', 'P1'),
                    ('XT', 'U'),
                    ]))
        self.assertEqual(
            sorted(self.reads[1].tags),
            sorted([('RG', 'L2'),
                    ('NM', 26),
                    ('MD',
                     '1G0A0A1G1G0G2C0A0G0A0A0C0T0T0G0A0A0G0A0C0A0A1T2C0T0T1'),
                    ('MF', 18),
                    ('PG', 'P2'),
                    ('XT', 'R')]))

    def testPresentOptionalFields(self):
        self.assertEqual(
            self.reads[0].opt('NM'), 22,
            "optional field mismatch in read 1, NM: %s != %s" %
            (self.reads[0].opt('NM'), 22))
        self.assertEqual(
            self.reads[0].opt('RG'), 'L1',
            "optional field mismatch in read 1, RG: %s != %s" %
            (self.reads[0].opt('RG'), 'L1'))
        self.assertEqual(
            self.reads[1].opt('RG'), 'L2',
            "optional field mismatch in read 2, RG: %s != %s" %
            (self.reads[1].opt('RG'), 'L2'))
        self.assertEqual(
            self.reads[1].opt('MF'), 18,
            "optional field mismatch in read 2, MF: %s != %s" %
            (self.reads[1].opt('MF'), 18))


class BasicTestSAMFromFilename(BasicTestBAMFromFetch):

    def setUp(self):
        self.samfile = pysam.AlignmentFile(
            os.path.join(BAM_DATADIR, "ex3.sam"),
            "r")
        self.reads = [r for r in self.samfile]


class BasicTestCRAMFromFilename(BasicTestCRAMFromFetch):

    def setUp(self):
        self.samfile = pysam.AlignmentFile(
            os.path.join(BAM_DATADIR, "ex3.cram"),
            "rc")
        self.reads = [r for r in self.samfile]


class BasicTestBAMFromFilename(BasicTestBAMFromFetch):

    def setUp(self):
        self.samfile = pysam.AlignmentFile(
            os.path.join(BAM_DATADIR, "ex3.bam"),
            "rb")
        self.reads = [r for r in self.samfile]


class BasicTestBAMFromFile(BasicTestBAMFromFetch):

    def setUp(self):
        with open(os.path.join(BAM_DATADIR, "ex3.bam")) as f:
            self.samfile = pysam.AlignmentFile(
                f, "rb")
        self.reads = [r for r in self.samfile]


class BasicTestBAMFromFileNo(BasicTestBAMFromFetch):

    def setUp(self):
        with open(os.path.join(BAM_DATADIR, "ex3.bam")) as f:
            self.samfile = pysam.AlignmentFile(
                f.fileno(), "rb")
        self.reads = [r for r in self.samfile]


class BasicTestSAMFromFile(BasicTestBAMFromFetch):

    def setUp(self):
        with open(os.path.join(BAM_DATADIR, "ex3.sam")) as f:
            self.samfile = pysam.AlignmentFile(
                f, "r")
        self.reads = [r for r in self.samfile]


class BasicTestSAMFromFileNo(BasicTestBAMFromFetch):

    def setUp(self):
        with open(os.path.join(BAM_DATADIR, "ex3.sam")) as f:
            self.samfile = pysam.AlignmentFile(
                f.fileno(), "r")
        self.reads = [r for r in self.samfile]


class BasicTestCRAMFromFile(BasicTestCRAMFromFetch):

    def setUp(self):
        with open(os.path.join(BAM_DATADIR, "ex3.cram")) as f:
            self.samfile = pysam.AlignmentFile(f, "rc")
        self.reads = [r for r in self.samfile]


class BasicTestCRAMFromFileNo(BasicTestCRAMFromFetch):

    def setUp(self):
        with open(os.path.join(BAM_DATADIR, "ex3.cram")) as f:
            self.samfile = pysam.AlignmentFile(
                f.fileno(), "rc")
        self.reads = [r for r in self.samfile]


class BasicTestSAMFromStringIO(BasicTestBAMFromFetch):

    def testRaises(self):
        statement = "samtools view -h {}".format(
            os.path.join(BAM_DATADIR, "ex3.bam"))
        stdout = subprocess.check_output(statement.split(" "))
        bam = StringIO()
        if sys.version_info.major >= 3:
            bam.write(stdout.decode('ascii'))
        else:
            bam.write(stdout)
        bam.seek(0)
        self.assertRaises(NotImplementedError,
                          pysam.AlignmentFile, bam)
        # self.reads = [r for r in samfile]


##################################################
#
# Test of basic File I/O
#
# * format conversions
# * reading with/without index
# * reading from closed files
#
##################################################
class TestIO(unittest.TestCase):

    '''check if reading samfile and writing a samfile
    are consistent.'''

    def checkEcho(self,
                  input_filename,
                  reference_filename,
                  output_filename,
                  input_mode,
                  output_mode,
                  sequence_filename=None,
                  use_template=True,
                  checkf=checkBinaryEqual,
                  **kwargs):
        '''iterate through *input_filename* writing to
        *output_filename* and comparing the output to
        *reference_filename*.

        The files are opened according to the *input_mode* and
        *output_mode*.

        If *use_template* is set, the header is copied from infile
        using the template mechanism, otherwise target names and
        lengths are passed explicitly.

        The *checkf* is used to determine if the files are
        equal.
        '''

        with pysam.AlignmentFile(
                os.path.join(BAM_DATADIR, input_filename),
                input_mode) as infile:

            if "b" in input_mode:
                self.assertTrue(infile.is_bam)
                self.assertFalse(infile.is_cram)
            elif "c" in input_mode:
                self.assertFalse(infile.is_bam)
                self.assertTrue(infile.is_cram)
            else:
                self.assertFalse(infile.is_cram)
                self.assertFalse(infile.is_bam)

            if use_template:
                outfile = pysam.AlignmentFile(
                    output_filename,
                    output_mode,
                    reference_filename=sequence_filename,
                    template=infile, **kwargs)
            else:
                outfile = pysam.AlignmentFile(
                    output_filename,
                    output_mode,
                    reference_names=infile.references,
                    reference_lengths=infile.lengths,
                    reference_filename=sequence_filename,
                    add_sq_text=False,
                    **kwargs)

            iter = infile.fetch()

            for x in iter:
                outfile.write(x)

            outfile.close()

        self.assertTrue(checkf(
            os.path.join(BAM_DATADIR, reference_filename),
            output_filename),
            "files %s and %s are not the same" %
            (reference_filename,
             output_filename))

        os.unlink(output_filename)

    def testSAM2SAM(self):
        self.checkEcho("ex2.sam",
                       "ex2.sam",
                       "tmp_ex2.sam",
                       "r", "wh")

    def testSAM2SAMWithoutHeader(self):
        self.checkEcho("ex2.sam",
                       "ex1.sam",
                       "tmp_ex2.sam",
                       "r", "w",
                       add_sam_header=False)

    def testBAM2BAM(self):
        self.checkEcho("ex2.bam",
                       "ex2.bam",
                       "tmp_ex2.bam",
                       "rb", "wb",
                       checkf=checkGZBinaryEqual)

    def testCRAM2CRAM(self):
        # in some systems different reference sequence paths might be
        # embedded in the CRAM files which will result in different headers
        # see #542
        self.checkEcho("ex2.cram",
                       "ex2.cram",
                       "tmp_ex2.cram",
                       "rc", "wc",
                       sequence_filename=os.path.join(BAM_DATADIR, "ex1.fa"),
                       checkf=partial(
                           check_samtools_view_equal,
                           without_header=True))

    def testSAM2BAM(self):
        self.checkEcho("ex2.sam",
                       "ex2.bam",
                       "tmp_ex2.bam",
                       "r", "wb",
                       checkf=checkGZBinaryEqual)

    def testBAM2SAM(self):
        self.checkEcho("ex2.bam",
                       "ex2.sam",
                       "tmp_ex2.sam",
                       "rb", "wh")

    def testBAM2CRAM(self):
        # ignore header (md5 sum)
        self.checkEcho("ex2.bam",
                       "ex2.cram",
                       "tmp_ex2.cram",
                       "rb", "wc",
                       sequence_filename=os.path.join(BAM_DATADIR, "ex1.fa"),
                       checkf=partial(
                           check_samtools_view_equal,
                           without_header=True))

    def testCRAM2BAM(self):
        # ignore header (md5 sum)
        self.checkEcho("ex2.cram",
                       "ex2.bam",
                       "tmp_ex2.bam",
                       "rc", "wb",
                       sequence_filename=os.path.join(BAM_DATADIR, "ex1.fa"),
                       checkf=partial(
                           check_samtools_view_equal,
                           without_header=True))

    def testSAM2CRAM(self):
        self.checkEcho("ex2.sam",
                       "ex2.cram",
                       "tmp_ex2.cram",
                       "r", "wc",
                       sequence_filename=os.path.join(BAM_DATADIR, "ex1.fa"),
                       checkf=partial(
                           check_samtools_view_equal,
                           without_header=True))

    def testCRAM2SAM(self):
        self.checkEcho("ex2.cram",
                       "ex2.sam",
                       "tmp_ex2.sam",
                       "rc", "wh",
                       sequence_filename=os.path.join(BAM_DATADIR, "ex1.fa"),
                       checkf=partial(
                           check_samtools_view_equal,
                           without_header=True))

    # Disabled - should work, files are not binary equal, but are
    # non-binary equal:
    # diff <(samtools view pysam_ex1.bam) <(samtools view pysam_data/ex1.bam)
    # def testReadWriteBamWithTargetNames(self):
    #     input_filename = "ex1.bam"
    #     output_filename = "pysam_ex1.bam"
    #     reference_filename = "ex1.bam"

    #     self.checkEcho(input_filename, reference_filename, output_filename,
    #                    "rb", "wb", use_template=False)

    def testReadSamWithoutTargetNames(self):
        '''see issue 104.'''
        input_filename = os.path.join(
            BAM_DATADIR,
            "example_unmapped_reads_no_sq.sam")

        # raise exception in default mode
        self.assertRaises(ValueError,
                          pysam.AlignmentFile,
                          input_filename,
                          "r")

        # raise exception if no SQ files
        self.assertRaises(ValueError,
                          pysam.AlignmentFile,
                          input_filename, "r",
                          check_header=True)

        with pysam.AlignmentFile(
                input_filename,
                check_header=False,
                check_sq=False) as infile:
            result = list(infile.fetch(until_eof=True))
            self.assertEqual(2, len(result))

    def testReadBamWithoutTargetNames(self):
        '''see issue 104.'''
        input_filename = os.path.join(
            BAM_DATADIR, "example_unmapped_reads_no_sq.bam")

        # raise exception in default mode
        self.assertRaises(ValueError,
                          pysam.AlignmentFile,
                          input_filename,
                          "r")

        # raise exception if no SQ files
        self.assertRaises(ValueError,
                          pysam.AlignmentFile,
                          input_filename,
                          "r",
                          check_header=True)

        with pysam.AlignmentFile(
                input_filename, check_sq=False) as infile:
            result = list(infile.fetch(until_eof=True))

    def test_fail_read_sam_without_header(self):
        input_filename = os.path.join(BAM_DATADIR, "ex1.sam")

        self.assertRaises(ValueError,
                          pysam.AlignmentFile,
                          input_filename,
                          "r")

    def test_pass_read_sam_without_header_with_refs(self):
        with pysam.AlignmentFile(os.path.join(BAM_DATADIR, "ex1.sam"),
                                 "r",
                                 reference_names=["chr1", "chr2"],
                                 reference_lengths=[1575, 1584]) as samfile:
            self.assertEqual(len(list(samfile.fetch(until_eof=True))), 3270)

    def test_pass_read_sam_with_header_without_header_check(self):
        with pysam.AlignmentFile(os.path.join(BAM_DATADIR, "ex2.sam"),
                                 "r", check_header=False) as samfile:
            self.assertEqual(len(list(samfile.fetch(until_eof=True))), 3270)

    def test_fail_when_reading_unformatted_files(self):
        '''test reading from a file that is not bam/sam formatted'''
        input_filename = os.path.join(BAM_DATADIR, 'Makefile')

        self.assertRaises(ValueError,
                          pysam.AlignmentFile,
                          input_filename,
                          "rb")

        self.assertRaises(ValueError,
                          pysam.AlignmentFile,
                          input_filename,
                          "r")

    def testBAMWithoutAlignedSegments(self):
        '''see issue 117'''
        input_filename = os.path.join(BAM_DATADIR, "test_unaligned.bam")
        samfile = pysam.AlignmentFile(input_filename,
                                      "rb",
                                      check_sq=False)
        samfile.fetch(until_eof=True)

    def testBAMWithShortBAI(self):
        '''see issue 116'''
        input_filename = os.path.join(BAM_DATADIR, "example_bai.bam")
        samfile = pysam.AlignmentFile(input_filename,
                                      "rb",
                                      check_sq=False)
        samfile.fetch('chr2')

    def testFetchFromClosedFile(self):

        samfile = pysam.AlignmentFile(
            os.path.join(BAM_DATADIR, "ex1.bam"),
            "rb")
        samfile.close()
        self.assertRaises(ValueError, samfile.fetch, 'chr1', 100, 120)

    def testFetchFromClosedFileObject(self):

        f = open(os.path.join(BAM_DATADIR, "ex1.bam"))
        samfile = pysam.AlignmentFile(f, "rb")
        f.close()
        self.assertTrue(f.closed)
        # access to Samfile still works
        self.checkEcho("ex1.bam",
                       "ex1.bam",
                       "tmp_ex1.bam",
                       "rb", "wb",
                       checkf=checkGZBinaryEqual)

        f = open(os.path.join(BAM_DATADIR, "ex1.bam"))
        samfile = pysam.AlignmentFile(f, "rb")
        self.assertFalse(f.closed)
        samfile.close()
        # python file needs to be closed separately
        self.assertFalse(f.closed)

    def test_accessing_attributes_in_closed_file_raises_errors(self):
        '''test that access to a closed samfile raises ValueError.'''

        samfile = pysam.AlignmentFile(os.path.join(BAM_DATADIR, "ex1.bam"),
                                      "rb")
        samfile.close()
            
        self.assertRaises(ValueError, samfile.fetch, 'chr1', 100, 120)
        self.assertRaises(ValueError, samfile.pileup, 'chr1', 100, 120)
        self.assertRaises(ValueError, samfile.getrname, 0)
        # TODO
        self.assertRaises(ValueError, samfile.tell)
        self.assertRaises(ValueError, samfile.seek, 0)
        self.assertRaises(ValueError, getattr, samfile, "nreferences")
        self.assertRaises(ValueError, getattr, samfile, "references")
        self.assertRaises(ValueError, getattr, samfile, "lengths")

        self.assertEqual(samfile.header, None)
        # write on closed file
        self.assertEqual(0, samfile.write(None))

    def test_header_available_after_closing_file(self):

        def load_bam():
            with pysam.AlignmentFile(os.path.join(BAM_DATADIR, "ex1.bam"), "rb") as inf:
                header = inf.header
            return header

        header = load_bam()
        self.assertTrue(header)
        self.assertEqual(header.nreferences, 2)
        self.assertEqual(header.references, ("chr1", "chr2"))

    def test_reference_name_available_after_closing_file(self):
        """read tids can be mapped to references after AlignmentFile has been closed.
        
        see issue #517"""
        
        def load_bam():
            with pysam.AlignmentFile(os.path.join(BAM_DATADIR, "ex1.bam"), "rb") as inf:
                read = next(inf)
            return read

        read = load_bam()
        self.assertEqual(read.reference_name, "chr1")
        
    # TOOD
    # def testReadingFromSamFileWithoutHeader(self):
    #     '''read from samfile without header.
    #     '''
    #     samfile = pysam.AlignmentFile(os.path.join(BAM_DATADIR, "ex7.sam"),
    #                             check_header=False,
    #                             check_sq=False)
    #     self.assertRaises(NotImplementedError, samfile.__iter__)

    def testReadingFromFileWithoutIndex(self):
        '''read from bam file without index.'''

        dest = get_temp_filename("tmp_ex2.bam")
        shutil.copyfile(os.path.join(BAM_DATADIR, "ex2.bam"),
                        dest)
        samfile = pysam.AlignmentFile(dest,
                                      "rb")
        self.assertRaises(ValueError, samfile.fetch)
        self.assertEqual(
            len(list(samfile.fetch(until_eof=True))),
            3270)
        os.unlink(dest)

    # def testReadingUniversalFileMode(self):
    #     '''read from samfile without header.
    #     '''

    #     input_filename = "ex2.sam"
    #     output_filename = "pysam_ex2.sam"
    #     reference_filename = "ex1.sam"

    #     self.checkEcho(input_filename,
    #                    reference_filename,
    #                    output_filename,
    #                    "rU", "w")

    def testHead(self):
        '''test IteratorRowHead'''
        samfile = pysam.AlignmentFile(os.path.join(BAM_DATADIR, "ex1.bam"),
                                      "rb")
        l10 = list(samfile.head(10))
        l100 = list(samfile.head(100))
        self.assertEqual(len(l10), 10)
        self.assertEqual(len(l100), 100)
        self.assertEqual(list(map(str, l10)),
                         list(map(str, l100[:10])))

    def testWriteUncompressedBAMFile(self):
        '''read from uncompressed BAM file, see issue #43'''

        input_filename = "ex2.sam"
        output_filename = "pysam_uncompressed.bam"
        reference_filename = "uncompressed.bam"

        self.checkEcho(input_filename,
                       reference_filename,
                       output_filename,
                       "r", "wb0")

        self.checkEcho(input_filename,
                       reference_filename,
                       output_filename,
                       "r", "wbu")

    def testEmptyBAM(self):
        samfile = pysam.Samfile(os.path.join(BAM_DATADIR, "empty.bam"),
                                "rb")
        self.assertEqual(samfile.mapped, 0)
        self.assertEqual(samfile.unmapped, 0)
        self.assertEqual(samfile.nocoordinate, 0)

    def testEmptyWithHeaderBAM(self):
        self.assertRaises(
            ValueError,
            pysam.Samfile,
            os.path.join(BAM_DATADIR, "example_empty_with_header.bam"),
            "rb")

        samfile = pysam.Samfile(
            os.path.join(BAM_DATADIR, "example_empty_with_header.bam"),
            "rb",
            check_sq=False)
        self.assertEqual(samfile.mapped, 0)
        self.assertEqual(samfile.unmapped, 0)
        self.assertEqual(samfile.nocoordinate, 0)
        self.assertEqual([], list(samfile.fetch()))

    def testOpenFromFilename(self):

        samfile = pysam.AlignmentFile(
            filename=os.path.join(BAM_DATADIR, "ex1.bam"),
            mode="rb")
        self.assertEqual(len(list(samfile.fetch())), 3270)

    def testBAMWithCSIIndex(self):
        '''see issue 116'''
        input_filename = os.path.join(BAM_DATADIR, "ex1_csi.bam")
        samfile = pysam.AlignmentFile(input_filename,
                                      "rb",
                                      check_sq=False)
        samfile.fetch('chr2')

    def test_fetch_by_tid(self):
        with pysam.AlignmentFile(os.path.join(BAM_DATADIR, "ex1.bam"), "rb") as samfile:
            self.assertEqual(len(list(samfile.fetch('chr1'))),
                             len(list(samfile.fetch(tid=0))))
            self.assertEqual(len(list(samfile.fetch('chr2'))),
                             len(list(samfile.fetch(tid=1))))
            self.assertRaises(
                IndexError,
                samfile.fetch,
                tid=2)
            self.assertRaises(
                IndexError,
                samfile.fetch,
                tid=-1)
            self.assertEqual(len(list(samfile.fetch('chr1', start=1000, end=2000))),
                             len(list(samfile.fetch(tid=0, start=1000, end=2000))))

    def test_write_bam_to_unknown_path_fails(self):
        '''see issue 116'''
        input_filename = os.path.join(BAM_DATADIR, "ex1.bam")
        with pysam.AlignmentFile(input_filename) as inf:
            self.assertRaises(IOError,
                              pysam.AlignmentFile,
                              "missing_directory/new_file.bam",
                              "wb",
                              template=inf)
        

class TestAutoDetect(unittest.TestCase):

    def testSAM(self):
        """test SAM autodetection."""

        with pysam.AlignmentFile(
                os.path.join(BAM_DATADIR, "ex3.sam")) as inf:
            self.assertFalse(inf.is_bam)
            self.assertFalse(inf.is_cram)

            self.assertRaises(ValueError, inf.fetch, 'chr1')

    def testBAM(self):
        """test BAM autodetection."""

        with pysam.AlignmentFile(
                os.path.join(BAM_DATADIR, "ex3.bam")) as inf:
            self.assertTrue(inf.is_bam)
            self.assertFalse(inf.is_cram)
            self.assertEqual(len(list(inf.fetch('chr1'))), 1)
            self.assertEqual(len(list(inf.fetch('chr2'))), 3)

    def testCRAM(self):
        """test CRAM autodetection."""

        with pysam.AlignmentFile(
                os.path.join(BAM_DATADIR, "ex3.cram")) as inf:
            self.assertFalse(inf.is_bam)
            self.assertTrue(inf.is_cram)
            self.assertEqual(len(list(inf.fetch('chr1'))), 1)
            self.assertEqual(len(list(inf.fetch('chr2'))), 3)


##################################################
#
# Random access iterator tests
#
##################################################
class TestIteratorRowBAM(unittest.TestCase):

    filename = os.path.join(BAM_DATADIR, "ex2.bam")
    mode = "rb"
    reference_filename = None

    def setUp(self):
        self.samfile = pysam.AlignmentFile(
            self.filename,
            self.mode,
            reference_filename=self.reference_filename,
        )

    def tearDown(self):
        self.samfile.close()

    def checkRange(self, rnge):
        '''compare results from iterator with those from samtools.'''
        ps = list(self.samfile.fetch(region=rnge))
        sa = force_str(
            pysam.samtools.view(
                self.filename,
                rnge,
                raw=True)).splitlines(True)
        self.assertEqual(
            len(ps), len(sa),
            "unequal number of results for range %s: %i != %i" %
            (rnge, len(ps), len(sa)))
        # check if the same reads are returned and in the same order
        for line, (a, b) in enumerate(list(zip(ps, sa))):
            d = b.split("\t")
            self.assertEqual(
                a.query_name, d[0],
                "line %i: read id mismatch: %s != %s" %
                (line, a.reference_id, d[0]))
            self.assertEqual(
                a.reference_start,
                int(d[3]) - 1,
                "line %i: read position mismatch: %s != %s, \n%s\n%s\n" %
                (line, a.reference_start, int(d[3]) - 1,
                 str(a), str(d)))
            qual = d[10]
            self.assertEqual(
                pysam.qualities_to_qualitystring(a.query_qualities),
                qual,
                "line %i: quality mismatch: %s != %s, \n%s\n%s\n" %
                (line, pysam.qualities_to_qualitystring(a.query_qualities), qual,
                 str(a), str(d)))

    def testIteratePerContig(self):
        '''check random access per contig'''
        for contig in self.samfile.references:
            self.checkRange(contig)

    def testIterateRanges(self):
        '''check random access per range'''
        for contig, length in zip(self.samfile.references,
                                  self.samfile.lengths):
            for start in range(1, length, 90):
                # this includes empty ranges
                self.checkRange("%s:%i-%i" %
                                (contig, start, start + 90))


class TestIteratorRowAllBAM(unittest.TestCase):

    filename = os.path.join(BAM_DATADIR, "ex2.bam")
    mode = "rb"

    def setUp(self):
        self.samfile = pysam.AlignmentFile(
            self.filename,
            self.mode)

    def testIterate(self):
        '''compare results from iterator with those from samtools.'''
        ps = list(self.samfile.fetch())
        sa = pysam.samtools.view(self.filename,
                                 raw=True).splitlines()
        self.assertEqual(
            len(ps), len(sa),
            "unequal number of results: %i != %i" %
            (len(ps), len(sa)))
        # check if the same reads are returned
        for line, pair in enumerate(list(zip(ps, sa))):
            data = force_str(pair[1]).split("\t")
            self.assertEqual(
                pair[0].query_name,
                data[0],
                "read id mismatch in line %i: %s != %s" %
                (line, pair[0].reference_id, data[0]))

    def tearDown(self):
        self.samfile.close()


class TestIteratorRowCRAM(TestIteratorRowBAM):
    filename = os.path.join(BAM_DATADIR, "ex2.cram")
    mode = "rc"


class TestIteratorRowCRAMWithReferenceFilename(TestIteratorRowCRAM):
    reference_filename = os.path.join(BAM_DATADIR, "ex1.fa")


# needs to be implemented
# class TestAlignedSegmentFromSamWithoutHeader(TestAlignedSegmentFromBam):
#
#     def setUp(self):
#         self.samfile=pysam.AlignmentFile( "ex7.sam","r" )
#         self.reads=list(self.samfile.fetch())


class TestFloatTagBug(unittest.TestCase):

    '''see issue 71'''

    def testFloatTagBug(self):
        '''a float tag before another exposed a parsing bug in bam_aux_get.

        Fixed in 0.1.19
        '''
        samfile = pysam.AlignmentFile(os.path.join(BAM_DATADIR, "tag_bug.bam"))
        read = next(samfile.fetch(until_eof=True))
        self.assertTrue(('XC', 1) in read.tags)
        self.assertEqual(read.opt('XC'), 1)


class TestLargeFieldBug(unittest.TestCase):

    '''see issue 100'''

    def testLargeFileBug(self):
        '''when creating a read with a large entry in the tag field
        causes an error:
            NotImplementedError: tags field too large
        '''
        samfile = pysam.AlignmentFile(
            os.path.join(BAM_DATADIR, "issue100.bam"))
        read = next(samfile.fetch(until_eof=True))
        new_read = pysam.AlignedSegment()
        new_read.tags = read.tags
        self.assertEqual(new_read.tags, read.tags)


class TestClipping(unittest.TestCase):

    def testClipping(self):

        self.samfile = pysam.AlignmentFile(
            os.path.join(BAM_DATADIR, "softclip.bam"),
            "rb")

        for read in self.samfile:

            if read.query_name == "r001":
                self.assertEqual(read.query_sequence, 'AAAAGATAAGGATA')
                self.assertEqual(read.query_alignment_sequence, 'AGATAAGGATA')
                self.assertEqual(pysam.qualities_to_qualitystring(read.query_qualities),
                                 None)
                self.assertEqual(
                    pysam.qualities_to_qualitystring(
                        read.query_alignment_qualities),
                    None)

            elif read.query_name == "r002":

                self.assertEqual(read.query_sequence, 'GCCTAAGCTAA')
                self.assertEqual(read.query_alignment_sequence, 'AGCTAA')
                self.assertEqual(
                    pysam.qualities_to_qualitystring(read.query_qualities),
                    '01234567890')
                self.assertEqual(
                    pysam.qualities_to_qualitystring(
                        read.query_alignment_qualities),
                    '567890')

            elif read.query_name == "r003":

                self.assertEqual(read.query_sequence, 'GCCTAAGCTAA')
                self.assertEqual(read.query_alignment_sequence, 'GCCTAA')
                self.assertEqual(
                    pysam.qualities_to_qualitystring(read.query_qualities),
                    '01234567890')
                self.assertEqual(
                    pysam.qualities_to_qualitystring(
                        read.query_alignment_qualities),
                    '012345')

            elif read.query_name == "r004":

                self.assertEqual(read.query_sequence, 'TAGGC')
                self.assertEqual(read.query_alignment_sequence, 'TAGGC')
                self.assertEqual(
                    pysam.qualities_to_qualitystring(read.query_qualities),
                    '01234')
                self.assertEqual(
                    pysam.qualities_to_qualitystring(
                        read.query_alignment_qualities),
                    '01234')


class TestUnmappedReadsRetrieval(unittest.TestCase):

    def test_fetch_from_sam_with_until_eof_reads_unmapped_reads(self):
        with pysam.AlignmentFile(os.path.join(BAM_DATADIR, "ex5.sam"),
                                 "rb") as samfile:
            self.assertEqual(len(list(samfile.fetch(until_eof=True))), 2)

    def test_fetch_from_bam_with_until_eof_reads_unmapped_reads(self):
        with pysam.AlignmentFile(os.path.join(BAM_DATADIR, "ex5.bam"),
                                 "rb") as samfile:
            self.assertEqual(len(list(samfile.fetch(until_eof=True))), 2)

    def test_fetch_with_asterisk_only_returns_unmapped_reads(self):
        with pysam.AlignmentFile(os.path.join(BAM_DATADIR, "test_mapped_unmapped.bam"),
                                 "rb") as samfile:
            self.assertEqual(len(list(samfile.fetch(region="*"))), 4)

    def test_fetch_with_asterisk_only_returns_unmapped_reads_by_contig(self):
        with pysam.AlignmentFile(os.path.join(BAM_DATADIR, "test_mapped_unmapped.bam"),
                                 "rb") as samfile:
            self.assertEqual(len(list(samfile.fetch(contig="*"))), 4)


class TestContextManager(unittest.TestCase):

    def testManager(self):
        with pysam.AlignmentFile(os.path.join(BAM_DATADIR, 'ex1.bam'),
                                 'rb') as samfile:
            samfile.fetch()
        self.assertEqual(samfile.closed, True)


class TestMultiThread(unittest.TestCase):

    def testSingleThreadEqualsMultithread(self):
        input_bam = os.path.join(BAM_DATADIR, 'ex1.bam')
        single_thread_out = get_temp_filename("tmp_single.bam")
        multi_thread_out = get_temp_filename("tmp_multi.bam")
        with pysam.AlignmentFile(input_bam,
                                 'rb') as samfile:
            reads = [r for r in samfile]
            with pysam.AlignmentFile(single_thread_out,
                                     mode='wb',
                                     template=samfile,
                                     threads=1) as single_out:
                [single_out.write(r) for r in reads]
            with pysam.AlignmentFile(multi_thread_out,
                                     mode='wb',
                                     template=samfile,
                                     threads=2) as multi_out:
                [single_out.write(r) for r in reads]
        with pysam.AlignmentFile(input_bam) as inp, \
            pysam.AlignmentFile(single_thread_out) as single, \
            pysam.AlignmentFile(multi_thread_out) as multi:
            for r1, r2, r3 in zip(inp, single, multi):
                assert r1.to_string == r2.to_string == r3.to_string

    def TestNoMultiThreadingWithIgnoreTruncation(self):
        self.assertRaises(
            ValueError, pysam.AlignmentFile(os.path.join(BAM_DATADIR, 'ex1.bam'),
                                            threads=2,
                                            ignore_truncation=True)
        )


class TestExceptions(unittest.TestCase):

    def setUp(self):
        self.samfile = pysam.AlignmentFile(os.path.join(BAM_DATADIR, "ex1.bam"),
                                           "rb")

    def testMissingFile(self):

        self.assertRaises(
            IOError, pysam.AlignmentFile, "exdoesntexist.bam", "rb")
        self.assertRaises(
            IOError, pysam.AlignmentFile, "exdoesntexist.sam", "r")
        self.assertRaises(
            IOError, pysam.AlignmentFile, "exdoesntexist.bam", "r")
        self.assertRaises(
            IOError, pysam.AlignmentFile, "exdoesntexist.sam", "rb")

    def testBadContig(self):
        self.assertRaises(ValueError, self.samfile.fetch, "chr88")

    def testMeaninglessCrap(self):
        self.assertRaises(ValueError, self.samfile.fetch, "skljf")

    def testBackwardsOrderNewFormat(self):
        self.assertRaises(ValueError, self.samfile.fetch, 'chr1', 100, 10)

    def testBackwardsOrderOldFormat(self):
        self.assertRaises(ValueError, self.samfile.fetch, region="chr1:100-10")

    def testOutOfRangeNegativeNewFormat(self):
        self.assertRaises(ValueError, self.samfile.fetch, "chr1", 5, -10)
        self.assertRaises(ValueError, self.samfile.fetch, "chr1", 5, 0)
        self.assertRaises(ValueError, self.samfile.fetch, "chr1", -5, -10)

        self.assertRaises(ValueError, self.samfile.count, "chr1", 5, -10)
        self.assertRaises(ValueError, self.samfile.count, "chr1", 5, 0)
        self.assertRaises(ValueError, self.samfile.count, "chr1", -5, -10)

    def testOutOfRangeNegativeOldFormat(self):
        self.assertRaises(ValueError, self.samfile.fetch, region="chr1:-5-10")
        self.assertRaises(ValueError, self.samfile.fetch, region="chr1:-5-0")
        self.assertRaises(ValueError, self.samfile.fetch, region="chr1:-5--10")

        self.assertRaises(ValueError, self.samfile.count, region="chr1:-5-10")
        self.assertRaises(ValueError, self.samfile.count, region="chr1:-5-0")
        self.assertRaises(ValueError, self.samfile.count, region="chr1:-5--10")

    def testOutOfRangNewFormat(self):
        self.assertRaises(
            ValueError, self.samfile.fetch, "chr1", 9999999999, 99999999999)
        self.assertRaises(
            ValueError, self.samfile.count, "chr1", 9999999999, 99999999999)

    def testOutOfRangeLargeNewFormat(self):
        self.assertRaises(ValueError, self.samfile.fetch, "chr1",
                          9999999999999999999999999999999, 9999999999999999999999999999999999999999)
        self.assertRaises(ValueError, self.samfile.count, "chr1",
                          9999999999999999999999999999999, 9999999999999999999999999999999999999999)

    def testOutOfRangeLargeOldFormat(self):
        self.assertRaises(
            ValueError, self.samfile.fetch, "chr1:99999999999999999-999999999999999999")
        self.assertRaises(
            ValueError, self.samfile.count, "chr1:99999999999999999-999999999999999999")

    def testZeroToZero(self):
        '''see issue 44'''
        self.assertEqual(len(list(self.samfile.fetch('chr1', 0, 0))), 0)

    def tearDown(self):
        self.samfile.close()


class TestWrongFormat(unittest.TestCase):

    '''test cases for opening files not in bam/sam format.'''

    def testOpenSamAsBam(self):
        self.assertRaises(ValueError,
                          pysam.AlignmentFile,
                          os.path.join(BAM_DATADIR, 'ex1.sam'),
                          'rb')

    def testOpenBamAsSam(self):
        # test fails, needs to be implemented.
        # sam.fetch() fails on reading, not on opening
        # self.assertRaises(ValueError, pysam.AlignmentFile,
        #                  os.path.join(BAM_DATADIR, 'ex1.bam'),
        #                  'r')
        pass

    def testOpenFastaAsSam(self):
        # test fails, needs to be implemented.
        # sam.fetch() fails on reading, not on opening
        # self.assertRaises( ValueError, pysam.AlignmentFile, 'ex1.fa', 'r' )
        pass

    def testOpenFastaAsBam(self):
        self.assertRaises(ValueError,
                          pysam.AlignmentFile,
                          os.path.join(BAM_DATADIR, 'ex1.fa'),
                          'rb')


class TestRegionParsing(unittest.TestCase):

    def test_dash_in_chr(self):
        with pysam.AlignmentFile(
                os.path.join(BAM_DATADIR, "example_dash_in_chr.bam")) as inf:
            self.assertEqual(len(list(inf.fetch(contig="chr-1"))), 1)
            self.assertEqual(len(list(inf.fetch(contig="chr2"))), 1)
            self.assertEqual(len(list(inf.fetch(region="chr-1"))), 1)
            self.assertEqual(len(list(inf.fetch(region="chr2"))), 1)


class TestDeNovoConstruction(unittest.TestCase):

    '''check BAM/SAM file construction using ex6.sam

    (note these are +1 coordinates):

    read_28833_29006_6945	99	chr1	33	20	10M1D25M	=	200	167	AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG	<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<	NM:i:1	RG:Z:L1
    read_28701_28881_323b	147	chr2	88	30	35M	=	500	412	ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA	<<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<	MF:i:18	RG:Z:L2
    '''  # noqa

    header = {'HD': {'VN': '1.0'},
              'SQ': [{'LN': 1575, 'SN': 'chr1'},
                     {'LN': 1584, 'SN': 'chr2'}], }

    bamfile = os.path.join(BAM_DATADIR, "ex6.bam")
    samfile = os.path.join(BAM_DATADIR, "ex6.sam")

    def setUp(self):

        header = pysam.AlignmentHeader.from_dict(self.header)

        a = pysam.AlignedSegment(header)
        a.query_name = "read_28833_29006_6945"
        a.query_sequence = "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG"
        a.flag = 99
        a.reference_id = 0
        a.reference_start = 32
        a.mapping_quality = 20
        a.cigartuples = ((0, 10), (2, 1), (0, 25))
        a.next_reference_id = 0
        a.next_reference_start = 199
        a.template_length = 167
        a.query_qualities = pysam.qualitystring_to_array(
            "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<")
        a.tags = (("NM", 1),
                  ("RG", "L1"))

        b = pysam.AlignedSegment(header)
        b.query_name = "read_28701_28881_323b"
        b.query_sequence = "ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA"
        b.flag = 147
        b.reference_id = 1
        b.reference_start = 87
        b.mapping_quality = 30
        b.cigartuples = ((0, 35), )
        b.next_reference_id = 1
        b.next_reference_start = 499
        b.template_length = 412
        b.query_qualities = pysam.qualitystring_to_array(
            "<<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<")
        b.tags = (("MF", 18),
                  ("RG", "L2"))

        self.reads = (a, b)

    # TODO
    # def testSAMWholeFile(self):

    #     tmpfilename = "tmp_%i.sam" % id(self)

    #     outfile = pysam.AlignmentFile(tmpfilename,
    #                             "wh",
    #                             header=self.header)

    #     for x in self.reads:
    #         outfile.write(x)
    #     outfile.close()
    #     self.assertTrue(checkBinaryEqual(tmpfilename, self.samfile),
    #                     "mismatch when construction SAM file, see %s %s" % (tmpfilename, self.samfile))

    #     os.unlink(tmpfilename)

    def test_pass_if_reads_binary_equal(self):
        '''check if individual reads are binary equal.'''
        infile = pysam.AlignmentFile(self.bamfile, "rb")

        references = list(infile)
        for denovo, reference in zip(references, self.reads):
            checkFieldEqual(self, reference, denovo)
            self.assertEqual(reference.compare(denovo), 0)

    # TODO
    # def testSAMPerRead(self):
    #     '''check if individual reads are binary equal.'''
    #     infile = pysam.AlignmentFile(self.samfile, "r")

    #     others = list(infile)
    #     for denovo, other in zip(others, self.reads):
    #         checkFieldEqual(self, other, denovo)
    #         self.assertEqual(other.compare(denovo), 0)

    def testBAMWholeFile(self):

        tmpfilename = "tmp_%i.bam" % id(self)

        outfile = pysam.AlignmentFile(tmpfilename, "wb", header=self.header)

        for x in self.reads:
            outfile.write(x)
        outfile.close()

        self.assertTrue(
            checkGZBinaryEqual(tmpfilename, self.bamfile),
            "mismatch when construction BAM file, see %s %s" %
            (tmpfilename, self.bamfile))

        os.unlink(tmpfilename)


class TestEmptyHeader(unittest.TestCase):
    '''see issue 84.'''

    def testEmptyHeader(self):
        s = pysam.AlignmentFile(os.path.join(BAM_DATADIR,
                                             'example_empty_header.bam'))
        self.assertEqual(s.header.to_dict(), {'SQ': [{'LN': 1000, 'SN': 'chr1'}]})

    def test_bam_without_seq_in_header(self):
        s = pysam.AlignmentFile(os.path.join(BAM_DATADIR, "example_no_seq_in_header.bam"))
        self.assertTrue("SQ" in s.header.to_dict())
        self.assertTrue("@SQ" in str(s.header))

    def test_bam_without_seq_with_null_bytes_in_header(self):
        s = pysam.AlignmentFile(os.path.join(BAM_DATADIR, "example_no_seq_in_header_null_bytes.bam"))
        self.assertTrue("SQ" in s.header.to_dict())
        self.assertTrue("@SQ" in str(s.header))


class TestMismatchingHeader(unittest.TestCase):
    '''see issue 716.'''

    def testMismatchingHeader(self):
        # Note: no chr2
        header = {
            'SQ': [{'SN': 'chr1', 'LN': 1575}],
            'PG': [{'ID': 'bwa', 'PN': 'bwa', 'VN': '0.7.15', 'CL': 'bwa mem xx -'}],
        }

        dest = get_temp_filename("tmp_ex3.bam")
        with pysam.AlignmentFile(os.path.join(BAM_DATADIR, 'ex3.bam')) as inf:
            with pysam.AlignmentFile(dest, mode="wb", header=header) as outf:
                for read in inf:
                    if read.reference_name == "chr1":
                        outf.write(read)
                    else:
                        self.assertRaises(ValueError,
                                          outf.write,
                                          read)
        os.unlink(dest)


class TestHeaderWithProgramOptions(unittest.TestCase):

    '''see issue 39.'''

    def testHeader(self):
        s = pysam.AlignmentFile(os.path.join(BAM_DATADIR,
                                             'rg_with_tab.bam'))
        self.assertEqual(
            s.header.to_dict(),
            {'SQ': [{'LN': 1575, 'SN': 'chr1'},
                    {'LN': 1584, 'SN': 'chr2'}],
             'PG': [{'PN': 'bwa',
                     'ID': 'bwa',
                     'VN': '0.7.9a-r786',
                     'CL': 'bwa mem -p -t 8 -M -R '
                     '@RG ID:None SM:None /mnt/data/hg19.fa '
                     '/mnt/analysis/default-0.fastq'}]})


class TestTruncatedBAM(unittest.TestCase):

    '''see pull request 50.'''

    def testTruncatedBam2(self):
        self.assertRaises(IOError,
                          pysam.AlignmentFile,
                          os.path.join(BAM_DATADIR, 'ex2_truncated.bam'))

    def testTruncatedBamIterator(self):
        s = pysam.AlignmentFile(os.path.join(BAM_DATADIR, 'ex2_truncated.bam'),
                                ignore_truncation=True)

        def iterall(x):
            return len([a for a in x])
        self.assertRaises(IOError, iterall, s)


COMPARE_BTAG = [100, 1, 91, 0, 7, 101, 0, 201, 96, 204,
                0, 0, 87, 109, 0, 7, 97, 112, 1, 12, 78,
                197, 0, 7, 100, 95, 101, 202, 0, 6, 0, 1,
                186, 0, 84, 0, 244, 0, 0, 324, 0, 107, 195,
                101, 113, 0, 102, 0, 104, 3, 0, 101, 1, 0,
                212, 6, 0, 0, 1, 0, 74, 1, 11, 0, 196, 2,
                197, 103, 0, 108, 98, 2, 7, 0, 1, 2, 194,
                0, 180, 0, 108, 0, 203, 104, 16, 5, 205,
                0, 0, 0, 1, 1, 100, 98, 0, 0, 204, 6, 0,
                79, 0, 0, 101, 7, 109, 90, 265, 1, 27, 10,
                109, 102, 9, 0, 292, 0, 110, 0, 0, 102,
                112, 0, 0, 84, 100, 103, 2, 81, 126, 0, 2,
                90, 0, 15, 96, 15, 1, 0, 2, 0, 107, 92, 0,
                0, 101, 3, 98, 15, 102, 13, 116, 116, 90, 93,
                198, 0, 0, 0, 199, 92, 26, 495, 100, 5, 0,
                100, 5, 209, 0, 92, 107, 90, 0, 0, 0, 0, 109,
                194, 7, 94, 200, 0, 40, 197, 0, 11, 0, 0, 112,
                110, 6, 4, 200, 28, 0, 196, 0, 203, 1, 129,
                0, 0, 1, 0, 94, 0, 1, 0, 107, 5, 201, 3, 3, 100,
                0, 121, 0, 7, 0, 1, 105, 306, 3, 86, 8, 183, 0,
                12, 163, 17, 83, 22, 0, 0, 1, 8, 109, 103, 0, 0,
                295, 0, 200, 16, 172, 3, 16, 182, 3, 11, 0, 0,
                223, 111, 103, 0, 5, 225, 0, 95]


class TestBTagSam(unittest.TestCase):

    '''see issue 81.'''

    compare = [COMPARE_BTAG,
               [-100, 200, -300, -400],
               [-100, 12],
               [12, 15],
               [-1.0, 5.0, 2.5]]

    filename = os.path.join(BAM_DATADIR, 'example_btag.sam')

    read0 = [('RG', 'QW85I'),
             ('PG', 'tmap'),
             ('MD', '140'),
             ('NM', 0),
             ('AS', 140),
             ('FZ', array.array('H', COMPARE_BTAG)),
             ('XA', 'map2-1'),
             ('XS', 53),
             ('XT', 38),
             ('XF', 1),
             ('XE', 0)]

    def testReadTags(self):

        s = pysam.AlignmentFile(self.filename)
        for x, read in enumerate(s):
            tags = read.tags
            if x == 0:
                self.assertEqual(tags, self.read0)

            fz = list(dict(tags)["FZ"])
            self.assertEqual(fz, self.compare[x])
            self.assertEqual(list(read.opt("FZ")), self.compare[x])
            self.assertEqual(tags, read.get_tags())
            for tag, value in tags:
                self.assertEqual(value, read.get_tag(tag))

    def testReadWriteTags(self):

        s = pysam.AlignmentFile(self.filename)
        for read in s:
            before = read.tags
            read.tags = before
            self.assertEqual(read.tags, before)

            read.set_tags(before)
            self.assertEqual(read.tags, before)

            for tag, value in before:
                read.set_tag(tag, value)
                self.assertEqual(value, read.get_tag(tag))


class TestBTagBam(TestBTagSam):
    filename = os.path.join(BAM_DATADIR, 'example_btag.bam')


class TestDoubleFetchBAM(unittest.TestCase):
    '''check if two iterators on the same bamfile are independent.'''

    filename = os.path.join(BAM_DATADIR, 'ex1.bam')
    mode = "rb"

    def testDoubleFetch(self):

        with pysam.AlignmentFile(self.filename, self.mode) as samfile1:
            for a, b in zip(samfile1.fetch(multiple_iterators=True),
                            samfile1.fetch(multiple_iterators=True)):
                self.assertEqual(a.compare(b), 0)

    def testDoubleFetchWithRegionTrueTrue(self):

        with pysam.AlignmentFile(self.filename, self.mode) as samfile1:
            contig, start, stop = 'chr2', 200, 3000000
            # just making sure the test has something to catch
            self.assertTrue(len(list(samfile1.fetch(contig, start, stop))) > 0)

            for a, b in zip(samfile1.fetch(contig, start, stop,
                                           multiple_iterators=True),
                            samfile1.fetch(contig, start, stop,
                                           multiple_iterators=True)):
                self.assertEqual(a.compare(b), 0)

    def testDoubleFetchWithRegionFalseTrue(self):
        with pysam.AlignmentFile(self.filename, self.mode) as samfile1:
            contig, start, stop = 'chr2', 200, 3000000
            # just making sure the test has something to catch
            self.assertTrue(len(list(samfile1.fetch(contig, start, stop))) > 0)
        
            for a, b in zip(samfile1.fetch(contig, start, stop,
                                           multiple_iterators=False),
                            samfile1.fetch(contig, start, stop,
                                           multiple_iterators=True)):
                self.assertEqual(a.compare(b), 0)

    def testDoubleFetchWithRegionTrueFalse(self):
        with pysam.AlignmentFile(self.filename, self.mode) as samfile1:
            contig, start, stop = 'chr2', 200, 3000000
            # just making sure the test has something to catch
            self.assertTrue(len(list(samfile1.fetch(contig, start, stop))) > 0)
        
            for a, b in zip(samfile1.fetch(contig, start, stop,
                                           multiple_iterators=True),
                            samfile1.fetch(contig, start, stop,
                                           multiple_iterators=False)):
                self.assertEqual(a.compare(b), 0)
                
    def testDoubleFetchUntilEOF(self):

        with pysam.AlignmentFile(self.filename, self.mode) as samfile1:

            for a, b in zip(samfile1.fetch(until_eof=True),
                            samfile1.fetch(until_eof=True,
                                           multiple_iterators=True)):
                self.assertEqual(a.compare(b), 0)


class TestDoubleFetchCRAM(TestDoubleFetchBAM):
    filename = os.path.join(BAM_DATADIR, 'ex2.cram')
    mode = "rc"


class TestDoubleFetchCRAMWithReference(TestDoubleFetchBAM):
    filename = os.path.join(BAM_DATADIR, 'ex2.cram')
    mode = "rc"
    reference_filename = os.path.join(BAM_DATADIR, 'ex1.fa')


class TestRemoteFileFTP(unittest.TestCase):

    '''test remote access.

    '''

    # Need to find an ftp server without password on standard
    # port.

    url = "ftp://ftp.sanger.ac.uk/pub/rd/humanSequences/CV.bam"
    region = "1:1-1000"

    def testFTPView(self):
        return
        if not check_url(self.url):
            return

        result = pysam.samtools.view(self.url, self.region)
        self.assertEqual(len(result), 36)

    def testFTPFetch(self):
        return
        if not check_url(self.url):
            return

        samfile = pysam.AlignmentFile(self.url, "rb")
        result = list(samfile.fetch(region=self.region))
        self.assertEqual(len(result), 36)


class TestRemoteFileHTTP(unittest.TestCase):

    url = "http://genserv.anat.ox.ac.uk/downloads/pysam/test/ex1.bam"
    region = "chr1:1-1000"
    local = os.path.join(BAM_DATADIR, "ex1.bam")

    def testView(self):
        if not check_url(self.url):
            return

        samfile_local = pysam.AlignmentFile(self.local, "rb")
        ref = list(samfile_local.fetch(region=self.region))

        result = pysam.samtools.view(self.url, self.region)
        self.assertEqual(len(result.splitlines()), len(ref))

    def testFetch(self):
        if not check_url(self.url):
            return

        with pysam.AlignmentFile(self.url, "rb") as samfile:
            result = list(samfile.fetch(region=self.region))

        with pysam.AlignmentFile(self.local, "rb") as samfile_local:
            ref = list(samfile_local.fetch(region=self.region))

        self.assertEqual(len(ref), len(result))
        for x, y in zip(result, ref):
            self.assertEqual(x.compare(y), 0)

    def testFetchAll(self):
        if not check_url(self.url):
            return

        with pysam.AlignmentFile(self.url, "rb") as samfile:
            result = list(samfile.fetch())

        with pysam.AlignmentFile(self.local, "rb") as samfile_local:
            ref = list(samfile_local.fetch())

        self.assertEqual(len(ref), len(result))
        for x, y in zip(result, ref):
            self.assertEqual(x.compare(y), 0)


class TestLargeOptValues(unittest.TestCase):

    ints = (65536, 214748, 2147484, 2147483647)
    floats = (65536.0, 214748.0, 2147484.0)

    def check(self, samfile):

        i = samfile.fetch()
        for exp in self.ints:
            rr = next(i)
            obs = rr.opt("ZP")
            self.assertEqual(exp, obs,
                             "expected %s, got %s\n%s" %
                             (str(exp), str(obs), str(rr)))

        for exp in [-x for x in self.ints]:
            rr = next(i)
            obs = rr.opt("ZP")
            self.assertEqual(exp, obs,
                             "expected %s, got %s\n%s" %
                             (str(exp), str(obs), str(rr)))

        for exp in self.floats:
            rr = next(i)
            obs = rr.opt("ZP")
            self.assertEqual(exp, obs,
                             "expected %s, got %s\n%s" %
                             (str(exp), str(obs), str(rr)))

        for exp in [-x for x in self.floats]:
            rr = next(i)
            obs = rr.opt("ZP")
            self.assertEqual(exp, obs, "expected %s, got %s\n%s" %
                             (str(exp), str(obs), str(rr)))

    def testSAM(self):
        samfile = pysam.AlignmentFile(
            os.path.join(BAM_DATADIR, "ex10.sam"),
            "r")
        self.check(samfile)

    def testBAM(self):
        samfile = pysam.AlignmentFile(
            os.path.join(BAM_DATADIR, "ex10.bam"),
            "rb")
        self.check(samfile)


class TestCountCoverage(unittest.TestCase):

    samfilename = os.path.join(BAM_DATADIR, "ex1.bam")
    fastafilename = os.path.join(BAM_DATADIR, "ex1.fa")

    def setUp(self):

        self.fastafile = pysam.Fastafile(self.fastafilename)
        self.tmpfilename = get_temp_filename(".bam")

        with pysam.AlignmentFile(self.samfilename) as inf:
            with pysam.AlignmentFile(
                    self.tmpfilename,
                    'wb',
                    template=inf) as outf:
                for ii, read in enumerate(inf.fetch()):
                    # if ii % 2 == 0: # setting BFUNMAP makes no sense...
                        #read.flag = read.flag | 0x4
                    if ii % 3 == 0:
                        read.flag = read.flag | 0x100
                    if ii % 5 == 0:
                        read.flag = read.flag | 0x200
                    if ii % 7 == 0:
                        read.flag = read.flag | 0x400
                    outf.write(read)
        pysam.samtools.index(self.tmpfilename)

    def tearDown(self):
        self.fastafile.close()
        os.unlink(self.tmpfilename)
        os.unlink(self.tmpfilename + ".bai")

    def count_coverage_python(self,
                              bam, chrom, start, stop,
                              read_callback,
                              quality_threshold=15):
        stop = min(stop, bam.get_reference_length(chrom))
        l = stop - start
        count_a = array.array('L', [0] * l)
        count_c = array.array('L', [0] * l)
        count_g = array.array('L', [0] * l)
        count_t = array.array('L', [0] * l)
        for p in bam.pileup(chrom, start, stop,
                            truncate=True,
                            stepper='nofilter',
                            min_base_quality=quality_threshold,
                            ignore_overlaps=False):
            rpos = p.reference_pos - start
            for read in p.pileups:
                if not read.is_del and not read.is_refskip and \
                   read_callback(read.alignment):
                    try:
                        if read.alignment.query_qualities[read.query_position] \
                           >= quality_threshold:
                            letter = read.alignment.query[read.query_position]
                            if letter == 'A':
                                count_a[rpos] += 1
                            elif letter == 'C':
                                count_c[rpos] += 1
                            elif letter == 'G':
                                count_g[rpos] += 1
                            elif letter == 'T':
                                count_t[rpos] += 1
                    except IndexError:
                        pass
        return count_a, count_c, count_g, count_t

    def test_count_coverage_with_coordinates_works(self):

        with pysam.AlignmentFile(self.samfilename) as inf:
            c = inf.count_coverage("chr1")
            self.assertEqual(len(c[0]), inf.get_reference_length("chr1"))
            self.assertEqual(len(c[0]), 1575)

            c = inf.count_coverage("chr1", 100)
            self.assertEqual(len(c[0]), inf.get_reference_length("chr1") - 100)

            c = inf.count_coverage("chr1", 100, 200)
            self.assertEqual(len(c[0]), 200 - 100)
            
            c = inf.count_coverage("chr1", None, 200)
            self.assertEqual(len(c[0]), 200)

            c = inf.count_coverage("chr1", None, inf.get_reference_length("chr1") + 10000)
            self.assertEqual(len(c[0]), inf.get_reference_length("chr1"))

    def test_count_coverage_without_coordinates_fails(self):
        with pysam.AlignmentFile(self.samfilename) as inf:
            self.assertRaises(TypeError, inf.count_coverage)

    def test_count_coverage_wrong_coordinates_fails(self):
        with pysam.AlignmentFile(self.samfilename) as inf:
            self.assertRaises(ValueError, inf.count_coverage, "chr1", 200, 100)
            self.assertRaises(KeyError, inf.count_coverage, "chrUnknown", 100, 200)
            
    def test_counting_the_same_region_works(self):

        with pysam.AlignmentFile(self.samfilename) as inf:
            c1 = inf.count_coverage("chr1")
            c2 = inf.count_coverage("chr1")
            self.assertEqual(c1, c2)
            
    def test_count_coverage_counts_as_expected(self):
        chrom = 'chr1'
        start = 0
        stop = 1000

        with pysam.AlignmentFile(self.samfilename) as inf:
            manual_counts = self.count_coverage_python(
                inf, chrom, start, stop,
                lambda read: True,
                quality_threshold=0)

            fast_counts = inf.count_coverage(
                chrom, start, stop,
                read_callback=lambda read: True,
                quality_threshold=0)

        self.assertEqual(list(fast_counts[0]), list(manual_counts[0]))
        self.assertEqual(list(fast_counts[1]), list(manual_counts[1]))
        self.assertEqual(list(fast_counts[2]), list(manual_counts[2]))
        self.assertEqual(list(fast_counts[3]), list(manual_counts[3]))

    def test_count_coverage_quality_filter(self):
        chrom = 'chr1'
        start = 0
        stop = 1000
        with pysam.AlignmentFile(self.samfilename) as inf:
            manual_counts = self.count_coverage_python(
                inf, chrom, start, stop,
                lambda read: True,
                quality_threshold=0)
            fast_counts = inf.count_coverage(
                chrom, start, stop,
                read_callback=lambda read: True,
                quality_threshold=15)
            
        # we filtered harder, should be less
        for i in range(4):
            for r in range(start, stop):
                self.assertTrue(fast_counts[i][r] <= manual_counts[i][r])

    def test_count_coverage_read_callback(self):
        chrom = 'chr1'
        start = 0
        stop = 1000
        with pysam.AlignmentFile(self.samfilename) as inf:
            manual_counts = self.count_coverage_python(
                inf, chrom, start, stop,
                lambda read: read.flag & 0x10,
                quality_threshold=0)
            fast_counts = inf.count_coverage(
                chrom, start, stop,
                read_callback=lambda read: True,
                quality_threshold=0)
            
            for i in range(4):
                for r in range(start, stop):
                    self.assertTrue(fast_counts[i][r] >= manual_counts[i][r])
                    
            fast_counts = inf.count_coverage(
                chrom, start, stop,
                read_callback=lambda read: read.flag & 0x10,
                quality_threshold=0)

        self.assertEqual(fast_counts[0], manual_counts[0])
        self.assertEqual(fast_counts[1], manual_counts[1])
        self.assertEqual(fast_counts[2], manual_counts[2])
        self.assertEqual(fast_counts[3], manual_counts[3])

    def test_count_coverage_read_all(self):

        chrom = 'chr1'
        start = 0
        stop = 1000

        def filter(read):
            return not (read.flag & (0x4 | 0x100 | 0x200 | 0x400))

        with pysam.AlignmentFile(self.tmpfilename) as samfile:
            fast_counts = samfile.count_coverage(
                chrom, start, stop,
                read_callback='all',
                # read_callback = lambda read: ~(read.flag & (0x4 | 0x100 | 0x200 | 0x400)),
                quality_threshold=0)
            manual_counts = samfile.count_coverage(
                chrom, start, stop,
                read_callback=lambda read: not(
                    read.flag & (0x4 | 0x100 | 0x200 | 0x400)),
                quality_threshold=0)

        self.assertEqual(fast_counts[0], manual_counts[0])
        self.assertEqual(fast_counts[1], manual_counts[1])
        self.assertEqual(fast_counts[2], manual_counts[2])
        self.assertEqual(fast_counts[3], manual_counts[3])

    def test_count_coverage_nofilter(self):

        with pysam.AlignmentFile(self.samfilename) as inf:
            with pysam.AlignmentFile(
                    self.tmpfilename, 'wb', template=inf) as outf:
        
                for ii, read in enumerate(inf.fetch()):
                    # if ii % 2 == 0: # setting BFUNMAP makes no sense...
                    # read.flag = read.flag | 0x4
                    if ii % 3 == 0:
                        read.flag = read.flag | 0x100
                    if ii % 5 == 0:
                        read.flag = read.flag | 0x200
                    if ii % 7 == 0:
                        read.flag = read.flag | 0x400
                    outf.write(read)
        
        pysam.samtools.index(self.tmpfilename)
        chr = 'chr1'
        start = 0
        stop = 1000

        with pysam.AlignmentFile(self.tmpfilename) as inf:
            fast_counts = inf.count_coverage(chr, start, stop,
                                             read_callback='nofilter',
                                             quality_threshold=0)

            manual_counts = self.count_coverage_python(inf, chr, start, stop,
                                                       read_callback=lambda x: True,
                                                       quality_threshold=0)

        self.assertEqual(fast_counts[0], manual_counts[0])
        self.assertEqual(fast_counts[1], manual_counts[1])
        self.assertEqual(fast_counts[2], manual_counts[2])
        self.assertEqual(fast_counts[3], manual_counts[3])
        

class TestFindIntrons(unittest.TestCase):
    samfilename = os.path.join(BAM_DATADIR, "ex_spliced.bam")

    def setUp(self):
        self.samfile = pysam.AlignmentFile(self.samfilename)

    def tearDown(self):
        self.samfile.close()

    def test_total(self):
        all_read_counts = self.samfile.count()
        splice_sites = self.samfile.find_introns(self.samfile.fetch())
        # there is a single unspliced read in there
        self.assertEqual(sum(splice_sites.values()), all_read_counts - 1)

    def test_first(self):
        reads = list(self.samfile.fetch())[:10]
        splice_sites = self.samfile.find_introns(reads)
        starts = [14792 + 38 - 1]
        stops = [14792 + 38 + 140 - 1]
        self.assertEqual(len(splice_sites), 1)
        self.assertTrue((starts[0], stops[0]) in splice_sites)
        # first one is the unspliced read
        self.assertEqual(splice_sites[(starts[0], stops[0])], 9)

    def test_all(self):
        reads = list(self.samfile.fetch())
        splice_sites = self.samfile.find_introns(reads)
        should = collections.Counter({
            (14829, 14969): 33,
            (15038, 15795): 24,
            (15947, 16606): 3,
            (16765, 16857): 9,
            (16765, 16875): 1,
            (17055, 17232): 19,
            (17055, 17605): 3,
            (17055, 17914): 1,
            (17368, 17605): 7,
        })
        self.assertEqual(should, splice_sites)


class TestLogging(unittest.TestCase):

    '''test around bug issue 42,

    failed in versions < 0.4
    '''

    def check(self, bamfile, log):

        if log:
            logger = logging.getLogger('franklin')
            logger.setLevel(logging.INFO)
            formatter = logging.Formatter(
                '%(asctime)s %(levelname)s %(message)s')
            log_hand = logging.FileHandler('log.txt')
            log_hand.setFormatter(formatter)
            logger.addHandler(log_hand)

        with pysam.AlignmentFile(bamfile, 'rb') as bam:
            cols = bam.pileup()
        self.assertTrue(True)

    def tearDown(self):
        if os.path.exists('log.txt'):
            os.unlink('log.txt')

    def testFail1(self):
        self.check(os.path.join(BAM_DATADIR, "ex9_fail.bam"),
                   False)
        self.check(os.path.join(BAM_DATADIR, "ex9_fail.bam"),
                   True)

    def testNoFail1(self):
        self.check(os.path.join(BAM_DATADIR, "ex9_nofail.bam"),
                   False)
        self.check(os.path.join(BAM_DATADIR, "ex9_nofail.bam"),
                   True)

    def testNoFail2(self):
        self.check(os.path.join(BAM_DATADIR, "ex9_nofail.bam"),
                   True)
        self.check(os.path.join(BAM_DATADIR, "ex9_nofail.bam"),
                   True)

# TODOS
# 1. finish testing all properties within pileup objects
# 2. check exceptions and bad input problems (missing files, optional fields that aren't present, etc...)
# 3. check: presence of sequence


class TestAlignmentFileUtilityFunctions(unittest.TestCase):

    def testCount(self):

        with pysam.AlignmentFile(
                os.path.join(BAM_DATADIR, "ex1.bam"),
                "rb") as samfile:

            for contig in ("chr1", "chr2"):
                for start in range(0, 2000, 100):
                    end = start + 1
                    self.assertEqual(
                        len(list(samfile.fetch(contig, start, end))),
                        samfile.count(contig, start, end),
                        'number mismatch for %s:%i-%i %i != %i' % (
                            contig, start, end,
                            len(list(samfile.fetch(contig, start, end))),
                            samfile.count(contig, start, end)))

                    # test empty intervals
                    self.assertEqual(
                        len(list(samfile.fetch(contig, start, start))),
                        samfile.count(contig, start, start),
                        'number mismatch for %s:%i-%i %i != %i' % (
                            contig, start, start,
                            len(list(samfile.fetch(contig, start, start))),
                            samfile.count(contig, start, start)))

                    # test half empty intervals
                    self.assertEqual(len(list(samfile.fetch(contig, start))),
                                     samfile.count(contig, start))

                    self.assertEqual(
                        len(list(samfile.fetch(contig, start))),
                        samfile.count(contig, start),
                        'number mismatch for %s:%i %i != %i' % (
                            contig, start,
                            len(list(samfile.fetch(contig, start))),
                            samfile.count(contig, start)))

    def testMate(self):
        '''test mate access.'''

        with open(os.path.join(BAM_DATADIR, "ex1.sam"), "rb") as inf:
            readnames = [x.split(b"\t")[0] for x in inf.readlines()]
        if sys.version_info[0] >= 3:
            readnames = [name.decode('ascii') for name in readnames]

        counts = collections.defaultdict(int)
        for x in readnames:
            counts[x] += 1

        with pysam.AlignmentFile(os.path.join(BAM_DATADIR, "ex1.bam"),
                                 "rb") as samfile:

            for read in samfile.fetch():
                if not read.is_paired:
                    self.assertRaises(ValueError, samfile.mate, read)
                elif read.mate_is_unmapped:
                    self.assertRaises(ValueError, samfile.mate, read)
                else:
                    if counts[read.query_name] == 1:
                        self.assertRaises(ValueError, samfile.mate, read)
                    else:
                        mate = samfile.mate(read)
                        self.assertEqual(read.query_name, mate.query_name)
                        self.assertEqual(read.is_read1, mate.is_read2)
                        self.assertEqual(read.is_read2, mate.is_read1)
                        self.assertEqual(
                            read.reference_start, mate.next_reference_start)
                        self.assertEqual(
                            read.next_reference_start, mate.reference_start)

    def testIndexStats(self):
        '''test if total number of mapped/unmapped reads is correct.'''

        with pysam.AlignmentFile(os.path.join(BAM_DATADIR, "ex1.bam"),
                                 "rb") as samfile:
            self.assertEqual(samfile.mapped, 3235)
            self.assertEqual(samfile.unmapped, 35)
            self.assertEqual(samfile.nocoordinate, 0)


class TestMappedUnmapped(unittest.TestCase):
    filename = "test_mapped_unmapped.bam"

    def test_counts_of_mapped_and_unmapped_are_correct(self):

        with pysam.AlignmentFile(os.path.join(BAM_DATADIR,
                                              self.filename)) as inf:
            unmapped_flag = 0
            unmapped_nopos = 0
            mapped_flag = 0
            for x in inf.fetch(until_eof=True):
                if x.is_unmapped:
                    if x.reference_id < 0:
                        unmapped_nopos += 1
                    else:
                        unmapped_flag += 1
                else:
                    mapped_flag += 1

            self.assertEqual(inf.mapped, mapped_flag)
            self.assertEqual(inf.unmapped, unmapped_flag + unmapped_nopos)

            inf.reset()
            self.assertEqual(inf.count(),
                             inf.mapped + unmapped_flag)

            inf.reset()
            self.assertEqual(inf.count(until_eof=True),
                             inf.mapped + unmapped_flag + unmapped_nopos)

            inf.reset()
            self.assertEqual(inf.count(read_callback="all"),
                             inf.mapped)

            inf.reset()
            self.assertEqual(inf.count(until_eof=True, read_callback="all"),
                             inf.mapped)

    def test_counts_of_mapped_and_unmapped_are_correct_per_chromosome(self):

        with pysam.AlignmentFile(os.path.join(BAM_DATADIR,
                                              self.filename)) as inf:

            counts = inf.get_index_statistics()

            counts_contigs = [x.contig for x in counts]
            self.assertEqual(sorted(counts_contigs),
                             sorted(inf.references))

            for contig in inf.references:
                unmapped_flag = 0
                unmapped_nopos = 0
                mapped_flag = 0
                for x in inf.fetch(contig=contig):
                    if x.is_unmapped:
                        unmapped_flag += 1
                    else:
                        mapped_flag += 1

                cc = [c for c in counts if c.contig == contig][0]
                self.assertEqual(cc.mapped, mapped_flag)
                self.assertEqual(cc.unmapped, unmapped_flag)
                self.assertEqual(cc.total, mapped_flag + unmapped_flag)


class TestSamtoolsProxy(unittest.TestCase):

    '''tests for sanity checking access to samtools functions.'''

    def testIndex(self):
        self.assertRaises(IOError, pysam.samtools.index, "missing_file")

    def testView(self):
        # note that view still echos "open: No such file or directory"
        self.assertRaises(pysam.SamtoolsError,
                          pysam.samtools.view,
                          "missing_file")

    def testSort(self):
        self.assertRaises(pysam.SamtoolsError,
                          pysam.samtools.sort,
                          "missing_file")


class TestAlignmentFileIndex(unittest.TestCase):

    def testIndex(self):
        samfile = pysam.AlignmentFile(
            os.path.join(BAM_DATADIR, "ex1.bam"),
            "rb")
        index = pysam.IndexedReads(samfile)
        index.build()
        reads = collections.defaultdict(int)

        for read in samfile:
            reads[read.query_name] += 1

        for qname, counts in reads.items():
            found = list(index.find(qname))
            self.assertEqual(len(found), counts)
            for x in found:
                self.assertEqual(x.query_name, qname)


class TestExplicitIndex(unittest.TestCase):

    def testExplicitIndexBAM(self):
        with pysam.AlignmentFile(
                os.path.join(BAM_DATADIR, "explicit_index.bam"),
                "rb",
                filepath_index=os.path.join(BAM_DATADIR, 'ex1.bam.bai')) as samfile:
            samfile.fetch("chr1")

    def testExplicitIndexCRAM(self):
        with pysam.AlignmentFile(
                os.path.join(BAM_DATADIR, "explicit_index.cram"),
                "rc",
                filepath_index=os.path.join(BAM_DATADIR, 'ex1.cram.crai')) as samfile:
            samfile.fetch("chr1")

    def testRemoteExplicitIndexBAM(self):
        if not check_url(
                "http://genserv.anat.ox.ac.uk/downloads/pysam/test/noindex.bam"):
            return

        with pysam.AlignmentFile(
                "http://genserv.anat.ox.ac.uk/downloads/pysam/test/noindex.bam",
                "rb",
                filepath_index=os.path.join(BAM_DATADIR, 'ex1.bam.bai')) as samfile:
            samfile.fetch("chr1")


class TestVerbosity(unittest.TestCase):

    '''test if setting/getting of verbosity works.'''

    def testVerbosity(self):
        self.assertEqual(pysam.get_verbosity(), 3)
        old = pysam.set_verbosity(0)
        self.assertEqual(pysam.get_verbosity(), 0)
        new = pysam.set_verbosity(old)
        self.assertEqual(new, 0)
        self.assertEqual(pysam.get_verbosity(), 3)


class TestSanityCheckingBAM(unittest.TestCase):

    mode = "wb"

    def check_write(self, read):

        fn = "tmp_test_sanity_check.bam"
        names = ["chr1"]
        lengths = [10000]
        with pysam.AlignmentFile(
                fn,
                self.mode,
                reference_names=names,
                reference_lengths=lengths) as outf:
            outf.write(read)

        if os.path.exists(fn):
            os.unlink(fn)

    def test_empty_read_gives_value_error(self):
        read = pysam.AlignedSegment()
        self.check_write(read)


class TestHeader1000Genomes(unittest.TestCase):

    '''see issue 110'''
    bamfile = "http://ftp.1000genomes.ebi.ac.uk/vol1/ftp/technical/phase3_EX_or_LC_only_alignment/data/HG00104/alignment/HG00104.chrom11.ILLUMINA.bwa.GBR.low_coverage.20130415.bam"  # noqa
    bambase = "HG00104.chrom11.ILLUMINA.bwa.GBR.low_coverage.20130415.bam"  # noqa

    def testRead(self):

        if not check_url(self.bamfile):
            return

        f = pysam.AlignmentFile(self.bamfile, "rb")
        data = f.header.copy()
        self.assertTrue(data)

    def tearDown(self):
        if os.path.exists(self.bambase + ".bai"):
            os.unlink(self.bambase + ".bai")


class TestLargeCigar(unittest.TestCase):

    def setUp(self):
        self.read_length = 70000
        self.header = pysam.AlignmentHeader.from_references(
            ["chr1", "chr2"],
            [self.read_length * 2, self.read_length * 2])

    def build_read(self):
        '''build an example read.'''

        a = pysam.AlignedSegment(self.header)
        l = self.read_length
        a.query_name = "read_12345"
        a.query_sequence = "A" * (l + 1)
        a.flag = 0
        a.reference_id = 0
        a.reference_start = 20
        a.mapping_quality = 20
        a.cigarstring = "1M1D" * l + "1M"
        self.assertEqual(len(a.cigartuples), 2 * l + 1)
        a.next_reference_id = 0
        a.next_reference_start = 0
        a.template_length = l
        a.query_qualities = pysam.qualitystring_to_array("1") * (l + 1)
        return a

    def check_read(self, read, mode="bam"):
        fn = get_temp_filename("tmp_largecigar.{}".format(mode))
        fn_reference = get_temp_filename("tmp_largecigar.fa")

        nrows = int(self.read_length * 2 / 80)

        s = "\n".join(["A" * 80 for x in range(nrows)])
        with open(fn_reference, "w") as outf:
            outf.write(">chr1\n{seq}\n>chr2\n{seq}\n".format(
                seq=s))

        if mode == "bam":
            write_mode = "wb"
        elif mode == "sam":
            write_mode = "w"
        elif mode == "cram":
            write_mode = "wc"

        with pysam.AlignmentFile(fn, write_mode,
                                 header=self.header,
                                 reference_filename=fn_reference) as outf:
            outf.write(read)
        with pysam.AlignmentFile(fn) as inf:
            ref_read = next(inf)

        if mode == "cram":
            # in CRAM, the tag field is kept, while it is emptied by the BAM/SAM reader
            self.assertEqual(read.cigarstring, ref_read.cigarstring)
        else:
            self.assertEqual(read, ref_read)

        os.unlink(fn)
        os.unlink(fn_reference)

    def test_reading_writing_sam(self):
        read = self.build_read()
        self.check_read(read, mode="sam")

    def test_reading_writing_bam(self):
        read = self.build_read()
        self.check_read(read, mode="bam")

    @unittest.skip("fails on linux - https issue?")
    def test_reading_writing_cram(self):
        # The following test fails with htslib 1.9, but worked with previous versions.
        # Error is:
        # [W::find_file_url] Failed to open reference "https://www.ebi.ac.uk/ena/cram/md5/ac9fac7c3e9c476f74f1d0e47abc8be2": Input/output error
        # Error can be reproduced using samtools 1.9 command line.
        # Could be a conda configuration issue, see
        # https://github.com/bioconda/bioconda-recipes/issues/9056
        return
        read = self.build_read()
        self.check_read(read, mode="cram")
        
# SAM writing fails, as query length is 0
# class TestSanityCheckingSAM(TestSanityCheckingSAM):
#     mode = "w"

if __name__ == "__main__":
    # build data files
    print("building data files")
    subprocess.call("make -C %s" % BAM_DATADIR, shell=True)
    print("starting tests")
    unittest.main()
    print("completed tests")
back to top