Source code for annular.coupling

"""coupling.py: core model coupling functionality using MUSCLE3.

This file contains the creation of the coupling structure, and the MUSCLE3
instance definitions for both the central market model and the (arbitrary)
number of satellite models.

The standard coupling structure is one central market model linked to many
individual satellite models. The market model receives bid tables from the
satellite models, and after market clearing sends back the amount of satisfied
power and cleared market price.
"""

import logging
from pathlib import Path
from typing import Iterable, Mapping

import numpy as np
import pandas as pd
import yaml
from cronian.configuration import load_configurations_subfolder
from libmuscle import Instance
from libmuscle.runner import run_simulation
from more_itertools import chunked
from ymmsl import Configuration, Operator, Settings

from .coupling_components import (
    compact_bids_to_msg,
    compact_market_info_to_msg,
    extract_bids_from_msg,
    extract_market_info_from_msg,
    get_coupling_setup,
)
from .market_model import create_market_model, run_market_model, validate_bids
from .satellite_model import strategies
from .utils import negate_supply_bids_for_summarize, read_csv_with_utc_timestamps

[docs] logger = logging.getLogger(__name__)
[docs] def satellite_model() -> None: """A simple satellite model to determine bids.""" instance = Instance( { Operator.F_INIT: ["market_info_in"], Operator.O_F: ["bids_out"], } ) strategy = None while instance.reuse_instance(): # F_INIT settings = instance.list_settings() coupling_level_settings = ["generator_configs", "timeseries_data_path", "satellite_configs"] common_settings = {k: instance.get_setting(k) for k in settings if k not in coupling_level_settings} config_file = Path(instance.get_setting("config", "str")) if strategy is None: with open(config_file) as f: strategy_name = yaml.safe_load(f)["strategy"] strategy = strategies[strategy_name].from_yaml(Path(config_file), **common_settings) msg = instance.receive("market_info_in") market_price, scheduled_bids, t_cur = extract_market_info_from_msg(msg) timestamp = pd.Timestamp.fromtimestamp(t_cur) logger.info("Market_price and scheduled_bids for satellite %s at timestep: %s", config_file.stem, timestamp) logger.info("market_price: %s, scheduled_bids: %s", market_price, scheduled_bids) strategy.meet_demand(market_price, scheduled_bids) # O_F bids = strategy.determine_bids() bids_msg = compact_bids_to_msg(bids, timestamp=t_cur) instance.send("bids_out", bids_msg)
[docs] def central_market_model() -> None: """The central market model.""" instance = Instance( { Operator.O_I: ["market_info_out[]"], Operator.S: ["bids_in[]"], } ) while instance.reuse_instance(): # F_INIT base_folder = Path(instance.get_setting("simulation_config_file", "str")).parent output_path = Path(instance.get_setting("results_folder", "str")) / "market.csv" start_hour = instance.get_setting("start_hour", "int") end_hour = start_hour + instance.get_setting("num_hours", "int") rolling_horizon_step = instance.get_setting("rolling_horizon_step", "int") timeseries_data = read_csv_with_utc_timestamps( base_folder / instance.get_setting("timeseries_data_path", "str") ) snapshots = timeseries_data.index[start_hour:end_hour] generator_configs = load_configurations_subfolder( base_folder / instance.get_setting("generator_configs", "str"), "Generators" ) satellite_configs = list((base_folder / instance.get_setting("satellite_configs", "str")).iterdir()) satellites = [config.stem for config in satellite_configs] market_price = None scheduled_bids: Mapping[str, np.ndarray | None] = {satellite: None for satellite in satellites} for window in chunked(snapshots, n=rolling_horizon_step): snapshot = window[0] utc_timestamp = snapshot.timestamp() # O_I for slot, satellite in enumerate(satellites): cur_state_msg = compact_market_info_to_msg(market_price, scheduled_bids[satellite], utc_timestamp) instance.send("market_info_out", cur_state_msg, slot=slot) logger.info("Sent to satellite %s:", satellite) logger.info("market price: %s", market_price) logger.info("scheduled bids: %s", scheduled_bids[satellite]) # S bids_per_satellite = {} for slot, satellite in enumerate(satellites): msg = instance.receive("bids_in", slot=slot) bids = extract_bids_from_msg(msg) bids_per_satellite[satellite] = bids logger.info("Bids recieved from satellite %s for timestep %s", satellite, snapshot) logger.info(bids) satellite_bids = pd.concat(bids_per_satellite, names=["satellite"]) validate_bids(satellite_bids) model = create_market_model(satellite_bids, generator_configs, timeseries_data, window) market_price, all_scheduled_bids = run_market_model(model, output_path) satellite_bids["scheduled"] = all_scheduled_bids logger.debug("Market model has been run.") logger.debug("market_price: %s", market_price) logger.debug("satellite_bids: %s", satellite_bids) scheduled_bids = summarize_scheduled_bids(satellite_bids, window) logger.info("scheduled_bids: %s", scheduled_bids) #### One final "iteration" to finish the process by forcing satellite to meet demand # O_I utc_timestamp += 1 # some 'fake' next timestep for slot, satellite in enumerate(satellites): cur_state_msg = compact_market_info_to_msg(market_price, scheduled_bids[satellite], utc_timestamp) instance.send("market_info_out", cur_state_msg, slot=slot) logger.info("Sending to satellite %s:", satellite) logger.info("market price: %s", market_price) logger.info("scheduled bids: %s", scheduled_bids[satellite]) # S for slot in range(len(satellite_configs)): _ = instance.receive("bids_in", slot=slot) logger.info("done")
[docs] def summarize_scheduled_bids(bids: pd.DataFrame, window: Iterable[pd.Timestamp]) -> dict[str, np.ndarray]: """Summarize explicitly scheduled quantity per bid to amounts per timestamp. Bids do not have to be submitted for the entire bidding window, but scheduled bids do have to be reported per satellite model per timestamp. Args: bids: Bids table with a `scheduled` column to be summarized. window: Bidding window over which to summarize. Returns: A dictionary with satellite names as keys, and an array of scheduled quantity as values. This array is the same length as the given window, with `0` value if no bid is scheduled. """ bids = negate_supply_bids_for_summarize(bids) satellites = bids.index.get_level_values("satellite").unique() # Explicitly create an index with entries for all timestamps of the current window new_index = pd.MultiIndex.from_product([satellites, window], names=["satellite", "timestamp"]) summed = bids["scheduled"].groupby(level=["satellite", "timestamp"]).sum() summed = summed.reindex(index=new_index, fill_value=0) return {satellite: summed.loc[satellite].to_numpy() for satellite in satellites}
[docs] def prepare_results_folder(config_file: Path, results_path: Path) -> dict: """Prepare the results folder to contain copies of the settings files to be used. Set up a unique folder for the model coupling run being executed. This folder will be the default location for any generated output by central or satellite models. Args: config_file: Initial configuration file from which settings for the model coupling run will be loaded. results_path: Path where a new folder for the results can be created. Returns: Finalized settings dictionary for the model coupling run. Raises: RuntimeError: if the given `results_path` already contains a results folder for this configuration file. """ # Load the settings section of the given config file as regular yaml with open(config_file) as f: settings = yaml.safe_load(f)["settings"] # Arrange results are stored in a clean, config-specific folder results_folder = results_path / config_file.stem if results_folder.exists(): raise RuntimeError(f"Results folder already exists: {results_folder}") results_folder.mkdir(parents=True) # create fresh output path settings["results_folder"] = str(results_folder) return settings
[docs] def run(config_file: Path, results_path: Path = Path("results/")) -> None: """Run the coupled simulation. Args: config_file: configuration file for the simulation to run. results_path: location where a folder can be created for output files. Defaults to "results/". """ logging.getLogger("yatiml").setLevel(logging.WARNING) settings = prepare_results_folder(config_file, results_path) satellite_configs_path = config_file.parent / settings["satellite_configs"] satellite_config_files = list(satellite_configs_path.iterdir()) num_satellites = len(satellite_config_files) # Add the paths to the satellite configs to settings. # This means, for each satellite, the respective config file is dispatched # when called by `instance.get_settings("config"). # The index in brackets needs to start at 0, and corresponds # to the satellite IDs maintained by muscle3. # At the time of writing, this was an undocumented feature in muscle3. settings |= {f"satellite[{i}].config": str(filepath) for i, filepath in enumerate(satellite_config_files)} settings["simulation_config_file"] = str(config_file) model = get_coupling_setup(config_file.stem, num_satellites) implementations = {"central": central_market_model, "satellite": satellite_model} configuration = Configuration(model, Settings(settings)) # And run the coupled simulation! run_simulation(configuration, implementations)