Raw File
Plots.py
# Copyright (C) 2016 Antoine Carme <Antoine.Carme@Laposte.net>
# All rights reserved.

# This file is part of the Python Automatic Forecasting (PyAF) library and is made available under
# the terms of the 3 Clause BSD license

import pandas as pd
import numpy as np

from io import BytesIO
import base64


SIGNAL_COLOR='green'
FORECAST_COLOR='blue'
RESIDUE_COLOR='red'
COMPONENT_COLOR='navy'
SHADED_COLOR='turquoise'
UPPER_COLOR='grey'
LOWER_COLOR='black'


def add_patched_legend(ax , names):
    # matplotlib does not like labels starting with '_'
    patched_names = []
    for name in names:
        # remove leading '_' => here, this is almost OK: no signal transformation
        patched_name = name[2:] if(name.startswith('__')) else name
        patched_name = patched_name[1:] if(patched_name.startswith('_')) else patched_name
        patched_names = patched_names + [ patched_name ]
    ax.legend(patched_names)

def decomp_plot(df, time, signal, estimator, residue, name = None, format='png', max_length = 1000) :
    assert(df.shape[0] > 0)
    assert(df.shape[1] > 0)
    assert(time in df.columns)
    assert(signal in df.columns)
    assert(estimator in df.columns)
    assert(residue in df.columns)


    import matplotlib
    # print("MATPLOTLIB_BACKEND",  matplotlib.get_backend())
    # matplotlib.use('Agg')
    import matplotlib.pyplot as plt
    df1 = df.tail(max_length);
    if(name is not None):
        plt.switch_backend('Agg')
    fig, axs = plt.subplots(ncols=2, figsize=(32, 16))
    lColor = COMPONENT_COLOR;
    if(name is not None and name.endswith("Forecast")):
        lColor = FORECAST_COLOR;
    df1.plot.line(time, [signal, estimator, residue],
                  color=[SIGNAL_COLOR, lColor, RESIDUE_COLOR],
                  ax=axs[0] , grid = True, legend=False)
    add_patched_legend(axs[0] , [signal, estimator, residue])
    residues =  df1[residue].values

    import scipy.stats as scistats
    resid = residues[~np.isnan(residues)]
    scistats.probplot(resid, dist="norm", plot=axs[1])

    if(name is not None):
        plt.switch_backend('Agg')
        fig.savefig(name + '_decomp_output.' + format)
        plt.close(fig)

def decomp_plot_as_png_base64(df, time, signal, estimator, residue, name = None, max_length = 1000) :
    assert(df.shape[0] > 0)
    assert(df.shape[1] > 0)
    assert(time in df.columns)
    assert(signal in df.columns)
    assert(estimator in df.columns)
    assert(residue in df.columns)

    import matplotlib
    # matplotlib.use('Agg')
    import matplotlib.pyplot as plt
    plt.switch_backend('Agg')
    df1 = df.tail(max_length);
    fig, axs = plt.subplots(ncols=2, figsize=(16, 8))
    lColor = COMPONENT_COLOR;
    if(name is not None and name.endswith("Forecast")):
        lColor = FORECAST_COLOR;
    df1.plot.line(time, [signal, estimator, residue],
                  color=[SIGNAL_COLOR, lColor, RESIDUE_COLOR],
                  ax=axs[0] , grid = True, legend = False)
    add_patched_legend(axs[0] , [signal, estimator, residue])
    residues =  df1[residue].values

    import scipy.stats as scistats
    resid = residues[~np.isnan(residues)]
    scistats.probplot(resid, dist="norm", plot=axs[1])

    figfile = BytesIO()
    fig.savefig(figfile, format='png')
    figfile.seek(0)  # rewind to beginning of file
    figdata_png = base64.b64encode(figfile.getvalue())
    plt.close(fig)
    return figdata_png.decode('utf8')
    

def prediction_interval_plot(df, time, signal, estimator, lower, upper, name = None, format='png', max_length = 1000) :
    assert(df.shape[0] > 0)
    assert(df.shape[1] > 0)
    assert(time in df.columns)
    assert(signal in df.columns)
    assert(estimator in df.columns)
    assert(lower in df.columns)
    assert(upper in df.columns)


    df1 = df.tail(max_length).copy();
    lMin = np.mean(df1[signal]) -  np.std(df1[signal]) * 3;
    lMax = np.mean(df1[signal]) +  np.std(df1[signal]) * 3;
    df1[lower] = df1[lower].apply(lambda x : x if (np.isnan(x) or x >= lMin) else np.nan);
    df1[upper] = df1[upper].apply(lambda x : x if (np.isnan(x) or x <= lMax) else np.nan);

    # last value of the signal
    lLastSignalPos = df1[signal].dropna().tail(1).index[0];
    lEstimtorValue = df1[estimator][lLastSignalPos];
    df1.loc[lLastSignalPos , lower] = lEstimtorValue;
    df1.loc[lLastSignalPos , upper] = lEstimtorValue;

    import matplotlib
    # matplotlib.use('Agg')
    import matplotlib.pyplot as plt
    if(name is not None):
        plt.switch_backend('Agg')
    fig, axs = plt.subplots(ncols=1, figsize=(16, 8))
    df1.plot.line(time, [signal, estimator, lower, upper],
                  color=[SIGNAL_COLOR, FORECAST_COLOR, LOWER_COLOR, UPPER_COLOR],
                  ax=axs, grid = True, legend=False)
    add_patched_legend(axs , [signal, estimator, lower, upper])

    x = df1[time];
    type1 = np.dtype(x)
    if(type1.kind == 'M'):
        x = x.apply(lambda t : t.date());
    axs.fill_between(x.values, df1[lower], df1[upper], color=SHADED_COLOR, alpha=.2)

    if(name is not None):
        plt.switch_backend('Agg')
        fig.savefig(name + '_prediction_intervals_output.' + format)
        plt.close(fig)
    

