Source code for annular.satellite_model.multi_profile_strategy

import logging
from collections.abc import Container
from itertools import product
from numbers import Real
from pathlib import Path

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import pyomo.environ as pyo
import yaml
from cronian.base_model import create_optimization_model
from cronian.prosumers import build_prosumer_model
from cronian.results import extract_prosumer_dispatch
from cronian.validate import validate_prosumer

from annular.market_model import ALL_BID_COLUMNS, BID_COLUMN_NAMES, BID_IDX_COLUMNS
from annular.optimization_model_manager import OptimizationModelManager
from annular.tariffs import TariffManager
from annular.utils import Sense, patch_expected_price, read_csv_with_utc_timestamps
from .satellite_model import SatelliteModel

[docs] logger = logging.getLogger(__name__)
[docs] class MultiProfileBiddingStrategy(SatelliteModel): def __init__( self, demands: pd.DataFrame, base_forecast: pd.DataFrame, carrier_prices: pd.DataFrame, cronian_config: dict, tariff_folder: Path | str | None = None, tariff_categories: dict[str, str | Real] | None = None, floor_price: int = 0, horizon_size: int = 48, cronian_storage_model: str = "simple", forecast_scaling_factor: float = 1.5, **kwargs, ): """A 24-hour bidding strategy to bid using multiple profiles based on multiple scenarios. See Also: annular.tariffs.TariffManager Args: demands: Demand values per timestamp, with different flexibility as separate columns named 'flex+N'. base_forecast: Electricity price forecast based on which multiple forecast scenarios will be generated by adjusting with the `forecast_scaling_factor`. carrier_prices: Price timeseries for any energy carrier other than electricity such as methane, hydrogen, etc. cronian_config: Cronian configuration describing the prosumer's demand and assets for building its optimization model. tariff_folder: Path to the folder with tariff data; passed to a TariffManager. tariff_categories: Dictionary that indicates the tariff category the satellite is subject to. Keys are categories, and values are the levels in that category. floor_price: Minimum price to bid at, defaults to 0. Used as price for supply bids. horizon_size: Full length of the horizon to use for an optimization iteration, i.e., bidding window + look ahead period, in number of snapshots. forecast_scaling_factor: Factor to scale up/down the electricity price forecast from the base scenario to create multiple forecast scenarios. cronian_storage_model: Type of storage model (`simple` or `complex`) to use in the optimization model if prosumer has storage assets. kwargs: Any other keyword arguments are passed to the initialization of the base class. """ super().__init__(**kwargs) if forecast_scaling_factor <= 0: raise ValueError("`forecast_scaling_factor` cannot be less than or equal to 0.") if horizon_size < 24: raise ValueError("`horizon_size` must be at least 24") if floor_price > self.ceiling_price: raise ValueError( f"`floor_price` may not be higher than ceiling price: {floor_price} > {self.ceiling_price}" ) # Explicitly cast to float to prevent pandas type warnings if originally given as int base_forecast["e_price"] = base_forecast["e_price"].astype(float)
[docs] self.base_forecast = base_forecast
[docs] self.demands = demands
[docs] self.carrier_prices = carrier_prices
[docs] self.cronian_config = cronian_config
[docs] self.floor_price = floor_price
[docs] self.horizon_size = horizon_size
[docs] self.forecast_scaling_factor = forecast_scaling_factor
[docs] self.storage_model = cronian_storage_model
validate_prosumer(self.cronian_config, self.demands)
[docs] self.cur_timestamp_idx = 0
[docs] self.manager = OptimizationModelManager(solver=pyo.SolverFactory("gurobi"))
if tariff_folder and tariff_categories: self.tariffs = TariffManager.from_folder(Path(tariff_folder), tariff_categories) elif tariff_folder and not tariff_categories: raise ValueError("`tariff_categories` must be specified if `tariff_folder` is given.") else: self.tariffs = TariffManager() # Initialize max electricity withdrawal that is used to model capacity tariffs over rolling horizons.
[docs] self.max_electricity_withdrawal_so_far = 0.0
@classmethod
[docs] def from_data_files( cls, demands_path: Path | str, forecasts_path: Path | str, carrier_prices_path: Path | str, cronian_config_path: Path | str, *args, **kwargs, ): """Initialize a MultiProfileBiddingStrategy by loading data and configuration from files. Args: demands_path: Path to csv file with demand values per timestamp, with different flexibility as separate columns named 'flex+N'. forecasts_path: Path to csv file containing the electricity price forecast based on which multiple forecast scenarios will be generated by adjusting with the `forecast_scaling_factor`. carrier_prices_path: Path to csv file containing price timeseries for any energy carrier other than electricity such as methane, hydrogen, etc. cronian_config_path: Path to the Cronian configuration describing the prosumer's demand and assets for building its optimization model. *args: All further arguments are passed as-is to `MultiProfileBiddingStrategy.__init__()` **kwargs: All further keyword arguments are passed as-is to `MultiProfileBiddingStrategy.__init__()` """ with open(cronian_config_path) as f: cronian_config = yaml.safe_load(f)["Prosumers"] demands = read_csv_with_utc_timestamps(demands_path) base_forecast = read_csv_with_utc_timestamps(forecasts_path) carrier_prices = read_csv_with_utc_timestamps(carrier_prices_path) return cls(demands, base_forecast, carrier_prices, cronian_config, *args, **kwargs)
@property
[docs] def bidding_window(self) -> pd.Index: """Returns current bidding window as a pandas Index of timestamps.""" return self._get_horizon(length=24)
[docs] def determine_bids(self, select_scenarios: Container[int] | None = None) -> pd.DataFrame: """Determine bids based on multiple forecast price scenarios. Different forecast price scenarios are made as variation of the given electricity_price_forecast. An optimization is run based on each of these scenarios, to determine the corresponding demand profile. The set of these profiles is submitted in a single exclusive group, all bid for at ceiling price. Returns: A collection of bids covering the current bidding window. """ if len(self.bidding_window) == 0: # Return a valid-to-unpack empty dataframe since the run has ended. return pd.DataFrame( data=np.zeros((1, len(BID_COLUMN_NAMES))), columns=BID_COLUMN_NAMES, index=pd.MultiIndex.from_tuples( [(0, 0, Sense.DEMAND, self.demands.index[-1])], names=BID_IDX_COLUMNS[1:] ), ) horizon = self._get_horizon() self.manager.set_model(self._create_model(horizon)) forecast_scenarios = self.make_forecast_scenarios( self.base_forecast.loc[self.bidding_window], self.forecast_scaling_factor, select_scenarios ) power_profiles = { scenario_name: self.determine_power_profile(scenario_forecast, scenario_name == "base_forecast") for scenario_name, scenario_forecast in forecast_scenarios.items() } bids_table = self._make_bids(power_profiles) if self.output_path: bids_csv_file_name = self.output_path / "bids.csv" bids_table.to_csv(bids_csv_file_name, mode="a", header=not bids_csv_file_name.exists()) return bids_table
[docs] def meet_demand(self, market_price: np.ndarray | None, demand_met: np.ndarray | None) -> None: """Process the amount of demand that was met. Solve the cronian model again, now with the electric power fixed to the values given from the central market. Record dispatch of assets and update the state of charge for any storage assets. Args: market_price: Price of electricity as provided per timestep. demand_met: Amount of demand that was met at the market price per timestep. """ if demand_met is None and market_price is None: return # Shortcut for initialization elif demand_met is None or market_price is None: raise ValueError("Both `demand_met` and `market_price` must be specified.") self.manager.set_model(self._create_model(self.bidding_window)) instance_data = {"e_price": dict(zip(self.bidding_window, market_price))} self.manager.instantiate(data=instance_data) assert self.manager.model_instance is not None # Force type of `model_instance` electric_power = getattr(self.manager.model_instance, f"{self.cronian_config['id']}_electric_power") for t, demand in zip(self.bidding_window, demand_met): electric_power[t].fix(-demand) # Demand is positive in the central market model, but negative in Cronian. self.manager.solve() if self.output_path is not None and self.manager.model_instance is not None: dispatch_csv_file_name = self.output_path / "dispatch.csv" dispatch_df = extract_prosumer_dispatch(self.manager.model_instance, self.cronian_config) dispatch_df.to_csv(dispatch_csv_file_name, mode="a", header=not dispatch_csv_file_name.exists()) self._update_internal_state(self.bidding_window) if "capacity_charge_yearly" in self.tariffs: self.max_electricity_withdrawal_so_far = pyo.value(self.manager.model_instance.max_electricity_withdrawal) # Advance the current timestamp index by 24 hours for the next bidding period self.cur_timestamp_idx += 24
[docs] def _update_internal_state(self, horizon: pd.Index) -> None: """Store intermediate values to carry over to the next rolling horizon. Specifically, these values are the energy levels of storage assets if any. Args: horizon: Current optimization horizon. """ model_instance = self.manager.model_instance for name, asset in self.cronian_config["assets"].items(): if asset["behavior_type"] != "storage": continue storage_energy = getattr(model_instance, f"{self.cronian_config['id']}_{name}_energy") # Update "initial energy" in place with the last value, as it will be used automatically at model creation. storage_final_energy = storage_energy[horizon[-1]].value asset["initial_energy"] = storage_final_energy if storage_final_energy is not None else 0
[docs] def _get_horizon(self, length: int | None = None) -> pd.Index: """Select the relevant horizon index values at the current timestamp index. Args: length: length of the desired horizon in number of timestamps. Uses `self.horizon_size` when `None` is given. Returns: Pandas Index object of the selected horizon values. """ if length is None: length = self.horizon_size end_idx = self.cur_timestamp_idx + length horizon = pd.Index(self.demands.index[self.cur_timestamp_idx : end_idx]) return horizon
@staticmethod
[docs] def make_forecast_scenarios( base_forecast: pd.DataFrame, forecast_scaling_factor: float, select: Container[int] | None = None ) -> dict[str, pd.DataFrame]: """Create different scenarios from a base forecast. The different scenarios are created by either scaling the base forecast according to some profile, or by scaling a fixed profile according to a numerical property of the base forecast. The following scenarios are created: - scale_up: `base_forecast_values * forecast_scaling_factor` - scale_down: `base_forecast_values / forecast_scaling_factor` - scale_up_increasing: `base_forecast_values * linear_increase` - scale_down_increasing: `base_forecast_values * linear_decrease` - peak_mid: `base_forecast_values * peak_shape_scaling` - valley_mid: `base_forecast_values * valley_shape_scaling` - day_period_boost: `base_forecast_values * boost_day_period` - morning_evening_boost: `base_forecast_values * boost_morning_evening` - base_min_day_period_boost: `base_forecast_min * boost_day_period` - base_min_morning_evening_boost: `base_forecast_min * boost_morning_evening` - base_max_day_period_boost: `base_forecast_max * boost_day_period` - base_max_morning_evening_boost: `base_forecast_max * boost_morning_evening` Args: base_forecast: Electricity price forecast used as a base for generating different scenarios of electricity price variations. forecast_scaling_factor: Factor to scale the base forecast to create different scenarios. select: optional collection of index integers for which scenarios to select from the provided collection. Returns: Electricity price forecast scenarios. Raises: ValueError: If the length of the forecast is not 24. """ forecast_length = len(base_forecast) if forecast_length != 24: raise ValueError("Forecast length must be 24 to represent a full day with hourly resolution.") forecast_scenarios = {"base_forecast": base_forecast.copy()} base_forecast_values: np.ndarray = base_forecast["e_price"].to_numpy() # 1. Linear increase and decrease linear_increase: np.ndarray = np.linspace(1.0, forecast_scaling_factor, forecast_length) linear_decrease: np.ndarray = np.linspace(forecast_scaling_factor, 1.0, forecast_length) # 2. Peak and valley scaling using Gaussian-like functions with slope controlled by `forecast_scaling_factor`. x: np.ndarray = np.linspace(-1, 1, forecast_length) peak_shape_scaling = 1 + np.exp(-forecast_scaling_factor * x**2) valley_shape_scaling = 1 - np.exp(-forecast_scaling_factor * x**2) # 3. Scale up specific periods that can capture different electricity usage patterns. time_index = np.arange(forecast_length) day_period = (time_index >= 8) & (time_index < 19) # Between 08:00 and 18:59 boost_day_period, boost_morning_evening = np.ones((2, forecast_length)) # Split a 2xN into two separate rows boost_day_period[day_period] = forecast_scaling_factor boost_morning_evening[~day_period] = forecast_scaling_factor base_forecast_min = np.full(forecast_length, base_forecast_values.min()) base_forecast_max = np.full(forecast_length, base_forecast_values.max()) scenario_variants = { "scale_up": base_forecast_values * forecast_scaling_factor, "scale_down": base_forecast_values / forecast_scaling_factor, "scale_up_increasing": base_forecast_values * linear_increase, "scale_down_increasing": base_forecast_values * linear_decrease, "peak_mid": base_forecast_values * peak_shape_scaling, "valley_mid": base_forecast_values * valley_shape_scaling, "day_period_boost": base_forecast_values * boost_day_period, "morning_evening_boost": base_forecast_values * boost_morning_evening, "base_min_day_period_boost": base_forecast_min * boost_day_period, "base_min_morning_evening_boost": base_forecast_min * boost_morning_evening, "base_max_day_period_boost": base_forecast_max * boost_day_period, "base_max_morning_evening_boost": base_forecast_max * boost_morning_evening, } select = range(len(scenario_variants)) if select is None else select for idx, (scenario_name, electricity_price) in enumerate(scenario_variants.items()): if idx not in select: continue # skip un-selected scenarios scenario_forecast = base_forecast.copy() # Ensure the electricity price column is of float type to avoid Pandas warnings. scenario_forecast["e_price"] = scenario_forecast["e_price"].astype(float) scenario_forecast.loc[:, "e_price"] = electricity_price forecast_scenarios[scenario_name] = scenario_forecast return forecast_scenarios
@staticmethod
[docs] def plot_forecast_scenarios(forecast_scenarios: dict[str, pd.DataFrame], output_path) -> None: """Plot the electricity price forecast scenarios. Args: forecast_scenarios: Dictionary of forecast scenarios. output_path: Path where the plot will be saved. """ plt.figure(figsize=(10, 6)) for scenario_name, forecast_df in forecast_scenarios.items(): plt.plot(forecast_df.index, forecast_df["e_price"], label=scenario_name) plt.title("Electricity price forecast") plt.xlabel("Timestamps") plt.ylabel("Electricity Price (EUR/MWh)") plt.legend(title="Forecast Scenarios", loc="upper right") plt.grid() plt.savefig(output_path / "forecast_scenarios.png", dpi=300, bbox_inches="tight")
[docs] def determine_power_profile(self, scenario_forecast: pd.DataFrame, no_supply_to_grid: bool = False) -> list[float]: """Determine electricity consumption/generation profile based on a given scenario. Args: scenario_forecast: DataFrame containing forecasted electricity prices. no_supply_to_grid: Boolean to enforce a demand-only connection to the grid. Returns: Optimized electricity power profile for the given forecast price. """ horizon = self._get_horizon() expected_price = patch_expected_price(self.base_forecast, horizon, scenario_forecast) instance_data = {"e_price": dict(zip(horizon, expected_price))} instance_data |= self._get_tariff_initialization() self.manager.instantiate(data=instance_data) assert self.manager.model_instance is not None # Force type of `model_instance` if no_supply_to_grid: electric_power = getattr(self.manager.model_instance, f"{self.cronian_config['id']}_electric_power") @self.manager.model_instance.Constraint(self.manager.model_instance.time) def no_supply_to_grid_constraint(model, t): """Add a constraint to force the grid connection to be one-way only.""" return electric_power[t] <= 0 self.manager.solve() electric_power = getattr(self.manager.model_instance, f"{self.cronian_config['id']}_electric_power") # In the model instance defined through cronian, `electric_power` is positive when electricity is supplied to # the grid, and negative when electricity is consumed from the grid. In the market model, demand bids are # positive and supply bids are negative. Thus, we need to negate the quantities when passing them from the # model_instance to the bids table. power_profile = [-electric_power[t].value for t in list(self.manager.model_instance.time)[:24]] return power_profile
[docs] def _get_tariff_initialization(self) -> dict[str, dict]: """Prepare initialization for tariff-related parameters and constraints. Returns: Nested dictionary with initialization information for Pyomo model with tariff-related parameters and constraints. Only contains the information for tariffs that are present in the TariffManager. """ tariff_data = {} if "capacity_charge_yearly" in self.tariffs and "contracted_transport_limit" in self.tariffs: logger.info("Capacity Charge initialized.") capacity_tariff = self.tariffs.fetch_value(name="capacity_charge_yearly") tariff_data["capacity_tariff"] = {None: capacity_tariff} contracted_transport_limit = self.tariffs.fetch_value(name="contracted_transport_limit") tariff_data["contracted_transport_limit"] = {None: contracted_transport_limit} if "volumetric" in self.tariffs: logger.info("Volumetric tariff initialized.") horizon = self._get_horizon() tou_volumetric_tariff = self.tariffs.fetch_timeseries(name="volumetric", timestamps=horizon) tariff_data["tou_volumetric_tariff"] = dict(zip(horizon, tou_volumetric_tariff)) return tariff_data
[docs] def _make_bids(self, profiles: dict[str, list[float]]) -> pd.DataFrame: """Create a bids table to submit given profiles as an exclusive group. Actual bid entries are only made for timestamps with non-zero demand. Args: profiles: dictionary of lists of demand values for each timestamp. Returns: pd.DataFrame table of the constructed bids. """ bid_tuples: list[tuple] = [] for idx, profile in enumerate(profiles.values()): bid_tuples.extend( (1, idx, Sense.DEMAND, time, quantity, 1, self.ceiling_price) for time, quantity in zip(self.bidding_window, profile) if quantity > 0 ) bid_tuples.extend( (1, idx, Sense.SUPPLY, time, -quantity, 1, self.floor_price) for time, quantity in zip(self.bidding_window, profile) if quantity < 0 ) # Ensure at least one bid is submitted if len(bid_tuples) == 0: bid_tuples.append((0, 0, 1, self.bidding_window[0], 0, 0, 0)) return pd.DataFrame.from_records(bid_tuples, columns=ALL_BID_COLUMNS[1:], index=BID_IDX_COLUMNS[1:])
[docs] def _create_model(self, horizon: pd.Index) -> pyo.AbstractModel: """Create a model instance using `cronian`. Args: horizon: Timestamps spanning the optimization horizon currently of interest. Returns: Abstract model for this satellite in the current optimization horizon. """ model = create_optimization_model(None, self.carrier_prices.loc[horizon], len(horizon)) model.name = f"Optimization model of satellite--{self.cronian_config['id']}" horizon_demands = self.demands.loc[horizon] model = build_prosumer_model(model, self.cronian_config, horizon_demands, len(horizon), self.storage_model) model.e_price = pyo.Param(model.time, mutable=True, default=0, domain=pyo.Reals) if "capacity_charge_yearly" in self.tariffs: model = self._define_max_electricity_withdrawal(model) model = self._add_contracted_capacity_limit_constraint(model) model = self._add_cost_objective(model) return model
[docs] def _add_cost_objective(self, model: pyo.AbstractModel) -> pyo.AbstractModel: """Create a cost minimization or revenue maximization objective. Depending on the sign of the `electric_power` variable, the objective minimizes electricity consumption cost or maximizes revenues from electricity generated locally. It also minimizes the cost of consuming other externally priced energy carriers such as methane, biomass, etc. Args: model: Pyomo model to add the cost objective to. Returns: The given model object, now with `cost_objective` added. """ agent_id = self.cronian_config["id"] if "capacity_charge_yearly" in self.tariffs: model.capacity_tariff = pyo.Param() if "volumetric" in self.tariffs: model.tou_volumetric_tariff = pyo.Param(model.time) def prosumer_cost_rule(model): """Construct objective function.""" # Electricity cost (Prosumer electric power is negative for consumption, positive for production.) e_cost = -1 * sum(model.e_price[t] * getattr(model, f"{agent_id}_electric_power")[t] for t in model.time) # Network tariff cost tariff_cost = 0 if "capacity_charge_yearly" in self.tariffs: tariff_cost += model.max_electricity_withdrawal * model.capacity_tariff if "volumetric" in self.tariffs: tariff_cost += sum( model.tou_volumetric_tariff[t] * getattr(model, f"{agent_id}_total_electricity_withdrawn_from_grid")[t] for t in model.time ) # Consumption cost for externally priced carriers (e.g., methane, hydrogen, biomass, etc.) other_carriers_cost = 0 assets = self.cronian_config.get("assets", {}) for energy_carrier, (asset_name, asset_data) in product(self.carrier_prices.columns, assets.items()): asset_behavior = asset_data.get("behavior_type") asset_inputs = asset_data["input"] if isinstance(asset_data["input"], list) else [asset_data["input"]] if energy_carrier not in map(str.lower, asset_inputs) or asset_behavior == "storage": # Cost only applies to external inputs. continue other_carriers_cost += sum( getattr(model, f"{agent_id}_{asset_name}_{energy_carrier}_consumption")[t] * getattr(model, f"{energy_carrier}_price")[t] for t in model.time ) return e_cost + other_carriers_cost + tariff_cost model.cost_objective = pyo.Objective(rule=prosumer_cost_rule, sense=pyo.minimize) return model
[docs] def _define_max_electricity_withdrawal(self, model: pyo.AbstractModel) -> pyo.AbstractModel: """Define maximum electricity withdrawal for modeling capacity tariffs. This function defines the maximum electricity withdrawal in the current optimization horizon to which a capacity tariff charge is applied. Directly computing max withdrawal as `max (withdrawals in current horizon, previous maximum withdrawal)` will lead to a nonlinear optimization problem due to the max() operator. We linearize this by introducing a variable representing the maximum power withdrawal and enforcing the following constraints on it: (1) It must be greater than or equal to the instantaneous withdrawals in the current horizon. (2) It must be greater than or equal to the recorded maximum withdrawal from previous horizons. Args: model: Pyomo model to add the constraint to. Returns: The given model object, now with `capacity_tariff_constraint` added. """ # Introduce a new variable for maximum withdrawal model.max_electricity_withdrawal = pyo.Var(within=pyo.NonNegativeReals) # Add constraints to link the max_electricity_withdrawal variable to actual withdrawal in the current horizon def max_withdrawal_in_actual_rule(model, t): return ( model.max_electricity_withdrawal >= (getattr(model, f"{self.cronian_config['id']}_total_electricity_withdrawn_from_grid")[t]) ) model.max_withdrawal_in_actual_constraint = pyo.Constraint(model.time, rule=max_withdrawal_in_actual_rule) # Add constraints to link the max_electricity_withdrawal variable to previous withdrawal in earlier horizons def max_withdrawal_in_previous_rule(model): return model.max_electricity_withdrawal >= self.max_electricity_withdrawal_so_far model.max_withdrawal_in_previous_constraint = pyo.Constraint(rule=max_withdrawal_in_previous_rule) return model
[docs] def _add_contracted_capacity_limit_constraint(self, model: pyo.AbstractModel) -> pyo.AbstractModel: """Constrain max electricity withdrawal based on contracted transport limit. NOTE: We assume that the unit of the contracted_transport_limit is MW. Args: model: Pyomo model to add the constraint to. Returns: The given model object, now with `max_withdrawal_in_previous_constraint` added. """ model.contracted_transport_limit = pyo.Param() def max_withdrawal_limit_rule(model, t): return ( getattr(model, f"{self.cronian_config['id']}_total_electricity_withdrawn_from_grid")[t] <= model.contracted_transport_limit ) model.max_withdrawal_limit_constraint = pyo.Constraint(model.time, rule=max_withdrawal_limit_rule) return model