Skip to main content
  • Home
  • Development
  • Documentation
  • Donate
  • Operational login
  • Browse the archive

swh logo
SoftwareHeritage
Software
Heritage
Archive
Features
  • Search

  • Downloads

  • Save code now

  • Add forge now

  • Help

https://github.com/deniztop/LABL
02 May 2025, 11:37:58 UTC
  • Code
  • Branches (1)
  • Releases (0)
  • Visits
    • Branches
    • Releases
    • HEAD
    • refs/heads/main
    No releases to show
  • b6bd1a2
  • /
  • wavelets4.py
Raw File Download
Take a new snapshot of a software origin

If the archived software origin currently browsed is not synchronized with its upstream version (for instance when new commits have been issued), you can explicitly request Software Heritage to take a new snapshot of it.

Use the form below to proceed. Once a request has been submitted and accepted, it will be processed as soon as possible. You can then check its processing state by visiting this dedicated page.
swh spinner

Processing "take a new snapshot" request ...

Permalinks

To reference or cite the objects present in the Software Heritage archive, permalinks based on SoftWare Hash IDentifiers (SWHIDs) must be used.
Select below a type of object currently browsed in order to display its associated SWHID and permalink.

  • content
  • directory
  • revision
  • snapshot
origin badgecontent badge Iframe embedding
swh:1:cnt:8251fd945dc17fa577f6ef3d4881723699b8da9f
origin badgedirectory badge Iframe embedding
swh:1:dir:b6bd1a29ed8619aa4544811d08046f9a58c1b0f2
origin badgerevision badge
swh:1:rev:67f0256ccd5a1469bf33da1a860f1c845c89b330
origin badgesnapshot badge
swh:1:snp:2d3ac023c963df62dc86d83828d33a919fac0c18
Citations

This interface enables to generate software citations, provided that the root directory of browsed objects contains a citation.cff or codemeta.json file.
Select below a type of object currently browsed in order to generate citations for them.

  • content
  • directory
  • revision
  • snapshot
Generate software citation in BibTex format (requires biblatex-software package)
Generating citation ...
Generate software citation in BibTex format (requires biblatex-software package)
Generating citation ...
Generate software citation in BibTex format (requires biblatex-software package)
Generating citation ...
Generate software citation in BibTex format (requires biblatex-software package)
Generating citation ...
Tip revision: 67f0256ccd5a1469bf33da1a860f1c845c89b330 authored by deniztop on 23 October 2021, 09:34:30 UTC
Add files via upload
Tip revision: 67f0256
wavelets4.py
import numpy as np
import matplotlib.pyplot as plt
import math
import sys
import os
import pathlib
import pywt
import gc
from scipy import stats
from scipy import optimize
import scaleogram as scg

