Source code for voxcity.geoprocessor.raster.core

import warnings
import numpy as np
from typing import List, Tuple, Dict, Any
from shapely.geometry import Polygon, box

from ..utils import initialize_geod, calculate_distance, normalize_to_one_meter
from voxcity.utils.projector import GridGeom


[docs] def apply_operation(arr: np.ndarray, meshsize: float) -> np.ndarray: """ Applies a sequence of operations to an array based on a mesh size to normalize and discretize values. 1) Divide by meshsize, 2) +0.5, 3) floor, 4) rescale by meshsize """ step1 = arr / meshsize step2 = step1 + 0.5 step3 = np.floor(step2) return step3 * meshsize
[docs] def translate_array(input_array: np.ndarray, translation_dict: Dict[Any, Any]) -> np.ndarray: """ Translate values in an array using a dictionary mapping (vectorized). Any value not found in the mapping is replaced with an empty string. Returns an object-dtype ndarray preserving the input shape. """ if not isinstance(input_array, np.ndarray): input_array = np.asarray(input_array) getter = np.vectorize(lambda v: translation_dict.get(v, ''), otypes=[object]) return getter(input_array)
[docs] def group_and_label_cells(array: np.ndarray) -> np.ndarray: """ Convert non-zero numbers in a 2D numpy array to sequential IDs starting from 1. """ result = array.copy() unique_values = sorted(set(array.flatten()) - {0}) value_to_id = {value: idx + 1 for idx, value in enumerate(unique_values)} for value in unique_values: result[array == value] = value_to_id[value] return result
[docs] def process_grid_optimized(grid_bi: np.ndarray, dem_grid: np.ndarray) -> np.ndarray: """ Optimized version that computes per-building averages without allocating huge arrays when building IDs are large and sparse. """ result = dem_grid.copy() if np.any(grid_bi != 0): if grid_bi.dtype.kind == 'f': grid_bi_int = np.nan_to_num(grid_bi, nan=0).astype(np.int64) else: grid_bi_int = grid_bi.astype(np.int64) flat_ids = grid_bi_int.ravel() flat_dem = dem_grid.ravel() nz_mask = flat_ids != 0 if np.any(nz_mask): ids_nz = flat_ids[nz_mask] vals_nz = flat_dem[nz_mask] unique_ids, inverse_idx = np.unique(ids_nz, return_inverse=True) sums = np.bincount(inverse_idx, weights=vals_nz) counts = np.bincount(inverse_idx) counts[counts == 0] = 1 means = sums / counts result.ravel()[nz_mask] = means[inverse_idx] return result - np.min(result)
[docs] def process_grid(grid_bi: np.ndarray, dem_grid: np.ndarray) -> np.ndarray: """ Safe version that tries optimization first, then falls back to original method. """ try: return process_grid_optimized(grid_bi, dem_grid) except Exception as e: print(f"Optimized process_grid failed: {e}, using original method") unique_ids = np.unique(grid_bi[grid_bi != 0]) result = dem_grid.copy() for id_num in unique_ids: mask = (grid_bi == id_num) avg_value = np.mean(dem_grid[mask]) result[mask] = avg_value return result - np.min(result)
[docs] def calculate_grid_size( side_1: np.ndarray, side_2: np.ndarray, u_vec: np.ndarray, v_vec: np.ndarray, meshsize: float ) -> Tuple[Tuple[int, int], Tuple[float, float]]: """ Calculate grid size and adjusted mesh size based on input parameters. Returns ((nx, ny), (dx, dy)) """ dist_side_1_m = np.linalg.norm(side_1) / (np.linalg.norm(u_vec) + 1e-12) dist_side_2_m = np.linalg.norm(side_2) / (np.linalg.norm(v_vec) + 1e-12) grid_size_0 = max(1, int(dist_side_1_m / meshsize + 0.5)) grid_size_1 = max(1, int(dist_side_2_m / meshsize + 0.5)) adjusted_mesh_size_0 = dist_side_1_m / grid_size_0 adjusted_mesh_size_1 = dist_side_2_m / grid_size_1 return (grid_size_0, grid_size_1), (adjusted_mesh_size_0, adjusted_mesh_size_1)
[docs] def create_coordinate_mesh( origin: np.ndarray, grid_size: Tuple[int, int], adjusted_meshsize: Tuple[float, float], u_vec: np.ndarray, v_vec: np.ndarray ) -> np.ndarray: """ Create a coordinate mesh based on input parameters. Returns array of shape (coord_dim, ny, nx) """ x = np.linspace(0, grid_size[0], grid_size[0]) y = np.linspace(0, grid_size[1], grid_size[1]) xx, yy = np.meshgrid(x, y) cell_coords = origin[:, np.newaxis, np.newaxis] + \ xx[np.newaxis, :, :] * adjusted_meshsize[0] * u_vec[:, np.newaxis, np.newaxis] + \ yy[np.newaxis, :, :] * adjusted_meshsize[1] * v_vec[:, np.newaxis, np.newaxis] return cell_coords
[docs] def create_cell_polygon( origin: np.ndarray, i: int, j: int, adjusted_meshsize: Tuple[float, float], u_vec: np.ndarray, v_vec: np.ndarray ): """ Create a polygon representing a grid cell. """ bottom_left = origin + i * adjusted_meshsize[0] * u_vec + j * adjusted_meshsize[1] * v_vec bottom_right = origin + (i + 1) * adjusted_meshsize[0] * u_vec + j * adjusted_meshsize[1] * v_vec top_right = origin + (i + 1) * adjusted_meshsize[0] * u_vec + (j + 1) * adjusted_meshsize[1] * v_vec top_left = origin + i * adjusted_meshsize[0] * u_vec + (j + 1) * adjusted_meshsize[1] * v_vec return Polygon([bottom_left, bottom_right, top_right, top_left])
[docs] def compute_grid_geometry(rectangle_vertices, meshsize: float) -> GridGeom | None: """ Compute full grid geometry from rectangle vertices and mesh size. Returns a :class:`~voxcity.utils.projector.GridGeom` dict with keys: origin, side_1, side_2, u_vec, v_vec, grid_size, adj_mesh, meshsize_m — or *None* if inputs are insufficient. """ if rectangle_vertices is None or len(rectangle_vertices) < 4: return None v0 = np.array(rectangle_vertices[0]) v1 = np.array(rectangle_vertices[1]) v3 = np.array(rectangle_vertices[3]) side_1 = v1 - v0 side_2 = v3 - v0 geod = initialize_geod() dist_1 = calculate_distance(geod, v0[0], v0[1], v1[0], v1[1]) dist_2 = calculate_distance(geod, v0[0], v0[1], v3[0], v3[1]) u_vec = normalize_to_one_meter(side_1, dist_1) v_vec = normalize_to_one_meter(side_2, dist_2) grid_size, adj_mesh = calculate_grid_size(side_1, side_2, u_vec, v_vec, meshsize) return { "origin": v0, "side_1": side_1, "side_2": side_2, "u_vec": u_vec, "v_vec": v_vec, "grid_size": grid_size, "adj_mesh": adj_mesh, "meshsize_m": float(meshsize), }
[docs] def compute_cell_center_coords(rectangle_vertices, meshsize: float) -> dict: """ Compute (lon, lat) coordinates for each grid cell center. Works correctly for both axis-aligned and rotated rectangles. Cell (i, j) centre = origin + (i+0.5)*dx*u_vec + (j+0.5)*dy*v_vec. Returns a dict with all keys from :func:`compute_grid_geometry` plus: - ``lons`` : ndarray of shape *grid_size* with cell-centre longitudes - ``lats`` : ndarray of shape *grid_size* with cell-centre latitudes Returns *None* when *rectangle_vertices* is insufficient. """ geom = compute_grid_geometry(rectangle_vertices, meshsize) if geom is None: return None origin = geom["origin"] u_vec = geom["u_vec"] v_vec = geom["v_vec"] nx, ny = geom["grid_size"] dx, dy = geom["adj_mesh"] ii, jj = np.meshgrid(np.arange(nx) + 0.5, np.arange(ny) + 0.5, indexing="ij") lons = origin[0] + ii * dx * u_vec[0] + jj * dy * v_vec[0] lats = origin[1] + ii * dx * u_vec[1] + jj * dy * v_vec[1] return {**geom, "lons": lons, "lats": lats}
[docs] def compute_grid_shape(rectangle_vertices, meshsize: float) -> Tuple[int, int]: """ Compute the grid dimensions (rows, cols) for a given rectangle and mesh size. This is useful when you need to know the output grid shape without actually creating the grid (e.g., for pre-allocating arrays or fallback shapes). Args: rectangle_vertices: List of 4 vertices [(lon, lat), ...] defining the rectangle. meshsize: Grid cell size in meters. Returns: Tuple of (grid_size_0, grid_size_1) representing grid dimensions. """ geom = compute_grid_geometry(rectangle_vertices, meshsize) if geom is None: return (1, 1) return geom["grid_size"]
[docs] def normalize_gdf_crs(gdf, assume_epsg: int = 4326): """Return *gdf* re-projected to *assume_epsg*. If the GeoDataFrame has no CRS, EPSG:4326 is assumed with a warning. """ if gdf.crs is None: warnings.warn( f"GeoDataFrame has no CRS. Assuming EPSG:{assume_epsg}.", UserWarning, stacklevel=2, ) return gdf.set_crs(epsg=assume_epsg) if gdf.crs.to_epsg() != assume_epsg: return gdf.to_crs(epsg=assume_epsg) return gdf
[docs] def bbox_from_vertices(rectangle_vertices: List[Tuple[float, float]]) -> "box": """Return a shapely box covering *rectangle_vertices* (lon, lat) pairs.""" lons = [c[0] for c in rectangle_vertices] lats = [c[1] for c in rectangle_vertices] return box(min(lons), min(lats), max(lons), max(lats))