import logging
from itertools import groupby, product
from operator import itemgetter
from pathlib import Path
from typing import Iterable
import numpy as np
import pandas as pd
import pyomo.environ as pyo
from cronian.generators import add_all_generators
from annular.utils import Sense, negate_supply_bids_for_summarize
[docs]
logger = logging.getLogger(__name__)
[docs]
ALL_BID_COLUMNS: tuple = (
"satellite",
"exclusive_group_id",
"profile_block_id",
"sense",
"timestamp",
"quantity",
"acceptance_ratio",
"price",
)
[docs]
BID_IDX_COLUMNS = ALL_BID_COLUMNS[:5]
[docs]
BID_COLUMN_NAMES = ALL_BID_COLUMNS[5:]
[docs]
def create_market_model(
satellite_bids: pd.DataFrame,
generator_configs: dict[str, dict],
timeseries_data: pd.DataFrame,
snapshots: Iterable[pd.Timestamp],
) -> pyo.AbstractModel:
"""Create the central market clearing model.
Args:
satellite_bids: MultiIndex DataFrame of bids for all satellites.
generator_configs: dict of configurations defining of all generators.
timeseries_data: DataFrame containing all timeseries_data.
snapshots: snapshots/timesteps for which the market model is created.
Returns:
Pyomo model representing the market-clearing optimization problem.
"""
model = pyo.AbstractModel(name="Market-model")
model.time = pyo.Set(initialize=snapshots, ordered=True)
# Supply
add_all_generators(model, generator_configs, timeseries_data)
create_generation_cost_expression(model)
# Demand
encode_block_bids_into_model(model, satellite_bids)
constrain_power_balance(model)
# Objective
set_objective_as_social_welfare_maximization(model)
return model
[docs]
def encode_block_bids_into_model(model: pyo.Model, satellite_bids: pd.DataFrame) -> None:
"""Create gross surplus expression and set mutual exclusivity constraint.
This takes bids in the following format:
+-----------+--------------------+------------------+-------+-----------+----------+------------------+-------+
| satellite | exclusive_group_id | profile_block_id | sense | timestamp | quantity | acceptance_ratio | price |
+-----------+--------------------+------------------+-------+-----------+----------+------------------+-------+
| ... | ... | ... | ... | ... | ... | ... | ... |
+-----------+--------------------+------------------+-------+-----------+----------+------------------+-------+
where:
- `satellite`: indicates which satellite this bid originates from
- `exclusive_group_id`: ID of which exclusive group this bid belongs to
- `profile_block_id`: ID of which profile block this bid belongs to
- `sense`: indicates whether the bid is a demand or a supply bid
- `timestamp`: timestamp for this bid
- `quantity`: quantity for this bid
- `acceptance_ratio`: minimum acceptance ratio between 0 and 1
- `price`: price for this bid
and (satellite, exclusive_group_id, profile_block_id, sense, timestamp) are its Index.
Multiple bids sharing the same `profile_block_id` will be encoded to be met
together, i.e., all-or-none.
If multiple profiles share the same `exclusive_group_id`, at most one of
them will be satisfied.
Every bid must have a `profile_block_id` and `exclusive_group_id`. If a bid
does not make use of profile block or exclusive group functionality, the
`exclusive_group_id` must be unique, while `profile_block_id` can be any value.
Args:
model: Pyomo model to which the gross surplus expression is added.
satellite_bids: MultiIndex DataFrame of bids for all satellites.
"""
# Index bid by `(satellite, group, profile, time)`
model.bid_idx = pyo.Set(initialize=satellite_bids.index.values)
# Each satellite may have different numbers of groups,
# so index each group explicitly by `(satellite, group)`
unique_groups = satellite_bids.index.droplevel(["profile_block_id", "sense", "timestamp"]).unique()
model.exclusive_group = pyo.Set(initialize=unique_groups)
# Each group may have a different number of profiles,
# so index each profile explicitly by `(satellite, group, profile)`
unique_profiles = satellite_bids.index.droplevel(["sense", "timestamp"]).unique()
model.profile = pyo.Set(initialize=unique_profiles)
# To efficiently only have constraints for each existing profile index, record which profiles exist in a group.
profiles_per_group = {group: list(profiles) for group, profiles in groupby(unique_profiles, itemgetter(0, 1))}
# Set quantities as parameter in the model
model.quantity = pyo.Param(model.bid_idx, initialize=satellite_bids["quantity"])
model.bid_price = pyo.Param(model.bid_idx, initialize=satellite_bids["price"])
# Create binary decision variables for all (possible) profiles within exclusive groups.
model.profile_choice = pyo.Var(model.profile, domain=pyo.Binary)
# All profiles within an exclusive group are mutually exclusive: at most one can be active.
@model.Constraint(model.exclusive_group)
def mutual_exclusivity(model, satellite, group):
return pyo.quicksum(model.profile_choice[profile] for profile in profiles_per_group[(satellite, group)]) <= 1
# Intermediate: demand met expression
@model.Expression(model.bid_idx)
def demand_met(model, satellite, group, profile, sense, t):
if sense == Sense.DEMAND:
return model.quantity[satellite, group, profile, sense, t] * model.profile_choice[satellite, group, profile]
return 0
# Intermediate: supply used expression
@model.Expression(model.bid_idx)
def supply_used(model, satellite, group, profile, sense, t):
if sense == Sense.SUPPLY:
return model.quantity[satellite, group, profile, sense, t] * model.profile_choice[satellite, group, profile]
return 0
# Define gross surplus, i.e., total 'value' of all dispatched power from demand bids.
@model.Expression()
def gross_surplus(model):
return pyo.quicksum(
quantity
* model.bid_price[satellite, group, profile, sense, t]
* model.profile_choice[satellite, group, profile]
for (satellite, group, profile, sense, t), quantity in satellite_bids["quantity"].items()
if sense == Sense.DEMAND
)
@model.Expression()
def supply_bids_cost(model):
return pyo.quicksum(
quantity
* model.bid_price[satellite, group, profile, sense, t]
* model.profile_choice[satellite, group, profile]
for (satellite, group, profile, sense, t), quantity in satellite_bids["quantity"].items()
if sense == Sense.SUPPLY
)
[docs]
def constrain_power_balance(model):
"""Constrain that total demand must equal total supply at all times.
Assumes `model.gen_power`, `model.demand_met`, and `model.supply_used` have been defined.
Args:
model: Pyomo model to which the power balance constraint is added.
"""
@model.Constraint(model.time)
def power_balance(model, time):
generation_total = pyo.quicksum(model.gen_power[gen, time] for gen in model.gens)
supply_used_total = pyo.quicksum(
model.supply_used[satellite, group, profile, sense, t]
for satellite, group, profile, sense, t in model.bid_idx
if t == time
)
demand_met_total = pyo.quicksum(
model.demand_met[satellite, group, profile, sense, t]
for satellite, group, profile, sense, t in model.bid_idx
if t == time
)
constraint_result = (generation_total + supply_used_total) == demand_met_total
# if all parts are numeric, the constraint result is not the expected Pyomo object
if constraint_result is True:
return pyo.Constraint.Feasible
return constraint_result
[docs]
def create_generation_cost_expression(model: pyo.AbstractModel) -> None:
"""Create generation cost expression to be used in the objective function.
Args:
model: Pyomo model to which the generation cost expression is added.
"""
@model.Expression()
def generators_cost(model):
quadratic_cost = pyo.quicksum(
(model.gen_marginal_cost_quadratic[g]) * model.gen_power[g, t] ** 2
for g in model.gens
for t in model.time
if model.gen_marginal_cost_quadratic[g] != 0 # only add if nonzero
)
linear_cost = pyo.quicksum(
model.gen_marginal_cost_linear[g] * model.gen_power[g, t]
for g in model.gens
for t in model.time
if model.gen_marginal_cost_linear[g] != 0 # only add if nonzero
)
return quadratic_cost + linear_cost
[docs]
def set_objective_as_social_welfare_maximization(model: pyo.AbstractModel) -> None:
"""Set the objective function as social welfare maximization.
Social welfare is defined as: gross surplus - generation cost.
Args:
model: Pyomo model to which the objective is added.
"""
@model.Objective(sense=pyo.maximize)
def objective(model):
"""Social Welfare Maximization."""
return model.gross_surplus - (model.generators_cost + model.supply_bids_cost)
[docs]
def get_market_clearing_price_as_dual(model_instance: pyo.ConcreteModel) -> pd.DataFrame:
"""Get market price as the dual of the power balance constraint.
Note: Market price might be determined by satellite bids.
Args:
model_instance: Solved pyomo instance of the market model (LP)
Returns:
DataFrame representing the market clearing price for each timestamp.
"""
# Note: duals are negative for maximization problems. So we multiply by -1 to get the correct price.
return pd.DataFrame(
{"market_price": [-1 * model_instance.dual[model_instance.power_balance[t]] for t in model_instance.time]},
index=pd.Index(model_instance.time, name="timestamp"),
)
[docs]
def fix_integer_variables(model_instance: pyo.Model) -> pyo.ConcreteModel:
"""Fix all integer variables of the solved model instance to their current values.
Args:
model_instance: Solved pyomo instance of the market model (MILP).
Returns:
Pyomo model instance with integer/binary variables fixed to their
current values from the solved MILP instance (making it an LP).
"""
model_instance = model_instance.clone()
for model_var in model_instance.component_data_objects(pyo.Var, active=True):
if not model_var.is_continuous():
model_var.fixed = True
return model_instance
[docs]
def run_market_model(model: pyo.AbstractModel, output_path: Path) -> tuple[np.ndarray, np.ndarray]:
"""Run market model and return market_price and scheduled_bids.
Args:
model: Pyomo model to run/solve.
output_path: path to save the results to.
Returns:
tuple: market clearing price and bids scheduled by the market.
Raises:
RuntimeError: if the optimization problem is infeasible or unbounded.
"""
model_instance_milp = model.create_instance()
solver = pyo.SolverFactory("gurobi")
results = solver.solve(model_instance_milp, tee=False)
logger.info("Accepted bids:")
for profile_idx in model_instance_milp.profile:
if pyo.value(model_instance_milp.profile_choice[profile_idx]) == 1:
logger.info("* %s", profile_idx)
if results.solver.termination_condition != pyo.TerminationCondition.optimal:
raise RuntimeError(f"Solver did not converge. Termination condition: {results.solver.termination_condition}")
# Fix integer variables to their current values, making the model an LP
model_instance_lp = fix_integer_variables(model_instance_milp)
# Add suffix to import duals
model_instance_lp.dual = pyo.Suffix(direction=pyo.Suffix.IMPORT)
# Re-solve as LP to get duals
solver.solve(model_instance_lp, tee=False)
market_price = get_market_clearing_price_as_dual(model_instance_lp)
generator_dispatch = extract_generator_dispatch(model_instance_lp)
scheduled_bids = extract_scheduled_bids(model_instance_lp)
logger.debug("Inside market_model, market price: %s", market_price)
logger.debug("scheduled_bids: %s", scheduled_bids)
logger.debug("generator_dispatch: %s", generator_dispatch)
market_results = market_results_to_df(generator_dispatch, market_price, scheduled_bids)
market_results.to_csv(output_path, mode="a", header=not output_path.exists())
return market_price.to_numpy().flatten(), scheduled_bids.to_numpy().flatten()
[docs]
def market_results_to_df(
generator_dispatch: pd.DataFrame, market_price: pd.DataFrame, scheduled_bids: pd.DataFrame
) -> pd.DataFrame:
"""Prepare the relevant market clearing results that should be logged.
The market results are returned as one row per timestamp. Columns are market
price, dispatch per generator, and scheduled bids per satellite. I.e.,
+-----------+--------------+-------------+-------------+-----+-------------+-------------+-----+
| timestamp | market_price | Generator 1 | Generator 2 | ... | Satellite 1 | Satellite 2 | ... |
+-----------+--------------+-------------+-------------+-----+-------------+-------------+-----+
| ... | ... | ... | ... | | ... | ... | |
+-----------+--------------+-------------+-------------+-----+-------------+-------------+-----+
with 'Generator N' and 'Satellite N' replaced by their actual provided names.
Args:
generator_dispatch: dataframe of generator dispatch per generator and timestamp.
market_price: dataframe of market clearing price per timestamp.
scheduled_bids: dataframe of scheduled amount per bid table entry.
Returns:
combined table of market price, generator dispatch and quantity supplied/consumed by satellite, per timestamp.
"""
# Explicitly select the "scheduled" column, so it doesn't become a column multi-index level after unstacking
scheduled_bids = negate_supply_bids_for_summarize(scheduled_bids)
scheduled = scheduled_bids["scheduled"].unstack(level="satellite") # unstack each satellite as a separate column
scheduled_summed = scheduled.groupby(
level="timestamp"
).sum() # sum per timestamp, dropping group/block id, and sense
unstacked_generator_dispatch = generator_dispatch["dispatch"].unstack(level="generator")
logger.debug("scheduled_per_satellite: %s", scheduled_summed)
# concatenate the three time-indexed dataframes next to each other, sharing the time index
market_results = pd.concat([market_price, unstacked_generator_dispatch, scheduled_summed], axis=1)
return market_results
[docs]
def validate_bids(bids: pd.DataFrame) -> None:
"""Validate that bids have format in line with assumptions in market model.
Args:
bids: data frame with bids.
Raises:
Errors depending on validation check.
"""
try:
_validate_dataframe_format(bids)
_validate_nonnegative_quantity(bids["quantity"])
_validate_sense(bids.index.get_level_values("sense").to_numpy())
_validate_acceptance_ratio(bids)
_validate_single_price_per_profile(bids)
except Exception as exc:
msg = f"Invalid bids: {bids}"
raise RuntimeError(msg) from exc
[docs]
def _validate_acceptance_ratio(bids: pd.DataFrame):
# Some bids may be submitted with quantity=0 and acceptance ratio=0
positive_bid_mask = bids["quantity"] > 0
acceptance_ratio = bids.loc[positive_bid_mask, "acceptance_ratio"]
if np.min(acceptance_ratio) < 1 or np.max(acceptance_ratio) > 1:
msg = "Invalid acceptance_ratio. Market can only handle acceptance ratio of 1."
raise NotImplementedError(msg)
[docs]
def _validate_nonnegative_quantity(quantity: pd.Series | np.ndarray):
if np.min(quantity) < 0:
msg = "Submitted quantities must be non-negative."
raise ValueError(msg)
[docs]
def _validate_sense(sense: np.ndarray):
if not set(sense) <= {Sense.SUPPLY, Sense.DEMAND}:
msg = f"Sense must be either of {Sense.SUPPLY} or {Sense.DEMAND}."
raise ValueError(msg)
[docs]
def _validate_single_price_per_profile(bids):
profile_levels = ["satellite", "exclusive_group_id", "profile_block_id", "sense"]
prices_per_profile = bids.groupby(level=profile_levels)["price"].nunique()
if (prices_per_profile > 1).any():
bad_profiles = prices_per_profile[prices_per_profile > 1].index.tolist()
msg = f"Each profile must have a single price, but these profiles have multiple: {bad_profiles}"
raise ValueError(msg)