"""Build a TulipaEnergy model with bids."""
import logging
from pathlib import Path
from typing import Any, Tuple
import duckdb
from _duckdb import DuckDBPyConnection
from .config import _TulipaConfig
[docs]
logger = logging.getLogger(__name__)
# Tables of the TulipaEnergyModel that are modified in this module
[docs]
TABLES_TO_MODIFY = [
"profiles_rep_periods",
"rep_periods_data",
"year_data",
"asset",
"asset_milestone",
"asset_commission",
"asset_both",
"flow",
"flow_milestone",
"flow_commission",
"assets_profiles",
"assets_rep_periods_partitions",
]
[docs]
def execute(con: DuckDBPyConnection, query: str, params: dict[str, Any]) -> None:
"""Execute duckdb query with parameters.
Filters a dictionary of parameters based on occurrence of the key in the query
string.
"""
used = {param_key: param_val for param_key, param_val in params.items() if f"${param_key}" in query}
con.execute(query, used)
[docs]
def read_csv_folder(con: DuckDBPyConnection, input_data_folder: Path) -> None:
"""Read csv files in a folder into a duckdb connection."""
for file in input_data_folder.iterdir():
filepath = input_data_folder / file
tbl_name = (file.stem).replace("-", "_")
con.execute(f"CREATE TABLE {tbl_name} AS (SELECT * FROM read_csv('{str(filepath)}'))")
[docs]
def convert_bids_table(con: duckdb.DuckDBPyConnection) -> None:
"""Convert annular_bids to bids."""
con.execute("CREATE OR REPLACE SEQUENCE bid_id START 1")
con.execute("""
CREATE OR REPLACE TABLE bids AS (
WITH cte_grouped_demand_bids AS MATERIALIZED (
SELECT
satellite, exclusive_group_id, profile_block_id,
1.0 AS peak_demand,
-MAX(price) AS operational_cost,
'demand' as bid_sense,
'consumer' as type,
FROM annular_bids
WHERE sense = 'demand'
GROUP BY satellite, exclusive_group_id, profile_block_id
), cte_grouped_supply_bids AS MATERIALIZED (
SELECT
satellite, exclusive_group_id, profile_block_id,
1.0 AS peak_demand,
MAX(price) AS operational_cost,
'supply' as bid_sense,
'producer' as type,
FROM annular_bids
WHERE sense = 'supply'
GROUP BY satellite, exclusive_group_id, profile_block_id
), cte_indexed_bids AS MATERIALIZED (
SELECT
nextval('bid_id') AS bid_id,
cte_grouped_demand_bids.*,
FROM cte_grouped_demand_bids
UNION BY NAME
SELECT
nextval('bid_id') AS bid_id,
cte_grouped_supply_bids.*,
FROM cte_grouped_supply_bids
)
SELECT
bid_id,
cte_indexed_bids.*,
'bid' || bid_id::VARCHAR AS asset,
"type",
peak_demand,
'==' AS consumer_balance_sense,
true AS unit_commitment,
true AS unit_commitment_integer,
'basic' AS unit_commitment_method,
bid_sense,
FROM cte_indexed_bids
)
""")
[docs]
def define_bid_manager(con: duckdb.DuckDBPyConnection) -> str:
"""Select a consumer as bid manager.
Args:
con: connection to database containing data for TulipaEnergyModel.
Notes:
We assume that all bids are attached to a single
consumer, and that it doesn't matter which one. In other words, it is
always possible to "move" energy around independent of which consumer is
the bid manager. This is particularly important in the case of supply bids. If
the total bid is negative (i.e., a net supply), then we assume that the
bid-manager has the possibility of supplying the excess energy to the
rest of grid.
This consumer acts like a "Bid Manager"
For reproducibility, ORDER BY asset makes sure that for the same input data,
the same asset is chosen as the bid manager.
"""
try:
bid_manager = con.execute("SELECT asset from asset WHERE type = 'consumer' ORDER BY asset LIMIT 1").df()
return bid_manager["asset"].values[0]
except IndexError:
msg = """No consumer asset defined in data. There needs to be at least one
consumer asset."""
raise RuntimeError(msg)
[docs]
def insert_bids_into_assets(
con: duckdb.DuckDBPyConnection,
year: int,
num_timesteps: int,
) -> None:
"""Insert bids and constraints into asset tables."""
query_args = {"year": year, "num_timesteps": num_timesteps}
con.execute("""
INSERT INTO asset BY NAME (
SELECT
asset,
"type",
1.0 as capacity,
consumer_balance_sense, -- ignored by supply bids
1.0 as min_operating_point,
unit_commitment,
unit_commitment_integer,
unit_commitment_method,
FROM bids
)
""")
query = """
INSERT INTO asset_milestone BY NAME (
SELECT
asset,
$year AS milestone_year,
peak_demand, -- ignored by supply bids
FROM bids
)"""
execute(con, query, query_args)
query = """
INSERT INTO asset_commission BY NAME (
SELECT asset, $year AS commission_year FROM bids
)
"""
execute(con, query, query_args)
query = """
INSERT INTO asset_both BY NAME (
SELECT asset,
$year AS commission_year,
$year AS milestone_year,
1.0 AS initial_units
FROM bids
)
"""
execute(con, query, query_args)
# This changes the var_units_on variable to have a full-horizon partition (forcing profile blocks to be consistent)
query = """
INSERT INTO assets_rep_periods_partitions BY NAME (
SELECT
asset,
$year AS year,
1 AS rep_period,
'uniform' AS specification,
$num_timesteps AS partition,
FROM bids
)"""
execute(con, query, query_args)
[docs]
def insert_bids_into_flows(con: duckdb.DuckDBPyConnection, year: int, bid_manager: str) -> None:
"""Insert bids into flow tables."""
query_args = {"bid_manager": bid_manager, "year": year}
# Flow from bid_manager to the demand bid asset
query = """INSERT INTO flow BY NAME (
SELECT
$bid_manager AS from_asset,
asset AS to_asset,
FROM bids WHERE bid_sense = 'demand')"""
execute(con, query, query_args)
query = """INSERT INTO flow_milestone BY NAME (
SELECT
$bid_manager AS from_asset,
asset AS to_asset,
$year AS milestone_year,
operational_cost,
FROM bids WHERE bid_sense = 'demand'
)"""
execute(con, query, query_args)
query = """INSERT INTO flow_commission BY NAME (
SELECT
$bid_manager AS from_asset,
asset AS to_asset,
$year AS commission_year,
FROM bids WHERE bid_sense = 'demand'
)"""
execute(con, query, query_args)
# Flow from supply bid to bid_manager
query = """INSERT INTO flow BY NAME (
SELECT
asset AS from_asset,
$bid_manager AS to_asset,
FROM bids WHERE bid_sense = 'supply'
)"""
execute(con, query, query_args)
query = """INSERT INTO flow_milestone BY NAME (
SELECT
asset AS from_asset,
$bid_manager AS to_asset,
$year AS milestone_year,
operational_cost,
FROM bids WHERE bid_sense = 'supply'
)"""
execute(con, query, query_args)
query = """INSERT INTO flow_commission BY NAME (
SELECT
asset AS from_asset,
$bid_manager AS to_asset,
$year AS commission_year,
FROM bids WHERE bid_sense = 'supply'
)"""
execute(con, query, query_args)
[docs]
def insert_bids_into_profiles(con: duckdb.DuckDBPyConnection, year: int) -> None:
"""Insert bids into profiles tables."""
con.execute(
"""INSERT INTO assets_profiles BY NAME (
SELECT
asset,
$year AS commission_year,
'bid-' || asset || '-' || bid_sense AS profile_name,
IF(bid_sense = 'demand', 'demand', 'availability') AS profile_type,
FROM
bids
)""",
{"year": year},
)
[docs]
def add_bid_loops(con: duckdb.DuckDBPyConnection, year: int) -> None:
"""Add loops for the bids/UC trick for the demand bids."""
# Add loops for the bids/UC trick for the demand bids
con.execute(
"""INSERT INTO flow BY NAME (
SELECT asset AS from_asset,
asset AS to_asset
FROM bids
WHERE bid_sense = 'demand'
)""",
)
con.execute(
"""INSERT INTO flow_milestone BY NAME (
SELECT asset AS from_asset,
asset AS to_asset,
$year AS milestone_year
FROM bids
WHERE bid_sense = 'demand'
)""",
{"year": year},
)
con.execute(
"""INSERT INTO flow_commission BY NAME (
SELECT asset AS from_asset,
asset AS to_asset,
$year AS commission_year
FROM bids WHERE bid_sense = 'demand'
)""",
{"year": year},
)
[docs]
def insert_bid_profiles_into_rep_periods(
con: duckdb.DuckDBPyConnection,
num_timesteps: int,
year: int,
timestep_start: int,
) -> None:
"""Create complete profiles for the bids.
Notes:
We assume that the bids always have the format YYYY-MM-DD HH:mm:SS and
we ignore everything except HH.
"""
query_args = {"year": year, "num_timesteps": num_timesteps, "timestep_start": timestep_start}
con.execute(
"""INSERT INTO profiles_rep_periods BY NAME (
WITH cte_complete_empty_profile AS (
SELECT timestep FROM GENERATE_SERIES(1, $num_timesteps) s(timestep)
), cte_complete_empty_bids_profiles AS (
SELECT
bid_id,
'bid-' || asset || '-' || bid_sense AS profile_name,
bid_sense,
$year AS year,
1 AS rep_period, -- we hardcode the rep_period instead of cross joining, because we assume only one
timestep
FROM bids
CROSS JOIN cte_complete_empty_profile
), cte_demand_bids_profiles AS (
SELECT
bids.bid_id,
extract('hour' from annular_bids.timestamp) + 1
- $timestep_start + 1 AS timestep, -- +1 for 0-based timestamp, +1 for shift
-- annular_bids.quantity / bids.peak_demand AS value,
annular_bids.quantity AS value, -- modified because peak_demand is set at 1
FROM annular_bids
LEFT JOIN bids
ON bids.satellite = annular_bids.satellite
AND bids.exclusive_group_id = annular_bids.exclusive_group_id
AND bids.profile_block_id = annular_bids.profile_block_id
WHERE annular_bids.sense = 'demand'
), cte_supply_bids_profiles AS (
SELECT
bids.bid_id,
extract('hour' from annular_bids.timestamp) + 1
- $timestep_start + 1 AS timestep, -- +1 for 0-based timestamp, +1 for shift
-- annular_bids.quantity / bids.peak_demand AS value,
annular_bids.quantity AS value, -- modified because peak_demand is set at 1
FROM annular_bids
LEFT JOIN bids
ON bids.satellite = annular_bids.satellite
AND bids.exclusive_group_id = annular_bids.exclusive_group_id
AND bids.profile_block_id = annular_bids.profile_block_id
WHERE annular_bids.sense = 'supply'
)
SELECT
cte_empty.* EXCLUDE (cte_empty.bid_id, cte_empty.bid_sense),
IF(
cte_empty.bid_sense = 'demand',
COALESCE(cte_demand_bids_profiles.value, 0.0),
COALESCE(cte_supply_bids_profiles.value, 0.0)
) AS value,
FROM cte_complete_empty_bids_profiles AS cte_empty
LEFT JOIN cte_demand_bids_profiles
ON cte_empty.bid_id = cte_demand_bids_profiles.bid_id
AND cte_empty.timestep = cte_demand_bids_profiles.timestep
LEFT JOIN cte_supply_bids_profiles
ON cte_empty.bid_id = cte_supply_bids_profiles.bid_id
AND cte_empty.timestep = cte_supply_bids_profiles.timestep
)""",
query_args,
)
[docs]
def build_model_with_bids(config: _TulipaConfig) -> Tuple[str, int]:
"""Build tulipa energy model with bids.
Extend TulipaEnergyModel defined in `config.input_data_folder` by
adding satellite bids.
Args:
config: `_TulipaConfig` with configuration data.
"""
with duckdb.connect(config.db_path) as con:
read_csv_folder(con, config.input_data_folder)
validate_input_data(con)
# Create assets_rep_periods_partitions if necessary
con.execute("""
CREATE TABLE IF NOT EXISTS assets_rep_periods_partitions
(asset VARCHAR
, year INTEGER
, rep_period INTEGER
, specification VARCHAR
, partition VARCHAR
)""")
# Subset profiles_rep_periods 'timestep' to the start and end times
timestep_window = con.execute(
"""SELECT
MIN(timestep) as t_start,
MAX(timestep) as t_end
FROM annular_window"""
).df()
# cast to Python int; np.int64 cannot be passed to DuckDB
timestep_start = int(timestep_window["t_start"].values[0])
timestep_end = int(timestep_window["t_end"].values[0])
for tbl_name in TABLES_TO_MODIFY:
con.execute(f"""DROP TABLE IF EXISTS annular_{tbl_name}""")
con.execute(f"""CREATE OR REPLACE TABLE backup_{tbl_name} AS FROM {tbl_name}""")
# Constraining the existing profiles into the window and renumbering the window
con.execute(f"""
CREATE OR REPLACE TABLE profiles_rep_periods AS (
WITH original_data AS (
SELECT * FROM profiles_rep_periods
)
SELECT
* EXCLUDE (timestep),
timestep + 1 - {timestep_start} AS timestep
FROM original_data
WHERE timestep BETWEEN {timestep_start} AND {timestep_end}
ORDER BY profile_name, year, rep_period, timestep
)""")
# Reindex the timestep to be 1:length(window) (+ 24?)
num_timesteps = timestep_end - timestep_start + 1
tbl_dict = {"rep_periods_data": "num_timesteps", "year_data": "length"}
for tbl_name, col_name in tbl_dict.items():
con.execute(f"""UPDATE {tbl_name}
SET {col_name} = {num_timesteps}""")
# TODO: (LATER) update the storage level
years = con.execute("""SELECT DISTINCT milestone_year AS year from asset_milestone""").df()
if len(years) > 1:
msg = f"Using Tulipa as market model is only allowed with a single year. Found: {years}"
raise RuntimeError(msg)
year = int(years["year"].values[0])
convert_bids_table(con)
bid_manager = define_bid_manager(con)
# Add columns that might not exist
for table_name, column_name, column_type in [
("asset", "capacity", "DOUBLE"),
("asset", "consumer_balance_sense", "VARCHAR"),
("asset", "min_operating_point", "DOUBLE"),
("asset", "unit_commitment", "BOOLEAN"),
("asset", "unit_commitment_integer", "BOOLEAN"),
("asset", "unit_commitment_method", "VARCHAR"),
("asset_milestone", "peak_demand", "DOUBLE"),
("flow_milestone", "operational_cost", "DOUBLE"),
]:
con.execute(f"""
ALTER TABLE {table_name}
ADD COLUMN IF NOT EXISTS {column_name} {column_type}
""")
# Integrate bids into the model
insert_bids_into_assets(con, year, num_timesteps)
insert_bids_into_flows(con, year, bid_manager)
insert_bids_into_profiles(con, year)
add_bid_loops(con, year)
insert_bid_profiles_into_rep_periods(con, num_timesteps, year, timestep_start)
# TODO: move this into a method of the config?
if config.debug_input_folder is not None:
logger.debug("Writing tables to csv for debugging.")
dir = Path(config.communication_folder) / config.debug_input_folder
dir.mkdir(exist_ok=True, parents=True)
for row in con.execute("SELECT table_name FROM duckdb_tables()").fetchall():
table_name = row[0]
path = dir / f"{table_name}.csv"
con.execute(
f"COPY (FROM {table_name} ORDER BY 1) TO '{str(path)}' (HEADER, DELIMITER ',')",
)
return bid_manager, timestep_start
[docs]
def postprocess_tulipa_model(
db_path: Path | str,
bid_manager: str,
timestep_start: int,
) -> None:
"""Postprocess results from TulipaEnergyModel."""
with duckdb.connect(db_path) as con:
query_args = {"bid_manager": bid_manager, "timestep_start": timestep_start}
con.execute(
"""
CREATE OR REPLACE TABLE annular_market_price AS
WITH cte_bids_manager AS (
SELECT dual_balance_consumer, time_block_start
FROM cons_balance_consumer
WHERE asset = $bid_manager
)
SELECT
timeseries.timestamp,
cons.dual_balance_consumer AS market_price,
FROM cte_bids_manager AS cons
LEFT JOIN annular_timeseries_data AS timeseries
ON cons.time_block_start + $timestep_start - 1 = timeseries.timestep
ORDER BY cons.time_block_start
""",
query_args,
)
con.execute(
"""
CREATE OR REPLACE TABLE annular_scheduled_bids AS
SELECT
annular_bids.satellite,
annular_bids.exclusive_group_id,
annular_bids.profile_block_id,
annular_bids.timestamp,
SUM(var_flow.solution) AS scheduled,
FROM annular_bids
LEFT JOIN bids
ON annular_bids.satellite = bids.satellite
AND annular_bids.exclusive_group_id = bids.exclusive_group_id
AND annular_bids.profile_block_id = bids.profile_block_id
LEFT JOIN annular_timeseries_data AS timeseries
ON annular_bids.timestamp = timeseries.timestamp
LEFT JOIN var_flow
ON ((var_flow.from_asset = $bid_manager AND var_flow.to_asset = bids.asset)
OR (var_flow.to_asset = $bid_manager AND var_flow.from_asset = bids.asset))
AND var_flow.time_block_start + $timestep_start - 1 = timeseries.timestep
GROUP BY
annular_bids.satellite,
annular_bids.exclusive_group_id,
annular_bids.profile_block_id,
annular_bids.timestamp
""",
query_args,
)
# undo modification to input in db
for tbl_name in TABLES_TO_MODIFY:
con.execute(f"ALTER TABLE {tbl_name} RENAME TO annular_{tbl_name}")
con.execute(f"ALTER TABLE backup_{tbl_name} RENAME TO {tbl_name}")