def prediction_interval_plot_as_png_base64(df, time, signal, estimator, lower, upper, name = None, max_length = 1000) :
    assert(df.shape[0] > 0)
    assert(df.shape[1] > 0)
    assert(time in df.columns)
    assert(signal in df.columns)
    assert(estimator in df.columns)
    assert(lower in df.columns)
    assert(upper in df.columns)


    df1 = df.tail(max_length).copy();
    lMin = np.mean(df1[signal]) -  np.std(df1[signal]) * 3;
    lMax = np.mean(df1[signal]) +  np.std(df1[signal]) * 3;
    df1[lower] = df1[lower].apply(lambda x : x if (np.isnan(x) or x >= lMin) else np.nan);
    df1[upper] = df1[upper].apply(lambda x : x if (np.isnan(x) or x <= lMax) else np.nan);

    # last value of the signal
    lLastSignalPos = df1[signal].dropna().tail(1).index;
    lEstimtorValue = df1[estimator][lLastSignalPos];
    df1.loc[lLastSignalPos , lower] = lEstimtorValue;
    df1.loc[lLastSignalPos , upper] = lEstimtorValue;

    import matplotlib
    # matplotlib.use('Agg')
    import matplotlib.pyplot as plt
    plt.switch_backend('Agg')
    fig, axs = plt.subplots(ncols=1, figsize=(16, 8))
    df1.plot.line(time, [signal, estimator, lower, upper],
                  color=[SIGNAL_COLOR, FORECAST_COLOR, FORECAST_COLOR, FORECAST_COLOR],
                  ax=axs, grid = True, legend=False)
    add_patched_legend(axs , [signal, estimator, lower, upper])

    x = df1[time];
    type1 = np.dtype(x)
    if(type1.kind == 'M'):
        x = x.apply(lambda t : t.date());
    axs.fill_between(x.values, df1[lower], df1[upper], color=SHADED_COLOR, alpha=.5)

    figfile = BytesIO()
    fig.savefig(figfile, format='png')
    plt.close(fig)
    figfile.seek(0)  # rewind to beginning of file
    figdata_png = base64.b64encode(figfile.getvalue())
    return figdata_png.decode('utf8')


def qqplot_residues(df , residue):
    pass

def build_record_label(labels_list):
    out = "<f0>" + str(labels_list[0]);
    i = 1;
    for l in labels_list[1:]:
        out = out + " | <f" + str(i) + "> " + str(l) ;
        i = i + 1;
    return out + "";


def plot_hierarchy(structure , iAnnotations, name):
    import pydot
    graph = pydot.Dot(graph_type='graph', rankdir='LR', fontsize="12.0");
    graph.set_node_defaults(shape='record')
    lLevelsReversed = sorted(structure.keys(), reverse=True);
    for level in  lLevelsReversed:
        color = '#%02x%02x%02x' % (255, 255, 127 + int(128 * (1.0 - (level + 1.0) / len(lLevelsReversed))));
        for col in structure[level].keys():
            lLabel = col if iAnnotations is None else str(iAnnotations[col]);
            if iAnnotations is not None:
                lLabel = build_record_label(iAnnotations[col]);
            node_col = pydot.Node(col, label=lLabel, style="filled", fillcolor=color, fontsize="12.0")
            graph.add_node(node_col);
            for col1 in structure[level][col]:
                lLabel1 = col1
                if iAnnotations is not None:
                    lLabel1 = build_record_label(iAnnotations[col1]);
                color1 = '#%02x%02x%02x' % (255, 255, 128 + int(128 * (1.0 - (level + 2.0) / len(lLevelsReversed))));
                node_col1 = pydot.Node(col1, label=lLabel1, style="filled",
                                       fillcolor=color1, fontsize="12.0")
                graph.add_node(node_col1);
                lEdgeLabel = "";
                if iAnnotations is not None:
                    lEdgeLabel = iAnnotations[col + "_" + col1];
                lEdge = pydot.Edge(node_col, node_col1, color="red", label=lEdgeLabel, fontsize="12.0")
                graph.add_edge(lEdge)
    # print(graph.obj_dict)
    if(name is not None):
        graph.write_png(name);
    else:
        from IPython.display import Image, display
        plot1 = Image(graph.create_png())
        display(plot1)
back to top