def plotSignal(zeroPoint, hours, signal, title):
    fig, (ax1, ax2, ax3) = plt.subplots(3, 1, figsize=(14, 18), sharex=True)
    for ax in (ax1, ax2, ax3):
        ax.xaxis.set_tick_params(which='both', labelbottom=True)
    
    fig.subplots_adjust(hspace=0.4)
    time = np.array(hours)
    time = time - zeroPoint
    ax1.plot(time, signal)
    ax1.set_title(title)
    ax1.set_xticks(np.arange(0, time[-1], 24))
    ax1.set_xlabel(f"Time [hours] since {zeroPoint} hours")

    plt.sca(ax2) #making ax2 current to prevent calls to plt from leaking to other axes inside cws
    ax2 = scg.cws(time, signal, scales=scales, wavelet=wavelet,
            ax=ax2, cmap="jet", cbar=None, yaxis='period',
            title=f'CWT using {wavelet}', xlabel=f"Time [hours] since {zeroPoint} hours")

    '''For ridge analysis we do a cwt manually'''
    coefs, scales_freq = pywt.cwt(signal, scales, wavelet, sampling_period=(1/samplesPerHour))

    scales_period = 1./scales_freq
    values = np.abs(coefs)
    maxIndices = np.argmax(values, axis=0)
    maxValues = np.max(values, axis=0)

    ridge = np.array([scales_period[index] for index in maxIndices])

    maxValue = max(maxValues)
    minValue = min(maxValues)

    """
    COI implementation courtesy of:

    Copyright (c) 2019 Alexandre Sauve <http://github.com/alsauve/scaleogram>

    Permission is hereby granted, free of charge, to any person obtaining a copy of
    this software and associated documentation files (the "Software"), to deal in
    the Software without restriction, including without limitation the rights to
    use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
    of the Software, and to permit persons to whom the Software is furnished to do
    so, subject to the following conditions:

    The above copyright notice and this permission notice shall be included in all
    copies or substantial portions of the Software.

    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
    IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
    FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
    AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
    LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
    OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
    SOFTWARE.
    """

    # convert the wavelet scales frequency into time domain periodicity
    scales_coi = scales_period
    max_coi  = scales_coi[-1]
    dt = time[1]-time[0]

    # produce the line and the curve delimiting the COI masked area
    xmesh = np.concatenate([time, [time[-1]+dt]])
    mid = int(len(xmesh)/2)
    time0 = np.abs(xmesh[0:mid+1]-xmesh[0])
    ymask = np.zeros(len(xmesh), dtype=np.float16)
    ymhalf= ymask[0:mid+1]  # compute the left part of the mask
    ws    = np.argsort(scales_period) # ensure np.interp() works
    minscale, maxscale = sorted(ax2.get_ylim())
    ymhalf[:] = np.interp(time0,
            scales_period[ws], scales_coi[ws])
    yborder = np.zeros(len(xmesh)) + maxscale
    ymhalf[time0 > max_coi]   = maxscale
    
    # complete the right part of the mask by symmetry
    ymask[-mid:] = ymhalf[0:mid][::-1]

    # plot the mask and forward user parameters
    ax3.plot(xmesh, ymask)
    coikw = {
        'alpha':'0.5',
        'hatch':'/',
    }
    ax3.fill_between(xmesh, yborder, ymask, **coikw)

    '''
    End COI Implementation
    '''

    passing = []
    for i, r in enumerate(ridge):
        border = yborder[i]
        mask = ymask[i]
        if mask < r < border:
            continue
        else:
            passing.append(i)
    passing = np.array(passing)
    
    if passing.size > 0:
        ridge = ridge[passing]
        time = time[passing]
        maxValues = maxValues[passing]
        
        intercept, slope = np.polynomial.polynomial.polyfit(time, ridge, 1, w=maxValues)

        threshold = minValue + (maxValue - minValue) * percentCutoff #cut off the bottom portion based on %
        passing = np.where(maxValues > threshold)
        trimmedRidge = ridge[passing]
        trimmedTime = time[passing]
        trimmedValues = maxValues[passing]
        ax2.invert_yaxis()
        ax2.set_yticks(np.arange(smallestScaleInSampleUnits / samplesPerHour, largestScaleInSampleUnits / samplesPerHour, 1))
        ax2.set_ylabel("Period [hours]")
        ax2.plot(trimmedTime, trimmedRidge, color='white', alpha=0.7)
        ax3.scatter(trimmedTime, trimmedRidge, s=10, color='blue')

        minY = np.min(ridge)
        maxY = np.max(ridge)
        ax3.set_ylim(minY, maxY)
        ax3.plot(time, line_func(time, slope, intercept))
        ax3.set_title(f'Fitted Ridge Line: y = {slope:.6f}*x + {intercept:.6f}')
        ax3.set_yticks(np.arange(smallestScaleInSampleUnits / samplesPerHour, largestScaleInSampleUnits / samplesPerHour, 1))
        ax3.set_xlabel(f"Time [hours] since {zeroPoint} hours")
        ax3.set_ylabel("Period [hours]")

        #ensure that the two lower figures share y-axes
        ax2.get_shared_y_axes().join(ax2, ax3)

        for ax in (ax1, ax2, ax3):
            ax.set_xlim(0, math.ceil(hours[-1]))
    else:
        print(f'{title} failed, no data lies in valid region')

    return trimmedTime, trimmedRidge, trimmedValues

def line_func(x, a, b):
    return a*x + b 

def floatInputOrDefault(default_value):
    userIn = input()
    return float(userIn) if userIn else default_value

def signalFromFile(filepath):
    strippedPath = 'stripped.temp'
    with open(filepath) as infile, open(strippedPath, 'w') as outfile:
        for line in infile:
            if not line.strip(): continue #skip
            outfile.write(line)

    data = np.genfromtxt(strippedPath,dtype='str',delimiter=",", skip_header=2)
    os.remove(strippedPath)
    signal = [float(x) for x in data[:,4]]
    hours = [float(x) * 24.0 for x in data[:,3]]
    return hours, signal

WAVELET_STEP = 0
ANALYSIS_STEP = 1
TEST_STEP = 2
RUN_STEP = 3

