Raw File
replace_pairs_mod.py
#!/usr/bin/env python

# replace_pairs - replace a (0,N) pair by (Poiss(N/2),1-Poiss(N/2))
# (C) 2020 Boud Roukema GPL-3+
#
# This script is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.
#
# This script is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
# Public License for more details. See <http://www.gnu.org/licenses/>.

import numpy as np
from numpy.random import SeedSequence, default_rng
import sys
import os

def replace_pairs(Noft_in):
    """
    Given a sequence (numpy array) of counts *Noft*, find each consecutive pair of the
    form m1,m2 where m1==0, m2 > 0 and there exists a preceding value m0 > 0.
    Replace the pair by selecting a value from a Poisson distribution
    whose mean is the mean of the pair. Return the modified sequence.

    Return: Noft_out
    """

    debug = False

    n = os.environ.get('FIXED_RNG_SEED')
    if(n):
        fixed_rng_seed = np.bool(n)
    else:
        fixed_rng_seed = False

    seeds = SeedSequence(2027) # 2027 = seed = next prime year
    seed = seeds.spawn(2)
    if(fixed_rng_seed):
        print('Fixed rng seed.')
        rng = default_rng(seed[0])
    else:
        print('Numpy automatic rng seed.')
        rng = default_rng()


    N_Noft = (Noft_in.shape)[0]
    if(N_Noft < 3):
        Noft_out = Noft_in
    else:
        # Create a 3-row array with three offset subsets of the full Noft_in;
        # each column is a successive triple of the original array:
        Noft_triple = np.array([Noft_in[0:N_Noft-2],
                               Noft_in[1:N_Noft-1],
                               Noft_in[2:N_Noft]])

        # Find the indices of triples top element positive and middle
        # element zero:
        i_pairs_todo = np.array(np.where( (Noft_triple[0,:] > 0) &
                                          (Noft_triple[1,:] == 0) &
                                          (Noft_triple[2,:] > 0) ))[0]
        N_todo = (i_pairs_todo.shape)[0]
        mean_replaced = 0.0 # default
        if(debug):
            print('i_pairs_todo = ',i_pairs_todo)

        if(N_todo == 0):
            Noft_out = Noft_in
        else:
            if(debug):
                print('before:\n', Noft_triple)

            # Generate Poisson values to replace the (0,.) pairs;
            # this could, hypothetically, insert a zero in row 1
            # or a negative or zero in row 2.

            # generate the new values
            Noft_triple[1, i_pairs_todo] = (
                np.array(rng.poisson( Noft_triple[2,i_pairs_todo]/2 ),
                         dtype=int) )
            # evaluate their complements
            Noft_triple[2, i_pairs_todo] -= Noft_triple[1, i_pairs_todo]

            # diagonally copy the new values down to the third row, but
            # first avoid copying the [1,0] value;
            if(i_pairs_todo[0] == 0):
                if(N_todo > 1):
                    i_pairs_tocopy = i_pairs_todo[1:N_todo+1] # remove the leftmost position
                    Noft_triple[2,(i_pairs_tocopy-1) ] = Noft_triple[1, i_pairs_tocopy]
            else:
                Noft_triple[2,(i_pairs_todo-1) ] = Noft_triple[1, i_pairs_todo]

            if(debug):
                print('after:\n', Noft_triple)

            Noft_out = np.zeros(N_Noft,dtype=int)
            Noft_out[0] = Noft_triple[0,0]
            Noft_out[1] = Noft_triple[1,0]
            Noft_out[2:N_Noft] = Noft_triple[2,0:N_Noft-2]


            mean_replaced = np.mean(Noft_triple[2,i_pairs_todo]/2)

    return Noft_out, N_todo, mean_replaced


if __name__ == '__main__':

    pass_val = 0 # pass_val value of test: 0 = OK, 1 = fail

    seeds = SeedSequence(2029)
    seed = seeds.spawn(2)
    rng_test = default_rng(seed[0])
    #rng_test = default_rng()

    # Set up an array with Poisson random values which will often
    # be zero, to see how well replace_pairs "fixes" it. The values
    # are increased by a big factor so that the replacement values
    # stay away from zero.
    Noft = 30 * rng_test.poisson(1,14)

    Noft_new, N_todo, mean_replaced = replace_pairs(Noft)

    print("Noft     = ",Noft)
    print("Noft_new = ",Noft_new)
    print("N_todo = ",N_todo)

    print("pass_val value = ",pass_val)
    sys.exit(pass_val)

#end __main__
back to top