Source code for annular.market.tulipa.build_model

"""Build a TulipaEnergy model with bids."""

import logging
from pathlib import Path
from typing import Any, Tuple

import duckdb
from _duckdb import DuckDBPyConnection

from .config import _TulipaConfig

[docs] logger = logging.getLogger(__name__)
# Tables of the TulipaEnergyModel that are modified in this module
[docs] TABLES_TO_MODIFY = [ "profiles_rep_periods", "rep_periods_data", "year_data", "asset", "asset_milestone", "asset_commission", "asset_both", "flow", "flow_milestone", "flow_commission", "assets_profiles", "assets_rep_periods_partitions", ]
[docs] def validate_input_data(con: DuckDBPyConnection) -> None: """Validate input data for using TulipaEnergyModel as market model.""" try: n_rep_periods = con.execute("SELECT COUNT(*) FROM rep_periods_data").fetchone()[0] if n_rep_periods > 1: msg = f"""Using Tulipa as market model is only allowed with a single representative period. Found: {n_rep_periods}""" raise RuntimeError(msg) except duckdb.CatalogException as exc: msg = "Missing table in input data: rep_periods_data" raise RuntimeError(msg) from exc return
[docs] def execute(con: DuckDBPyConnection, query: str, params: dict[str, Any]) -> None: """Execute duckdb query with parameters. Filters a dictionary of parameters based on occurrence of the key in the query string. """ used = {param_key: param_val for param_key, param_val in params.items() if f"${param_key}" in query} con.execute(query, used)
[docs] def read_csv_folder(con: DuckDBPyConnection, input_data_folder: Path) -> None: """Read csv files in a folder into a duckdb connection.""" for file in input_data_folder.iterdir(): filepath = input_data_folder / file tbl_name = (file.stem).replace("-", "_") con.execute(f"CREATE TABLE {tbl_name} AS (SELECT * FROM read_csv('{str(filepath)}'))")
[docs] def convert_bids_table(con: duckdb.DuckDBPyConnection) -> None: """Convert annular_bids to bids.""" con.execute("CREATE OR REPLACE SEQUENCE bid_id START 1") con.execute(""" CREATE OR REPLACE TABLE bids AS ( WITH cte_grouped_demand_bids AS MATERIALIZED ( SELECT satellite, exclusive_group_id, profile_block_id, 1.0 AS peak_demand, -MAX(price) AS operational_cost, 'demand' as bid_sense, 'consumer' as type, FROM annular_bids WHERE sense = 'demand' GROUP BY satellite, exclusive_group_id, profile_block_id ), cte_grouped_supply_bids AS MATERIALIZED ( SELECT satellite, exclusive_group_id, profile_block_id, 1.0 AS peak_demand, MAX(price) AS operational_cost, 'supply' as bid_sense, 'producer' as type, FROM annular_bids WHERE sense = 'supply' GROUP BY satellite, exclusive_group_id, profile_block_id ), cte_indexed_bids AS MATERIALIZED ( SELECT nextval('bid_id') AS bid_id, cte_grouped_demand_bids.*, FROM cte_grouped_demand_bids UNION BY NAME SELECT nextval('bid_id') AS bid_id, cte_grouped_supply_bids.*, FROM cte_grouped_supply_bids ) SELECT bid_id, cte_indexed_bids.*, 'bid' || bid_id::VARCHAR AS asset, "type", peak_demand, '==' AS consumer_balance_sense, true AS unit_commitment, true AS unit_commitment_integer, 'basic' AS unit_commitment_method, bid_sense, FROM cte_indexed_bids ) """)
[docs] def define_bid_manager(con: duckdb.DuckDBPyConnection) -> str: """Select a consumer as bid manager. Args: con: connection to database containing data for TulipaEnergyModel. Notes: We assume that all bids are attached to a single consumer, and that it doesn't matter which one. In other words, it is always possible to "move" energy around independent of which consumer is the bid manager. This is particularly important in the case of supply bids. If the total bid is negative (i.e., a net supply), then we assume that the bid-manager has the possibility of supplying the excess energy to the rest of grid. This consumer acts like a "Bid Manager" For reproducibility, ORDER BY asset makes sure that for the same input data, the same asset is chosen as the bid manager. """ try: bid_manager = con.execute("SELECT asset from asset WHERE type = 'consumer' ORDER BY asset LIMIT 1").df() return bid_manager["asset"].values[0] except IndexError: msg = """No consumer asset defined in data. There needs to be at least one consumer asset.""" raise RuntimeError(msg)
[docs] def insert_bids_into_assets( con: duckdb.DuckDBPyConnection, year: int, num_timesteps: int, ) -> None: """Insert bids and constraints into asset tables.""" query_args = {"year": year, "num_timesteps": num_timesteps} con.execute(""" INSERT INTO asset BY NAME ( SELECT asset, "type", 1.0 as capacity, consumer_balance_sense, -- ignored by supply bids 1.0 as min_operating_point, unit_commitment, unit_commitment_integer, unit_commitment_method, FROM bids ) """) query = """ INSERT INTO asset_milestone BY NAME ( SELECT asset, $year AS milestone_year, peak_demand, -- ignored by supply bids FROM bids )""" execute(con, query, query_args) query = """ INSERT INTO asset_commission BY NAME ( SELECT asset, $year AS commission_year FROM bids ) """ execute(con, query, query_args) query = """ INSERT INTO asset_both BY NAME ( SELECT asset, $year AS commission_year, $year AS milestone_year, 1.0 AS initial_units FROM bids ) """ execute(con, query, query_args) # This changes the var_units_on variable to have a full-horizon partition (forcing profile blocks to be consistent) query = """ INSERT INTO assets_rep_periods_partitions BY NAME ( SELECT asset, $year AS year, 1 AS rep_period, 'uniform' AS specification, $num_timesteps AS partition, FROM bids )""" execute(con, query, query_args)
[docs] def insert_bids_into_flows(con: duckdb.DuckDBPyConnection, year: int, bid_manager: str) -> None: """Insert bids into flow tables.""" query_args = {"bid_manager": bid_manager, "year": year} # Flow from bid_manager to the demand bid asset query = """INSERT INTO flow BY NAME ( SELECT $bid_manager AS from_asset, asset AS to_asset, FROM bids WHERE bid_sense = 'demand')""" execute(con, query, query_args) query = """INSERT INTO flow_milestone BY NAME ( SELECT $bid_manager AS from_asset, asset AS to_asset, $year AS milestone_year, operational_cost, FROM bids WHERE bid_sense = 'demand' )""" execute(con, query, query_args) query = """INSERT INTO flow_commission BY NAME ( SELECT $bid_manager AS from_asset, asset AS to_asset, $year AS commission_year, FROM bids WHERE bid_sense = 'demand' )""" execute(con, query, query_args) # Flow from supply bid to bid_manager query = """INSERT INTO flow BY NAME ( SELECT asset AS from_asset, $bid_manager AS to_asset, FROM bids WHERE bid_sense = 'supply' )""" execute(con, query, query_args) query = """INSERT INTO flow_milestone BY NAME ( SELECT asset AS from_asset, $bid_manager AS to_asset, $year AS milestone_year, operational_cost, FROM bids WHERE bid_sense = 'supply' )""" execute(con, query, query_args) query = """INSERT INTO flow_commission BY NAME ( SELECT asset AS from_asset, $bid_manager AS to_asset, $year AS commission_year, FROM bids WHERE bid_sense = 'supply' )""" execute(con, query, query_args)
[docs] def insert_bids_into_profiles(con: duckdb.DuckDBPyConnection, year: int) -> None: """Insert bids into profiles tables.""" con.execute( """INSERT INTO assets_profiles BY NAME ( SELECT asset, $year AS commission_year, 'bid-' || asset || '-' || bid_sense AS profile_name, IF(bid_sense = 'demand', 'demand', 'availability') AS profile_type, FROM bids )""", {"year": year}, )
[docs] def add_bid_loops(con: duckdb.DuckDBPyConnection, year: int) -> None: """Add loops for the bids/UC trick for the demand bids.""" # Add loops for the bids/UC trick for the demand bids con.execute( """INSERT INTO flow BY NAME ( SELECT asset AS from_asset, asset AS to_asset FROM bids WHERE bid_sense = 'demand' )""", ) con.execute( """INSERT INTO flow_milestone BY NAME ( SELECT asset AS from_asset, asset AS to_asset, $year AS milestone_year FROM bids WHERE bid_sense = 'demand' )""", {"year": year}, ) con.execute( """INSERT INTO flow_commission BY NAME ( SELECT asset AS from_asset, asset AS to_asset, $year AS commission_year FROM bids WHERE bid_sense = 'demand' )""", {"year": year}, )
[docs] def insert_bid_profiles_into_rep_periods( con: duckdb.DuckDBPyConnection, num_timesteps: int, year: int, timestep_start: int, ) -> None: """Create complete profiles for the bids. Notes: We assume that the bids always have the format YYYY-MM-DD HH:mm:SS and we ignore everything except HH. """ query_args = {"year": year, "num_timesteps": num_timesteps, "timestep_start": timestep_start} con.execute( """INSERT INTO profiles_rep_periods BY NAME ( WITH cte_complete_empty_profile AS ( SELECT timestep FROM GENERATE_SERIES(1, $num_timesteps) s(timestep) ), cte_complete_empty_bids_profiles AS ( SELECT bid_id, 'bid-' || asset || '-' || bid_sense AS profile_name, bid_sense, $year AS year, 1 AS rep_period, -- we hardcode the rep_period instead of cross joining, because we assume only one timestep FROM bids CROSS JOIN cte_complete_empty_profile ), cte_demand_bids_profiles AS ( SELECT bids.bid_id, extract('hour' from annular_bids.timestamp) + 1 - $timestep_start + 1 AS timestep, -- +1 for 0-based timestamp, +1 for shift -- annular_bids.quantity / bids.peak_demand AS value, annular_bids.quantity AS value, -- modified because peak_demand is set at 1 FROM annular_bids LEFT JOIN bids ON bids.satellite = annular_bids.satellite AND bids.exclusive_group_id = annular_bids.exclusive_group_id AND bids.profile_block_id = annular_bids.profile_block_id WHERE annular_bids.sense = 'demand' ), cte_supply_bids_profiles AS ( SELECT bids.bid_id, extract('hour' from annular_bids.timestamp) + 1 - $timestep_start + 1 AS timestep, -- +1 for 0-based timestamp, +1 for shift -- annular_bids.quantity / bids.peak_demand AS value, annular_bids.quantity AS value, -- modified because peak_demand is set at 1 FROM annular_bids LEFT JOIN bids ON bids.satellite = annular_bids.satellite AND bids.exclusive_group_id = annular_bids.exclusive_group_id AND bids.profile_block_id = annular_bids.profile_block_id WHERE annular_bids.sense = 'supply' ) SELECT cte_empty.* EXCLUDE (cte_empty.bid_id, cte_empty.bid_sense), IF( cte_empty.bid_sense = 'demand', COALESCE(cte_demand_bids_profiles.value, 0.0), COALESCE(cte_supply_bids_profiles.value, 0.0) ) AS value, FROM cte_complete_empty_bids_profiles AS cte_empty LEFT JOIN cte_demand_bids_profiles ON cte_empty.bid_id = cte_demand_bids_profiles.bid_id AND cte_empty.timestep = cte_demand_bids_profiles.timestep LEFT JOIN cte_supply_bids_profiles ON cte_empty.bid_id = cte_supply_bids_profiles.bid_id AND cte_empty.timestep = cte_supply_bids_profiles.timestep )""", query_args, )
[docs] def build_model_with_bids(config: _TulipaConfig) -> Tuple[str, int]: """Build tulipa energy model with bids. Extend TulipaEnergyModel defined in `config.input_data_folder` by adding satellite bids. Args: config: `_TulipaConfig` with configuration data. """ with duckdb.connect(config.db_path) as con: read_csv_folder(con, config.input_data_folder) validate_input_data(con) # Create assets_rep_periods_partitions if necessary con.execute(""" CREATE TABLE IF NOT EXISTS assets_rep_periods_partitions (asset VARCHAR , year INTEGER , rep_period INTEGER , specification VARCHAR , partition VARCHAR )""") # Subset profiles_rep_periods 'timestep' to the start and end times timestep_window = con.execute( """SELECT MIN(timestep) as t_start, MAX(timestep) as t_end FROM annular_window""" ).df() # cast to Python int; np.int64 cannot be passed to DuckDB timestep_start = int(timestep_window["t_start"].values[0]) timestep_end = int(timestep_window["t_end"].values[0]) for tbl_name in TABLES_TO_MODIFY: con.execute(f"""DROP TABLE IF EXISTS annular_{tbl_name}""") con.execute(f"""CREATE OR REPLACE TABLE backup_{tbl_name} AS FROM {tbl_name}""") # Constraining the existing profiles into the window and renumbering the window con.execute(f""" CREATE OR REPLACE TABLE profiles_rep_periods AS ( WITH original_data AS ( SELECT * FROM profiles_rep_periods ) SELECT * EXCLUDE (timestep), timestep + 1 - {timestep_start} AS timestep FROM original_data WHERE timestep BETWEEN {timestep_start} AND {timestep_end} ORDER BY profile_name, year, rep_period, timestep )""") # Reindex the timestep to be 1:length(window) (+ 24?) num_timesteps = timestep_end - timestep_start + 1 tbl_dict = {"rep_periods_data": "num_timesteps", "year_data": "length"} for tbl_name, col_name in tbl_dict.items(): con.execute(f"""UPDATE {tbl_name} SET {col_name} = {num_timesteps}""") # TODO: (LATER) update the storage level years = con.execute("""SELECT DISTINCT milestone_year AS year from asset_milestone""").df() if len(years) > 1: msg = f"Using Tulipa as market model is only allowed with a single year. Found: {years}" raise RuntimeError(msg) year = int(years["year"].values[0]) convert_bids_table(con) bid_manager = define_bid_manager(con) # Add columns that might not exist for table_name, column_name, column_type in [ ("asset", "capacity", "DOUBLE"), ("asset", "consumer_balance_sense", "VARCHAR"), ("asset", "min_operating_point", "DOUBLE"), ("asset", "unit_commitment", "BOOLEAN"), ("asset", "unit_commitment_integer", "BOOLEAN"), ("asset", "unit_commitment_method", "VARCHAR"), ("asset_milestone", "peak_demand", "DOUBLE"), ("flow_milestone", "operational_cost", "DOUBLE"), ]: con.execute(f""" ALTER TABLE {table_name} ADD COLUMN IF NOT EXISTS {column_name} {column_type} """) # Integrate bids into the model insert_bids_into_assets(con, year, num_timesteps) insert_bids_into_flows(con, year, bid_manager) insert_bids_into_profiles(con, year) add_bid_loops(con, year) insert_bid_profiles_into_rep_periods(con, num_timesteps, year, timestep_start) # TODO: move this into a method of the config? if config.debug_input_folder is not None: logger.debug("Writing tables to csv for debugging.") dir = Path(config.communication_folder) / config.debug_input_folder dir.mkdir(exist_ok=True, parents=True) for row in con.execute("SELECT table_name FROM duckdb_tables()").fetchall(): table_name = row[0] path = dir / f"{table_name}.csv" con.execute( f"COPY (FROM {table_name} ORDER BY 1) TO '{str(path)}' (HEADER, DELIMITER ',')", ) return bid_manager, timestep_start
[docs] def postprocess_tulipa_model( db_path: Path | str, bid_manager: str, timestep_start: int, ) -> None: """Postprocess results from TulipaEnergyModel.""" with duckdb.connect(db_path) as con: query_args = {"bid_manager": bid_manager, "timestep_start": timestep_start} con.execute( """ CREATE OR REPLACE TABLE annular_market_price AS WITH cte_bids_manager AS ( SELECT dual_balance_consumer, time_block_start FROM cons_balance_consumer WHERE asset = $bid_manager ) SELECT timeseries.timestamp, cons.dual_balance_consumer AS market_price, FROM cte_bids_manager AS cons LEFT JOIN annular_timeseries_data AS timeseries ON cons.time_block_start + $timestep_start - 1 = timeseries.timestep ORDER BY cons.time_block_start """, query_args, ) con.execute( """ CREATE OR REPLACE TABLE annular_scheduled_bids AS SELECT annular_bids.satellite, annular_bids.exclusive_group_id, annular_bids.profile_block_id, annular_bids.timestamp, SUM(var_flow.solution) AS scheduled, FROM annular_bids LEFT JOIN bids ON annular_bids.satellite = bids.satellite AND annular_bids.exclusive_group_id = bids.exclusive_group_id AND annular_bids.profile_block_id = bids.profile_block_id LEFT JOIN annular_timeseries_data AS timeseries ON annular_bids.timestamp = timeseries.timestamp LEFT JOIN var_flow ON ((var_flow.from_asset = $bid_manager AND var_flow.to_asset = bids.asset) OR (var_flow.to_asset = $bid_manager AND var_flow.from_asset = bids.asset)) AND var_flow.time_block_start + $timestep_start - 1 = timeseries.timestep GROUP BY annular_bids.satellite, annular_bids.exclusive_group_id, annular_bids.profile_block_id, annular_bids.timestamp """, query_args, ) # undo modification to input in db for tbl_name in TABLES_TO_MODIFY: con.execute(f"ALTER TABLE {tbl_name} RENAME TO annular_{tbl_name}") con.execute(f"ALTER TABLE backup_{tbl_name} RENAME TO {tbl_name}")