'''Default configuration'''
defaultConfiguration = {"bandwidthParameter":.7, "centralFrequency":1, "samplesPerHour":(60/4), "zeroPointHours":10, "largestPeriodOfInterestInHours":32, "smallestPeriodOfInterestHours":16, "samplesPerScaleIncrement":1, "percentCutoff":.25}
configuration = defaultConfiguration
firstStep = WAVELET_STEP
if len(sys.argv) > 1:
    configFile = sys.argv[1]
    if not os.path.exists(configFile):
        print(f'User provided configuration file {configFile} did not exist.')
        sys.exit()

    configuration = eval(open(configFile, 'r').read())
    print(f'User provided configuration: {configuration}')
    firstStep = TEST_STEP

'''
The following parameters are as per:
https://github.com/alsauve/scaleogram/blob/master/doc/scale-to-frequency.ipynb
and https://pywavelets.readthedocs.io/en/latest/ref/cwt.html#complex-morlet-wavelets
'''
bandwidthParameter = configuration["bandwidthParameter"]
centralFrequency = configuration["centralFrequency"]
wavelet = f'cmor{bandwidthParameter}-{centralFrequency}' # becomes "cmorB-C" e.g. cmor1-1
def waveletConfig():
    global bandwidthParameter
    global centralFrequency
    global wavelet
    print('1 of 4: Wavelet configuration step. We will configure the bandwidth parameter and central frequency for the mother wavelet. \n See: https://github.com/alsauve/scaleogram/blob/master/doc/scale-to-frequency.ipynb \n Or: https://pywavelets.readthedocs.io/en/latest/ref/cwt.html#complex-morlet-wavelets')
    print(f'bandwidth parameter [defaulted to {bandwidthParameter}]:')
    bandwidthParameter = floatInputOrDefault(bandwidthParameter) #aka B
    print(f'central frequency [defaulted to {centralFrequency}]:')
    centralFrequency = floatInputOrDefault(centralFrequency) #aka C
    wavelet = f'cmor{bandwidthParameter}-{centralFrequency}' # becomes "cmorB-C" e.g. cmor1-1
    print(f'Using wavelet: {wavelet}')
    return ANALYSIS_STEP

'''
The following parameters indicate the range of scales we're interested in. The 'centralFrequency' above
 represents the frequency at which the wavelet is most sensitive at scale 1.0. Subsequent scales will have
 the wavelet scaled up and will capture longer periods.
'''
samplesPerHour = configuration["samplesPerHour"]
zeroPointHours = configuration["zeroPointHours"]
largestPeriodOfInterestInHours = configuration["largestPeriodOfInterestInHours"]
smallestPeriodOfInterestHours = configuration["smallestPeriodOfInterestHours"]
largestScaleInSampleUnits = largestPeriodOfInterestInHours * samplesPerHour * centralFrequency
smallestScaleInSampleUnits = smallestPeriodOfInterestHours * samplesPerHour * centralFrequency
samplesPerScaleIncrement = configuration["samplesPerScaleIncrement"]
percentCutoff = configuration["percentCutoff"]
scales = np.arange(1, largestScaleInSampleUnits, samplesPerScaleIncrement)
def analysisConfig():
    global samplesPerHour
    global zeroPointHours
    global smallestPeriodOfInterestHours
    global largestPeriodOfInterestInHours
    global largestScaleInSampleUnits
    global samplesPerScaleIncrement
    global scales
    global percentCutoff
    print('2 of 4: Analysis configuration step. We will configure the parameters to be used in the CWT analysis. These will determine the wavelet scales and ridge line.')
    print(f'Data points in a given hour. For example, samples at 4 minute intervals would yield 15 samples in an hour. [defaulted to {samplesPerHour}]:')
    samplesPerHour = floatInputOrDefault(samplesPerHour) #60 minutes per hour and the data is at 4 minute intervals
    print(f'The point, in hours, at which to place the zero hour mark. [defaulted to {zeroPointHours}]:')
    zeroPointHours = floatInputOrDefault(zeroPointHours)
    print(f'Period of shortest wave to match against, in hours. This should be smaller than the smallest expected signal. [defaulted to {smallestPeriodOfInterestHours}]:')
    smallestPeriodOfInterestHours = floatInputOrDefault(smallestPeriodOfInterestHours)
    smallestScaleInSampleUnits = smallestPeriodOfInterestHours * samplesPerHour * centralFrequency
    print(f'Period of the longest wave to match against, in hours. This should exceed the largest expected signal. [defaulted to {largestPeriodOfInterestInHours}]:')
    largestPeriodOfInterestInHours = floatInputOrDefault(largestPeriodOfInterestInHours)
    largestScaleInSampleUnits = largestPeriodOfInterestInHours * samplesPerHour * centralFrequency
    print(f'Given those parameters the largest period in units of samples (recall: {samplesPerHour} in an hour) that we will analyse is {largestScaleInSampleUnits}.')
    print(f'Select the increment from {smallestScaleInSampleUnits} to {largestScaleInSampleUnits}. The smaller the increment, the higher the resolution but the slower the analysis. [defaulted to {samplesPerScaleIncrement}]:')
    samplesPerScaleIncrement = floatInputOrDefault(samplesPerScaleIncrement)
    scales = np.arange(smallestScaleInSampleUnits, largestScaleInSampleUnits, samplesPerScaleIncrement)
    print(f'Analysis will look at {len(scales)} total scales. From {scales[0]} to {scales[-1]}')
    print(f'Finally, below which percent threshold, expressed as a decimal [0..1], should the coefficients for the ridge line be discarded? [defaulted to {percentCutoff}]:')
    percentCutoff = floatInputOrDefault(percentCutoff)
    return TEST_STEP

