Source code for voxcity.geoprocessor.heights
"""
Height extraction and complement utilities for building footprints.
"""
from typing import List, Dict
import numpy as np
import geopandas as gpd
import pandas as pd
from shapely.errors import GEOSException
from shapely.geometry import shape
from rtree import index
import rasterio
from pyproj import Transformer, CRS
from ..utils.logging import get_logger
_logger = get_logger(__name__)
[docs]
def extract_building_heights_from_gdf(gdf_0: gpd.GeoDataFrame, gdf_1: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
"""
Extract building heights from one GeoDataFrame and apply them to another based on spatial overlap.
"""
gdf_primary = gdf_0.copy()
gdf_ref = gdf_1.copy()
if 'height' not in gdf_primary.columns:
gdf_primary['height'] = 0.0
if 'height' not in gdf_ref.columns:
gdf_ref['height'] = 0.0
count_0 = 0
count_1 = 0
count_2 = 0
spatial_index = index.Index()
for i, geom in enumerate(gdf_ref.geometry):
if geom.is_valid:
spatial_index.insert(i, geom.bounds)
for idx_primary, row in gdf_primary.iterrows():
if row['height'] <= 0 or pd.isna(row['height']):
count_0 += 1
geom = row.geometry
overlapping_height_area = 0
overlapping_area = 0
potential_matches = list(spatial_index.intersection(geom.bounds))
for ref_idx in potential_matches:
if ref_idx >= len(gdf_ref):
continue
ref_row = gdf_ref.iloc[ref_idx]
try:
if geom.intersects(ref_row.geometry):
overlap_area = geom.intersection(ref_row.geometry).area
overlapping_height_area += ref_row['height'] * overlap_area
overlapping_area += overlap_area
except GEOSException:
try:
fixed_ref_geom = ref_row.geometry.buffer(0)
if geom.intersects(fixed_ref_geom):
overlap_area = geom.intersection(fixed_ref_geom).area
overlapping_height_area += ref_row['height'] * overlap_area
overlapping_area += overlap_area
except Exception:
_logger.warning("Failed to fix polygon")
continue
if overlapping_height_area > 0:
count_1 += 1
new_height = overlapping_height_area / overlapping_area
gdf_primary.at[idx_primary, 'height'] = new_height
else:
count_2 += 1
gdf_primary.at[idx_primary, 'height'] = np.nan
if count_0 > 0:
_logger.info("For %d of these building footprints without height, values from the complementary source were assigned.", count_1)
_logger.info("For %d of these building footprints without height, no data exist in complementary data.", count_2)
return gdf_primary
[docs]
def complement_building_heights_from_gdf(gdf_0, gdf_1, primary_id='id', ref_id='id'):
"""
Vectorized approach with GeoPandas to compute weighted heights and add non-intersecting buildings.
Returns a single combined GeoDataFrame.
"""
gdf_primary = gdf_0.copy()
gdf_ref = gdf_1.copy()
if 'height' not in gdf_primary.columns:
gdf_primary['height'] = 0.0
if 'height' not in gdf_ref.columns:
gdf_ref['height'] = 0.0
gdf_primary = gdf_primary.rename(columns={'height': 'height_primary'})
gdf_ref = gdf_ref.rename(columns={'height': 'height_ref'})
intersect_gdf = gpd.overlay(gdf_primary, gdf_ref, how='intersection')
intersect_gdf['intersect_area'] = intersect_gdf.area
intersect_gdf['height_area'] = intersect_gdf['height_ref'] * intersect_gdf['intersect_area']
group_cols = {
'height_area': 'sum',
'intersect_area': 'sum'
}
grouped = intersect_gdf.groupby(f'{primary_id}_1').agg(group_cols)
grouped['weighted_height'] = grouped['height_area'] / grouped['intersect_area']
gdf_primary = gdf_primary.merge(grouped['weighted_height'],
left_on=primary_id,
right_index=True,
how='left')
zero_or_nan_mask = (gdf_primary['height_primary'] == 0) | (gdf_primary['height_primary'].isna())
valid_weighted_height_mask = zero_or_nan_mask & gdf_primary['weighted_height'].notna()
gdf_primary.loc[valid_weighted_height_mask, 'height_primary'] = gdf_primary.loc[valid_weighted_height_mask, 'weighted_height']
gdf_primary['height_primary'] = gdf_primary['height_primary'].fillna(np.nan)
sjoin_gdf = gpd.sjoin(gdf_ref, gdf_primary, how='left', predicate='intersects')
non_intersect_mask = sjoin_gdf[f'{primary_id}_right'].isna()
non_intersect_ids = sjoin_gdf[non_intersect_mask][f'{ref_id}_left'].unique()
gdf_ref_non_intersect = gdf_ref[gdf_ref[ref_id].isin(non_intersect_ids)]
gdf_ref_non_intersect = gdf_ref_non_intersect.rename(columns={'height_ref': 'height'})
gdf_primary = gdf_primary.rename(columns={'height_primary': 'height'})
if 'weighted_height' in gdf_primary.columns:
gdf_primary.drop(columns='weighted_height', inplace=True)
final_gdf = pd.concat([gdf_primary, gdf_ref_non_intersect], ignore_index=True)
count_total = len(gdf_primary)
count_0 = len(gdf_primary[zero_or_nan_mask])
count_1 = len(gdf_primary[valid_weighted_height_mask])
count_2 = count_0 - count_1
count_3 = len(gdf_ref_non_intersect)
count_4 = count_3
height_mask = gdf_ref_non_intersect['height'].notna() & (gdf_ref_non_intersect['height'] > 0)
count_5 = len(gdf_ref_non_intersect[height_mask])
count_6 = count_4 - count_5
final_height_mask = final_gdf['height'].notna() & (final_gdf['height'] > 0)
count_7 = len(final_gdf[final_height_mask])
count_8 = len(final_gdf)
if count_0 > 0:
_logger.info("%d of the total %d building footprints from base data source did not have height data.", count_0, count_total)
_logger.info("For %d of these building footprints without height, values from complementary data were assigned.", count_1)
_logger.info("For the rest %d, no data exists in complementary data.", count_2)
_logger.info("Footprints of %d buildings were added from the complementary source.", count_3)
_logger.info("Of these %d additional building footprints, %d had height data while %d had no height data.", count_4, count_5, count_6)
_logger.info("In total, %d buildings had height data out of %d total building footprints.", count_7, count_8)
return final_gdf
[docs]
def extract_building_heights_from_geotiff(geotiff_path, gdf):
"""
Extract building heights from a GeoTIFF raster for building footprints in a GeoDataFrame.
"""
gdf = gdf.copy()
count_0 = 0
count_1 = 0
count_2 = 0
with rasterio.open(geotiff_path) as src:
transformer = Transformer.from_crs(CRS.from_epsg(4326), src.crs, always_xy=True)
mask_condition = (gdf.geometry.geom_type == 'Polygon') & ((gdf.get('height', 0) <= 0) | gdf.get('height').isna())
buildings_to_process = gdf[mask_condition]
count_0 = len(buildings_to_process)
for idx, row in buildings_to_process.iterrows():
coords = list(row.geometry.exterior.coords)
transformed_coords = [transformer.transform(lon, lat) for lon, lat in coords]
polygon = shape({"type": "Polygon", "coordinates": [transformed_coords]})
try:
masked_data, _ = rasterio.mask.mask(src, [polygon], crop=True, all_touched=True)
heights = masked_data[0][masked_data[0] != src.nodata]
if len(heights) > 0:
count_1 += 1
gdf.at[idx, 'height'] = float(np.mean(heights))
else:
count_2 += 1
gdf.at[idx, 'height'] = np.nan
except ValueError as e:
_logger.warning("Error processing building at index %s: %s", idx, e)
gdf.at[idx, 'height'] = None
if count_0 > 0:
_logger.info("%d of the total %d building footprint from OSM did not have height data.", count_0, len(gdf))
_logger.info("For %d of these building footprints without height, values from complementary data were assigned.", count_1)
_logger.info("For %d of these building footprints without height, no data exist in complementary data.", count_2)
return gdf