import logging
from collections import namedtuple
from operator import itemgetter
from pathlib import Path
import numpy as np
import pandas as pd
from cronian.feasible_consumption import parse_flex_amount
from more_itertools import bucket
from annular.utils import read_csv_with_utc_timestamps
from .satellite_model import SatelliteModel
[docs]
logger = logging.getLogger(__name__)
[docs]
class SimpleMultiHourBiddingStrategy(SatelliteModel):
def __init__(
self,
demands_path: str,
forecasts_path: str,
floor_price: int = 0,
bid_margin: float = 0.05,
horizon_size: int = 48,
**kwargs,
):
"""Simple multi-hour bidding strategy to bid for the lowest expected price at the cheapest expected hour.
Demand is specified at its deadline. Any specified flexibility means it
can be satisfied in any of the `n` earlier timesteps.
Args:
demands_path (str): Path to csv file with demand values per timestamp,
with different flexibility as
separate columns named 'flex+N'.
forecasts_path (str): Path to csv file with electricity price
forecast information. If multiple forecasts columns are given,
each is used in parallel to determine the bid curves. If
only a single forecast is given, it will be adjusted using the
`bid_curve_resolution` parameter to generate multiple forecasts
in place.
floor_price (int): Minimum price to bid at, defaults to 0.
bid_margin (float): [0, ...) Amount of margin as a fraction to bid over the
lowest found price within the look-ahead window. Example: 0.05
is interpreted as bidding 5% on top of the lowest found price.
horizon_size (int): full length of the horizon to use for an optimization
iteration, i.e., bidding window + look ahead period, in number
of snapshots.
kwargs: Any other keyword arguments are passed to the initialization of
the base class.
"""
super().__init__(**kwargs)
[docs]
self._last_bids: list[Bid] = []
[docs]
self.cur_timestamp_idx = 0
[docs]
self.floor_price = floor_price
[docs]
self.bid_margin = bid_margin
[docs]
self.horizon_size = horizon_size
[docs]
self.demands = read_csv_with_utc_timestamps(Path(demands_path))
[docs]
self.forecasts = read_csv_with_utc_timestamps(Path(forecasts_path))
# Assumption: no flexibility beyond look-ahead horizon
assert max(parse_flex_amount(d) for d in self.demands.columns) + self.rolling_horizon_step <= self.horizon_size
[docs]
def determine_bids(self) -> pd.DataFrame:
"""Determine when in the next bidding window we bid, and at what price.
A bid is roughly determined as follows for each listed demand quantity:
- Find the time of the cheapest expected price in the bidding window,
i.e., the period for which bids have to be submitted.
- If the flexibility extends _beyond_ the current bidding window, find
the cheapest expected price within the available flexibility, and
place a bid at that price + margin.
- If there is no flexibility beyond the current bidding window, bid at
the ceiling price instead.
Returns:
A collection of bids covering the next bid window.
"""
logger.debug("Determining bids")
bidding_window = self._get_horizon(self.rolling_horizon_step)
if len(bidding_window) == 0:
# Return a valid-to-unpack empty dataframe since the run has ended.
return pd.DataFrame(
data=np.zeros((1, 2)),
columns=["quantity", "price"],
index=pd.MultiIndex.from_tuples(
[(1, 1, self.demands.index[-1])], names=["exclusive_group_id", "profile_block_id", "timestamp"]
),
)
horizon_forecast = self.forecasts.loc[self._get_horizon()]
bids = []
group_idx = AutoIncrement()
for demand_name, demand_column in self.demands.items():
if demand_name in {"base", "flex+0"}: # base case: bid at ceiling price
bids.extend(
Bid(group_idx(), 0, time, q, self.ceiling_price, demand_name, time_idx)
for time_idx, (time, q) in enumerate(demand_column.loc[bidding_window].items())
if q != 0
)
continue
flexibility = parse_flex_amount(demand_name)
# Look at any demand that may be satisfied within current bidding window
# I.e., rolling horizon step size + maximum amount of flexibility
demand_horizon = self._get_horizon(self.rolling_horizon_step + flexibility)
for time_idx, q in enumerate(demand_column.loc[demand_horizon]):
if q == 0:
continue
bid_time, bid_price = self._make_bid(time_idx, flexibility, horizon_forecast)
bids.append(Bid(group_idx(), 0, bid_time, q, bid_price, demand_name, time_idx))
# Remember which bids were made for when meeting demand later.
self._last_bids = bids
# Make sure there's at least a 0-bid for every timestamp, which don't have to be remembered
timestamps_with_bids = {bid.timestamp for bid in bids}
bids = bids + [
Bid(group_idx(), 0, bid_time, 0, 0, 0, 0)
for bid_time in bidding_window
if bid_time not in timestamps_with_bids
]
bids.sort(key=itemgetter(2)) # sort by timestamp
bids_table = pd.DataFrame.from_records(
bids, columns=Bid._fields, index=["exclusive_group_id", "profile_block_id", "timestamp"]
)
if self.output_path:
bids_csv_file_name = self.output_path / "bids.csv"
bids_table.to_csv(bids_csv_file_name, mode="a", header=not bids_csv_file_name.exists())
# Don't return any additional columns that are only used for internal bookkeeping
return bids_table[["quantity", "price"]]
[docs]
def meet_demand(self, market_price: np.ndarray | None, demand_met: np.ndarray | None) -> None:
"""Update the internal state to record the amount of demand that was met.
Ensures all demand met is removed from the 'demand yet to be satisfied',
and advances the internal 'clock' by one rolling horizon step.
Args:
market_price: Price of electricity as provided per timestep.
demand_met: Amount of demand that was met at the market price per timestep.
Raises:
AssertionError: if any of the provided demand is under- or over-used.
ValueError: if any mandatory demand has not been satisfied by the given demand.
"""
logger.debug("Meeting demand")
if demand_met is None:
return
demand_horizon = self._get_horizon()
# Group by timestamp of the bid
groups = bucket(self._last_bids, key=lambda x: x.timestamp)
for timestamp, price, demand in zip(demand_horizon, market_price, demand_met):
# Sort bids per timestamp from highest to lowest bid_price
group = sorted(groups[timestamp], key=lambda x: x.price, reverse=True)
if not group:
continue
for bid in group:
if abs(demand) <= 1e-8 or bid.price < price:
break
self.demands.at[demand_horizon[bid.time_idx], bid.demand_name] -= min(demand, bid.quantity)
demand -= min(demand, bid.quantity)
assert abs(demand) <= 1e-8
# Confirm that all mandatory demand has been met
if self.demands.loc[demand_horizon[: self.rolling_horizon_step]].sum(axis=1).sum() > 0:
raise ValueError("Demand with deadline in this bidding window has not been met")
if self.output_path:
# Filter for non-zero supplied demand
horizon = self._get_horizon()
supplied_demand = [
(t, price, demand) for t, price, demand in zip(horizon, market_price, demand_met) if demand > 1e-8
]
# Save dispatch output
dispatch_csv_file_name = self.output_path / "dispatch.csv"
pd.DataFrame.from_records(supplied_demand, columns=["timestamp", "price", "demand"]).to_csv(
dispatch_csv_file_name, mode="a", header=not dispatch_csv_file_name.exists(), index=False
)
self.cur_timestamp_idx += self.rolling_horizon_step
[docs]
def _get_horizon(self, length: int = None) -> pd.Index:
"""Select the relevant horizon index values at the current timestamp index.
Args:
length: length of the desired horizon in number of timestamps.
Uses `self.horizon_size` when `None` is given.
Returns:
Pandas Index object of the selected horizon values.
"""
if length is None:
length = self.horizon_size
end_idx = self.cur_timestamp_idx + length
horizon = pd.Index(self.demands.index[self.cur_timestamp_idx : end_idx])
return horizon
[docs]
def _make_bid(self, time_idx: int, flexibility: int, prices_forecast: pd.DataFrame) -> tuple[pd.Timestamp, float]:
"""Give bid time and price for demand at given time and flexibility.
Args:
time_idx: integer index of the demand that could be fulfilled within
this bidding window
flexibility: how many timesteps early this demand could be satisfied
prices_forecast: forecast of electricity prices for the whole
horizon in which the demand might be satisfied.
Returns:
tuple: Expected best timestamp for the bid (always
within current bidding window) and the price for the bid.
"""
# What is the earliest time at which can be bid for this demand?
earliest_bid_index = max(0, time_idx - flexibility)
# Determine bid_time:
if time_idx <= self.rolling_horizon_step:
bid_price = self.ceiling_price # bids for demand within bidding window are at ceiling price
else:
# What is the minimum price within the window in which we can consume this demand?
bid_price = prices_forecast.iloc[earliest_bid_index : time_idx + 1].min().item()
bid_price *= 1 + self.bid_margin
# Determine bid_price:
# At what time-index in the *current rolling horizon window* are prices cheapest?
tmp = prices_forecast.iloc[earliest_bid_index : self.rolling_horizon_step]
bid_time_idx = tmp["e_price"].argmin()
# Get the actual timestamp value for use in the bids table.
bid_time = prices_forecast.index[earliest_bid_index + bid_time_idx]
return bid_time, bid_price
[docs]
[docs]
[docs]
[docs]
[docs]
[docs]
[docs]
[docs]
Bid = namedtuple(
"Bid",
[
# Bids table index
"exclusive_group_id",
"profile_block_id",
"timestamp",
# Bids table values
"quantity",
"price",
# Bookkeeping: which original demand did this bid come from?
"demand_name",
"time_idx",
],
)
[docs]
class AutoIncrement:
def __init__(self):
"""An auto-incrementing index counter starting at 0."""
[docs]
self.value = -1 # value is incremented before use, so -1 becomes 0 at first call.
[docs]
def __call__(self):
"""When called, increment the value by 1 and return."""
self.value += 1
return self.value