Source code for annular.coupling_components

"""coupling_components.py: Utilities for coupling using MUSCLE3."""

import logging

import numpy as np
import pandas as pd
from libmuscle import Grid, Message
from ymmsl import Component, Conduit, Model, Ports

from annular.market_model import BID_COLUMN_NAMES, BID_IDX_COLUMNS

[docs] logger = logging.getLogger(__name__)
[docs] def compact_market_info_to_msg( price: np.ndarray | None, scheduled_bids: np.ndarray | None, timestamp: float ) -> Message: """Compact market information to a message. Args: price: A numpy array with market prices. scheduled_bids: A numpy array with the scheduled bids for a satellite. timestamp: Floating point value to serve as timestamp for model coordination. Returns: A muscle3 message where the data attribute is a dictionary with keys "price" and "scheduled_bids". """ if price is None and scheduled_bids is None: return Message(timestamp=timestamp, data={"price": None, "scheduled_bids": None}) if price is None or scheduled_bids is None: raise ValueError("Both price and scheduled_bids should be None, or neither should be.") data_dict = { "price": Grid(price), "scheduled_bids": Grid(scheduled_bids), } return Message(timestamp=timestamp, data=data_dict)
[docs] def extract_market_info_from_msg(msg: Message) -> tuple[np.ndarray | None, np.ndarray | None, float]: """Extract the market information message. Args: msg: muscle3 Message containing a dictionary with keys "price" and "scheduled_bids" for a single satellite. Returns: tuple: the market price, scheduled bids as numpy arrays, and the timestamp. """ if msg.data is None: raise ValueError("Invalid Message: data is None") if not isinstance(msg.data["price"], Grid): return None, None, msg.timestamp # messages are read-only, and we modify them later price = msg.data["price"].array.copy() scheduled_bids = msg.data["scheduled_bids"].array.copy() return price, scheduled_bids, msg.timestamp
[docs] def compact_bids_to_msg(bids: pd.DataFrame, timestamp: float) -> Message: """Compact a DataFrame of (block) bids columns into a MUSCLE3 Message. This takes bids in the following format: +--------------------+------------------+-------+-----------+----------+------------------+-------+ | exclusive_group_id | profile_block_id | sense | timestamp | quantity | acceptance_ratio | price | +--------------------+------------------+-------+-----------+----------+------------------+-------+ | ... | ... | ... | ... | ... | ... | ... | +--------------------+------------------+-------+-----------+----------+------------------+-------+ where: - `exclusive_group_id`: ID of which exclusive group this bid belongs to - `profile_block_id`: ID of which profile block this bid belongs to - `sense`: indicates whether the bid is a demand or a supply bid - `timestamp`: timestamp for this bid - `quantity`: quantity for this bid - `acceptance_ratio`: minimum acceptance ratio between 0 and 1 - `price`: price for this bid and (exclusive_group_id, profile_block_id, sense, timestamp) are its Index. Every bid must have a `profile_block_id`, `exclusive_group_id` and a `sense`. If a bid does not make use of profile block or exclusive group functionality, the `exclusive_group_id` must be unique, while `profile_block_id` can be any value. The resulting message contains the dataframe converted to a dictionary where - column and index names become dictionary keys - column and index values become dictionary values. Args: bids: Dataframe table of the bids from a satellite model. timestamp: Floating point value to serve as timestamp for model coordination. Returns: MUSCLE3 message object containing the bids table information in a dictionary. """ bids_dict = {col: Grid(bids[col].to_numpy(), None) for col in bids.columns} # add indices for level in bids.index.names: level_values = bids.index.get_level_values(level) if level == "timestamp": # `.astype(int) / 1e9` is equivalent to calling `.timestamp()`, but works on whole array level_values = level_values.astype(np.int64) / 1e9 bids_dict[level] = Grid(level_values.to_numpy(), None) msg = Message(timestamp, data=bids_dict) return msg
[docs] def extract_bids_from_msg(msg: Message) -> pd.DataFrame: """Reconstruct a DataFrame of (block) bids from a MUSCLE3 Message. The `.data` attribute of the incoming message should be a dictionary with the following keys: price, quantity, exclusive_group_id, profile_block_id, timestamp. From this data, a DataFrame in the following format is created: +--------------------+------------------+-------+-----------+----------+------------------+-------+ | exclusive_group_id | profile_block_id | sense | timestamp | quantity | acceptance_ratio | price | +--------------------+------------------+-------+-----------+----------+------------------+-------+ | ... | ... | ... | ... | ... | ... | ... | +--------------------+------------------+-------+-----------+----------+------------------+-------+ where: - `exclusive_group_id`: ID of which exclusive group this bid belongs to - `profile_block_id`: ID of which profile block this bid belongs to - `sense`: indicates whether the bid is a demand or a supply bid - `timestamp`: timestamp for this bid - `quantity`: quantity for this bid - `acceptance_ratio`: minimum acceptance ratio between 0 and 1 - `price`: price for this bid and (exclusive_group_id, profile_block_id, sense, timestamp) are its Index. Every bid must have a `profile_block_id`, `exclusive_group_id` and a `sense`. If a bid does not make use of profile block or exclusive group functionality, the `exclusive_group_id` must be unique, while `profile_block_id` can be any value. Args: msg: MUSCLE3 message object containing the bids table information as a dictionary. Returns: Dataframe table of the bids from a satellite model. """ if msg.data is None: raise ValueError("Invalid Message: data is None") bids = pd.DataFrame() logger.debug("msg.data is %s", msg.data) logger.debug("msg.data[quantity] is %s", msg.data["quantity"].array) for col in BID_COLUMN_NAMES: bids[col] = msg.data[col].array timestamps = [pd.Timestamp.utcfromtimestamp(t) for t in msg.data["timestamp"].array] bids.index = pd.MultiIndex.from_arrays( [msg.data["exclusive_group_id"].array, msg.data["profile_block_id"].array, msg.data["sense"].array, timestamps], names=BID_IDX_COLUMNS[1:], ) return bids
[docs] def get_coupling_setup(config_name: str, number_of_satellites: int) -> Model: """Create the MUSCLE3 coupling configuration for the energy system network. Args: config_name: name of this run to be used as model name number_of_satellites: number of satellites to spin up Returns: MUSCLE3 Model object with the standard coupling configuration """ components = [ Component("central", "central", None, Ports(o_i=["market_info_out"], s=["bids_in"])), Component("satellite", "satellite", [number_of_satellites], Ports(f_init=["market_info_in"], o_f=["bids_out"])), ] conduits = [ Conduit("central.market_info_out", "satellite.market_info_in"), Conduit("satellite.bids_out", "central.bids_in"), ] model = Model(config_name, components, conduits) return model