Source code for voxcity.geoprocessor.network

import contextily as ctx
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import geopandas as gpd
from shapely.geometry import LineString, Polygon
import shapely.ops as ops
import networkx as nx
import osmnx as ox
import os
import shapely
from shapely.geometry import Point
from shapely.ops import transform
import pyproj
from pyproj import Transformer
from joblib import Parallel, delayed

from .raster import grid_to_geodataframe

[docs] def vectorized_edge_values(G, polygons_gdf, value_col='value'): """ Compute average polygon values along each edge in a network graph using vectorized operations. This function performs efficient computation of average values from polygons that intersect with network edges. It uses GeoDataFrames for vectorized spatial operations instead of iterating over individual edges. Parameters ---------- G : networkx.MultiDiGraph OSMnx graph with edges containing either geometry attributes or node coordinates. polygons_gdf : geopandas.GeoDataFrame GeoDataFrame containing polygons with values to be averaged along edges. value_col : str, default='value' Name of the column in polygons_gdf containing the values to average. Returns ------- dict Dictionary mapping edge tuples (u, v, k) to their computed average values. Values are length-weighted averages of intersecting polygon values. Notes ----- The process involves: 1. Converting edges to a GeoDataFrame with LineString geometries 2. Projecting geometries to a metric CRS (EPSG:3857) for accurate length calculations 3. Computing intersections between edges and polygons 4. Calculating length-weighted averages of polygon values for each edge """ # Build edge GeoDataFrame in WGS84 (EPSG:4326) records = [] for i, (u, v, k, data) in enumerate(G.edges(keys=True, data=True)): if 'geometry' in data: edge_geom = data['geometry'] else: # Create LineString from node coordinates if no geometry exists start_node = G.nodes[u] end_node = G.nodes[v] edge_geom = LineString([(start_node['x'], start_node['y']), (end_node['x'], end_node['y'])]) records.append({ 'edge_id': i, # unique ID for grouping 'u': u, 'v': v, 'k': k, 'geometry': edge_geom }) edges_gdf = gpd.GeoDataFrame(records, crs="EPSG:4326") if polygons_gdf.crs != edges_gdf.crs: polygons_gdf = polygons_gdf.to_crs(edges_gdf.crs) #Filter NaN first valid_polygons = polygons_gdf[polygons_gdf[value_col].notna()] # Project to Web Mercator for accurate length calculations edges_3857 = edges_gdf.to_crs(epsg=3857) polys_3857 = valid_polygons.to_crs(epsg=3857) # Compute intersections between edges and polygons intersected = gpd.overlay(edges_3857, polys_3857, how='intersection') # If no intersections found, return empty dict if intersected.empty or 'edge_id' not in intersected.columns: return {} # Calculate length-weighted averages intersected['seg_length'] = intersected.geometry.length intersected['weighted_val'] = intersected['seg_length'] * intersected[value_col] # Group by edge and compute weighted averages grouped = intersected.groupby('edge_id') results = grouped.apply( lambda df: df['weighted_val'].sum() / df['seg_length'].sum() if df['seg_length'].sum() > 0 else np.nan ) # Build lookup from edge_id to (u, v, k) id_to_edge = { int(row['edge_id']): (row['u'], row['v'], row['k']) for _, row in edges_gdf.iterrows() } # Map results back to edge tuples edge_values = {} for edge_id, val in results.items(): edge_values[id_to_edge[int(edge_id)]] = val return edge_values
[docs] def get_network_values( grid, rectangle_vertices=None, meshsize=None, voxcity=None, value_name='value', **kwargs ): """ Extract and visualize values from a grid along a street network. This function downloads a street network from OpenStreetMap for a given area, computes average grid values along network edges, and optionally visualizes the results on an interactive map. Parameters ---------- grid : array-like or geopandas.GeoDataFrame Either a grid array of values or a pre-built GeoDataFrame with polygons and values. rectangle_vertices : list of tuples, optional List of (lon, lat) coordinates defining the bounding rectangle in EPSG:4326. Optional if `voxcity` is provided. meshsize : float, optional Size of each grid cell (used only if grid is array-like). Optional if `voxcity` is provided. voxcity : VoxCity, optional VoxCity object from which `rectangle_vertices` and `meshsize` will be derived if not supplied. value_name : str, default='value' Name to use for the edge attribute storing computed values. **kwargs : dict Additional visualization and processing parameters: - network_type : str, default='walk' Type of street network to download ('walk', 'drive', etc.) - vis_graph : bool, default=True Whether to display the visualization - colormap : str, default='viridis' Matplotlib colormap for edge colors - vmin, vmax : float, optional Value range for color mapping - edge_width : float, default=1 Width of edge lines in visualization - fig_size : tuple, default=(15,15) Figure size in inches - zoom : int, default=16 Zoom level for basemap - basemap_style : ctx.providers, default=CartoDB.Positron Contextily basemap provider - save_path : str, optional Path to save the edge GeoDataFrame as a GeoPackage Returns ------- tuple (networkx.MultiDiGraph, geopandas.GeoDataFrame) The network graph with computed edge values and edge geometries as a GeoDataFrame. """ defaults = { 'network_type': 'walk', 'vis_graph': True, 'colormap': 'viridis', 'vmin': None, 'vmax': None, 'edge_width': 1, 'fig_size': (15,15), 'zoom': 16, 'basemap_style': ctx.providers.CartoDB.Positron, 'save_path': None } settings = {**defaults, **kwargs} # Derive geometry parameters from VoxCity if supplied (inline to avoid extra helper) if voxcity is not None: derived_rv = None derived_meshsize = None # Try extras['rectangle_vertices'] when available if hasattr(voxcity, "extras") and isinstance(voxcity.extras, dict): derived_rv = voxcity.extras.get("rectangle_vertices") # Pull meshsize and bounds from voxels.meta voxels = getattr(voxcity, "voxels", None) meta = getattr(voxels, "meta", None) if voxels is not None else None if meta is not None: derived_meshsize = getattr(meta, "meshsize", None) if derived_rv is None: bounds = getattr(meta, "bounds", None) if bounds is not None: west, south, east, north = bounds derived_rv = [(west, south), (west, north), (east, north), (east, south)] if rectangle_vertices is None: rectangle_vertices = derived_rv if meshsize is None: meshsize = derived_meshsize if rectangle_vertices is None: raise ValueError("rectangle_vertices must be provided, either directly or via `voxcity`.") # Build polygons GDF if needed polygons_gdf = (grid if isinstance(grid, gpd.GeoDataFrame) else grid_to_geodataframe(grid, rectangle_vertices, meshsize)) if polygons_gdf.crs is None: polygons_gdf.set_crs(epsg=4326, inplace=True) # BBox north, south = rectangle_vertices[1][1], rectangle_vertices[0][1] east, west = rectangle_vertices[2][0], rectangle_vertices[0][0] bbox = (west, south, east, north) # Download OSMnx network G = ox.graph.graph_from_bbox( bbox=bbox, network_type=settings['network_type'], simplify=True ) # Compute edge values with the vectorized function edge_values = vectorized_edge_values(G, polygons_gdf, value_col="value") nx.set_edge_attributes(G, edge_values, name=value_name) # Build edge GDF edges_with_values = [] for u, v, k, data in G.edges(data=True, keys=True): if 'geometry' in data: geom = data['geometry'] else: start_node = G.nodes[u] end_node = G.nodes[v] geom = LineString([(start_node['x'], start_node['y']), (end_node['x'], end_node['y'])]) val = data.get(value_name, np.nan) edges_with_values.append({ 'u': u, 'v': v, 'key': k, 'geometry': geom, value_name: val }) edge_gdf = gpd.GeoDataFrame(edges_with_values, crs="EPSG:4326") # Save if settings['save_path']: edge_gdf.to_file(settings['save_path'], driver="GPKG") if settings['vis_graph']: edge_gdf_web = edge_gdf.to_crs(epsg=3857) fig, ax = plt.subplots(figsize=settings['fig_size']) edge_gdf_web.plot( column=value_name, ax=ax, cmap=settings['colormap'], legend=True, vmin=settings['vmin'], vmax=settings['vmax'], linewidth=settings['edge_width'], legend_kwds={'label': value_name, 'shrink': 0.5} ) ctx.add_basemap(ax, source=settings['basemap_style'], zoom=settings['zoom']) ax.set_axis_off() plt.show() return G, edge_gdf
# ------------------------------------------------------------------- # 1) Functions for interpolation, parallelization, and slope # -------------------------------------------------------------------
[docs] def interpolate_points_along_line(line, interval): """ Interpolate points along a single LineString at a given interval (in meters). If the line is shorter than `interval`, only start/end points are returned. This function handles coordinate system transformations to ensure accurate distance measurements, working in Web Mercator (EPSG:3857) for distance calculations while maintaining WGS84 (EPSG:4326) for input/output. Parameters ---------- line : shapely.geometry.LineString Edge geometry in EPSG:4326 (lon/lat). interval : float Distance in meters between interpolated points. Returns ------- list of shapely.geometry.Point Points in EPSG:4326 along the line, spaced approximately `interval` meters apart. For lines shorter than interval, only start and end points are returned. For empty lines, an empty list is returned. """ if line.is_empty: return [] # Transformers for metric distance calculations project = Transformer.from_crs("EPSG:4326", "EPSG:3857", always_xy=True).transform project_rev = Transformer.from_crs("EPSG:3857", "EPSG:4326", always_xy=True).transform # Project line to Web Mercator line_merc = shapely.ops.transform(project, line) length_m = line_merc.length if length_m == 0: return [Point(line.coords[0])] # If line is shorter than interval, just start & end if length_m < interval: return [Point(line.coords[0]), Point(line.coords[-1])] # Otherwise, create distances num_points = int(length_m // interval) dists = [i * interval for i in range(num_points + 1)] # Ensure end if dists[-1] < length_m: dists.append(length_m) # Interpolate points_merc = [line_merc.interpolate(d) for d in dists] # Reproject back return [shapely.ops.transform(project_rev, pt) for pt in points_merc]
[docs] def gather_interpolation_points(G, interval=10.0, n_jobs=1): """ Gather all interpolation points for each edge in the graph into a single GeoDataFrame. Supports parallel processing for improved performance on large networks. This function processes each edge in the graph, either using its geometry attribute or creating a LineString from node coordinates, then interpolates points along it at the specified interval. Parameters ---------- G : networkx.MultiDiGraph OSMnx graph with 'geometry' attributes or x,y coordinates in the nodes. interval : float, default=10.0 Interpolation distance interval in meters. n_jobs : int, default=1 Number of parallel jobs for processing edges. Set to 1 for sequential processing, or -1 to use all available CPU cores. Returns ------- gpd.GeoDataFrame GeoDataFrame in EPSG:4326 with columns: - edge_id: Index of the edge in the graph - index_in_edge: Position of the point along its edge - geometry: Point geometry """ edges = list(G.edges(keys=True, data=True)) def process_edge(u, v, k, data, idx): if 'geometry' in data: line = data['geometry'] else: # If no geometry, build from node coords start_node = G.nodes[u] end_node = G.nodes[v] line = LineString([(start_node['x'], start_node['y']), (end_node['x'], end_node['y'])]) pts = interpolate_points_along_line(line, interval) df = pd.DataFrame({ 'edge_id': [idx]*len(pts), 'index_in_edge': np.arange(len(pts)), 'geometry': pts }) return df # Parallel interpolation results = Parallel(n_jobs=n_jobs, backend='threading')( delayed(process_edge)(u, v, k, data, i) for i, (u, v, k, data) in enumerate(edges) ) all_points_df = pd.concat(results, ignore_index=True) points_gdf = gpd.GeoDataFrame(all_points_df, geometry='geometry', crs="EPSG:4326") return points_gdf
[docs] def fetch_elevations_for_points(points_gdf_3857, dem_gdf_3857, elevation_col='value'): """ Perform a spatial join to fetch DEM elevations for interpolated points. Uses nearest neighbor matching in projected coordinates (EPSG:3857) to ensure accurate distance calculations when finding the closest DEM cell for each point. Parameters ---------- points_gdf_3857 : gpd.GeoDataFrame Interpolation points in EPSG:3857 projection. dem_gdf_3857 : gpd.GeoDataFrame DEM polygons in EPSG:3857 projection, containing elevation values. elevation_col : str, default='value' Name of the column containing elevation values in dem_gdf_3857. Returns ------- gpd.GeoDataFrame Copy of points_gdf_3857 with additional columns: - elevation: Elevation value from nearest DEM cell - dist_to_poly: Distance to nearest DEM cell """ joined = gpd.sjoin_nearest( points_gdf_3857, dem_gdf_3857[[elevation_col, 'geometry']].copy(), how='left', distance_col='dist_to_poly' ) joined.rename(columns={elevation_col: 'elevation'}, inplace=True) return joined
[docs] def compute_slope_for_group(df): """ Compute average slope between consecutive points along a single edge. Slopes are calculated as absolute percentage grade (rise/run * 100) between consecutive points, then averaged for the entire edge. Points must be in EPSG:3857 projection for accurate horizontal distance calculations. Parameters ---------- df : pd.DataFrame DataFrame containing points for a single edge with columns: - geometry: Point geometries in EPSG:3857 - elevation: Elevation values in meters - index_in_edge: Position along the edge for sorting Returns ------- float Average slope as a percentage, or np.nan if no valid slopes can be computed (e.g., when points are coincident or no elevation change). """ # Sort by position along the edge df = df.sort_values("index_in_edge") # Coordinates xs = df.geometry.x.to_numpy() ys = df.geometry.y.to_numpy() elevs = df["elevation"].to_numpy() # Differences dx = np.diff(xs) dy = np.diff(ys) horizontal_dist = np.sqrt(dx**2 + dy**2) elev_diff = np.diff(elevs) # Slope in % valid_mask = horizontal_dist > 0 slopes = (np.abs(elev_diff[valid_mask]) / horizontal_dist[valid_mask]) * 100 if len(slopes) == 0: return np.nan return slopes.mean()
[docs] def calculate_edge_slopes_from_join(joined_points_gdf, n_edges): """ Calculate average slopes for all edges in the network from interpolated points. This function groups points by edge_id and computes the average slope for each edge using the compute_slope_for_group function. It ensures all edges in the original graph have a slope value, even if no valid slope could be computed. Parameters ---------- joined_points_gdf : gpd.GeoDataFrame Points with elevations in EPSG:3857, must have columns: - edge_id: Index of the edge in the graph - index_in_edge: Position along the edge - elevation: Elevation value - geometry: Point geometry n_edges : int Total number of edges in the original graph. Returns ------- dict Dictionary mapping edge_id to average slope (in %). Edges with no valid slope calculation are assigned np.nan. """ # We'll group by edge_id, ignoring the group columns in apply (pandas >= 2.1). # If your pandas version < 2.1, just do a column subset after groupby. # E.g. .groupby("edge_id", group_keys=False)[["geometry","elevation","index_in_edge"]]... grouped = joined_points_gdf.groupby("edge_id", group_keys=False) results = grouped[["geometry", "elevation", "index_in_edge"]].apply(compute_slope_for_group) # Convert series -> dict slope_dict = results.to_dict() # Fill any missing edge IDs with NaN for i in range(n_edges): if i not in slope_dict: slope_dict[i] = np.nan return slope_dict
# ------------------------------------------------------------------- # 2) Main function to analyze network slopes # -------------------------------------------------------------------
[docs] def analyze_network_slopes( dem_grid, meshsize, value_name='slope', interval=10.0, n_jobs=1, **kwargs ): """ Analyze and visualize street network slopes using Digital Elevation Model (DEM) data. This function performs a comprehensive analysis of street network slopes by: 1. Converting DEM data to a GeoDataFrame of elevation polygons 2. Downloading the street network from OpenStreetMap 3. Interpolating points along network edges 4. Matching points to DEM elevations 5. Computing slopes between consecutive points 6. Aggregating slopes per edge 7. Optionally visualizing results on an interactive map The analysis uses appropriate coordinate transformations between WGS84 (EPSG:4326) for geographic operations and Web Mercator (EPSG:3857) for distance calculations. Parameters ---------- dem_grid : array-like Digital Elevation Model grid data containing elevation values. meshsize : float Size of each DEM grid cell. value_name : str, default='slope' Name to use for the slope attribute in output data. interval : float, default=10.0 Distance in meters between interpolated points along edges. n_jobs : int, default=1 Number of parallel jobs for edge processing. **kwargs : dict Additional configuration parameters: - rectangle_vertices : list of (lon, lat), required Coordinates defining the analysis area in EPSG:4326 - network_type : str, default='walk' Type of street network to download - vis_graph : bool, default=True Whether to create visualization - colormap : str, default='viridis' Matplotlib colormap for slope visualization - vmin, vmax : float, optional Value range for slope coloring - edge_width : float, default=1 Width of edge lines in plot - fig_size : tuple, default=(15,15) Figure size in inches - zoom : int, default=16 Zoom level for basemap - basemap_style : ctx.providers, default=CartoDB.Positron Contextily basemap provider - output_directory : str, optional Directory to save results - output_file_name : str, default='network_slopes' Base name for output files - alpha : float, default=1.0 Transparency of edge lines in visualization Returns ------- tuple (networkx.MultiDiGraph, geopandas.GeoDataFrame) - Graph with slope values as edge attributes - GeoDataFrame of edges with geometries and slope values Notes ----- - Slopes are calculated as absolute percentage grades (rise/run * 100) - Edge slopes are length-weighted averages of point-to-point slopes - The visualization includes a basemap and legend showing slope percentages - If output_directory is specified, results are saved as a GeoPackage """ defaults = { 'rectangle_vertices': None, 'network_type': 'walk', 'vis_graph': True, 'colormap': 'viridis', 'vmin': None, 'vmax': None, 'edge_width': 1, 'fig_size': (15, 15), 'zoom': 16, 'basemap_style': ctx.providers.CartoDB.Positron, 'output_directory': None, 'output_file_name': 'network_slopes', 'alpha': 1.0 } settings = {**defaults, **kwargs} # Validate bounding box if settings['rectangle_vertices'] is None: raise ValueError("Must supply 'rectangle_vertices' in kwargs.") # 1) Build DEM GeoDataFrame in EPSG:4326 dem_gdf = grid_to_geodataframe(dem_grid, settings['rectangle_vertices'], meshsize) if dem_gdf.crs is None: dem_gdf.set_crs(epsg=4326, inplace=True) # 2) Download bounding box from rectangle_vertices north, south = settings['rectangle_vertices'][1][1], settings['rectangle_vertices'][0][1] east, west = settings['rectangle_vertices'][2][0], settings['rectangle_vertices'][0][0] bbox = (west, south, east, north) G = ox.graph.graph_from_bbox( bbox=bbox, network_type=settings['network_type'], simplify=True ) # 3) Interpolate points along edges (EPSG:4326) points_gdf_4326 = gather_interpolation_points(G, interval=interval, n_jobs=n_jobs) # 4) Reproject DEM + Points to EPSG:3857 for correct distance operations dem_gdf_3857 = dem_gdf.to_crs(epsg=3857) points_gdf_3857 = points_gdf_4326.to_crs(epsg=3857) # 5) Perform spatial join to get elevations joined_points_3857 = fetch_elevations_for_points(points_gdf_3857, dem_gdf_3857, elevation_col='value') # 6) Compute slopes for each edge n_edges = len(list(G.edges(keys=True))) slope_dict = calculate_edge_slopes_from_join(joined_points_3857, n_edges) # 7) Assign slopes back to G edges = list(G.edges(keys=True, data=True)) edge_slopes = {} for i, (u, v, k, data) in enumerate(edges): edge_slopes[(u, v, k)] = slope_dict.get(i, np.nan) nx.set_edge_attributes(G, edge_slopes, name=value_name) # 8) Build an edge GeoDataFrame in EPSG:4326 edges_with_values = [] for (u, v, k, data), edge_id in zip(edges, range(len(edges))): if 'geometry' in data: geom = data['geometry'] else: start_node = G.nodes[u] end_node = G.nodes[v] geom = LineString([(start_node['x'], start_node['y']), (end_node['x'], end_node['y'])]) edges_with_values.append({ 'u': u, 'v': v, 'key': k, 'geometry': geom, value_name: slope_dict.get(edge_id, np.nan) }) edge_gdf = gpd.GeoDataFrame(edges_with_values, crs="EPSG:4326") # 9) Save output if requested if settings['output_directory']: os.makedirs(settings['output_directory'], exist_ok=True) out_path = os.path.join( settings['output_directory'], f"{settings['output_file_name']}.gpkg" ) edge_gdf.to_file(out_path, driver="GPKG") # 10) Visualization if settings['vis_graph']: # Create a Polygon from the rectangle vertices rectangle_polygon = Polygon(settings['rectangle_vertices']) # Convert the rectangle polygon to the same CRS as edge_gdf_web rectangle_gdf = gpd.GeoDataFrame(index=[0], crs='epsg:4326', geometry=[rectangle_polygon]) rectangle_gdf_web = rectangle_gdf.to_crs(epsg=3857) # Get the bounding box of the rectangle minx, miny, maxx, maxy = rectangle_gdf_web.total_bounds # Plot the edges edge_gdf_web = edge_gdf.to_crs(epsg=3857) fig, ax = plt.subplots(figsize=settings['fig_size']) edge_gdf_web.plot( column=value_name, ax=ax, cmap=settings['colormap'], legend=True, vmin=settings['vmin'], vmax=settings['vmax'], linewidth=settings['edge_width'], alpha=settings['alpha'], legend_kwds={'label': f"{value_name} (%)"} ) # Add basemap with the same extent as the rectangle ctx.add_basemap( ax, source=settings['basemap_style'], zoom=settings['zoom'], bounds=(minx, miny, maxx, maxy) # Explicitly set the bounds of the basemap ) # Set the plot limits to the bounding box of the rectangle ax.set_xlim(minx, maxx) ax.set_ylim(miny, maxy) # Turn off the axis ax.set_axis_off() # Add title plt.title(f'Network {value_name} Analysis', pad=20) # Show the plot plt.show() return G, edge_gdf