Source code for annular.market.tulipa.utils
import logging
from pathlib import Path
import duckdb
import numpy as np
import pandas as pd
from annular.utils import Sense
[docs]
def strip_timezone(timestamp: pd.Series | pd.DatetimeIndex | list) -> pd.Series | pd.DatetimeIndex:
"""Convert an input to pd.Series or pd.Index and strip timezone.
Args:
timestamp: The input to process.
"""
# Safety check b/c repo is not completely type-checked
if not isinstance(timestamp, (pd.Series, pd.DatetimeIndex, list, tuple)):
msg = f"timestamp {timestamp} is of wrong type, expected one of: pd.Series, pd.DatetimeIndex, list)"
raise TypeError(msg)
if isinstance(timestamp, pd.Series):
return timestamp.dt.tz_localize(None) if timestamp.dt.tz else timestamp
if isinstance(timestamp, (list, tuple)):
timestamp = pd.to_datetime(timestamp) # returns a pd.Index
return timestamp.tz_localize(None) if timestamp.tz else timestamp
[docs]
def create_tulipa_input_data(
generator_configs: dict[str, dict],
timeseries_data: pd.DataFrame,
db_file: str | None = None,
csv_folder: str | Path | None = None,
) -> None:
"""Creates the Tulipa input from the given arguments.
Populates a `duckdb` database with the input files for Tulipa.
This creates one producer asset per generator in `generator_configs` plus one consumer asset named `bid_manager`.
The profiles for the generators is stored in the `timeseries_data`.
Args:
generator_configs: dict of configurations defining of all generators.
timeseries_data: DataFrame containing all timeseries_data for generators.
db_file: A path to duckdb database. By default, an in-memory database is used
and not persisted, in which case `csv_folder` should be specified.
csv_folder: If given, all tables are saved as csv files in this location.
Notes:
The intended use of this function is to transform supply-side input data
for an annular model to a Tulipa model. The `bid_manager` asset is created
because we attach bids later to an (arbitrary) existing consumer asset,
but the supply-side data do not have any demand assets.
"""
year = timeseries_data.index[0].year
simulation_duration = len(timeseries_data)
bid_manager = "bid_manager"
db_file = ":memory:" if db_file is None else db_file
with duckdb.connect(db_file) as con:
con.execute("CREATE OR REPLACE TABLE asset (asset TEXT, type TEXT, capacity REAL)")
con.execute(f"""CREATE OR REPLACE TABLE asset_both (asset TEXT, milestone_year INT DEFAULT {year},
commission_year INT DEFAULT {year}, initial_units REAL DEFAULT 1.0)""")
con.execute(f"CREATE OR REPLACE TABLE asset_commission (asset TEXT, commission_year INT DEFAULT {year})")
con.execute(f"""CREATE OR REPLACE TABLE asset_milestone (asset TEXT, milestone_year INT DEFAULT {year},
peak_demand REAL DEFAULT 0.0)""")
con.execute("CREATE OR REPLACE TABLE flow (from_asset TEXT, to_asset TEXT)")
con.execute(f"""CREATE OR REPLACE TABLE flow_both (from_asset TEXT, to_asset TEXT,
milestone_year INT DEFAULT {year}, commission_year INT DEFAULT {year})""")
con.execute(f"""CREATE OR REPLACE TABLE flow_commission (from_asset TEXT, to_asset TEXT,
commission_year INT DEFAULT {year})""")
con.execute(f"""CREATE OR REPLACE TABLE flow_milestone (from_asset TEXT, to_asset TEXT,
milestone_year INT DEFAULT {year}, operational_cost REAL)""")
con.execute(f"""CREATE OR REPLACE TABLE assets_profiles (asset TEXT, commission_year INT DEFAULT {year},
profile_name TEXT, profile_type TEXT)""")
con.execute(f"""CREATE OR REPLACE TABLE profiles_rep_periods (profile_name TEXT, year INT DEFAULT {year},
rep_period INT DEFAULT 1, timestep INT, value REAL)""")
# Below we use prepared statements because the data comes from external source
# Generated by TulipaClustering (notice profiles_rep_periods wAS created directly instead of profiles above)
con.execute(
"""CREATE OR REPLACE TABLE rep_periods_data AS
(SELECT ? AS year, 1 AS rep_period, ? AS num_timesteps, 1.0 AS resolution)""",
[year, simulation_duration],
)
con.execute(
"""CREATE OR REPLACE TABLE rep_periods_mapping AS
(SELECT ? AS year, 1 AS period, 1 AS rep_period, 1.0 AS weight)""",
[year],
)
con.execute(
"""CREATE OR REPLACE TABLE timeframe_data AS
(SELECT ? AS year, 1 AS period, ? AS num_timesteps)""",
[year, simulation_duration],
)
con.execute(
"""CREATE OR REPLACE TABLE year_data AS
(SELECT ? AS year, ? AS length, TRUE AS is_milestone)""",
[year, simulation_duration],
)
# Bid Manager
con.execute("INSERT INTO asset (asset, type, capacity) VALUES (?, 'consumer', 0.0)", [bid_manager])
con.execute("INSERT INTO asset_both (asset) VALUES (?)", [bid_manager])
con.execute("INSERT INTO asset_commission (asset) VALUES (?)", [bid_manager])
con.execute("INSERT INTO asset_milestone (asset) VALUES (?)", [bid_manager])
# Generators
for asset_name, gen_config in generator_configs.items():
capacity = gen_config["installed_capacity"]
operational_cost = gen_config["marginal_cost_linear"]
con.execute(
"INSERT INTO asset (asset, type, capacity) VALUES (?, ?, ?)", [asset_name, "producer", capacity]
)
con.execute("INSERT INTO asset_both (asset) VALUES (?)", [asset_name])
con.execute("INSERT INTO asset_commission (asset) VALUES (?)", [asset_name])
con.execute("INSERT INTO asset_milestone (asset) VALUES (?)", [asset_name])
con.execute("INSERT INTO flow (from_asset, to_asset) VALUES (?, ?)", [asset_name, bid_manager])
con.execute("INSERT INTO flow_commission (from_asset, to_asset) VALUES (?, ?)", [asset_name, bid_manager])
con.execute(
"INSERT INTO flow_milestone (from_asset, to_asset, operational_cost) VALUES (?, ?, ?)",
[asset_name, bid_manager, operational_cost],
)
profile_column = gen_config.get("availability_factor", None)
if profile_column and profile_column in timeseries_data.columns:
profile_name = f"{profile_column}-availability-{year}"
profile_df = timeseries_data[[profile_column]].rename(columns={profile_column: "value"})
timesteps_since_start = profile_df.index - profile_df.index.min()
timesteps = 1 + timesteps_since_start.components.hours + 24 * timesteps_since_start.components.days
profile_df["timestep"] = timesteps.to_numpy()
profile_df["profile_name"] = profile_name
con.execute("INSERT INTO profiles_rep_periods BY NAME (FROM profile_df)")
con.execute(
"INSERT INTO assets_profiles (asset, profile_name, profile_type) VALUES (?, ?, 'availability')",
[asset_name, profile_name],
)
if not csv_folder:
return
for tbl in con.execute("SHOW TABLES").fetchall():
tbl_name = tbl[0]
filename = Path(csv_folder) / f"{tbl_name}.csv"
con.execute(f"COPY {tbl_name} TO '{filename}' (FORMAT CSV, HEADER)")
return
[docs]
def write_inputs_to_db(
connection: duckdb.DuckDBPyConnection,
bids: pd.DataFrame,
timeseries_data: pd.DataFrame,
window_df: pd.DataFrame,
) -> None:
"""Write input data to DuckDB tables.
Args:
connection: Open DuckDB connection.
bids: DataFrame with bids.
timeseries_data: DataFrame with timestamp and timestep columns.
window_df: DataFrame with timestamp and timestep columns for the window.
"""
logger.debug("Writing to duckdb table")
# Fails on pandas 3.0.0 due to https://github.com/duckdb/duckdb/issues/18297
connection.execute("CREATE TABLE annular_bids AS FROM bids")
connection.execute("CREATE TABLE annular_timeseries_data AS FROM timeseries_data")
connection.execute("CREATE TABLE annular_window AS FROM window_df")
[docs]
def read_outputs_from_db(
connection: duckdb.DuckDBPyConnection,
snapshots: pd.DatetimeIndex,
bid_idx_cols: list[str],
bids: pd.DataFrame,
) -> tuple[pd.DataFrame, pd.DataFrame]:
"""Read output data from DuckDB tables.
Args:
connection: Open DuckDB connection
snapshots: DatetimeIndex to filter market_price rows (the bidding window)
bid_idx_cols: Column names to use for setting the index
bids: Dataframe with with submitted bids.
Returns:
Tuple of (market_price, scheduled_bids) DataFrames.
scheduled_bids timestamps are localized back to UTC.
"""
logger.info("Reading annular_market_price table")
market_price = connection.sql("FROM annular_market_price").df()
market_price.set_index("timestamp", inplace=True)
market_price = market_price.loc[snapshots, :]
logger.info("Reading annular_scheduled_bids table")
scheduled_bids = connection.sql("FROM annular_scheduled_bids").df()
scheduled_bids.set_index(bid_idx_cols, inplace=True)
bids.set_index(bid_idx_cols, inplace=True)
scheduled_bids = scheduled_bids.loc[bids.index, :]
return market_price, scheduled_bids
[docs]
def convert_data_for_tulipa(
bids: pd.DataFrame,
timeseries_data: pd.DataFrame,
snapshots: pd.DatetimeIndex,
bidding_window: pd.DatetimeIndex,
) -> tuple[pd.DataFrame, pd.DataFrame, pd.DatetimeIndex, pd.DataFrame]:
"""Convert data so that Tulipa can use them.
In particular
- Strip timezone from inputs to prepare for database storage
- Reset all indexes
- Create 1-based time index that is used in Tulipa
- Denote `sense` column with strings "demand" and "supply" instead of numerics
To avoid mutation, input dataframes are copied.
Args:
bids: MultiIndex DataFrame with a 'timestamp' level.
timeseries_data: DataFrame with DatetimeIndex.
snapshots: Optimization window as a DatetimeIndex of timesteps.
bidding_window: timestamps for bidding window.
Returns:
Tuple of (bids, timeseries_data, snapshots, and window_df).
window_df is a correspondence between pd.Datetime timestamps in Annular and
integer timesteps in Tulipa, subset to the optimization window.
"""
optimization_window = snapshots
logger.debug("Preparing data for Tulipa")
bids = bids.copy().reset_index()
# NOTE: Extracting the index name here b/c it does not have a standardized name.
timeseries_index_name = timeseries_data.index.name
timeseries_data = timeseries_data.copy().reset_index()
timeseries_data = timeseries_data.rename(columns={timeseries_index_name: "timestamp"})
bids["timestamp"] = strip_timezone(bids["timestamp"])
timeseries_data["timestamp"] = strip_timezone(timeseries_data["timestamp"])
optimization_window = strip_timezone(optimization_window)
bidding_window = strip_timezone(bidding_window)
timeseries_data["timestep"] = (timeseries_data["timestamp"] - min(timeseries_data["timestamp"])).apply(
lambda t: 1 + round(t.seconds / 3600)
)
optimization_window_df = (
timeseries_data.loc[:, ["timestamp", "timestep"]]
.set_index("timestamp")
.loc[optimization_window, :]
.reset_index()
)
bids["sense"] = np.where(bids["sense"] == Sense.SUPPLY, "supply", "demand")
return bids, timeseries_data, bidding_window, optimization_window_df