Revision d88a462b3e5d4a972c75df5d1a68db01e06ef780 authored by Fabian Hofmann on 06 October 2023, 08:52:40 UTC, committed by GitHub on 06 October 2023, 08:52:40 UTC
1 parent b36a22f
Raw File
test_lopf_multiinvest.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Fri Jul  2 10:21:16 2021.

@author: fabian
"""

import pandas as pd
import pytest
from conftest import optimize
from numpy.testing import assert_array_almost_equal as equal
from pandas import IndexSlice as idx

import pypsa
from pypsa.descriptors import get_activity_mask

MULTIINVEST_APIS = ["linopy", "native"]
kwargs = dict(multi_investment_periods=True)


@pytest.fixture
def n():
    n = pypsa.Network(snapshots=range(10))
    n.investment_periods = [2020, 2030, 2040, 2050]
    n.add("Carrier", "gencarrier")
    n.madd("Bus", [1, 2])

    for i, period in enumerate(n.investment_periods):
        factor = (10 + i) / 10
        n.madd(
            "Generator",
            [f"gen1-{period}", f"gen2-{period}"],
            bus=[1, 2],
            lifetime=30,
            build_year=period,
            capital_cost=[100 / factor, 100 * factor],
            marginal_cost=[i + 2, i + 1],
            p_nom_extendable=True,
            carrier="gencarrier",
        )

    for i, period in enumerate(n.investment_periods):
        n.add(
            "Line",
            f"line-{period}",
            bus0=1,
            bus1=2,
            length=1,
            build_year=period,
            lifetime=40,
            capital_cost=30 + i,
            x=0.0001,
            s_nom_extendable=True,
        )

    load = range(100, 100 + len(n.snapshots))
    load = pd.DataFrame({"load1": load, "load2": load}, index=n.snapshots)
    n.madd(
        "Load",
        ["load1", "load2"],
        bus=[1, 2],
        p_set=load,
    )

    return n


@pytest.fixture
def n_sus(n):
    # only keep generators which are getting more expensiv and push generator
    # capital cost, so that sus are activated
    n.mremove("Generator", n.generators.query('bus == "1"').index)
    n.generators.capital_cost *= 5

    for i, period in enumerate(n.investment_periods):
        factor = (10 + i) / 10
        n.add(
            "StorageUnit",
            f"sto1-{period}",
            bus=1,
            lifetime=30,
            build_year=period,
            capital_cost=10 / factor,
            marginal_cost=i,
            p_nom_extendable=True,
        )
    return n


@pytest.fixture
def n_sts(n):
    # only keep generators which are getting more expensiv and push generator
    # capital cost, so that sus are activated
    n.mremove("Generator", n.generators.query('bus == "1"').index)
    n.generators.capital_cost *= 5

    n.add("Bus", "1 battery")

    n.add(
        "Store",
        "sto1-2020",
        bus="1 battery",
        e_nom_extendable=True,
        e_initial=20,
        build_year=2020,
        lifetime=30,
        capital_cost=0.1,
    )

    n.add(
        "Link", "bus2 battery charger", bus0=1, bus1="1 battery", p_nom_extendable=True
    )

    n.add(
        "Link",
        "My bus2 battery discharger",
        bus0="1 battery",
        bus1=1,
        p_nom_extendable=True,
    )

    return n


def test_single_to_multi_level_snapshots():
    n = pypsa.Network(snapshots=range(2))
    years = [2030, 2040]
    n.investment_periods = years
    assert isinstance(n.snapshots, pd.MultiIndex)
    equal(n.snapshots.levels[0], years)


def test_investment_period_values():
    sns = pd.MultiIndex.from_product([[2020, 2030, 2040], [1, 2, 3]])
    n = pypsa.Network(snapshots=sns)

    with pytest.raises(ValueError):
        n.investment_periods = [2040, 2030, 2020]

    with pytest.raises(ValueError):
        n.investment_periods = ["2020", "2030", "2040"]

    with pytest.raises(NotImplementedError):
        n.investment_periods = [2020]

    n = pypsa.Network(snapshots=range(2))
    with pytest.raises(ValueError):
        n.investment_periods = ["2020", "2030", "2040"]


def test_active_assets(n):
    active_gens = n.get_active_assets("Generator", 2030)[lambda ds: ds].index
    assert (active_gens == ["gen1-2020", "gen2-2020", "gen1-2030", "gen2-2030"]).all()

    active_gens = n.get_active_assets("Generator", 2050)[lambda ds: ds].index
    assert (
        active_gens
        == [
            "gen1-2030",
            "gen2-2030",
            "gen1-2040",
            "gen2-2040",
            "gen1-2050",
            "gen2-2050",
        ]
    ).all()


@pytest.mark.parametrize("api", MULTIINVEST_APIS)
def test_tiny_with_default(api):
    n = pypsa.Network(snapshots=range(2))
    n.investment_periods = [2020, 2030]
    n.add("Bus", 1)
    n.add("Generator", 1, bus=1, p_nom_extendable=True, capital_cost=10)
    n.add("Load", 1, bus=1, p_set=100)
    status, _ = optimize(n, api, **kwargs)
    assert status == "ok"
    assert n.generators.p_nom_opt.item() == 100


@pytest.mark.parametrize("api", MULTIINVEST_APIS)
def test_tiny_with_build_year(api):
    n = pypsa.Network(snapshots=range(2))
    n.investment_periods = [2020, 2030]
    n.add("Bus", 1)
    n.add(
        "Generator", 1, bus=1, p_nom_extendable=True, capital_cost=10, build_year=2020
    )
    n.add("Load", 1, bus=1, p_set=100)
    status, _ = optimize(n, api, **kwargs)
    assert status == "ok"
    assert n.generators.p_nom_opt.item() == 100


@pytest.mark.parametrize("api", MULTIINVEST_APIS)
def test_tiny_infeasible(api):
    n = pypsa.Network(snapshots=range(2))
    n.investment_periods = [2020, 2030]
    n.add("Bus", 1)
    n.add(
        "Generator", 1, bus=1, p_nom_extendable=True, capital_cost=10, build_year=2030
    )
    n.add("Load", 1, bus=1, p_set=100)
    with pytest.raises(ValueError):
        status, cond = optimize(n, api, **kwargs)


@pytest.mark.parametrize("api", MULTIINVEST_APIS)
def test_simple_network(n, api):
    status, cond = optimize(n, api, **kwargs)
    assert status == "ok"
    assert cond == "optimal"

    assert (n.generators_t.p.loc[[2020, 2030, 2040], "gen1-2050"] == 0).all()
    assert (n.generators_t.p.loc[[2050], "gen1-2020"] == 0).all()

    assert (n.lines_t.p0.loc[[2020, 2030, 2040], "line-2050"] == 0).all()


@pytest.mark.parametrize("api", MULTIINVEST_APIS)
def test_simple_network_snapshot_subset(n, api):
    status, cond = optimize(n, api, n.snapshots[:20], **kwargs)
    assert status == "ok"
    assert cond == "optimal"

    assert (n.generators_t.p.loc[[2020, 2030, 2040], "gen1-2050"] == 0).all()
    assert (n.generators_t.p.loc[[2050], "gen1-2020"] == 0).all()

    assert (n.lines_t.p0.loc[[2020, 2030, 2040], "line-2050"] == 0).all()


@pytest.mark.parametrize("api", MULTIINVEST_APIS)
def test_simple_network_storage_noncyclic(n_sus, api):
    n_sus.storage_units["state_of_charge_initial"] = 200
    n_sus.storage_units["cyclic_state_of_charge"] = False
    n_sus.storage_units["state_of_charge_initial_per_period"] = False

    status, cond = optimize(n_sus, api, **kwargs)
    assert status == "ok"
    assert cond == "optimal"

    soc = n_sus.storage_units_t.state_of_charge
    p = n_sus.storage_units_t.p
    assert round((soc + p).loc[idx[2020, 0], "sto1-2020"], 4) == 200
    assert soc.loc[idx[2040, 9], "sto1-2020"] == 0


@pytest.mark.parametrize("api", MULTIINVEST_APIS)
def test_simple_network_storage_noncyclic_per_period(n_sus, api):
    n_sus.storage_units["state_of_charge_initial"] = 200
    n_sus.storage_units["cyclic_state_of_charge"] = False
    n_sus.storage_units["state_of_charge_initial_per_period"] = True

    status, cond = optimize(n_sus, api, **kwargs)
    assert status == "ok"
    assert cond == "optimal"

    assert (n_sus.storage_units_t.p.loc[[2020, 2030, 2040], "sto1-2050"] == 0).all()
    assert (n_sus.storage_units_t.p.loc[[2050], "sto1-2020"] == 0).all()

    soc_initial = (n_sus.storage_units_t.state_of_charge + n_sus.storage_units_t.p).loc[
        idx[:, 0], :
    ]
    soc_initial = soc_initial.droplevel("timestep")
    assert soc_initial.loc[2020, "sto1-2020"] == 200
    assert soc_initial.loc[2030, "sto1-2020"] == 200
    assert soc_initial.loc[2040, "sto1-2040"] == 200


@pytest.mark.parametrize("api", MULTIINVEST_APIS)
def test_simple_network_storage_cyclic(n_sus, api):
    n_sus.storage_units["cyclic_state_of_charge"] = True
    n_sus.storage_units["cyclic_state_of_charge_per_period"] = False

    status, cond = optimize(n_sus, api, **kwargs)
    assert status == "ok"
    assert cond == "optimal"

    soc = n_sus.storage_units_t.state_of_charge
    p = n_sus.storage_units_t.p
    assert (
        soc.loc[idx[2040, 9], "sto1-2020"] == (soc + p).loc[idx[2020, 0], "sto1-2020"]
    )
    assert (
        soc.loc[idx[2050, 9], "sto1-2030"] == (soc + p).loc[idx[2030, 0], "sto1-2030"]
    )


@pytest.mark.parametrize("api", MULTIINVEST_APIS)
def test_simple_network_storage_cyclic_per_period(n_sus, api):
    # Watch out breaks with xarray version 2022.06.00 !
    n_sus.storage_units["cyclic_state_of_charge"] = True
    n_sus.storage_units["cyclic_state_of_charge_per_period"] = True

    status, cond = optimize(n_sus, api, **kwargs)
    assert status == "ok"
    assert cond == "optimal"

    soc = n_sus.storage_units_t.state_of_charge
    p = n_sus.storage_units_t.p
    assert (
        soc.loc[idx[2020, 9], "sto1-2020"] == (soc + p).loc[idx[2020, 0], "sto1-2020"]
    )


@pytest.mark.parametrize("api", MULTIINVEST_APIS)
def test_simple_network_store_noncyclic(n_sts, api):
    n_sts.stores["e_cyclic"] = False
    n_sts.stores["e_initial_per_period"] = False

    status, cond = optimize(n_sts, api, **kwargs)
    assert status == "ok"
    assert cond == "optimal"

    assert (n_sts.stores_t.p.loc[[2050], "sto1-2020"] == 0).all()

    e_initial = (n_sts.stores_t.e + n_sts.stores_t.p).loc[idx[:, 0], :]
    e_initial = e_initial.droplevel("timestep")
    assert e_initial.loc[2020, "sto1-2020"] == 20


@pytest.mark.parametrize("api", MULTIINVEST_APIS)
def test_simple_network_store_noncyclic_per_period(n_sts, api):
    n_sts.stores["e_cyclic"] = False
    n_sts.stores["e_initial_per_period"] = True

    status, cond = optimize(n_sts, api, **kwargs)
    assert status == "ok"
    assert cond == "optimal"

    assert (n_sts.stores_t.p.loc[[2050], "sto1-2020"] == 0).all()

    e_initial = (n_sts.stores_t.e + n_sts.stores_t.p).loc[idx[:, 0], :]
    e_initial = e_initial.droplevel("timestep")
    assert e_initial.loc[2020, "sto1-2020"] == 20
    assert e_initial.loc[2030, "sto1-2020"] == 20

    # lifetime is over here
    assert e_initial.loc[2050, "sto1-2020"] == 0


@pytest.mark.parametrize("api", MULTIINVEST_APIS)
def test_simple_network_store_cyclic(n_sts, api):
    n_sts.stores["e_cyclic"] = True
    n_sts.stores["e_cyclic_per_period"] = False

    status, cond = optimize(n_sts, api, **kwargs)
    assert status == "ok"
    assert cond == "optimal"

    assert (n_sts.stores_t.p.loc[[2050], "sto1-2020"] == 0).all()

    e = n_sts.stores_t.e
    p = n_sts.stores_t.p
    assert e.loc[idx[2040, 9], "sto1-2020"] == (e + p).loc[idx[2020, 0], "sto1-2020"]


@pytest.mark.parametrize("api", MULTIINVEST_APIS)
def test_simple_network_store_cyclic_per_period(n_sts, api):
    # Watch out breaks with xarray version 2022.06.00 !
    n_sts.stores["e_cyclic"] = True
    n_sts.stores["e_cyclic_per_period"] = True

    status, cond = optimize(n_sts, api, **kwargs)
    assert status == "ok"
    assert cond == "optimal"

    assert (n_sts.stores_t.p.loc[[2050], "sto1-2020"] == 0).all()

    e = n_sts.stores_t.e
    p = n_sts.stores_t.p
    assert e.loc[idx[2020, 9], "sto1-2020"] == (e + p).loc[idx[2020, 0], "sto1-2020"]


@pytest.mark.parametrize("api", MULTIINVEST_APIS)
def test_global_constraint_primary_energy_storage(n_sus, api):
    c = "StorageUnit"
    n_sus.add("Carrier", "emitting_carrier", co2_emissions=100)
    n_sus.df(c)["state_of_charge_initial"] = 200
    n_sus.df(c)["cyclic_state_of_charge"] = False
    n_sus.df(c)["state_of_charge_initial_per_period"] = False
    n_sus.df(c)["carrier"] = "emitting_carrier"

    n_sus.add("GlobalConstraint", name="co2limit", type="primary_energy", constant=3000)

    status, cond = optimize(n_sus, api, **kwargs)

    active = get_activity_mask(n_sus, c)
    soc_end = n_sus.pnl(c).state_of_charge.where(active).ffill().iloc[-1]
    soc_diff = n_sus.df(c).state_of_charge_initial - soc_end
    emissions = n_sus.df(c).carrier.map(n_sus.carriers.co2_emissions)
    assert round(soc_diff @ emissions, 0) == 3000


@pytest.mark.parametrize("api", MULTIINVEST_APIS)
def test_global_constraint_primary_energy_store(n_sts, api):
    c = "Store"
    n_sts.add("Carrier", "emitting_carrier", co2_emissions=100)
    n_sts.df(c)["e_initial"] = 200
    n_sts.df(c)["e_cyclic"] = False
    n_sts.df(c)["e_initial_per_period"] = False

    n_sts.buses.loc["1 battery", "carrier"] = "emitting_carrier"

    n_sts.add("GlobalConstraint", name="co2limit", type="primary_energy", constant=3000)

    status, cond = optimize(n_sts, api, **kwargs)

    active = get_activity_mask(n_sts, c)
    soc_end = n_sts.pnl(c).e.where(active).ffill().iloc[-1]
    soc_diff = n_sts.df(c).e_initial - soc_end
    emissions = n_sts.df(c).carrier.map(n_sts.carriers.co2_emissions)
    assert round(soc_diff @ emissions, 0) == 3000


@pytest.mark.parametrize("api", MULTIINVEST_APIS)
def test_global_constraint_transmission_expansion_limit(n, api):
    n.add(
        "GlobalConstraint",
        "expansion_limit",
        type="transmission_volume_expansion_limit",
        constant=100,
        sense="==",
        carrier_attribute="AC",
    )

    status, cond = optimize(n, api, **kwargs)
    assert n.lines.s_nom_opt.sum() == 100

    # when only optimizing the first 10 snapshots the contraint must hold for
    # the 2020 period
    status, cond = optimize(n, api, n.snapshots[:10], **kwargs)
    assert n.lines.loc["line-2020", "s_nom_opt"] == 100

    n.global_constraints["investment_period"] = 2030
    status, cond = optimize(n, api, **kwargs)
    assert n.lines.s_nom_opt[["line-2020", "line-2030"]].sum() == 100


@pytest.mark.parametrize("api", MULTIINVEST_APIS)
def test_global_constraint_transmission_cost_limit(n, api):
    n.add(
        "GlobalConstraint",
        "expansion_limit",
        type="transmission_expansion_cost_limit",
        constant=1000,
        sense="==",
        carrier_attribute="AC",
    )

    status, cond = optimize(n, api, **kwargs)
    assert round(n.lines.eval("s_nom_opt * capital_cost").sum(), 2) == 1000

    # when only optimizing the first 10 snapshots the contraint must hold for
    # the 2020 period
    status, cond = optimize(n, api, n.snapshots[:10], **kwargs)
    assert round(n.lines.eval("s_nom_opt * capital_cost")["line-2020"].sum(), 2) == 1000

    n.global_constraints["investment_period"] = 2030
    status, cond = optimize(n, api, **kwargs)
    lines = n.lines.loc[["line-2020", "line-2030"]]
    assert round(lines.eval("s_nom_opt * capital_cost").sum(), 2) == 1000


@pytest.mark.parametrize("api", ["native", "linopy"])
def test_global_constraint_bus_tech_limit(n, api):
    n.add(
        "GlobalConstraint",
        "expansion_limit",
        type="tech_capacity_expansion_limit",
        constant=300,
        sense="==",
        carrier_attribute="gencarrier",
        investment_period=2020,
    )

    status, cond = optimize(n, api, **kwargs)
    assert round(n.generators.p_nom_opt[["gen1-2020", "gen2-2020"]], 1).sum() == 300

    n.global_constraints["bus"] = 1
    status, cond = optimize(n, api, **kwargs)
    assert n.generators.at["gen1-2020", "p_nom_opt"] == 300

    # make the constraint non-binding and check that the shadow price is zero
    n.global_constraints.sense = "<="
    status, cond = optimize(n, api, **kwargs)
    assert n.global_constraints.at["expansion_limit", "mu"] == 0


@pytest.mark.parametrize("api", ["linopy"])
def test_nominal_constraint_bus_carrier_expansion_limit(n, api):
    n.buses.at["1", "nom_max_gencarrier"] = 100
    status, cond = optimize(n, api, **kwargs)
    gen1s = [f"gen1-{period}" for period in n.investment_periods]
    assert round(n.generators.p_nom_opt[gen1s], 0).sum() == 100
    n.buses.drop(["nom_max_gencarrier"], inplace=True, axis=1)

    n.buses.at["1", "nom_max_gencarrier_2020"] = 100
    status, cond = optimize(n, api, **kwargs)
    assert n.generators.at["gen1-2020", "p_nom_opt"] == 100
    n.buses.drop(["nom_max_gencarrier_2020"], inplace=True, axis=1)

    # make the constraint non-binding and check that the shadow price is zero
    n.buses.at["1", "nom_min_gencarrier_2020"] = 100
    status, cond = optimize(n, api, **kwargs)
    assert (n.model.constraints["Bus-nom_min_gencarrier_2020"].dual).item() == 0


@pytest.mark.parametrize("api", MULTIINVEST_APIS)
def test_max_growth_constraint(n, api):
    # test generator grow limit
    gen_carrier = n.generators.carrier.unique()[0]
    n.carriers.at[gen_carrier, "max_growth"] = 218
    status, cond = optimize(n, api, **kwargs)
    assert all(n.generators.p_nom_opt.groupby(n.generators.build_year).sum() <= 218)


@pytest.mark.parametrize("api", ["linopy"])
def test_max_relative_growth_constraint(n, api):
    # test generator relative grow limit
    gen_carrier = n.generators.carrier.unique()[0]
    n.carriers.at[gen_carrier, "max_growth"] = 218
    n.carriers.at[gen_carrier, "max_relative_growth"] = 1.5
    status, cond = optimize(n, api, **kwargs)
    built_per_period = n.generators.p_nom_opt.groupby(n.generators.build_year).sum()
    assert all(built_per_period - built_per_period.shift(fill_value=0) * 1.5 <= 218)
back to top