def testStep():
    print('3 of 4: Test step. Would you like to run a test to validate your inputs? [y/n]:')
    desire = str(input())
    if (desire == 'n'):
        return RUN_STEP
    elif (desire == 'y'): 
        signal = []
        hours = []
        print('Full path to sample data file:')
        filename = str(input());
        hours, signal = signalFromFile(filename)
        plotSignal(zeroPointHours, hours, signal, filename)
        plt.show()
        plt.close()

        print('Were you happy with the result? A response of no will restart the configuration steps [y/n]:')
        desire = ''
        while not (desire == 'n' or desire == 'y'):
            desire = str(input())
            if (desire == 'n'):
                return WAVELET_STEP
            elif (desire == 'y'):
                return RUN_STEP
            else:
                print("Please input 'y' or 'n'")
    else:
        print("Please input 'y' or 'n'")
        return TEST_STEP

steps = [waveletConfig, analysisConfig, testStep]

currentStep = firstStep
while currentStep != RUN_STEP:
    gc.collect()
    currentStep = steps[currentStep]()

configuration["bandwidthParameter"] = bandwidthParameter
configuration["centralFrequency"] = centralFrequency
configuration["samplesPerHour"] = samplesPerHour
configuration["zeroPointHours"] = zeroPointHours
configuration["largestPeriodOfInterestInHours"] = largestPeriodOfInterestInHours
configuration["smallestPeriodOfInterestHours"] = smallestPeriodOfInterestHours
configuration["samplesPerScaleIncrement"] = samplesPerScaleIncrement
configuration["percentCutoff"] = percentCutoff
print(f'Would you like to save the current configuration: {configuration}? [y/n]')
desire = ''
while not (desire == 'n' or desire == 'y'):
    desire = str(input())
    if (desire == 'n'):
        print('Not saved.')
    elif (desire == 'y'):
        print('Enter a filename:')
        fname = f'{str(input())}.cfg'
        with open(fname, 'w') as f:f.write(repr(configuration))
        print(f'Wrote configuration to {fname}. You may pass it in to future runs')
    else:
        print("Please input 'y' or 'n'")

pwd = os.getcwd()
outdir = f'{pwd}/scaleograms'
pathlib.Path(outdir).mkdir(parents=True, exist_ok=True) 
print(f'Results will be in {outdir}')

print('Running full analysis.')
print('Provide the path to the folder containing ONLY your data files:')
folderpath = ''
while not os.path.exists(folderpath):
    folderpath = str(input())
    if not os.path.exists(folderpath):
        print(f'Folder: ({folderpath}) does not appear to exist. Try again.')

files = [os.path.join(folderpath, f) for f in os.listdir(folderpath) if os.path.isfile(os.path.join(folderpath, f))]
print(f'Found {len(files)} files...')
for filepath in files:
    print(f'Processing {filepath}...')
    gc.collect()
    hours, signal = signalFromFile(filepath)
    filename = os.path.splitext(os.path.basename(filepath))[0]

    time, ridge, coef = plotSignal(zeroPointHours, hours, signal, filename)

    plt.savefig(f'{outdir}/{filename}_scaleogram.png')
    plt.close()
    gc.collect()

    zipped = np.array(list(zip(time, ridge, coef)));

    np.savetxt(f'{outdir}/{filename}_ridge.txt', zipped, delimiter=',')
print(f'Finished')

Software Heritage — Copyright (C) 2015–2025, The Software Heritage developers. License: GNU AGPLv3+.
The source code of Software Heritage itself is available on our development forge.
The source code files archived by Software Heritage are available under their own copyright and licenses.
Terms of use: Archive access, API— Contact— JavaScript license information— Web API

back to top