import os
import warnings
from typing import List, Tuple, Optional
import numpy as np
import pandas as pd
import geopandas as gpd
from shapely.geometry import mapping
from shapely.errors import GEOSException
from affine import Affine
from rtree import index
from rasterio import features
from ..utils import (
initialize_geod,
calculate_distance,
normalize_to_one_meter,
convert_format_lat_lon,
)
from ..heights import (
extract_building_heights_from_geotiff,
extract_building_heights_from_gdf,
complement_building_heights_from_gdf,
)
from ..overlap import (
process_building_footprints_by_overlap,
)
from ...downloader.gee import (
get_roi,
save_geotiff_open_buildings_temporal,
)
from .core import calculate_grid_size, create_cell_polygon, compute_grid_geometry, bbox_from_vertices
from ...utils.logging import get_logger
_logger = get_logger(__name__)
# Overlap-ratio thresholds used by _decide_auto_mode to choose the precise
# geometry-intersection rasterization path over the faster rasterio path.
_OVERLAP_HIGH = 0.15 # >15 % overlap → always use precise mode
_OVERLAP_MEDIUM = 0.08 # >8 % overlap in a dense scene → use precise mode
_DENSITY_MEDIUM = 0.15 # density threshold paired with _OVERLAP_MEDIUM
_OVERLAP_LOW_SMALL = 0.05 # >5 % overlap in a small dataset (≤200 buildings)
# Fraction of a grid cell's area that a building must cover to be assigned
# that building's height. Below this the cell is left as ground.
_CELL_INTERSECTION_THRESHOLD = 0.3
def _validate_meshsize(meshsize: float) -> None:
if meshsize < 0.1:
warnings.warn(
f"meshsize={meshsize} is very small (< 0.1 m). Check that you are not passing degrees.",
UserWarning, stacklevel=3,
)
elif meshsize > 1000:
warnings.warn(
f"meshsize={meshsize} is very large (> 1000 m). Check units.",
UserWarning, stacklevel=3,
)
[docs]
def create_building_height_grid_from_gdf_polygon(
gdf: gpd.GeoDataFrame,
meshsize: float,
rectangle_vertices: List[Tuple[float, float]],
overlapping_footprint: any = "auto",
gdf_comp: Optional[gpd.GeoDataFrame] = None,
geotiff_path_comp: Optional[str] = None,
complement_building_footprints: Optional[bool] = None,
complement_height: Optional[float] = None
):
"""
Create a building height grid from GeoDataFrame data within a polygon boundary.
Returns: (building_height_grid, building_min_height_grid, building_id_grid, filtered_buildings)
"""
_validate_meshsize(meshsize)
geom = compute_grid_geometry(rectangle_vertices, meshsize)
origin = geom["origin"]
side_1, side_2 = geom["side_1"], geom["side_2"]
u_vec, v_vec = geom["u_vec"], geom["v_vec"]
grid_size, adjusted_meshsize = geom["grid_size"], geom["adj_mesh"]
plotting_box = bbox_from_vertices(rectangle_vertices)
filtered_gdf = gdf[gdf.geometry.intersects(plotting_box)].copy()
zero_height_count = len(filtered_gdf[filtered_gdf['height'] == 0])
nan_height_count = len(filtered_gdf[filtered_gdf['height'].isna()])
_logger.info(f"{zero_height_count+nan_height_count} of the total {len(filtered_gdf)} building footprint from the base data source did not have height data.")
if gdf_comp is not None:
filtered_gdf_comp = gdf_comp[gdf_comp.geometry.intersects(plotting_box)].copy()
if complement_building_footprints:
filtered_gdf = complement_building_heights_from_gdf(filtered_gdf, filtered_gdf_comp)
else:
filtered_gdf = extract_building_heights_from_gdf(filtered_gdf, filtered_gdf_comp)
elif geotiff_path_comp:
filtered_gdf = extract_building_heights_from_geotiff(geotiff_path_comp, filtered_gdf)
filtered_gdf = process_building_footprints_by_overlap(filtered_gdf, overlap_threshold=0.5)
mode = overlapping_footprint
if mode is None:
mode = "auto"
mode_norm = mode.strip().lower() if isinstance(mode, str) else mode
def _decide_auto_mode(gdf_in) -> bool:
try:
n_buildings = len(gdf_in)
if n_buildings == 0:
return False
num_cells = max(1, int(grid_size[0]) * int(grid_size[1]))
density = float(n_buildings) / float(num_cells)
sample_n = min(800, n_buildings)
idx_rt = index.Index()
geoms = []
areas = []
for i, geom in enumerate(gdf_in.geometry):
g = geom
if not getattr(g, "is_valid", True):
try:
g = g.buffer(0)
except (GEOSException, ValueError):
_logger.debug("Could not repair invalid geometry at index %d", i)
geoms.append(g)
try:
areas.append(g.area)
except (GEOSException, AttributeError):
areas.append(0.0)
try:
idx_rt.insert(i, g.bounds)
except (GEOSException, AttributeError, ValueError):
pass
with_overlap = 0
step = max(1, n_buildings // sample_n)
checked = 0
for i in range(0, n_buildings, step):
if checked >= sample_n:
break
gi = geoms[i]
ai = areas[i] if i < len(areas) else 0.0
if gi is None:
continue
try:
potentials = list(idx_rt.intersection(gi.bounds))
except (GEOSException, AttributeError):
potentials = []
overlapped = False
for j in potentials:
if j == i or j >= len(geoms):
continue
gj = geoms[j]
if gj is None:
continue
try:
if gi.intersects(gj):
inter = gi.intersection(gj)
inter_area = getattr(inter, "area", 0.0)
if inter_area > 0.0:
aj = areas[j] if j < len(areas) else 0.0
ref_area = max(1e-9, min(ai, aj) if ai > 0 and aj > 0 else (ai if ai > 0 else aj))
if (inter_area / ref_area) >= 0.2:
overlapped = True
break
except GEOSException:
continue
if overlapped:
with_overlap += 1
checked += 1
overlap_ratio = (with_overlap / checked) if checked > 0 else 0.0
if overlap_ratio >= _OVERLAP_HIGH:
return True
if overlap_ratio >= _OVERLAP_MEDIUM and density > _DENSITY_MEDIUM:
return True
if n_buildings <= 200 and overlap_ratio >= _OVERLAP_LOW_SMALL:
return True
return False
except Exception:
_logger.warning("Auto-mode overlap detection failed; defaulting to non-precise mode", exc_info=True)
return False
if mode_norm == "auto":
use_precise = _decide_auto_mode(filtered_gdf)
elif mode_norm is True:
use_precise = True
else:
use_precise = False
if use_precise:
return _process_with_geometry_intersection(
filtered_gdf, grid_size, adjusted_meshsize, origin, u_vec, v_vec, complement_height
)
return _process_with_rasterio(
filtered_gdf, grid_size, adjusted_meshsize, origin, u_vec, v_vec,
rectangle_vertices, complement_height
)
def _process_with_geometry_intersection(filtered_gdf, grid_size, adjusted_meshsize, origin, u_vec, v_vec, complement_height):
building_height_grid = np.zeros(grid_size)
building_id_grid = np.zeros(grid_size)
building_min_height_grid = np.empty(grid_size, dtype=object)
for idx_flat in range(building_min_height_grid.size):
building_min_height_grid.flat[idx_flat] = []
building_polygons = []
for idx_b, row in filtered_gdf.iterrows():
polygon = row.geometry
height = row.get('height', None)
if complement_height is not None and (height == 0 or height is None or pd.isna(height)):
height = complement_height
min_height = row.get('min_height', 0)
if pd.isna(min_height):
min_height = 0
is_inner = row.get('is_inner', False)
# Fix: Handle NaN values for is_inner (NaN is truthy, causing buildings to be skipped)
if pd.isna(is_inner):
is_inner = False
feature_id = row.get('id', idx_b)
if not polygon.is_valid:
try:
polygon = polygon.buffer(0)
if not polygon.is_valid:
polygon = polygon.simplify(1e-8)
except (GEOSException, ValueError):
_logger.debug("Could not repair invalid building polygon (id=%s)", feature_id)
bounding_box = polygon.bounds
building_polygons.append((
polygon, bounding_box, height, min_height, is_inner, feature_id
))
idx = index.Index()
for i_b, (poly, bbox, _, _, _, _) in enumerate(building_polygons):
idx.insert(i_b, bbox)
INTERSECTION_THRESHOLD = _CELL_INTERSECTION_THRESHOLD
for i in range(grid_size[0]):
for j in range(grid_size[1]):
cell = create_cell_polygon(origin, i, j, adjusted_meshsize, u_vec, v_vec)
if not cell.is_valid:
cell = cell.buffer(0)
cell_area = cell.area
potential = list(idx.intersection(cell.bounds))
if not potential:
continue
cell_buildings = []
for k in potential:
bpoly, bbox, height, minh, inr, fid = building_polygons[k]
sort_val = height if (height is not None) else -float('inf')
cell_buildings.append((k, bpoly, bbox, height, minh, inr, fid, sort_val))
cell_buildings.sort(key=lambda x: x[-1], reverse=True)
found_intersection = False
all_zero_or_nan = True
for (k, polygon, bbox, height, min_height, is_inner, feature_id, _) in cell_buildings:
try:
minx_p, miny_p, maxx_p, maxy_p = bbox
minx_c, miny_c, maxx_c, maxy_c = cell.bounds
overlap_minx = max(minx_p, minx_c)
overlap_miny = max(miny_p, miny_c)
overlap_maxx = min(maxx_p, maxx_c)
overlap_maxy = min(maxy_p, maxy_c)
if (overlap_maxx <= overlap_minx) or (overlap_maxy <= overlap_miny):
continue
bbox_intersect_area = (overlap_maxx - overlap_minx) * (overlap_maxy - overlap_miny)
if bbox_intersect_area < INTERSECTION_THRESHOLD * cell_area:
continue
if not polygon.is_valid:
polygon = polygon.buffer(0)
if cell.intersects(polygon):
intersection = cell.intersection(polygon)
inter_area = intersection.area
if (inter_area / cell_area) > INTERSECTION_THRESHOLD:
found_intersection = True
if not is_inner:
building_min_height_grid[i, j].append([min_height, height])
building_id_grid[i, j] = feature_id
if (height is not None and not np.isnan(height) and height > 0):
all_zero_or_nan = False
current_height = building_height_grid[i, j]
if (current_height == 0 or np.isnan(current_height) or current_height < height):
building_height_grid[i, j] = height
else:
building_min_height_grid[i, j] = [[0, 0]]
building_height_grid[i, j] = 0
found_intersection = True
all_zero_or_nan = False
break
except (GEOSException, ValueError):
try:
simplified_polygon = polygon.simplify(1e-8)
if simplified_polygon.is_valid:
intersection = cell.intersection(simplified_polygon)
inter_area = intersection.area
if (inter_area / cell_area) > INTERSECTION_THRESHOLD:
found_intersection = True
if not is_inner:
building_min_height_grid[i, j].append([min_height, height])
building_id_grid[i, j] = feature_id
if (height is not None and not np.isnan(height) and height > 0):
all_zero_or_nan = False
if (building_height_grid[i, j] == 0 or
np.isnan(building_height_grid[i, j]) or
building_height_grid[i, j] < height):
building_height_grid[i, j] = height
else:
building_min_height_grid[i, j] = [[0, 0]]
building_height_grid[i, j] = 0
found_intersection = True
all_zero_or_nan = False
break
except GEOSException:
_logger.debug("Skipping geometry intersection at cell (%d,%d)", i, j)
continue
if found_intersection and all_zero_or_nan:
building_height_grid[i, j] = np.nan
return building_height_grid, building_min_height_grid, building_id_grid, filtered_gdf
def _process_with_rasterio(filtered_gdf, grid_size, adjusted_meshsize, origin, u_vec, v_vec, rectangle_vertices, complement_height):
# Build a rasterio Affine that maps (col=u_axis, row=v_axis) → (lon, lat).
# Works for both axis-aligned and rotated rectangles.
du, dv = adjusted_meshsize[0], adjusted_meshsize[1]
transform = Affine(du * u_vec[0], dv * v_vec[0], float(origin[0]),
du * u_vec[1], dv * v_vec[1], float(origin[1]))
filtered_gdf = filtered_gdf.copy()
if complement_height is not None:
mask = (filtered_gdf['height'] == 0) | (filtered_gdf['height'].isna())
filtered_gdf.loc[mask, 'height'] = complement_height
# Preserve existing min_height values; only set default for missing/NaN
if 'min_height' not in filtered_gdf.columns:
filtered_gdf['min_height'] = 0
else:
filtered_gdf['min_height'] = filtered_gdf['min_height'].fillna(0)
if 'is_inner' not in filtered_gdf.columns:
filtered_gdf['is_inner'] = False
else:
try:
filtered_gdf['is_inner'] = filtered_gdf['is_inner'].fillna(False).astype(bool)
except (ValueError, TypeError):
filtered_gdf['is_inner'] = False
if 'id' not in filtered_gdf.columns:
filtered_gdf['id'] = range(len(filtered_gdf))
regular_buildings = filtered_gdf[~filtered_gdf['is_inner']].copy()
regular_buildings = regular_buildings.sort_values('height', ascending=True, na_position='first')
height_raster = np.zeros((grid_size[1], grid_size[0]), dtype=np.float64)
id_raster = np.zeros((grid_size[1], grid_size[0]), dtype=np.float64)
if len(regular_buildings) > 0:
valid_buildings = regular_buildings[regular_buildings.geometry.is_valid].copy()
if len(valid_buildings) > 0:
height_shapes = [(mapping(geom), height) for geom, height in
zip(valid_buildings.geometry, valid_buildings['height'])
if pd.notna(height) and height > 0]
if height_shapes:
height_raster = features.rasterize(
height_shapes,
out_shape=(grid_size[1], grid_size[0]),
transform=transform,
fill=0,
dtype=np.float64
)
id_shapes = [(mapping(geom), id_val) for geom, id_val in
zip(valid_buildings.geometry, valid_buildings['id'])]
if id_shapes:
id_raster = features.rasterize(
id_shapes,
out_shape=(grid_size[1], grid_size[0]),
transform=transform,
fill=0,
dtype=np.float64
)
inner_buildings = filtered_gdf[filtered_gdf['is_inner']].copy()
if len(inner_buildings) > 0:
inner_shapes = [(mapping(geom), 1) for geom in inner_buildings.geometry if geom.is_valid]
if inner_shapes:
inner_mask = features.rasterize(
inner_shapes,
out_shape=(grid_size[1], grid_size[0]),
transform=transform,
fill=0,
dtype=np.uint8
)
height_raster[inner_mask > 0] = 0
id_raster[inner_mask > 0] = 0
building_min_height_grid = np.empty(grid_size, dtype=object)
min_heights_raster = np.zeros((grid_size[1], grid_size[0]), dtype=np.float64)
if len(regular_buildings) > 0:
valid_buildings = regular_buildings[regular_buildings.geometry.is_valid].copy()
if len(valid_buildings) > 0:
min_height_shapes = [(mapping(geom), min_h) for geom, min_h in
zip(valid_buildings.geometry, valid_buildings['min_height'])
if pd.notna(min_h)]
if min_height_shapes:
min_heights_raster = features.rasterize(
min_height_shapes,
out_shape=(grid_size[1], grid_size[0]),
transform=transform,
fill=0,
dtype=np.float64
)
# Rasterio writes (rows=v_axis, cols=u_axis) → shape (ny, nx).
# Transpose to (nx, ny) = (u_axis, v_axis) uv_m layout.
# ascontiguousarray prevents silent performance degradation in Numba JIT.
building_height_grid = np.ascontiguousarray(height_raster.T)
building_id_grid = np.ascontiguousarray(id_raster.T)
min_heights = np.ascontiguousarray(min_heights_raster.T)
flat_bmh = building_min_height_grid.reshape(-1)
flat_height = building_height_grid.reshape(-1)
flat_minh = min_heights.reshape(-1)
for idx in range(flat_bmh.size):
h = flat_height[idx]
if h > 0:
flat_bmh[idx] = [[flat_minh[idx], h]]
else:
flat_bmh[idx] = []
return building_height_grid, building_min_height_grid, building_id_grid, filtered_gdf
[docs]
def create_building_height_grid_from_open_building_temporal_polygon(meshsize, rectangle_vertices, output_dir):
"""
Create a building height grid from OpenBuildings temporal data within a polygon.
"""
_validate_meshsize(meshsize)
roi = get_roi(rectangle_vertices)
os.makedirs(output_dir, exist_ok=True)
geotiff_path = os.path.join(output_dir, "building_height.tif")
save_geotiff_open_buildings_temporal(roi, geotiff_path)
from .raster import create_height_grid_from_geotiff_polygon
building_height_grid = create_height_grid_from_geotiff_polygon(geotiff_path, meshsize, rectangle_vertices)
building_min_height_grid = np.empty(building_height_grid.shape, dtype=object)
flat_bmh = building_min_height_grid.reshape(-1)
flat_height = building_height_grid.reshape(-1)
for idx in range(flat_bmh.size):
h = flat_height[idx]
if h > 0:
flat_bmh[idx] = [[0, h]]
else:
flat_bmh[idx] = []
filtered_buildings = gpd.GeoDataFrame()
building_id_grid = np.zeros_like(building_height_grid, dtype=int)
non_zero_positions = np.nonzero(building_height_grid)
sequence = np.arange(1, len(non_zero_positions[0]) + 1)
building_id_grid[non_zero_positions] = sequence
return building_height_grid, building_min_height_grid, building_id_grid, filtered_buildings