"""
Stage 4: High-level workflows & I/O.
"""
from datetime import datetime
import pytz
from ...models import VoxCity
from ...utils.weather import (
get_nearest_epw_from_climate_onebuilding,
read_epw_for_solar_simulation,
)
from .radiation import get_global_solar_irradiance_map, get_building_solar_irradiance
from .temporal import get_cumulative_global_solar_irradiance, get_cumulative_building_solar_irradiance
from ..visibility import get_surface_view_factor
[docs]
def get_global_solar_irradiance_using_epw(
voxcity: VoxCity,
calc_type: str = "instantaneous",
direct_normal_irradiance_scaling: float = 1.0,
diffuse_irradiance_scaling: float = 1.0,
**kwargs,
):
"""
Compute global irradiance from EPW, either instantaneous or cumulative.
"""
# EPW acquisition
download_nearest_epw = kwargs.get("download_nearest_epw", False)
epw_file_path = kwargs.get("epw_file_path", None)
# Extract rectangle_vertices with fallback to voxcity.extras
rectangle_vertices = kwargs.get("rectangle_vertices", None)
if rectangle_vertices is None:
extras = getattr(voxcity, "extras", None)
if isinstance(extras, dict):
rectangle_vertices = extras.get("rectangle_vertices", None)
if download_nearest_epw:
if rectangle_vertices is None:
print("rectangle_vertices is required to download nearest EPW file")
return None
lons = [coord[0] for coord in rectangle_vertices]
lats = [coord[1] for coord in rectangle_vertices]
center_lon = (min(lons) + max(lons)) / 2
center_lat = (min(lats) + max(lats)) / 2
output_dir = kwargs.get("output_dir", "output")
max_distance = kwargs.get("max_distance", kwargs.get("max_distance_km", 100))
epw_file_path, weather_data, metadata = get_nearest_epw_from_climate_onebuilding(
longitude=center_lon,
latitude=center_lat,
output_dir=output_dir,
max_distance=max_distance,
extract_zip=True,
load_data=True,
allow_insecure_ssl=kwargs.get("allow_insecure_ssl", False),
allow_http_fallback=kwargs.get("allow_http_fallback", False),
ssl_verify=kwargs.get("ssl_verify", True),
)
if not download_nearest_epw and not epw_file_path:
raise ValueError("epw_file_path must be provided when download_nearest_epw is False")
# Read EPW
df, lon, lat, tz, elevation_m = read_epw_for_solar_simulation(epw_file_path)
if df.empty:
raise ValueError("No data in EPW file.")
if calc_type == "instantaneous":
calc_time = kwargs.get("calc_time", "01-01 12:00:00")
try:
calc_dt = datetime.strptime(calc_time, "%m-%d %H:%M:%S")
except ValueError as ve:
raise ValueError("calc_time must be in format 'MM-DD HH:MM:SS'") from ve
df_period = df[
(df.index.month == calc_dt.month)
& (df.index.day == calc_dt.day)
& (df.index.hour == calc_dt.hour)
]
if df_period.empty:
raise ValueError("No EPW data at the specified time.")
# Localize and convert to UTC
offset_minutes = int(tz * 60)
local_tz = pytz.FixedOffset(offset_minutes)
df_local = df_period.copy()
df_local.index = df_local.index.tz_localize(local_tz)
df_utc = df_local.tz_convert(pytz.UTC)
from .temporal import get_solar_positions_astral
solar_positions = get_solar_positions_astral(df_utc.index, lon, lat)
DNI = float(df_utc.iloc[0]["DNI"]) * direct_normal_irradiance_scaling
DHI = float(df_utc.iloc[0]["DHI"]) * diffuse_irradiance_scaling
azimuth_degrees = float(solar_positions.iloc[0]["azimuth"])
elevation_degrees = float(solar_positions.iloc[0]["elevation"])
solar_map = get_global_solar_irradiance_map(
voxcity,
azimuth_degrees,
elevation_degrees,
DNI,
DHI,
show_plot=True,
**kwargs,
)
return solar_map
if calc_type == "cumulative":
start_hour = kwargs.get("start_hour", 0)
end_hour = kwargs.get("end_hour", 23)
df_filtered = df[(df.index.hour >= start_hour) & (df.index.hour <= end_hour)]
solar_map = get_cumulative_global_solar_irradiance(
voxcity,
df_filtered,
lon,
lat,
tz,
direct_normal_irradiance_scaling=direct_normal_irradiance_scaling,
diffuse_irradiance_scaling=diffuse_irradiance_scaling,
**kwargs,
)
return solar_map
raise ValueError("calc_type must be 'instantaneous' or 'cumulative'")
[docs]
def get_building_global_solar_irradiance_using_epw(*args, **kwargs):
"""
Compute building-surface irradiance using EPW (instantaneous or cumulative).
"""
voxcity: VoxCity = kwargs.get("voxcity")
if voxcity is None and len(args) > 0 and isinstance(args[0], VoxCity):
voxcity = args[0]
if voxcity is None:
raise ValueError("voxcity (VoxCity) must be provided as first arg or kwarg")
calc_type = kwargs.get("calc_type", "instantaneous")
direct_normal_irradiance_scaling = float(kwargs.get("direct_normal_irradiance_scaling", 1.0))
diffuse_irradiance_scaling = float(kwargs.get("diffuse_irradiance_scaling", 1.0))
building_svf_mesh = kwargs.get("building_svf_mesh", None)
building_id_grid = kwargs.get("building_id_grid", None)
progress_report = kwargs.get("progress_report", False)
fast_path = kwargs.get("fast_path", True)
# Thread configuration
from .temporal import _configure_num_threads
desired_threads = kwargs.get("numba_num_threads", None)
_configure_num_threads(desired_threads, progress=progress_report)
# EPW acquisition
download_nearest_epw = kwargs.get("download_nearest_epw", False)
epw_file_path = kwargs.get("epw_file_path", None)
# Extract rectangle_vertices with fallback to voxcity.extras
rectangle_vertices = kwargs.get("rectangle_vertices", None)
if rectangle_vertices is None:
extras = getattr(voxcity, "extras", None)
if isinstance(extras, dict):
rectangle_vertices = extras.get("rectangle_vertices", None)
if download_nearest_epw:
if rectangle_vertices is None:
print("rectangle_vertices is required to download nearest EPW file")
return None
lons = [coord[0] for coord in rectangle_vertices]
lats = [coord[1] for coord in rectangle_vertices]
center_lon = (min(lons) + max(lons)) / 2
center_lat = (min(lats) + max(lats)) / 2
output_dir = kwargs.get("output_dir", "output")
max_distance = kwargs.get("max_distance", kwargs.get("max_distance_km", 100))
epw_file_path, _weather_data, _metadata = get_nearest_epw_from_climate_onebuilding(
longitude=center_lon,
latitude=center_lat,
output_dir=output_dir,
max_distance=max_distance,
extract_zip=True,
load_data=True,
allow_insecure_ssl=kwargs.get("allow_insecure_ssl", False),
allow_http_fallback=kwargs.get("allow_http_fallback", False),
ssl_verify=kwargs.get("ssl_verify", True),
)
if not download_nearest_epw and not epw_file_path:
raise ValueError("epw_file_path must be provided when download_nearest_epw is False")
# Read EPW
df, lon, lat, tz, _elevation_m = read_epw_for_solar_simulation(epw_file_path)
if df.empty:
raise ValueError("No data in EPW file.")
# SVF for building faces (compute if not provided)
if building_svf_mesh is None:
if progress_report:
print("Processing Sky View Factor for building surfaces...")
svf_kwargs = {
'value_name': 'svf',
'target_values': (0,),
'inclusion_mode': False,
'sky_diffuse': True,
'building_id_grid': building_id_grid,
'progress_report': progress_report,
'fast_path': fast_path,
}
for k in ("N_azimuth", "N_elevation", "tree_k", "tree_lad", "debug"):
if k in kwargs:
svf_kwargs[k] = kwargs[k]
building_svf_mesh = get_surface_view_factor(voxcity, **svf_kwargs)
# Precompute geometry/masks
import numpy as _np
voxel_data = voxcity.voxels.classes
meshsize = voxcity.voxels.meta.meshsize
precomputed_geometry = {}
try:
grid_shape = voxel_data.shape
grid_bounds_voxel = _np.array([[0, 0, 0], [grid_shape[0], grid_shape[1], grid_shape[2]]], dtype=_np.float64)
grid_bounds_real = grid_bounds_voxel * meshsize
boundary_epsilon = meshsize * 0.05
precomputed_geometry = {
'face_centers': building_svf_mesh.triangles_center,
'face_normals': building_svf_mesh.face_normals,
'face_svf': building_svf_mesh.metadata['svf'] if ('svf' in building_svf_mesh.metadata) else None,
'grid_bounds_real': grid_bounds_real,
'boundary_epsilon': boundary_epsilon,
}
except Exception:
precomputed_geometry = {}
tree_k = kwargs.get("tree_k", 0.6)
tree_lad = kwargs.get("tree_lad", 1.0)
precomputed_masks = {
'vox_is_tree': (voxel_data == -2),
'vox_is_opaque': (voxel_data != 0) & (voxel_data != -2),
'att': float(_np.exp(-tree_k * tree_lad * meshsize)),
}
if progress_report:
t_cnt = int(_np.count_nonzero(precomputed_masks['vox_is_tree']))
print(f"Precomputed caches: trees={t_cnt:,}, tree_att_per_voxel={precomputed_masks['att']:.4f}")
result_mesh = None
if calc_type == "instantaneous":
calc_time = kwargs.get("calc_time", "01-01 12:00:00")
try:
calc_dt = datetime.strptime(calc_time, "%m-%d %H:%M:%S")
except ValueError as ve:
raise ValueError("calc_time must be in format 'MM-DD HH:MM:SS'") from ve
df_period = df[
(df.index.month == calc_dt.month) & (df.index.day == calc_dt.day) & (df.index.hour == calc_dt.hour)
]
if df_period.empty:
raise ValueError("No EPW data at the specified time.")
offset_minutes = int(tz * 60)
local_tz = pytz.FixedOffset(offset_minutes)
df_local = df_period.copy()
df_local.index = df_local.index.tz_localize(local_tz)
df_utc = df_local.tz_convert(pytz.UTC)
from .temporal import get_solar_positions_astral
solar_positions = get_solar_positions_astral(df_utc.index, lon, lat)
DNI = float(df_utc.iloc[0]['DNI']) * direct_normal_irradiance_scaling
DHI = float(df_utc.iloc[0]['DHI']) * diffuse_irradiance_scaling
azimuth_degrees = float(solar_positions.iloc[0]['azimuth'])
elevation_degrees = float(solar_positions.iloc[0]['elevation'])
_call_kwargs = kwargs.copy()
_call_kwargs.update({
'progress_report': progress_report,
'fast_path': fast_path,
'precomputed_geometry': precomputed_geometry,
'precomputed_masks': precomputed_masks,
})
result_mesh = get_building_solar_irradiance(
voxcity,
building_svf_mesh,
azimuth_degrees,
elevation_degrees,
DNI,
DHI,
**_call_kwargs
)
elif calc_type == "cumulative":
period_start = kwargs.get("period_start", "01-01 00:00:00")
period_end = kwargs.get("period_end", "12-31 23:59:59")
time_step_hours = float(kwargs.get("time_step_hours", 1.0))
result_mesh = get_cumulative_building_solar_irradiance(
voxcity,
building_svf_mesh,
df,
lon,
lat,
tz,
period_start=period_start,
period_end=period_end,
time_step_hours=time_step_hours,
direct_normal_irradiance_scaling=direct_normal_irradiance_scaling,
diffuse_irradiance_scaling=diffuse_irradiance_scaling,
progress_report=progress_report,
fast_path=fast_path,
precomputed_geometry=precomputed_geometry,
precomputed_masks=precomputed_masks,
)
else:
raise ValueError("calc_type must be 'instantaneous' or 'cumulative'")
# Optional persist
if kwargs.get("save_mesh", False):
mesh_output_path = kwargs.get("mesh_output_path")
if not mesh_output_path:
output_directory = kwargs.get("output_directory", "output")
output_file_name = kwargs.get("output_file_name", f"{calc_type}_solar_irradiance")
mesh_output_path = f"{output_directory}/{output_file_name}.pkl"
save_irradiance_mesh(result_mesh, mesh_output_path)
if progress_report:
print(f"Saved irradiance mesh data to: {mesh_output_path}")
return result_mesh
[docs]
def save_irradiance_mesh(irradiance_mesh, output_file_path):
"""
Persist irradiance mesh to pickle file.
"""
import pickle
import os
os.makedirs(os.path.dirname(output_file_path), exist_ok=True)
with open(output_file_path, 'wb') as f:
pickle.dump(irradiance_mesh, f)
[docs]
def load_irradiance_mesh(input_file_path):
"""
Load irradiance mesh from pickle file.
"""
import pickle
with open(input_file_path, 'rb') as f:
irradiance_mesh = pickle.load(f)
return irradiance_mesh