"""
Module for downloading and processing OpenStreetMap data.
This module provides functionality to download and process building footprints, land cover,
and other geographic features from OpenStreetMap. It handles downloading data via the Overpass API,
processing the responses, and converting them to standardized GeoJSON format with proper properties.
The module includes functions for:
- Converting OSM JSON to GeoJSON format
- Processing building footprints with height information
- Handling land cover classifications
- Managing coordinate systems and projections
- Processing roads and other geographic features
"""
import requests
import time
from shapely.geometry import Polygon, shape, mapping
from shapely.ops import transform
import pyproj
from collections import defaultdict
import requests
import json
from shapely.geometry import shape, mapping, Polygon, LineString, Point, MultiPolygon
from shapely.ops import transform
import pyproj
import pandas as pd
import geopandas as gpd
from ..errors import DownloaderError
__all__ = [
"load_gdf_from_openstreetmap",
"load_land_cover_gdf_from_osm",
"load_tree_gdf_from_osm",
"OVERPASS_ENDPOINTS",
"tag_osm_key_value_mapping",
"classification_mapping",
]
# Default Overpass API endpoints
OVERPASS_ENDPOINTS = [
"https://overpass-api.de/api/interpreter",
"https://overpass.kumi.systems/api/interpreter",
"https://overpass.openstreetmap.ru/api/interpreter",
]
def _fetch_overpass_with_retry(query, timeout=60, max_retries=5, initial_delay=5.0,
backoff_factor=2.0, endpoints=None):
"""Fetch data from Overpass API with retry logic and exponential backoff.
This function tries multiple Overpass API endpoints and retries on transient
failures with exponential backoff delays between attempts.
Args:
query (str): The Overpass QL query to execute
timeout (int): Request timeout in seconds. Defaults to 30.
max_retries (int): Maximum number of retry attempts per full endpoint cycle.
Defaults to 3.
initial_delay (float): Initial delay in seconds before first retry.
Defaults to 5.0 seconds.
backoff_factor (float): Multiplier for delay between retries (exponential backoff).
Defaults to 2.0 (delays: 5s, 10s, 20s, ...).
endpoints (list, optional): List of Overpass API endpoint URLs to try.
Defaults to OVERPASS_ENDPOINTS.
Returns:
dict: Parsed JSON response from Overpass API containing 'elements' key
Raises:
RuntimeError: If all endpoints and retries are exhausted without success
"""
if endpoints is None:
endpoints = OVERPASS_ENDPOINTS
headers = {"User-Agent": "voxcity/ci (https://github.com/voxcity)"}
last_error = None
for retry in range(max_retries):
if retry > 0:
delay = initial_delay * (backoff_factor ** (retry - 1))
print(f" Retry {retry}/{max_retries - 1}: waiting {delay:.1f}s before next attempt...")
time.sleep(delay)
for overpass_url in endpoints:
try:
response = requests.get(
overpass_url,
params={"data": query},
headers=headers,
timeout=timeout
)
# Check for rate limiting (HTTP 429) or server errors (5xx)
if response.status_code == 429:
last_error = Exception(f"Rate limited (HTTP 429) from {overpass_url}")
continue
if response.status_code >= 500:
last_error = Exception(f"Server error (HTTP {response.status_code}) from {overpass_url}")
continue
if response.status_code != 200:
last_error = Exception(f"HTTP {response.status_code} from {overpass_url}")
continue
# Some servers return HTML/plain text on rate-limit; guard JSON parsing
content_type = response.headers.get("Content-Type", "")
if "json" not in content_type.lower():
# Attempt JSON anyway; if fails, try next endpoint
try:
data = response.json()
except Exception as e:
last_error = e
continue
else:
data = response.json()
# Validate structure
if not isinstance(data, dict) or "elements" not in data:
last_error = Exception(f"Malformed Overpass response from {overpass_url}")
continue
# Success!
return data
except requests.exceptions.Timeout:
last_error = Exception(f"Timeout after {timeout}s from {overpass_url}")
continue
except requests.exceptions.ConnectionError as e:
last_error = Exception(f"Connection error from {overpass_url}: {e}")
continue
except Exception as e:
last_error = e
continue
raise DownloaderError(
f"Failed to fetch OSM data from Overpass endpoints after {max_retries} attempts. "
f"Last error: {last_error}"
)
def osm_json_to_geojson(osm_data):
"""
Convert OSM JSON data to GeoJSON format with proper handling of complex relations.
Args:
osm_data (dict): OSM JSON data from Overpass API
Returns:
dict: GeoJSON FeatureCollection
"""
features = []
# Create a mapping of node IDs to their coordinates
nodes = {}
ways = {}
# First pass: index all nodes and ways
for element in osm_data['elements']:
if element['type'] == 'node':
nodes[element['id']] = (element['lon'], element['lat'])
elif element['type'] == 'way':
ways[element['id']] = element
# Second pass: generate features
for element in osm_data['elements']:
if element['type'] == 'node' and 'tags' in element and element['tags']:
# Convert POI nodes to Point features
feature = {
'type': 'Feature',
'properties': {
'id': element['id'],
'type': 'node',
'tags': element.get('tags', {})
},
'geometry': {
'type': 'Point',
'coordinates': [element['lon'], element['lat']]
}
}
features.append(feature)
elif element['type'] == 'way' and 'nodes' in element:
# Skip ways that are part of relations - we'll handle those in relation processing
if is_part_of_relation(element['id'], osm_data):
continue
# Process standalone way
coords = get_way_coords(element, nodes)
if not coords or len(coords) < 2:
continue
# Determine if it's a polygon or a line
is_polygon = is_way_polygon(element)
# Make sure polygons have valid geometry (closed loop with at least 4 points)
if is_polygon:
# For closed ways, make sure first and last coordinates are the same
if coords[0] != coords[-1]:
coords.append(coords[0])
# Check if we have enough coordinates for a valid polygon (at least 4)
if len(coords) < 4:
# Not enough coordinates for a polygon, convert to LineString
is_polygon = False
feature = {
'type': 'Feature',
'properties': {
'id': element['id'],
'type': 'way',
'tags': element.get('tags', {})
},
'geometry': {
'type': 'Polygon' if is_polygon else 'LineString',
'coordinates': [coords] if is_polygon else coords
}
}
features.append(feature)
elif element['type'] == 'relation' and 'members' in element and 'tags' in element:
tags = element.get('tags', {})
# Process multipolygon relations
if tags.get('type') == 'multipolygon' or any(key in tags for key in ['natural', 'water', 'waterway']):
# Group member ways by role
members_by_role = {'outer': [], 'inner': []}
for member in element['members']:
if member['type'] == 'way' and member['ref'] in ways:
role = member['role']
if role not in ['outer', 'inner']:
role = 'outer' # Default to outer if role not specified
members_by_role[role].append(member['ref'])
# Skip if no outer members
if not members_by_role['outer']:
continue
# Create rings from member ways
outer_rings = create_rings_from_ways(members_by_role['outer'], ways, nodes)
inner_rings = create_rings_from_ways(members_by_role['inner'], ways, nodes)
# Skip if no valid outer rings
if not outer_rings:
continue
# Create feature based on number of outer rings
if len(outer_rings) == 1:
# Single polygon with possible inner rings
feature = {
'type': 'Feature',
'properties': {
'id': element['id'],
'type': 'relation',
'tags': tags
},
'geometry': {
'type': 'Polygon',
'coordinates': [outer_rings[0]] + inner_rings
}
}
else:
# MultiPolygon
# Each outer ring forms a polygon, and we assign inner rings to each polygon
# This is a simplification - proper assignment would check for containment
multipolygon_coords = []
for outer_ring in outer_rings:
polygon_coords = [outer_ring]
# For simplicity, assign all inner rings to the first polygon
# A more accurate implementation would check which outer ring contains each inner ring
if len(multipolygon_coords) == 0:
polygon_coords.extend(inner_rings)
multipolygon_coords.append(polygon_coords)
feature = {
'type': 'Feature',
'properties': {
'id': element['id'],
'type': 'relation',
'tags': tags
},
'geometry': {
'type': 'MultiPolygon',
'coordinates': multipolygon_coords
}
}
features.append(feature)
return {
'type': 'FeatureCollection',
'features': features
}
def is_part_of_relation(way_id, osm_data):
"""Check if a way is part of any relation in the OSM data.
Args:
way_id (int): The ID of the way to check
osm_data (dict): OSM JSON data containing elements
Returns:
bool: True if the way is part of a relation, False otherwise
"""
for element in osm_data['elements']:
if element['type'] == 'relation' and 'members' in element:
for member in element['members']:
if member['type'] == 'way' and member['ref'] == way_id:
return True
return False
def is_way_polygon(way):
"""Determine if a way should be treated as a polygon based on OSM tags and geometry.
A way is considered a polygon if:
1. It forms a closed loop (first and last nodes are the same)
2. It has tags indicating it represents an area (building, landuse, etc.)
Args:
way (dict): OSM way element with nodes and tags
Returns:
bool: True if the way should be treated as a polygon, False otherwise
"""
# Check if the way is closed (first and last nodes are the same)
if 'nodes' in way and way['nodes'][0] == way['nodes'][-1]:
# Check for tags that indicate this is an area
if 'tags' in way:
tags = way['tags']
if 'building' in tags or ('area' in tags and tags['area'] == 'yes'):
return True
if any(k in tags for k in ['landuse', 'natural', 'water', 'leisure', 'amenity']):
return True
return False
def get_way_coords(way, nodes):
"""Extract coordinates for a way from its node references.
Args:
way (dict): OSM way element containing node references
nodes (dict): Dictionary mapping node IDs to their coordinates
Returns:
list: List of coordinate pairs [(lon, lat), ...] for the way,
or empty list if any nodes are missing
"""
coords = []
if 'nodes' not in way:
return coords
for node_id in way['nodes']:
if node_id in nodes:
coords.append(nodes[node_id])
else:
# Missing node - skip this way
return []
return coords
def create_rings_from_ways(way_ids, ways, nodes):
"""Create continuous rings by connecting ways that share nodes.
This function handles complex relations by:
1. Connecting ways that share end nodes
2. Handling reversed way directions
3. Closing rings when possible
4. Converting node references to coordinates
Args:
way_ids (list): List of way IDs that make up the ring(s)
ways (dict): Dictionary mapping way IDs to way elements
nodes (dict): Dictionary mapping node IDs to coordinates
Returns:
list: List of rings, where each ring is a list of coordinate pairs [(lon, lat), ...]
forming a closed polygon with at least 4 points
"""
if not way_ids:
return []
# Extract node IDs for each way
way_nodes = {}
for way_id in way_ids:
if way_id in ways and 'nodes' in ways[way_id]:
way_nodes[way_id] = ways[way_id]['nodes']
# If we have no valid ways, return empty list
if not way_nodes:
return []
# Connect the ways to form rings
rings = []
unused_ways = set(way_nodes.keys())
while unused_ways:
# Start a new ring with the first unused way
current_way_id = next(iter(unused_ways))
unused_ways.remove(current_way_id)
# Get the first and last node IDs of the current way
current_nodes = way_nodes[current_way_id]
if not current_nodes:
continue
# Start building a ring with the nodes of the first way
ring_nodes = list(current_nodes)
# Try to connect more ways to complete the ring
connected = True
while connected and unused_ways:
connected = False
# Get the first and last nodes of the current ring
first_node = ring_nodes[0]
last_node = ring_nodes[-1]
# Try to find a way that connects to either end of our ring
for way_id in list(unused_ways):
nodes_in_way = way_nodes[way_id]
if not nodes_in_way:
unused_ways.remove(way_id)
continue
# Check if this way connects at the start of our ring
if nodes_in_way[-1] == first_node:
# This way connects to the start of our ring (reversed)
ring_nodes = nodes_in_way[:-1] + ring_nodes
unused_ways.remove(way_id)
connected = True
break
elif nodes_in_way[0] == first_node:
# This way connects to the start of our ring
ring_nodes = list(reversed(nodes_in_way))[:-1] + ring_nodes
unused_ways.remove(way_id)
connected = True
break
# Check if this way connects at the end of our ring
elif nodes_in_way[0] == last_node:
# This way connects to the end of our ring
ring_nodes.extend(nodes_in_way[1:])
unused_ways.remove(way_id)
connected = True
break
elif nodes_in_way[-1] == last_node:
# This way connects to the end of our ring (reversed)
ring_nodes.extend(list(reversed(nodes_in_way))[1:])
unused_ways.remove(way_id)
connected = True
break
# Check if the ring is closed (first node equals last node)
if ring_nodes and ring_nodes[0] == ring_nodes[-1] and len(ring_nodes) >= 4:
# Convert node IDs to coordinates
ring_coords = []
for node_id in ring_nodes:
if node_id in nodes:
ring_coords.append(nodes[node_id])
else:
# Missing node - skip this ring
ring_coords = []
break
if ring_coords and len(ring_coords) >= 4:
rings.append(ring_coords)
else:
# Try to close the ring if it's almost complete
if ring_nodes and len(ring_nodes) >= 3 and ring_nodes[0] != ring_nodes[-1]:
ring_nodes.append(ring_nodes[0])
# Convert node IDs to coordinates
ring_coords = []
for node_id in ring_nodes:
if node_id in nodes:
ring_coords.append(nodes[node_id])
else:
# Missing node - skip this ring
ring_coords = []
break
if ring_coords and len(ring_coords) >= 4:
rings.append(ring_coords)
return rings
[docs]
def load_gdf_from_openstreetmap(rectangle_vertices, floor_height=3.0):
"""Download and process building footprint data from OpenStreetMap.
This function:
1. Downloads building data using the Overpass API
2. Processes complex relations and their members
3. Extracts height information and other properties
4. Converts features to a GeoDataFrame with standardized properties
Args:
rectangle_vertices (list): List of (lon, lat) coordinates defining the bounding box
Returns:
geopandas.GeoDataFrame: GeoDataFrame containing building footprints with properties:
- geometry: Polygon or MultiPolygon
- height: Building height in meters
- levels: Number of building levels
- min_height: Minimum height (for elevated structures)
- building_type: Type of building
- And other OSM tags as properties
"""
# Create a bounding box from the rectangle vertices
min_lon = min(v[0] for v in rectangle_vertices)
max_lon = max(v[0] for v in rectangle_vertices)
min_lat = min(v[1] for v in rectangle_vertices)
max_lat = max(v[1] for v in rectangle_vertices)
# Enhanced Overpass API query with recursive member extraction
overpass_query = f"""
[out:json];
(
way["building"]({min_lat},{min_lon},{max_lat},{max_lon});
way["building:part"]({min_lat},{min_lon},{max_lat},{max_lon});
relation["building"]({min_lat},{min_lon},{max_lat},{max_lon});
way["tourism"="artwork"]["area"="yes"]({min_lat},{min_lon},{max_lat},{max_lon});
relation["tourism"="artwork"]["area"="yes"]({min_lat},{min_lon},{max_lat},{max_lon});
);
(._; >;); // Recursively get all nodes, ways, and relations within relations
out geom;
"""
# Fetch data from Overpass API with retry logic
data = _fetch_overpass_with_retry(overpass_query, timeout=60)
# Build a mapping from (type, id) to element
id_map = {}
for element in data['elements']:
id_map[(element['type'], element['id'])] = element
def is_underground_construction(element):
"""Check if an element represents an underground construction.
Underground constructions are identified by explicit markers or
a combination of indicators suggesting the structure is entirely underground.
Definite underground indicators (any one is sufficient):
- location tag = 'underground'
- tunnel tag = 'yes' combined with negative layer or level
Probable underground (requires multiple indicators):
- layer tag with strongly negative value (< -1) combined with level < 0
- tunnel = 'yes' with location hints
This conservative approach avoids excluding:
- Buildings with basements (layer=-1) that have above-ground portions
- Partially underground structures on slopes
- Buildings using layer for rendering order
Args:
element: OSM element with tags
Returns:
bool: True if the element is underground, False otherwise
"""
tags = element.get('tags', {})
# Definite indicator: explicit underground location
if tags.get('location') == 'underground':
return True
# Parse layer value
layer_value = None
layer = tags.get('layer')
if layer is not None:
try:
layer_value = float(layer)
except (ValueError, TypeError):
pass
# Parse level value (take the first/primary level if multiple)
level_value = None
level = tags.get('level')
if level is not None:
try:
# Level can be a single value or a range like "-5" or "-2;-1"
level_str = str(level).split(';')[0].split(',')[0].strip()
level_value = float(level_str)
except (ValueError, TypeError):
pass
has_tunnel = tags.get('tunnel') == 'yes'
# Tunnel structures with negative layer or level are underground
if has_tunnel:
if (layer_value is not None and layer_value < 0) or \
(level_value is not None and level_value < 0):
return True
# Strongly negative layer (< -1) combined with negative level
# indicates a deeply underground structure, not just a basement
if layer_value is not None and layer_value < -1:
if level_value is not None and level_value < 0:
return True
# Very deep layer without above-ground levels specified
if level_value is None and layer_value <= -3:
return True
# Check for underground-only building levels
# If building:levels:underground exists but no building:levels, it's fully underground
underground_levels = tags.get('building:levels:underground')
above_ground_levels = tags.get('building:levels')
if underground_levels is not None and above_ground_levels is None:
# Has only underground levels specified, likely fully underground
if has_tunnel or (layer_value is not None and layer_value < 0):
return True
return False
# Process the response and create features list
features = []
def process_coordinates(geometry):
"""Helper function to process and reverse coordinate pairs.
Args:
geometry: List of coordinate pairs to process
Returns:
list: Processed coordinate pairs with reversed order
"""
return [coord for coord in geometry] # Keep original order since already (lon, lat)
def get_height_from_properties(properties, floor_height=3.0):
"""Helper function to extract height from properties.
Args:
properties: Dictionary of feature properties
Returns:
float: Extracted or calculated height value
"""
height = properties.get('height', properties.get('building:height', None))
if height is not None:
try:
return float(height)
except ValueError:
pass
# Infer from floors when available
floors_candidates = [
properties.get('building:levels'),
properties.get('levels'),
properties.get('num_floors')
]
for floors in floors_candidates:
if floors is None:
continue
try:
floors_val = float(floors)
if floors_val > 0:
return float(floor_height) * floors_val
except ValueError:
continue
return 0 # Default height if no valid height found
def extract_properties(element, floor_height=3.0):
"""Helper function to extract and process properties from an element.
Args:
element: OSM element containing tags and properties
Returns:
dict: Processed properties dictionary
"""
properties = element.get('tags', {})
# Get height (now using the helper function)
height = get_height_from_properties(properties, floor_height=floor_height)
# Get min_height and min_level
min_height = properties.get('min_height', '0')
min_level = properties.get('building:min_level', properties.get('min_level', '0'))
try:
min_height = float(min_height)
except ValueError:
min_height = 0
levels = properties.get('building:levels', properties.get('levels', None))
try:
levels = float(levels) if levels is not None else None
except ValueError:
levels = None
# Extract additional properties, including those relevant to artworks
extracted_props = {
"id": element['id'],
"height": height,
"min_height": min_height,
"confidence": -1.0,
"is_inner": False,
"levels": levels,
"height_source": "explicit" if properties.get('height') or properties.get('building:height')
else "levels" if (levels is not None) or (properties.get('num_floors') is not None)
else "default",
"min_level": min_level if min_level != '0' else None,
"building": properties.get('building', 'no'),
"building_part": properties.get('building:part', 'no'),
"building_material": properties.get('building:material'),
"building_colour": properties.get('building:colour'),
"roof_shape": properties.get('roof:shape'),
"roof_material": properties.get('roof:material'),
"roof_angle": properties.get('roof:angle'),
"roof_colour": properties.get('roof:colour'),
"roof_direction": properties.get('roof:direction'),
"architect": properties.get('architect'),
"start_date": properties.get('start_date'),
"name": properties.get('name'),
"name:en": properties.get('name:en'),
"name:es": properties.get('name:es'),
"email": properties.get('email'),
"phone": properties.get('phone'),
"wheelchair": properties.get('wheelchair'),
"tourism": properties.get('tourism'),
"artwork_type": properties.get('artwork_type'),
"area": properties.get('area'),
"layer": properties.get('layer')
}
# Remove None values to keep the properties clean
return {k: v for k, v in extracted_props.items() if v is not None}
def create_polygon_feature(coords, properties, is_inner=False):
"""Helper function to create a polygon feature.
Args:
coords: List of coordinate pairs defining the polygon
properties: Dictionary of feature properties
is_inner: Boolean indicating if this is an inner ring
Returns:
dict: GeoJSON Feature object or None if invalid
"""
if len(coords) >= 4:
properties = properties.copy()
properties["is_inner"] = is_inner
return {
"type": "Feature",
"properties": properties,
"geometry": {
"type": "Polygon",
"coordinates": [process_coordinates(coords)]
}
}
return None
# Process each element, handling relations and their way members
for element in data['elements']:
if element['type'] == 'way':
# Skip underground constructions
if is_underground_construction(element):
continue
if 'geometry' in element:
coords = [(node['lon'], node['lat']) for node in element['geometry']]
properties = extract_properties(element, floor_height=floor_height)
feature = create_polygon_feature(coords, properties)
if feature:
features.append(feature)
elif element['type'] == 'relation':
# Skip underground constructions
if is_underground_construction(element):
continue
properties = extract_properties(element, floor_height=floor_height)
# Process each member of the relation
for member in element['members']:
if member['type'] == 'way':
# Look up the way in id_map
way = id_map.get(('way', member['ref']))
if way and 'geometry' in way:
coords = [(node['lon'], node['lat']) for node in way['geometry']]
is_inner = member['role'] == 'inner'
member_properties = properties.copy()
member_properties['member_id'] = way['id'] # Include id of the way
feature = create_polygon_feature(coords, member_properties, is_inner)
if feature:
feature['properties']['role'] = member['role']
features.append(feature)
# Convert features list to GeoDataFrame
if not features:
return gpd.GeoDataFrame()
geometries = []
properties_list = []
for feature in features:
geometries.append(shape(feature['geometry']))
properties_list.append(feature['properties'])
gdf = gpd.GeoDataFrame(properties_list, geometry=geometries, crs="EPSG:4326")
return gdf
# Classification mapping defines the land cover/use classes and their associated tags
# The numbers (0-13) represent class codes used in the system
classification_mapping = {
11: {'name': 'Road', 'tags': ['highway', 'road', 'path', 'track', 'street']},
12: {'name': 'Building', 'tags': ['building', 'house', 'apartment', 'commercial_building', 'industrial_building']},
10: {'name': 'Developed space', 'tags': ['industrial', 'retail', 'commercial', 'residential', 'construction', 'railway', 'parking', 'islet', 'island']},
0: {'name': 'Bareland', 'tags': ['quarry', 'brownfield', 'bare_rock', 'scree', 'shingle', 'rock', 'sand', 'desert', 'landfill', 'beach']},
1: {'name': 'Rangeland', 'tags': ['grass', 'meadow', 'grassland', 'heath', 'garden', 'park']},
2: {'name': 'Shrub', 'tags': ['scrub', 'shrubland', 'bush', 'thicket']},
3: {'name': 'Agriculture land', 'tags': ['farmland', 'orchard', 'vineyard', 'plant_nursery', 'greenhouse_horticulture', 'flowerbed', 'allotments', 'cropland']},
4: {'name': 'Tree', 'tags': ['wood', 'forest', 'tree', 'tree_row', 'tree_canopy']},
5: {'name': 'Moss and lichen', 'tags': ['moss', 'lichen', 'tundra_vegetation']},
6: {'name': 'Wet land', 'tags': ['wetland', 'marsh', 'swamp', 'bog', 'fen', 'flooded_vegetation']},
7: {'name': 'Mangrove', 'tags': ['mangrove', 'mangrove_forest', 'mangrove_swamp']},
8: {'name': 'Water', 'tags': ['water', 'reservoir', 'basin', 'bay', 'ocean', 'sea', 'lake']},
9: {'name': 'Snow and ice', 'tags': ['glacier', 'snow', 'ice', 'snowfield', 'ice_shelf']},
13: {'name': 'No Data', 'tags': ['unknown', 'no_data', 'clouds', 'undefined']}
}
# Maps classification tags to specific OSM key-value pairs
# '*' means match any value for that key
tag_osm_key_value_mapping = {
# Road
'highway': {'highway': '*'},
'road': {'highway': '*'},
'path': {'highway': 'path'},
'track': {'highway': 'track'},
'street': {'highway': '*'},
# Building
'building': {'building': '*'},
'house': {'building': 'house'},
'apartment': {'building': 'apartments'},
'commercial_building': {'building': 'commercial'},
'industrial_building': {'building': 'industrial'},
# Developed space
'industrial': {'landuse': 'industrial'},
'retail': {'landuse': 'retail'},
'commercial': {'landuse': 'commercial'},
'residential': {'landuse': 'residential'},
'construction': {'landuse': 'construction'},
'railway': {'landuse': 'railway'},
'parking': {'amenity': 'parking'},
'islet': {'place': 'islet'},
'island': {'place': 'island'},
# Bareland
'quarry': {'landuse': 'quarry'},
'brownfield': {'landuse': 'brownfield'},
'bare_rock': {'natural': 'bare_rock'},
'scree': {'natural': 'scree'},
'shingle': {'natural': 'shingle'},
'rock': {'natural': 'rock'},
'sand': {'natural': 'sand'},
'desert': {'natural': 'desert'},
'landfill': {'landuse': 'landfill'},
'beach': {'natural': 'beach'},
# Rangeland
'grass': {'landuse': 'grass'},
'meadow': {'landuse': 'meadow'},
'grassland': {'natural': 'grassland'},
'heath': {'natural': 'heath'},
'garden': {'leisure': 'garden'},
'park': {'leisure': 'park'},
# Shrub
'scrub': {'natural': 'scrub'},
'shrubland': {'natural': 'scrub'},
'bush': {'natural': 'scrub'},
'thicket': {'natural': 'scrub'},
# Agriculture land
'farmland': {'landuse': 'farmland'},
'orchard': {'landuse': 'orchard'},
'vineyard': {'landuse': 'vineyard'},
'plant_nursery': {'landuse': 'plant_nursery'},
'greenhouse_horticulture': {'landuse': 'greenhouse_horticulture'},
'flowerbed': {'landuse': 'flowerbed'},
'allotments': {'landuse': 'allotments'},
'cropland': {'landuse': 'farmland'},
# Tree
'wood': {'natural': 'wood'},
'forest': {'landuse': 'forest'},
'tree': {'natural': 'tree'},
'tree_row': {'natural': 'tree_row'},
'tree_canopy': {'natural': 'tree_canopy'},
# Moss and lichen
'moss': {'natural': 'fell'},
'lichen': {'natural': 'fell'},
'tundra_vegetation': {'natural': 'fell'},
# Wet land
'wetland': {'natural': 'wetland'},
'marsh': {'wetland': 'marsh'},
'swamp': {'wetland': 'swamp'},
'bog': {'wetland': 'bog'},
'fen': {'wetland': 'fen'},
'flooded_vegetation': {'natural': 'wetland'},
# Mangrove
'mangrove': {'natural': 'wetland', 'wetland': 'mangrove'},
'mangrove_forest': {'natural': 'wetland', 'wetland': 'mangrove'},
'mangrove_swamp': {'natural': 'wetland', 'wetland': 'mangrove'},
# Water
'water': {'natural': 'water'},
'reservoir': {'landuse': 'reservoir'},
'basin': {'landuse': 'basin'},
'bay': {'natural': 'bay'},
'ocean': {'natural': 'water', 'water': 'ocean'},
'sea': {'natural': 'water', 'water': 'sea'},
'lake': {'natural': 'water', 'water': 'lake'},
# Snow and ice
'glacier': {'natural': 'glacier'},
'snow': {'natural': 'glacier'},
'ice': {'natural': 'glacier'},
'snowfield': {'natural': 'glacier'},
'ice_shelf': {'natural': 'glacier'},
# No Data
'unknown': {'FIXME': '*'},
'no_data': {'FIXME': '*'},
'clouds': {'natural': 'cloud'},
'undefined': {'FIXME': '*'}
}
def get_classification(tags):
"""Determine the land cover/use classification based on OSM tags.
This function maps OSM tags to standardized land cover classes using:
1. A hierarchical classification system (codes 0-13)
2. Tag matching patterns for different feature types
3. Special cases for roads, water bodies, etc.
Args:
tags (dict): Dictionary of OSM tags (key-value pairs)
Returns:
tuple: (classification_code, classification_name) where:
- classification_code (int): Numeric code (0-13) for the land cover class
- classification_name (str): Human-readable name of the class
Or (None, None) if no matching classification is found
"""
# Iterate through each classification code and its associated info
for code, info in classification_mapping.items():
# Check each tag associated with this classification
for tag in info['tags']:
osm_mappings = tag_osm_key_value_mapping.get(tag)
if osm_mappings:
# Check if the feature's tags match any of the OSM key-value pairs
for key, value in osm_mappings.items():
if key in tags:
if value == '*' or tags[key] == value:
return code, info['name']
# Special case for islets and islands
if tag in ['islet', 'island'] and tags.get('place') == tag:
return code, info['name']
# Special case for roads mapped as areas
if 'area:highway' in tags:
return 11, 'Road'
return None, None
[docs]
def load_land_cover_gdf_from_osm(rectangle_vertices_ori):
"""Load and classify land cover data from OpenStreetMap.
This function:
1. Downloads land cover features using the Overpass API
2. Classifies features based on OSM tags
3. Handles special cases like roads with width information
4. Projects geometries for accurate buffering
5. Creates a standardized GeoDataFrame with classifications
Args:
rectangle_vertices_ori (list): List of (lon, lat) coordinates defining the area
Returns:
geopandas.GeoDataFrame: GeoDataFrame with:
- geometry: Polygon or MultiPolygon features
- class: Land cover classification name
- Additional properties from OSM tags
"""
# Close the rectangle polygon by adding first vertex at the end
rectangle_vertices = rectangle_vertices_ori.copy()
rectangle_vertices.append(rectangle_vertices_ori[0])
# Instead of using poly:"lat lon lat lon...", use area coordinates
min_lat = min(lat for lon, lat in rectangle_vertices)
max_lat = max(lat for lon, lat in rectangle_vertices)
min_lon = min(lon for lon, lat in rectangle_vertices)
max_lon = max(lon for lon, lat in rectangle_vertices)
# Initialize dictionary to store OSM keys and their allowed values
osm_keys_values = defaultdict(list)
# Build mapping of OSM keys to their possible values from classification mapping
for info in classification_mapping.values():
tags = info['tags']
for tag in tags:
osm_mappings = tag_osm_key_value_mapping.get(tag)
if osm_mappings:
for key, value in osm_mappings.items():
if value == '*':
osm_keys_values[key] = ['*'] # Match all values
else:
if osm_keys_values[key] != ['*'] and value not in osm_keys_values[key]:
osm_keys_values[key].append(value)
# Build Overpass API query parts for each key-value pair
query_parts = []
for key, values in osm_keys_values.items():
if values:
if values == ['*']:
# Query for any value of this key using bounding box
query_parts.append(f'way["{key}"]({min_lat},{min_lon},{max_lat},{max_lon});')
query_parts.append(f'relation["{key}"]({min_lat},{min_lon},{max_lat},{max_lon});')
else:
# Remove duplicate values
values = list(set(values))
# Build regex pattern for specific values
values_regex = '|'.join(values)
query_parts.append(f'way["{key}"~"^{values_regex}$"]({min_lat},{min_lon},{max_lat},{max_lon});')
query_parts.append(f'relation["{key}"~"^{values_regex}$"]({min_lat},{min_lon},{max_lat},{max_lon});')
# Combine query parts into complete Overpass query
query_body = "\n ".join(query_parts)
query = (
"[out:json];\n"
"(\n"
f" {query_body}\n"
");\n"
"out body;\n"
">;\n"
"out skel qt;"
)
# Fetch data from Overpass API with retry logic
print("Fetching data from Overpass API...")
data = _fetch_overpass_with_retry(query, timeout=60)
# Convert OSM data to GeoJSON format using our custom converter instead of json2geojson
print("Converting data to GeoJSON format...")
geojson_data = osm_json_to_geojson(data)
# Create shapely polygon from rectangle vertices (in lon,lat order)
rectangle_polygon = Polygon(rectangle_vertices)
# Calculate center point for projection
center_lat = sum(lat for lon, lat in rectangle_vertices) / len(rectangle_vertices)
center_lon = sum(lon for lon, lat in rectangle_vertices) / len(rectangle_vertices)
# Set up coordinate reference systems for projection
wgs84 = pyproj.CRS('EPSG:4326') # Standard lat/lon
# Albers Equal Area projection centered on area of interest
aea = pyproj.CRS(proj='aea', lat_1=rectangle_polygon.bounds[1], lat_2=rectangle_polygon.bounds[3], lat_0=center_lat, lon_0=center_lon)
# Create transformers for projecting coordinates
project = pyproj.Transformer.from_crs(wgs84, aea, always_xy=True).transform
project_back = pyproj.Transformer.from_crs(aea, wgs84, always_xy=True).transform
def is_underground_construction(tags):
"""Check if a feature represents an underground construction.
Underground constructions are identified by explicit markers or
a combination of indicators suggesting the structure is entirely underground.
Args:
tags: Dictionary of OSM tags
Returns:
bool: True if the feature is underground, False otherwise
"""
# Definite indicator: explicit underground location
if tags.get('location') == 'underground':
return True
# Parse layer value
layer_value = None
layer = tags.get('layer')
if layer is not None:
try:
layer_value = float(layer)
except (ValueError, TypeError):
pass
# Parse level value (take the first/primary level if multiple)
level_value = None
level = tags.get('level')
if level is not None:
try:
# Level can be a single value or a range like "-5" or "-2;-1"
level_str = str(level).split(';')[0].split(',')[0].strip()
level_value = float(level_str)
except (ValueError, TypeError):
pass
has_tunnel = tags.get('tunnel') == 'yes'
# Tunnel structures with negative layer or level are underground
if has_tunnel:
if (layer_value is not None and layer_value < 0) or \
(level_value is not None and level_value < 0):
return True
# Strongly negative layer (< -1) combined with negative level
# indicates a deeply underground structure, not just a basement
if layer_value is not None and layer_value < -1:
if level_value is not None and level_value < 0:
return True
# Very deep layer without above-ground levels specified
if level_value is None and layer_value <= -3:
return True
# Check for underground-only building levels
# If building:levels:underground exists but no building:levels, it's fully underground
underground_levels = tags.get('building:levels:underground')
above_ground_levels = tags.get('building:levels')
if underground_levels is not None and above_ground_levels is None:
# Has only underground levels specified, likely fully underground
if has_tunnel or (layer_value is not None and layer_value < 0):
return True
return False
# Lists to store geometries and properties for GeoDataFrame
geometries = []
properties = []
for feature in geojson_data['features']:
# Convert feature geometry to shapely object
geom = shape(feature['geometry'])
if not (geom.is_valid and geom.intersects(rectangle_polygon)):
continue
# Get classification for feature
tags = feature['properties'].get('tags', {})
classification_code, classification_name = get_classification(tags)
if classification_code is None:
continue
# Skip underground buildings
if classification_code == 12 and is_underground_construction(tags):
continue
# Special handling for roads
if classification_code == 11:
highway_value = tags.get('highway', '')
# Skip minor paths and walkways
if highway_value in ['footway', 'path', 'pedestrian', 'steps', 'cycleway', 'bridleway']:
continue
# Determine road width for buffering
width_value = tags.get('width')
lanes_value = tags.get('lanes')
buffer_distance = None
# Calculate buffer distance based on width or number of lanes
if width_value is not None:
try:
width_meters = float(width_value)
buffer_distance = width_meters / 2
except ValueError:
pass
elif lanes_value is not None:
try:
num_lanes = float(lanes_value)
width_meters = num_lanes * 3.0 # 3m per lane
buffer_distance = width_meters / 2
except ValueError:
pass
else:
# Default road width
buffer_distance = 2.5 # 5m total width
if buffer_distance is None:
continue
# Buffer line features to create polygons
if geom.geom_type in ['LineString', 'MultiLineString']:
# Project to planar CRS, buffer, and project back
geom_proj = transform(project, geom)
buffered_geom_proj = geom_proj.buffer(buffer_distance)
buffered_geom = transform(project_back, buffered_geom_proj)
# Clip to rectangle
geom = buffered_geom.intersection(rectangle_polygon)
else:
continue
# Skip empty geometries
if geom.is_empty:
continue
# Add geometries and properties
if geom.geom_type == 'Polygon':
geometries.append(geom)
properties.append({'class': classification_name})
elif geom.geom_type == 'MultiPolygon':
for poly in geom.geoms:
geometries.append(poly)
properties.append({'class': classification_name})
# Create GeoDataFrame
gdf = gpd.GeoDataFrame(properties, geometry=geometries, crs="EPSG:4326")
return gdf
[docs]
def load_tree_gdf_from_osm(rectangle_vertices, default_top_height=10.0, default_trunk_height=4.0,
default_crown_diameter=None, default_crown_ratio=0.6,
include_polygons=True):
"""Download and process individual tree data and tree land cover polygons from OpenStreetMap.
This function downloads tree point data (natural=tree) and optionally tree land cover
polygons (natural=wood, landuse=forest, natural=tree_row) from OpenStreetMap and creates
a GeoDataFrame compatible with VoxCity's tree canopy processing.
For individual trees, it extracts height and crown diameter information from OSM tags
when available, or uses default values when not specified.
For tree polygons (forests, woods), it assigns default height values and the polygon
geometry is preserved for rasterization during canopy grid creation.
OSM tags used for tree properties:
- height, est_height: Tree height in meters
- diameter_crown: Crown diameter in meters
- circumference: Trunk circumference (used to estimate crown if diameter_crown missing)
- genus, species: Tree species information (stored as properties)
- leaf_type: broadleaved/needleleaved (stored as property)
- leaf_cycle: deciduous/evergreen (stored as property)
Crown diameter estimation priority (for point trees only):
1. Use diameter_crown tag if available
2. Estimate from circumference tag (trunk circumference × 15 / π)
3. Use default_crown_diameter if specified
4. Estimate from tree height (height × default_crown_ratio)
Args:
rectangle_vertices (list): List of (lon, lat) coordinates defining the bounding box.
Should be 4 vertices forming a rectangle.
default_top_height (float): Default tree top height in meters when not specified in OSM.
Defaults to 10.0 meters.
default_trunk_height (float): Default trunk height (height to bottom of canopy) in meters.
This is the height where the canopy starts. Defaults to 4.0 meters.
default_crown_diameter (float, optional): Default crown diameter in meters. If None,
crown diameter is estimated from tree height using default_crown_ratio.
Only used for Point geometries.
default_crown_ratio (float): Ratio of crown diameter to tree height, used when
crown diameter cannot be determined from OSM tags and default_crown_diameter is None.
Defaults to 0.6 (e.g., a 10m tall tree would have a 6m crown diameter).
include_polygons (bool): If True, also download tree land cover polygons (forests,
woods, tree_rows). Defaults to True. Set to False to only get individual trees.
Returns:
geopandas.GeoDataFrame: GeoDataFrame containing tree features with columns:
- geometry: Point or Polygon geometry (lon, lat)
- geometry_type: 'point' for individual trees, 'polygon' for forest/wood areas
- tree_id: Unique identifier for each feature
- top_height: Height to the top of the tree canopy in meters
- bottom_height: Height to the bottom of the canopy (trunk height) in meters
- crown_diameter: Diameter of the tree crown in meters (0 for polygons)
- genus: Tree genus if available
- species: Tree species if available
- leaf_type: Leaf type (broadleaved/needleleaved) if available
- leaf_cycle: Leaf cycle (deciduous/evergreen) if available
- osm_id: Original OSM element ID
- osm_type: OSM element type ('node', 'way', 'relation')
Example:
>>> vertices = [(-73.99, 40.75), (-73.98, 40.75), (-73.98, 40.76), (-73.99, 40.76)]
>>> # Get both individual trees and forest polygons
>>> tree_gdf = load_tree_gdf_from_osm(vertices)
>>> # Get only individual trees
>>> tree_gdf = load_tree_gdf_from_osm(vertices, include_polygons=False)
>>> # Customize defaults: 15m tall trees, 5m trunk
>>> tree_gdf = load_tree_gdf_from_osm(vertices, default_top_height=15.0,
... default_trunk_height=5.0)
Note:
- Individual trees have Point geometry with crown_diameter for ellipsoid rendering
- Tree polygons have Polygon geometry and are rasterized as flat canopy areas
- Tree polygon types: natural=wood, landuse=forest, natural=tree_row
- Crown diameter estimation from trunk circumference uses an empirical ratio
- The function uses multiple Overpass API endpoints for reliability
"""
# Create a bounding box from the rectangle vertices
min_lon = min(v[0] for v in rectangle_vertices)
max_lon = max(v[0] for v in rectangle_vertices)
min_lat = min(v[1] for v in rectangle_vertices)
max_lat = max(v[1] for v in rectangle_vertices)
# Build Overpass query - always get individual trees, optionally get polygons
if include_polygons:
# Query for individual trees AND tree land cover polygons
overpass_query = f"""
[out:json];
(
node["natural"="tree"]({min_lat},{min_lon},{max_lat},{max_lon});
way["natural"="wood"]({min_lat},{min_lon},{max_lat},{max_lon});
way["landuse"="forest"]({min_lat},{min_lon},{max_lat},{max_lon});
way["natural"="tree_row"]({min_lat},{min_lon},{max_lat},{max_lon});
relation["natural"="wood"]({min_lat},{min_lon},{max_lat},{max_lon});
relation["landuse"="forest"]({min_lat},{min_lon},{max_lat},{max_lon});
);
out body;
>;
out skel qt;
"""
else:
# Query for individual trees only
overpass_query = f"""
[out:json];
(
node["natural"="tree"]({min_lat},{min_lon},{max_lat},{max_lon});
);
out body;
"""
# Fetch data from Overpass API with retry logic
data = _fetch_overpass_with_retry(overpass_query, timeout=60)
# Build index of nodes and ways for polygon reconstruction
nodes = {}
ways = {}
for element in data['elements']:
if element['type'] == 'node':
nodes[element['id']] = (element.get('lon'), element.get('lat'))
elif element['type'] == 'way':
ways[element['id']] = element
# Process tree elements
trees = []
tree_id = 1
def extract_height_from_tags(tags, default_height):
"""Extract height from OSM tags."""
for height_key in ['height', 'est_height']:
if height_key in tags:
try:
height_str = tags[height_key]
height_str = height_str.replace('m', '').replace('M', '').strip()
return float(height_str)
except (ValueError, AttributeError):
continue
return default_height
def extract_species_info(tags):
"""Extract species information from OSM tags."""
return {
'genus': tags.get('genus', None),
'species': tags.get('species', None),
'leaf_type': tags.get('leaf_type', None),
'leaf_cycle': tags.get('leaf_cycle', None),
}
# Process individual tree nodes
for element in data['elements']:
if element['type'] != 'node':
continue
tags = element.get('tags', {})
# Skip nodes that are not trees (they might be way nodes)
if tags.get('natural') != 'tree':
continue
lon = element.get('lon')
lat = element.get('lat')
if lon is None or lat is None:
continue
height = extract_height_from_tags(tags, default_top_height)
# Extract crown diameter from OSM tags
crown_diameter = None
if 'diameter_crown' in tags:
try:
crown_str = tags['diameter_crown']
crown_str = crown_str.replace('m', '').replace('M', '').strip()
crown_diameter = float(crown_str)
except (ValueError, AttributeError):
pass
# If no crown diameter, try to estimate from trunk circumference
if crown_diameter is None and 'circumference' in tags:
try:
circ_str = tags['circumference']
circ_str = circ_str.replace('m', '').replace('M', '').strip()
circumference = float(circ_str)
trunk_diameter = circumference / 3.14159
crown_diameter = trunk_diameter * 15
except (ValueError, AttributeError):
pass
# If still no crown diameter, use default or estimate from height
if crown_diameter is None:
if default_crown_diameter is not None:
crown_diameter = default_crown_diameter
else:
crown_diameter = height * default_crown_ratio
# Set bottom height
bottom_height = default_trunk_height
if bottom_height >= height:
bottom_height = height * 0.4
species_info = extract_species_info(tags)
trees.append({
'tree_id': tree_id,
'geometry_type': 'point',
'top_height': height,
'bottom_height': bottom_height,
'crown_diameter': crown_diameter,
'genus': species_info['genus'],
'species': species_info['species'],
'leaf_type': species_info['leaf_type'],
'leaf_cycle': species_info['leaf_cycle'],
'osm_id': element['id'],
'osm_type': 'node',
'geometry': Point(lon, lat)
})
tree_id += 1
# Process tree polygon features (ways and relations) if requested
if include_polygons:
for element in data['elements']:
if element['type'] == 'way':
tags = element.get('tags', {})
# Check if this is a tree-related polygon
is_tree_polygon = (
tags.get('natural') in ['wood', 'tree_row'] or
tags.get('landuse') == 'forest'
)
if not is_tree_polygon:
continue
# Reconstruct polygon from way nodes
node_ids = element.get('nodes', [])
coords = []
for node_id in node_ids:
if node_id in nodes:
coord = nodes[node_id]
if coord[0] is not None and coord[1] is not None:
coords.append(coord)
# Need at least 4 points for a valid polygon (closed ring)
if len(coords) < 4:
continue
# Ensure the polygon is closed
if coords[0] != coords[-1]:
coords.append(coords[0])
try:
polygon_geom = Polygon(coords)
if not polygon_geom.is_valid:
polygon_geom = polygon_geom.buffer(0)
if polygon_geom.is_empty or not polygon_geom.is_valid:
continue
except Exception:
continue
height = extract_height_from_tags(tags, default_top_height)
bottom_height = default_trunk_height
if bottom_height >= height:
bottom_height = height * 0.4
species_info = extract_species_info(tags)
trees.append({
'tree_id': tree_id,
'geometry_type': 'polygon',
'top_height': height,
'bottom_height': bottom_height,
'crown_diameter': 0, # Not applicable for polygons
'genus': species_info['genus'],
'species': species_info['species'],
'leaf_type': species_info['leaf_type'],
'leaf_cycle': species_info['leaf_cycle'],
'osm_id': element['id'],
'osm_type': 'way',
'geometry': polygon_geom
})
tree_id += 1
elif element['type'] == 'relation':
tags = element.get('tags', {})
# Check if this is a tree-related relation
is_tree_polygon = (
tags.get('natural') in ['wood', 'tree_row'] or
tags.get('landuse') == 'forest'
)
if not is_tree_polygon:
continue
# Process relation members to create polygon(s)
members = element.get('members', [])
outer_ways = []
inner_ways = []
for member in members:
if member.get('type') == 'way':
way_id = member.get('ref')
role = member.get('role', 'outer')
if way_id in ways:
way_coords = []
for node_id in ways[way_id].get('nodes', []):
if node_id in nodes:
coord = nodes[node_id]
if coord[0] is not None and coord[1] is not None:
way_coords.append(coord)
if way_coords:
if role == 'inner':
inner_ways.append(way_coords)
else:
outer_ways.append(way_coords)
# Try to create polygon from outer rings
if not outer_ways:
continue
try:
# For simplicity, create MultiPolygon from outer rings
# (proper ring merging would require more complex logic)
polygons = []
for outer_coords in outer_ways:
if len(outer_coords) >= 4:
if outer_coords[0] != outer_coords[-1]:
outer_coords.append(outer_coords[0])
try:
poly = Polygon(outer_coords)
if not poly.is_valid:
poly = poly.buffer(0)
if poly.is_valid and not poly.is_empty:
polygons.append(poly)
except Exception:
continue
if not polygons:
continue
# Merge all polygons into one (or use MultiPolygon)
if len(polygons) == 1:
relation_geom = polygons[0]
else:
from shapely.ops import unary_union
relation_geom = unary_union(polygons)
if relation_geom.is_empty:
continue
except Exception:
continue
height = extract_height_from_tags(tags, default_top_height)
bottom_height = default_trunk_height
if bottom_height >= height:
bottom_height = height * 0.4
species_info = extract_species_info(tags)
trees.append({
'tree_id': tree_id,
'geometry_type': 'polygon',
'top_height': height,
'bottom_height': bottom_height,
'crown_diameter': 0, # Not applicable for polygons
'genus': species_info['genus'],
'species': species_info['species'],
'leaf_type': species_info['leaf_type'],
'leaf_cycle': species_info['leaf_cycle'],
'osm_id': element['id'],
'osm_type': 'relation',
'geometry': relation_geom
})
tree_id += 1
# Create GeoDataFrame
if not trees:
# Return empty GeoDataFrame with correct columns
return gpd.GeoDataFrame(
columns=['tree_id', 'geometry_type', 'top_height', 'bottom_height', 'crown_diameter',
'genus', 'species', 'leaf_type', 'leaf_cycle', 'osm_id', 'osm_type', 'geometry'],
crs="EPSG:4326"
)
gdf = gpd.GeoDataFrame(trees, crs="EPSG:4326")
# Report counts
point_count = len(gdf[gdf['geometry_type'] == 'point'])
polygon_count = len(gdf[gdf['geometry_type'] == 'polygon'])
print(f" Downloaded {point_count} individual trees and {polygon_count} tree polygons from OSM")
return gdf