Source code for voxcity.simulator.solar.temporal

"""
Stage 3: Time-series integration.
"""

import os
from datetime import datetime
import pytz
import numpy as np
import matplotlib.pyplot as plt
import numba

from ...models import VoxCity
from ...exporter.obj import grid_to_obj
from .radiation import (
    get_direct_solar_irradiance_map,
    get_diffuse_solar_irradiance_map,
    compute_cumulative_solar_irradiance_faces_masked_timeseries,
    get_building_solar_irradiance,
)
from .sky import (
    generate_tregenza_patches,
    generate_reinhart_patches,
    generate_uniform_grid_patches,
    generate_fibonacci_patches,
    bin_sun_positions_to_patches,
    get_tregenza_patch_index_fast,
)


[docs] def get_solar_positions_astral(times, lon, lat): """ Compute solar azimuth and elevation for given times and location using Astral. Returns a DataFrame indexed by times with columns ['azimuth', 'elevation'] (degrees). """ import pandas as pd from astral import Observer from astral.sun import elevation, azimuth observer = Observer(latitude=lat, longitude=lon) df_pos = pd.DataFrame(index=times, columns=['azimuth', 'elevation'], dtype=float) for t in times: el = elevation(observer=observer, dateandtime=t) az = azimuth(observer=observer, dateandtime=t) df_pos.at[t, 'elevation'] = el df_pos.at[t, 'azimuth'] = az return df_pos
def _configure_num_threads(desired_threads=None, progress=False): try: cores = os.cpu_count() or 4 except Exception: cores = 4 used = desired_threads if desired_threads is not None else cores try: numba.set_num_threads(int(used)) except Exception: pass os.environ.setdefault('MKL_NUM_THREADS', '1') if 'OMP_NUM_THREADS' not in os.environ: os.environ['OMP_NUM_THREADS'] = str(int(used)) if progress: try: print(f"Numba threads: {numba.get_num_threads()} (requested {used})") except Exception: print(f"Numba threads set to {used}") return used def _auto_time_batch_size(n_faces, total_steps, user_value=None): if user_value is not None: return max(1, int(user_value)) if total_steps <= 0: return 1 if n_faces <= 5_000: batches = 2 elif n_faces <= 50_000: batches = 8 elif n_faces <= 200_000: batches = 16 else: batches = 32 batches = min(batches, total_steps) return max(1, total_steps // batches) # ============================================================================= # Sky Patch Optimization for Cumulative Solar Irradiance # ============================================================================= def _aggregate_weather_to_sky_patches( azimuth_arr, elevation_arr, dni_arr, dhi_arr, time_step_hours=1.0, sky_discretization="tregenza", **kwargs ): """ Aggregate weather data (DNI, DHI) into sky patches for efficient cumulative calculation. Instead of computing for each hourly sun position (potentially 8760 per year), this aggregates DNI into sky patches and sums DHI. Ray tracing is then performed once per patch instead of per timestep. Parameters ---------- azimuth_arr : np.ndarray Solar azimuth values in degrees. elevation_arr : np.ndarray Solar elevation values in degrees. dni_arr : np.ndarray Direct Normal Irradiance (W/m²) for each timestep. dhi_arr : np.ndarray Diffuse Horizontal Irradiance (W/m²) for each timestep. time_step_hours : float Duration of each timestep in hours. sky_discretization : str Method: "tregenza", "reinhart", "uniform", "fibonacci". **kwargs : dict Additional parameters (e.g., mf for Reinhart). Returns ------- dict Contains: - 'patch_directions': Unit vectors for each patch (N, 3) - 'patch_cumulative_dni': Cumulative DNI×hours per patch (Wh/m²) - 'patch_solid_angles': Solid angle of each patch (steradians) - 'patch_hours': Number of hours sun was in each patch - 'total_cumulative_dhi': Total DHI×hours sum (Wh/m²) - 'n_patches': Number of patches - 'n_original_timesteps': Original number of timesteps """ # Generate sky patches based on method if sky_discretization.lower() == "tregenza": patches, directions, solid_angles = generate_tregenza_patches() elif sky_discretization.lower() == "reinhart": mf = kwargs.get("reinhart_mf", kwargs.get("mf", 4)) patches, directions, solid_angles = generate_reinhart_patches(mf=mf) elif sky_discretization.lower() == "uniform": n_az = kwargs.get("sky_n_azimuth", kwargs.get("n_azimuth", 36)) n_el = kwargs.get("sky_n_elevation", kwargs.get("n_elevation", 9)) patches, directions, solid_angles = generate_uniform_grid_patches(n_az, n_el) elif sky_discretization.lower() == "fibonacci": n_patches = kwargs.get("sky_n_patches", kwargs.get("n_patches", 145)) patches, directions, solid_angles = generate_fibonacci_patches(n_patches=n_patches) else: raise ValueError(f"Unknown sky discretization method: {sky_discretization}") n_patches = len(patches) cumulative_dni = np.zeros(n_patches, dtype=np.float64) hours_count = np.zeros(n_patches, dtype=np.int32) total_cumulative_dhi = 0.0 n_timesteps = len(azimuth_arr) # Bin each sun position to a patch for i in range(n_timesteps): elev = elevation_arr[i] dhi = dhi_arr[i] # DHI accumulates regardless of sun position (sky-based diffuse) if dhi > 0: total_cumulative_dhi += dhi * time_step_hours # DNI only when sun is above horizon if elev <= 0: continue az = azimuth_arr[i] dni = dni_arr[i] if dni <= 0: continue # Find nearest patch if sky_discretization.lower() == "tregenza": patch_idx = int(get_tregenza_patch_index_fast(float(az), float(elev))) else: # For other methods, find nearest patch by direction elev_rad = np.deg2rad(elev) az_rad = np.deg2rad(az) sun_dir = np.array([ np.cos(elev_rad) * np.cos(az_rad), np.cos(elev_rad) * np.sin(az_rad), np.sin(elev_rad) ]) dots = np.sum(directions * sun_dir, axis=1) patch_idx = int(np.argmax(dots)) if patch_idx >= 0 and patch_idx < n_patches: cumulative_dni[patch_idx] += dni * time_step_hours hours_count[patch_idx] += 1 # Filter to patches that actually have sun exposure active_mask = cumulative_dni > 0 return { 'patches': patches, 'patch_directions': directions, 'patch_cumulative_dni': cumulative_dni, 'patch_solid_angles': solid_angles, 'patch_hours': hours_count, 'active_mask': active_mask, 'n_active_patches': int(np.sum(active_mask)), 'total_cumulative_dhi': total_cumulative_dhi, 'n_patches': n_patches, 'n_original_timesteps': n_timesteps, 'method': sky_discretization, }
[docs] def get_cumulative_global_solar_irradiance( voxcity: VoxCity, df, lon, lat, tz, direct_normal_irradiance_scaling=1.0, diffuse_irradiance_scaling=1.0, **kwargs, ): """ Integrate global horizontal irradiance over a period using EPW data. Returns W/m²·hour accumulation on the ground plane. Parameters ---------- voxcity : VoxCity The VoxCity model. df : pd.DataFrame Weather data with 'DNI' and 'DHI' columns. lon, lat : float Longitude and latitude for solar position calculation. tz : float Timezone offset in hours. direct_normal_irradiance_scaling : float Scaling factor for DNI. diffuse_irradiance_scaling : float Scaling factor for DHI. **kwargs : dict Additional options: - use_sky_patches : bool (default False) If True, use sky patch aggregation for efficiency. DNI is aggregated into sky patches and ray tracing is done once per patch instead of per timestep. - sky_discretization : str (default "tregenza") Sky discretization method: "tregenza", "reinhart", "uniform", "fibonacci" - reinhart_mf : int (default 4) Multiplication factor for Reinhart subdivision. - sky_n_patches : int (default 145) Number of patches for Fibonacci method. - progress_report : bool Print progress information. Returns ------- np.ndarray Cumulative global solar irradiance map (Wh/m²). """ view_point_height = kwargs.get("view_point_height", 1.5) colormap = kwargs.get("colormap", "magma") start_time = kwargs.get("start_time", "01-01 05:00:00") end_time = kwargs.get("end_time", "01-01 20:00:00") desired_threads = kwargs.get("numba_num_threads", None) progress_report = kwargs.get("progress_report", False) use_sky_patches = kwargs.get("use_sky_patches", False) sky_discretization = kwargs.get("sky_discretization", "tregenza") _configure_num_threads(desired_threads, progress=progress_report) if df.empty: raise ValueError("No data in EPW dataframe.") try: start_dt = datetime.strptime(start_time, "%m-%d %H:%M:%S") end_dt = datetime.strptime(end_time, "%m-%d %H:%M:%S") except ValueError as ve: raise ValueError("start_time and end_time must be in format 'MM-DD HH:MM:SS'") from ve df = df.copy() df['hour_of_year'] = (df.index.dayofyear - 1) * 24 + df.index.hour + 1 start_doy = datetime(2000, start_dt.month, start_dt.day).timetuple().tm_yday end_doy = datetime(2000, end_dt.month, end_dt.day).timetuple().tm_yday start_hour = (start_doy - 1) * 24 + start_dt.hour + 1 end_hour = (end_doy - 1) * 24 + end_dt.hour + 1 if start_hour <= end_hour: df_period = df[(df['hour_of_year'] >= start_hour) & (df['hour_of_year'] <= end_hour)] else: df_period = df[(df['hour_of_year'] >= start_hour) | (df['hour_of_year'] <= end_hour)] df_period = df_period[ ((df_period.index.hour != start_dt.hour) | (df_period.index.minute >= start_dt.minute)) & ((df_period.index.hour != end_dt.hour) | (df_period.index.minute <= end_dt.minute)) ] if df_period.empty: raise ValueError("No EPW data in the specified period.") offset_minutes = int(tz * 60) local_tz = pytz.FixedOffset(offset_minutes) df_period_local = df_period.copy() df_period_local.index = df_period_local.index.tz_localize(local_tz) df_period_utc = df_period_local.tz_convert(pytz.UTC) solar_positions = get_solar_positions_astral(df_period_utc.index, lon, lat) # Compute base diffuse map (SVF-based) - used in both methods diffuse_kwargs = kwargs.copy() diffuse_kwargs.update({'show_plot': False, 'obj_export': False}) base_diffuse_map = get_diffuse_solar_irradiance_map( voxcity, diffuse_irradiance=1.0, **diffuse_kwargs ) nx, ny, _ = voxcity.voxels.classes.shape cumulative_map = np.zeros((nx, ny)) mask_map = np.ones((nx, ny), dtype=bool) direct_kwargs = kwargs.copy() direct_kwargs.update({'show_plot': False, 'view_point_height': view_point_height, 'obj_export': False}) # ========================================================================= # Sky Patch Optimization Path # ========================================================================= if use_sky_patches: # Extract arrays for aggregation azimuth_arr = solar_positions['azimuth'].to_numpy() elevation_arr = solar_positions['elevation'].to_numpy() dni_arr = df_period_utc['DNI'].to_numpy() * direct_normal_irradiance_scaling dhi_arr = df_period_utc['DHI'].to_numpy() * diffuse_irradiance_scaling time_step_hours = kwargs.get("time_step_hours", 1.0) # Aggregate weather data into sky patches # Filter kwargs to avoid duplicate parameters sky_kwargs = {k: v for k, v in kwargs.items() if k not in ('sky_discretization', 'time_step_hours')} patch_data = _aggregate_weather_to_sky_patches( azimuth_arr, elevation_arr, dni_arr, dhi_arr, time_step_hours=time_step_hours, sky_discretization=sky_discretization, **sky_kwargs ) if progress_report: print(f"Sky patch optimization: {patch_data['n_original_timesteps']} timesteps → " f"{patch_data['n_active_patches']} active patches ({patch_data['method']})") print(f" Total cumulative DHI: {patch_data['total_cumulative_dhi']:.1f} Wh/m²") # Diffuse component: SVF × total cumulative DHI cumulative_diffuse = base_diffuse_map * patch_data['total_cumulative_dhi'] cumulative_map += np.nan_to_num(cumulative_diffuse, nan=0.0) mask_map &= ~np.isnan(cumulative_diffuse) # Direct component: loop over active patches only active_indices = np.where(patch_data['active_mask'])[0] patches = patch_data['patches'] patch_cumulative_dni = patch_data['patch_cumulative_dni'] for i, patch_idx in enumerate(active_indices): az_deg = patches[patch_idx, 0] el_deg = patches[patch_idx, 1] cumulative_dni_patch = patch_cumulative_dni[patch_idx] # Compute direct transmittance map for this patch direction # Using DNI=1.0 to get transmittance, then multiply by cumulative DNI direct_map = get_direct_solar_irradiance_map( voxcity, az_deg, el_deg, direct_normal_irradiance=1.0, # Get transmittance **direct_kwargs, ) # Accumulate: transmittance × cumulative DNI for this patch patch_contribution = direct_map * cumulative_dni_patch mask_map &= ~np.isnan(patch_contribution) cumulative_map += np.nan_to_num(patch_contribution, nan=0.0) if progress_report and ((i + 1) % max(1, len(active_indices) // 10) == 0 or i == len(active_indices) - 1): pct = (i + 1) * 100.0 / len(active_indices) print(f" Patch {i+1}/{len(active_indices)} ({pct:.1f}%)") # ========================================================================= # Original Per-Timestep Path # ========================================================================= else: for idx, (time_utc, row) in enumerate(df_period_utc.iterrows()): DNI = float(row['DNI']) * direct_normal_irradiance_scaling DHI = float(row['DHI']) * diffuse_irradiance_scaling solpos = solar_positions.loc[time_utc] azimuth_degrees = float(solpos['azimuth']) elevation_degrees = float(solpos['elevation']) direct_map = get_direct_solar_irradiance_map( voxcity, azimuth_degrees, elevation_degrees, direct_normal_irradiance=DNI, **direct_kwargs, ) diffuse_map = base_diffuse_map * DHI global_map = direct_map + diffuse_map mask_map &= ~np.isnan(global_map) cumulative_map += np.nan_to_num(global_map, nan=0.0) if kwargs.get("show_each_timestep", False): vmin = kwargs.get("vmin", 0.0) vmax = kwargs.get("vmax", max(direct_normal_irradiance_scaling, diffuse_irradiance_scaling) * 1000) cmap = plt.cm.get_cmap(kwargs.get("colormap", "viridis")).copy() cmap.set_bad(color="lightgray") plt.figure(figsize=(10, 8)) plt.imshow(global_map, origin="lower", cmap=cmap, vmin=vmin, vmax=vmax) plt.axis("off") plt.colorbar(label="Global Solar Irradiance (W/m²)") plt.show() cumulative_map[~mask_map] = np.nan if kwargs.get("show_plot", True): vmin = kwargs.get("vmin", float(np.nanmin(cumulative_map))) vmax = kwargs.get("vmax", float(np.nanmax(cumulative_map))) cmap = plt.cm.get_cmap(colormap).copy() cmap.set_bad(color="lightgray") plt.figure(figsize=(10, 8)) plt.imshow(cumulative_map, origin="lower", cmap=cmap, vmin=vmin, vmax=vmax) plt.colorbar(label="Cumulative Global Solar Irradiance (W/m²·hour)") plt.axis("off") plt.show() if kwargs.get("obj_export", False): vmin = kwargs.get("vmin", float(np.nanmin(cumulative_map))) vmax = kwargs.get("vmax", float(np.nanmax(cumulative_map))) dem_grid = kwargs.get("dem_grid", voxcity.dem.elevation if voxcity.dem else np.zeros_like(cumulative_map)) output_dir = kwargs.get("output_directory", "output") output_file_name = kwargs.get("output_file_name", "cumulative_global_solar_irradiance") num_colors = kwargs.get("num_colors", 10) alpha = kwargs.get("alpha", 1.0) meshsize = voxcity.voxels.meta.meshsize grid_to_obj( cumulative_map, dem_grid, output_dir, output_file_name, meshsize, view_point_height, colormap_name=colormap, num_colors=num_colors, alpha=alpha, vmin=vmin, vmax=vmax, ) return cumulative_map
[docs] def get_cumulative_building_solar_irradiance( voxcity: VoxCity, building_svf_mesh, weather_df, lon, lat, tz, **kwargs ): """ Cumulative Wh/m² on building faces over a period from weather dataframe. Parameters ---------- voxcity : VoxCity The VoxCity model. building_svf_mesh : trimesh.Trimesh Building mesh with SVF in metadata. weather_df : pd.DataFrame Weather data with 'DNI' and 'DHI' columns. lon, lat : float Longitude and latitude. tz : float Timezone offset in hours. **kwargs : dict Additional options: - use_sky_patches : bool (default False) If True, use sky patch aggregation for efficiency. Reduces computation from N timesteps to M patches (M << N). - sky_discretization : str (default "tregenza") Method: "tregenza", "reinhart", "uniform", "fibonacci" - reinhart_mf : int (default 4) Multiplication factor for Reinhart subdivision. - fast_path : bool (default True) Use optimized Numba kernels. - progress_report : bool Print progress information. Returns ------- trimesh.Trimesh Mesh with cumulative irradiance in metadata (direct, diffuse, global in Wh/m²). """ import numpy as _np from ..common.coordinates import scene_points_to_uv_domain, scene_vectors_to_uv_domain period_start = kwargs.get("period_start", "01-01 00:00:00") period_end = kwargs.get("period_end", "12-31 23:59:59") time_step_hours = float(kwargs.get("time_step_hours", 1.0)) direct_normal_irradiance_scaling = float(kwargs.get("direct_normal_irradiance_scaling", 1.0)) diffuse_irradiance_scaling = float(kwargs.get("diffuse_irradiance_scaling", 1.0)) progress_report = kwargs.get("progress_report", False) fast_path = kwargs.get("fast_path", True) use_sky_patches = kwargs.get("use_sky_patches", False) sky_discretization = kwargs.get("sky_discretization", "tregenza") try: start_dt = datetime.strptime(period_start, "%m-%d %H:%M:%S") end_dt = datetime.strptime(period_end, "%m-%d %H:%M:%S") except ValueError as ve: raise ValueError("Time must be in format 'MM-DD HH:MM:SS'") from ve offset_minutes = int(tz * 60) local_tz = pytz.FixedOffset(offset_minutes) df_period = weather_df[ ((weather_df.index.month > start_dt.month) | ((weather_df.index.month == start_dt.month) & (weather_df.index.day >= start_dt.day) & (weather_df.index.hour >= start_dt.hour))) & ((weather_df.index.month < end_dt.month) | ((weather_df.index.month == end_dt.month) & (weather_df.index.day <= end_dt.day) & (weather_df.index.hour <= end_dt.hour))) ] if df_period.empty: raise ValueError("No weather data in specified period.") df_period_local = df_period.copy() df_period_local.index = df_period_local.index.tz_localize(local_tz) df_period_utc = df_period_local.tz_convert(pytz.UTC) precomputed_solar_positions = kwargs.get("precomputed_solar_positions", None) if precomputed_solar_positions is not None and len(precomputed_solar_positions) == len(df_period_utc.index): solar_positions = precomputed_solar_positions else: solar_positions = get_solar_positions_astral(df_period_utc.index, lon, lat) times_len = len(df_period_utc.index) azimuth_deg_arr = solar_positions['azimuth'].to_numpy() elev_deg_arr = solar_positions['elevation'].to_numpy() # Account for grid rotation rotation_angle = 0 extras = getattr(voxcity, 'extras', None) if isinstance(extras, dict): rotation_angle = extras.get('rotation_angle', 0) az_rad_arr = _np.deg2rad(azimuth_deg_arr - rotation_angle) el_rad_arr = _np.deg2rad(elev_deg_arr) sun_dx_arr = _np.cos(el_rad_arr) * _np.cos(az_rad_arr) sun_dy_arr = _np.cos(el_rad_arr) * _np.sin(az_rad_arr) sun_dz_arr = _np.sin(el_rad_arr) sun_dirs_arr = _np.stack([sun_dx_arr, sun_dy_arr, sun_dz_arr], axis=1).astype(_np.float64) DNI_arr = (df_period_utc['DNI'].to_numpy() * direct_normal_irradiance_scaling).astype(_np.float64) DHI_arr = (df_period_utc['DHI'].to_numpy() * diffuse_irradiance_scaling).astype(_np.float64) sun_above_mask = elev_deg_arr > 0.0 n_faces = len(building_svf_mesh.faces) face_cum_direct = _np.zeros(n_faces, dtype=_np.float64) face_cum_diffuse = _np.zeros(n_faces, dtype=_np.float64) face_cum_global = _np.zeros(n_faces, dtype=_np.float64) voxel_data = voxcity.voxels.classes meshsize = float(voxcity.voxels.meta.meshsize) precomputed_geometry = kwargs.get("precomputed_geometry", None) if precomputed_geometry is not None: face_centers = precomputed_geometry.get("face_centers", building_svf_mesh.triangles_center) face_normals = precomputed_geometry.get("face_normals", building_svf_mesh.face_normals) face_svf = precomputed_geometry.get( "face_svf", building_svf_mesh.metadata['svf'] if ('svf' in building_svf_mesh.metadata) else _np.zeros(n_faces, dtype=_np.float64) ) grid_bounds_real = precomputed_geometry.get("grid_bounds_real", None) boundary_epsilon = precomputed_geometry.get("boundary_epsilon", None) else: face_centers = building_svf_mesh.triangles_center face_normals = building_svf_mesh.face_normals face_svf = building_svf_mesh.metadata['svf'] if ('svf' in building_svf_mesh.metadata) else _np.zeros(n_faces, dtype=_np.float64) grid_bounds_real = None boundary_epsilon = None face_centers = scene_points_to_uv_domain(face_centers) face_normals = scene_vectors_to_uv_domain(face_normals) if grid_bounds_real is None or boundary_epsilon is None: grid_shape = voxel_data.shape grid_bounds_voxel = _np.array([[0, 0, 0], [grid_shape[0], grid_shape[1], grid_shape[2]]], dtype=_np.float64) grid_bounds_real = grid_bounds_voxel * meshsize boundary_epsilon = meshsize * 0.05 hit_values = (0,) inclusion_mode = False tree_k = kwargs.get("tree_k", 0.6) tree_lad = kwargs.get("tree_lad", 1.0) boundary_mask = None instant_kwargs = kwargs.copy() instant_kwargs['obj_export'] = False total_steps = times_len progress_every = max(1, total_steps // 20) face_centers64 = (face_centers if isinstance(face_centers, _np.ndarray) else building_svf_mesh.triangles_center).astype(_np.float64) face_normals64 = (face_normals if isinstance(face_normals, _np.ndarray) else building_svf_mesh.face_normals).astype(_np.float64) face_svf64 = face_svf.astype(_np.float64) x_min, y_min, z_min = grid_bounds_real[0, 0], grid_bounds_real[0, 1], grid_bounds_real[0, 2] x_max, y_max, z_max = grid_bounds_real[1, 0], grid_bounds_real[1, 1], grid_bounds_real[1, 2] # ========================================================================= # Sky Patch Optimization Path for Building Faces # ========================================================================= if use_sky_patches: # Aggregate weather data into sky patches # Filter kwargs to avoid duplicate parameters sky_kwargs = {k: v for k, v in kwargs.items() if k not in ('sky_discretization', 'time_step_hours')} patch_data = _aggregate_weather_to_sky_patches( azimuth_deg_arr, elev_deg_arr, DNI_arr, DHI_arr, time_step_hours=time_step_hours, sky_discretization=sky_discretization, **sky_kwargs ) if progress_report: print(f"Sky patch optimization: {patch_data['n_original_timesteps']} timesteps → " f"{patch_data['n_active_patches']} active patches ({patch_data['method']})") print(f" Faces: {n_faces:,}, Total cumulative DHI: {patch_data['total_cumulative_dhi']:.1f} Wh/m²") # Diffuse component: SVF × total cumulative DHI # (DHI is sky-hemisphere based, so sum over all timesteps) face_cum_diffuse = face_svf64 * patch_data['total_cumulative_dhi'] # Direct component: loop over active patches active_indices = _np.where(patch_data['active_mask'])[0] patches = patch_data['patches'] patch_cumulative_dni = patch_data['patch_cumulative_dni'] # Prepare masks for fast path precomputed_masks = kwargs.get("precomputed_masks", None) if precomputed_masks is not None: vox_is_tree = precomputed_masks.get("vox_is_tree", (voxel_data == -2)) vox_is_opaque = precomputed_masks.get("vox_is_opaque", (voxel_data != 0) & (voxel_data != -2)) att = float(precomputed_masks.get("att", _np.exp(-tree_k * tree_lad * meshsize))) else: vox_is_tree = (voxel_data == -2) vox_is_opaque = (voxel_data != 0) & (~vox_is_tree) att = float(_np.exp(-tree_k * tree_lad * meshsize)) from .radiation import compute_solar_irradiance_for_all_faces_masked for i, patch_idx in enumerate(active_indices): az_deg = float(patches[patch_idx, 0]) el_deg = float(patches[patch_idx, 1]) cumulative_dni_patch = float(patch_cumulative_dni[patch_idx]) # Convert patch direction to sun vector az_rad = _np.deg2rad(az_deg - rotation_angle) el_rad = _np.deg2rad(el_deg) sun_dx = _np.cos(el_rad) * _np.cos(az_rad) sun_dy = _np.cos(el_rad) * _np.sin(az_rad) sun_dz = _np.sin(el_rad) sun_direction = _np.array([sun_dx, sun_dy, sun_dz], dtype=_np.float64) # Compute direct irradiance for this patch direction (using DNI=1 for transmittance) patch_direct, _, _ = compute_solar_irradiance_for_all_faces_masked( face_centers64, face_normals64, face_svf64, sun_direction, 1.0, # DNI = 1 to get cos(incidence) × transmittance 0.0, # No diffuse here vox_is_tree, vox_is_opaque, float(meshsize), att, float(x_min), float(y_min), float(z_min), float(x_max), float(y_max), float(z_max), float(boundary_epsilon) ) # Accumulate: transmittance factor × cumulative DNI for this patch face_cum_direct += _np.nan_to_num(patch_direct, nan=0.0) * cumulative_dni_patch if progress_report and ((i + 1) % max(1, len(active_indices) // 10) == 0 or i == len(active_indices) - 1): pct = (i + 1) * 100.0 / len(active_indices) print(f" Patch {i+1}/{len(active_indices)} ({pct:.1f}%)") # Combine direct and diffuse face_cum_global = face_cum_direct + face_cum_diffuse # Apply boundary mask from SVF boundary_mask = _np.isnan(face_svf64) # ========================================================================= # Original Fast Path (Per-Timestep with Batching) # ========================================================================= elif fast_path: precomputed_masks = kwargs.get("precomputed_masks", None) if precomputed_masks is not None: vox_is_tree = precomputed_masks.get("vox_is_tree", (voxel_data == -2)) vox_is_opaque = precomputed_masks.get("vox_is_opaque", (voxel_data != 0) & (voxel_data != -2)) att = float(precomputed_masks.get("att", _np.exp(-tree_k * tree_lad * meshsize))) else: vox_is_tree = (voxel_data == -2) vox_is_opaque = (voxel_data != 0) & (~vox_is_tree) att = float(_np.exp(-tree_k * tree_lad * meshsize)) time_batch_size = _auto_time_batch_size(n_faces, total_steps, kwargs.get("time_batch_size", None)) if progress_report: print(f"Faces: {n_faces:,}, Timesteps: {total_steps:,}, Batch size: {time_batch_size}") for start in range(0, total_steps, time_batch_size): end = min(start + time_batch_size, total_steps) ch_dir, ch_diff, ch_glob = compute_cumulative_solar_irradiance_faces_masked_timeseries( face_centers64, face_normals64, face_svf64, sun_dirs_arr.astype(_np.float64), DNI_arr.astype(_np.float64), DHI_arr.astype(_np.float64), vox_is_tree, vox_is_opaque, float(meshsize), float(att), float(x_min), float(y_min), float(z_min), float(x_max), float(y_max), float(z_max), float(boundary_epsilon), int(start), int(end), float(time_step_hours) ) face_cum_direct += ch_dir face_cum_diffuse += ch_diff face_cum_global += ch_glob if progress_report: pct = (end * 100.0) / total_steps print(f"Cumulative irradiance: {end}/{total_steps} ({pct:.1f}%)") else: for idx in range(total_steps): DNI = float(DNI_arr[idx]) DHI = float(DHI_arr[idx]) if not sun_above_mask[idx]: if boundary_mask is None: boundary_mask = _np.isnan(face_svf) face_cum_diffuse += _np.nan_to_num(face_svf * DHI) * time_step_hours face_cum_global += _np.nan_to_num(face_svf * DHI) * time_step_hours if progress_report and (((idx + 1) % progress_every == 0) or (idx == total_steps - 1)): pct = (idx + 1) * 100.0 / total_steps print(f"Cumulative irradiance: {idx+1}/{total_steps} ({pct:.1f}%)") continue irr_mesh = get_building_solar_irradiance( voxcity, building_svf_mesh, float(azimuth_deg_arr[idx]), float(elev_deg_arr[idx]), DNI, DHI, show_plot=False, **instant_kwargs ) face_direct = irr_mesh.metadata['direct'] face_diffuse = irr_mesh.metadata['diffuse'] face_global = irr_mesh.metadata['global'] if boundary_mask is None: boundary_mask = _np.isnan(face_global) face_cum_direct += _np.nan_to_num(face_direct) * time_step_hours face_cum_diffuse += _np.nan_to_num(face_diffuse) * time_step_hours face_cum_global += _np.nan_to_num(face_global) * time_step_hours if progress_report and (((idx + 1) % progress_every == 0) or (idx == total_steps - 1)): pct = (idx + 1) * 100.0 / total_steps print(f"Cumulative irradiance: {idx+1}/{total_steps} ({pct:.1f}%)") if boundary_mask is not None: face_cum_direct[boundary_mask] = _np.nan face_cum_diffuse[boundary_mask] = _np.nan face_cum_global[boundary_mask] = _np.nan cumulative_mesh = building_svf_mesh.copy() if not hasattr(cumulative_mesh, 'metadata'): cumulative_mesh.metadata = {} if 'svf' in building_svf_mesh.metadata: cumulative_mesh.metadata['svf'] = building_svf_mesh.metadata['svf'] cumulative_mesh.metadata['direct'] = face_cum_direct cumulative_mesh.metadata['diffuse'] = face_cum_diffuse cumulative_mesh.metadata['global'] = face_cum_global cumulative_mesh.name = "Cumulative Solar Irradiance (Wh/m²)" return cumulative_mesh