from typing import Optional, Dict, List, Tuple, Union
from pathlib import Path
import requests
import re
import xml.etree.ElementTree as ET
import json
import zipfile
from .files import safe_extract, safe_rename
from .epw import process_epw
[docs]
def get_nearest_epw_from_climate_onebuilding(longitude: float, latitude: float, output_dir: str = "./", max_distance: Optional[float] = None,
extract_zip: bool = True, load_data: bool = True, region: Optional[Union[str, List[str]]] = None,
allow_insecure_ssl: bool = False, allow_http_fallback: bool = False,
ssl_verify: Union[bool, str] = True) -> Tuple[Optional[str], Optional["pd.DataFrame"], Optional[Dict]]:
"""
Download and process EPW weather file from Climate.OneBuilding.Org based on coordinates.
"""
import numpy as np
import pandas as pd
# --- KML sources and region helpers (unchanged from monolith) ---
KML_SOURCES = {
"Africa": "https://climate.onebuilding.org/sources/Region1_Africa_TMYx_EPW_Processing_locations.kml",
"Asia": "https://climate.onebuilding.org/sources/Region2_Asia_TMYx_EPW_Processing_locations.kml",
"Japan": "https://climate.onebuilding.org/sources/JGMY_EPW_Processing_locations.kml",
"India": "https://climate.onebuilding.org/sources/ITMY_EPW_Processing_locations.kml",
"CSWD": "https://climate.onebuilding.org/sources/CSWD_EPW_Processing_locations.kml",
"CityUHK": "https://climate.onebuilding.org/sources/CityUHK_EPW_Processing_locations.kml",
"PHIKO": "https://climate.onebuilding.org/sources/PHIKO_EPW_Processing_locations.kml",
"South_America": "https://climate.onebuilding.org/sources/Region3_South_America_TMYx_EPW_Processing_locations.kml",
"Argentina": "https://climate.onebuilding.org/sources/ArgTMY_EPW_Processing_locations.kml",
"INMET_TRY": "https://climate.onebuilding.org/sources/INMET_TRY_EPW_Processing_locations.kml",
"AMTUes": "https://climate.onebuilding.org/sources/AMTUes_EPW_Processing_locations.kml",
"BrazFuture": "https://climate.onebuilding.org/sources/BrazFuture_EPW_Processing_locations.kml",
"Canada": "https://climate.onebuilding.org/sources/Region4_Canada_TMYx_EPW_Processing_locations.kml",
"USA": "https://climate.onebuilding.org/sources/Region4_USA_TMYx_EPW_Processing_locations.kml",
"Caribbean": "https://climate.onebuilding.org/sources/Region4_NA_CA_Caribbean_TMYx_EPW_Processing_locations.kml",
"Southwest_Pacific": "https://climate.onebuilding.org/sources/Region5_Southwest_Pacific_TMYx_EPW_Processing_locations.kml",
"Europe": "https://climate.onebuilding.org/sources/Region6_Europe_TMYx_EPW_Processing_locations.kml",
"Antarctica": "https://climate.onebuilding.org/sources/Region7_Antarctica_TMYx_EPW_Processing_locations.kml",
}
REGION_DATASET_GROUPS = {
"Africa": ["Africa"],
"Asia": ["Asia", "Japan", "India", "CSWD", "CityUHK", "PHIKO"],
"South_America": ["South_America", "Argentina", "INMET_TRY", "AMTUes", "BrazFuture"],
"North_and_Central_America": ["North_and_Central_America", "Canada", "USA", "Caribbean"],
"Southwest_Pacific": ["Southwest_Pacific"],
"Europe": ["Europe"],
"Antarctica": ["Antarctica"],
}
REGION_BOUNDS = {
"Africa": {"lon_min": -25, "lon_max": 80, "lat_min": -55, "lat_max": 45},
"Asia": {"lon_min": 20, "lon_max": 180, "lat_min": -10, "lat_max": 80},
"Japan": {"lon_min": 127, "lon_max": 146, "lat_min": 24, "lat_max": 46},
"India": {"lon_min": 68, "lon_max": 97, "lat_min": 6, "lat_max": 36},
"South_America": {"lon_min": -92, "lon_max": -20, "lat_min": -60, "lat_max": 15},
"Argentina": {"lon_min": -75, "lon_max": -53, "lat_min": -55, "lat_max": -22},
"North_and_Central_America": {"lon_min": -180, "lon_max": 20, "lat_min": -10, "lat_max": 85},
"Canada": {"lon_min": -141, "lon_max": -52, "lat_min": 42, "lat_max": 83},
"USA": {"lon_min": -170, "lon_max": -65, "lat_min": 20, "lat_max": 72},
"Caribbean": {"lon_min": -90, "lon_max": -59, "lat_min": 10, "lat_max": 27},
"Southwest_Pacific": {"boxes": [
{"lon_min": 90, "lon_max": 180, "lat_min": -50, "lat_max": 25},
{"lon_min": -180, "lon_max": -140, "lat_min": -50, "lat_max": 25},
]},
"Europe": {"lon_min": -75, "lon_max": 60, "lat_min": 25, "lat_max": 85},
"Antarctica": {"lon_min": -180, "lon_max": 180, "lat_min": -90, "lat_max": -60}
}
def detect_regions(lon: float, lat: float) -> List[str]:
matching_regions = []
lon_adjusted = lon
if lon < -180:
lon_adjusted = lon + 360
elif lon > 180:
lon_adjusted = lon - 360
def _in_box(bx: Dict[str, float], lon_v: float, lat_v: float) -> bool:
return (bx["lon_min"] <= lon_v <= bx["lon_max"] and bx["lat_min"] <= lat_v <= bx["lat_max"])
for region_name, bounds in REGION_BOUNDS.items():
if "boxes" in bounds:
for bx in bounds["boxes"]:
if _in_box(bx, lon_adjusted, lat):
matching_regions.append(region_name)
break
else:
if _in_box(bounds, lon_adjusted, lat):
matching_regions.append(region_name)
if not matching_regions:
region_distances = []
def _box_distance(bx: Dict[str, float]) -> float:
lon_dist = 0
if lon_adjusted < bx["lon_min"]:
lon_dist = bx["lon_min"] - lon_adjusted
elif lon_adjusted > bx["lon_max"]:
lon_dist = lon_adjusted - bx["lon_max"]
lat_dist = 0
if lat < bx["lat_min"]:
lat_dist = bx["lat_min"] - lat
elif lat > bx["lat_max"]:
lat_dist = lat - bx["lat_max"]
return (lon_dist**2 + lat_dist**2)**0.5
for region_name, bounds in REGION_BOUNDS.items():
if "boxes" in bounds:
d = min(_box_distance(bx) for bx in bounds["boxes"])
else:
d = _box_distance(bounds)
region_distances.append((region_name, d))
closest_regions = sorted(region_distances, key=lambda x: x[1])[:3]
matching_regions = [r[0] for r in closest_regions]
return matching_regions
def try_decode(content: bytes) -> str:
encodings = ['utf-8', 'latin1', 'iso-8859-1', 'cp1252']
for encoding in encodings:
try:
return content.decode(encoding)
except UnicodeDecodeError:
continue
return content.decode('utf-8', errors='replace')
def clean_xml(content: str) -> str:
content = content.replace('ñ', 'n').replace('Ñ', 'N').replace('ñ', 'n').replace('Ñ', 'N')
content = re.sub(r'[^\x09\x0A\x0D\x20-\x7E\x85\xA0-\xFF]', '', content)
return content
def parse_coordinates(point_text: str) -> Tuple[float, float, float]:
try:
coords = point_text.strip().split(',')
if len(coords) >= 2:
lon, lat = map(float, coords[:2])
elevation = float(coords[2]) if len(coords) > 2 else 0
return lat, lon, elevation
except (ValueError, IndexError):
pass
return None
def parse_station_from_description(desc: str, point_coords: Optional[Tuple[float, float, float]] = None) -> Dict:
if not desc:
return None
url_match = re.search(r'URL (https://.*?\.zip)', desc)
if not url_match:
return None
url = url_match.group(1)
coord_match = re.search(r'([NS]) (\d+)°\s*(\d+\.\d+)'.encode('utf-8').decode('utf-8') + r"'.*?([EW]) (\d+)°\s*(\d+\.\d+)'", desc)
if coord_match:
ns, lat_deg, lat_min, ew, lon_deg, lon_min = coord_match.groups()
lat = float(lat_deg) + float(lat_min)/60
if ns == 'S':
lat = -lat
lon = float(lon_deg) + float(lon_min)/60
if ew == 'W':
lon = -lon
elif point_coords:
lat, lon, _ = point_coords
else:
return None
def extract_value(pattern: str, default: str = None) -> str:
match = re.search(pattern, desc)
return match.group(1) if match else default
metadata = {
'url': url,
'longitude': lon,
'latitude': lat,
'elevation': int(extract_value(r'Elevation <b>(-?\d+)</b>', '0')),
'name': extract_value(r'<b>(.*?)</b>'),
'wmo': extract_value(r'WMO <b>(\d+)</b>'),
'climate_zone': extract_value(r'Climate Zone <b>(.*?)</b>'),
'period': extract_value(r'Period of Record=(\d{4}-\d{4})'),
'heating_db': extract_value(r'99% Heating DB <b>(.*?)</b>'),
'cooling_db': extract_value(r'1% Cooling DB <b>(.*?)</b>'),
'hdd18': extract_value(r'HDD18 <b>(\d+)</b>'),
'cdd10': extract_value(r'CDD10 <b>(\d+)</b>'),
'time_zone': extract_value(r'Time Zone {GMT <b>([-+]?\d+\.\d+)</b>')
}
return metadata
from ...geoprocessor.utils import haversine_distance
def try_download_station_zip(original_url: str, timeout_s: int = 30) -> Optional[bytes]:
def candidate_urls(url: str) -> List[str]:
urls = [url]
if "/TUR_Turkey/" in url:
urls.append(url.replace("/TUR_Turkey/", "/TUR_Turkiye/"))
if "/TUR_Turkiye/" in url:
urls.append(url.replace("/TUR_Turkiye/", "/TUR_Turkey/"))
m = re.search(r"(.*_TMYx)(?:\.(\d{4}-\d{4}))?\.zip$", url)
if m:
base = m.group(1)
variants = [
f"{base}.2009-2023.zip",
f"{base}.2007-2021.zip",
f"{base}.zip",
f"{base}.2004-2018.zip",
]
for v in variants:
if v not in urls:
urls.append(v)
extra = []
for v in variants:
if "/TUR_Turkey/" in url:
extra.append(v.replace("/TUR_Turkey/", "/TUR_Turkiye/"))
if "/TUR_Turkiye/" in url:
extra.append(v.replace("/TUR_Turkiye/", "/TUR_Turkey/"))
for v in extra:
if v not in urls:
urls.append(v)
return urls
tried = set()
for u in candidate_urls(original_url):
if u in tried:
continue
tried.add(u)
try:
resp = requests.get(u, timeout=timeout_s, verify=ssl_verify)
resp.raise_for_status()
return resp.content
except requests.exceptions.SSLError:
if allow_insecure_ssl:
try:
resp = requests.get(u, timeout=timeout_s, verify=False)
resp.raise_for_status()
return resp.content
except requests.exceptions.RequestException:
if allow_http_fallback and u.lower().startswith("https://"):
insecure_url = "http://" + u.split("://", 1)[1]
try:
resp = requests.get(insecure_url, timeout=timeout_s)
resp.raise_for_status()
return resp.content
except requests.exceptions.RequestException:
pass
continue
else:
if allow_http_fallback and u.lower().startswith("https://"):
insecure_url = "http://" + u.split("://", 1)[1]
try:
resp = requests.get(insecure_url, timeout=timeout_s)
resp.raise_for_status()
return resp.content
except requests.exceptions.RequestException:
pass
continue
except requests.exceptions.HTTPError as he:
if getattr(he.response, "status_code", None) == 404:
continue
else:
raise
except requests.exceptions.RequestException:
continue
return None
try:
Path(output_dir).mkdir(parents=True, exist_ok=True)
regions_to_scan = {}
def _add_selection(selection_name: str, mapping: Dict[str, str], out: Dict[str, str]):
if selection_name in REGION_DATASET_GROUPS:
for key in REGION_DATASET_GROUPS[selection_name]:
if key in KML_SOURCES:
out[key] = KML_SOURCES[key]
elif selection_name in KML_SOURCES:
out[selection_name] = KML_SOURCES[selection_name]
else:
valid = sorted(list(REGION_DATASET_GROUPS.keys()) + list(KML_SOURCES.keys()))
raise ValueError(f"Invalid region/dataset: '{selection_name}'. Valid options include: {', '.join(valid)}")
if region is None:
detected_regions = detect_regions(longitude, latitude)
if detected_regions:
print(f"Auto-detected regions: {', '.join(detected_regions)}")
for r in detected_regions:
_add_selection(r, KML_SOURCES, regions_to_scan)
else:
print("Could not determine region from coordinates. Scanning all regions.")
regions_to_scan = dict(KML_SOURCES)
elif isinstance(region, str):
if region.lower() == "all":
regions_to_scan = dict(KML_SOURCES)
else:
_add_selection(region, KML_SOURCES, regions_to_scan)
else:
for r in region:
_add_selection(r, KML_SOURCES, regions_to_scan)
print("Fetching weather station data from Climate.OneBuilding.Org...")
all_stations = []
scanned_urls = set()
for region_name, url in regions_to_scan.items():
if url in scanned_urls:
continue
scanned_urls.add(url)
print(f"Scanning {region_name}...")
stations = []
try:
try:
response = requests.get(url, timeout=30, verify=ssl_verify)
response.raise_for_status()
except requests.exceptions.SSLError:
if allow_insecure_ssl:
try:
response = requests.get(url, timeout=30, verify=False)
response.raise_for_status()
except requests.exceptions.RequestException:
if allow_http_fallback and url.lower().startswith("https://"):
insecure_url = "http://" + url.split("://", 1)[1]
response = requests.get(insecure_url, timeout=30)
response.raise_for_status()
else:
raise
else:
if allow_http_fallback and url.lower().startswith("https://"):
insecure_url = "http://" + url.split("://", 1)[1]
response = requests.get(insecure_url, timeout=30)
response.raise_for_status()
else:
raise
content = try_decode(response.content)
content = clean_xml(content)
try:
root = ET.fromstring(content.encode('utf-8'))
except ET.ParseError as e:
print(f"Error parsing KML file {url}: {e}")
root = None
if root is not None:
ns = {'kml': 'http://earth.google.com/kml/2.1'}
for placemark in root.findall('.//kml:Placemark', ns):
name = placemark.find('kml:name', ns)
desc = placemark.find('kml:description', ns)
point = placemark.find('.//kml:Point/kml:coordinates', ns)
if desc is None or not desc.text or "Data Source" not in desc.text:
continue
point_coords = None
if point is not None and point.text:
point_coords = parse_coordinates(point.text)
station_data = parse_station_from_description(desc.text, point_coords)
if station_data:
station_data['name'] = name.text if name is not None else "Unknown"
station_data['kml_source'] = url
stations.append(station_data)
except requests.exceptions.RequestException as e:
print(f"Error accessing KML file {url}: {e}")
except Exception as e:
print(f"Error processing KML file {url}: {e}")
all_stations.extend(stations)
print(f"Found {len(stations)} stations in {region_name}")
print(f"\nTotal stations found: {len(all_stations)}")
if not all_stations:
if not (isinstance(region, str) and region.lower() == "all"):
print("No stations found from detected/selected regions. Falling back to global scan...")
regions_to_scan = dict(KML_SOURCES)
all_stations = []
scanned_urls = set()
for region_name, url in regions_to_scan.items():
if url in scanned_urls:
continue
scanned_urls.add(url)
print(f"Scanning {region_name}...")
# re-use logic above
try:
response = requests.get(url, timeout=30, verify=ssl_verify)
response.raise_for_status()
content = try_decode(response.content)
content = clean_xml(content)
root = ET.fromstring(content.encode('utf-8'))
ns = {'kml': 'http://earth.google.com/kml/2.1'}
for placemark in root.findall('.//kml:Placemark', ns):
name = placemark.find('kml:name', ns)
desc = placemark.find('kml:description', ns)
point = placemark.find('.//kml:Point/kml:coordinates', ns)
if desc is None or not desc.text or "Data Source" not in desc.text:
continue
point_coords = None
if point is not None and point.text:
point_coords = parse_coordinates(point.text)
station_data = parse_station_from_description(desc.text, point_coords)
if station_data:
station_data['name'] = name.text if name is not None else "Unknown"
station_data['kml_source'] = url
all_stations.append(station_data)
print(f"Found {len(all_stations)} stations in {region_name}")
except Exception:
pass
print(f"\nTotal stations found after global scan: {len(all_stations)}")
if not all_stations:
raise ValueError("No weather stations found")
stations_with_distances = [
(station, haversine_distance(longitude, latitude, station['longitude'], station['latitude']))
for station in all_stations
]
if max_distance is not None:
close_stations = [
(station, distance) for station, distance in stations_with_distances if distance <= max_distance
]
if not close_stations:
closest_station, min_distance = min(stations_with_distances, key=lambda x: x[1])
print(f"\nNo stations found within {max_distance} km. Closest station is {min_distance:.1f} km away.")
print("Using closest available station.")
stations_with_distances = [(closest_station, min_distance)]
else:
stations_with_distances = close_stations
nearest_station, distance = min(stations_with_distances, key=lambda x: x[1])
print(f"\nDownloading EPW file for {nearest_station['name']}...")
archive_bytes = try_download_station_zip(nearest_station['url'], timeout_s=30)
if archive_bytes is None:
raise ValueError(f"Failed to download EPW archive from station URL and fallbacks: {nearest_station['url']}")
temp_dir = Path(output_dir) / "temp"
temp_dir.mkdir(parents=True, exist_ok=True)
zip_file = temp_dir / "weather_data.zip"
with open(zip_file, 'wb') as f:
f.write(archive_bytes)
final_epw = None
try:
if extract_zip:
with zipfile.ZipFile(zip_file, 'r') as zip_ref:
epw_files = [f for f in zip_ref.namelist() if f.lower().endswith('.epw')]
if not epw_files:
raise ValueError("No EPW file found in the downloaded archive")
epw_filename = epw_files[0]
extracted_epw = safe_extract(zip_ref, epw_filename, temp_dir)
final_epw = Path(output_dir) / f"{nearest_station['name'].replace(' ', '_').replace(',', '').lower()}.epw"
final_epw = safe_rename(extracted_epw, final_epw)
finally:
try:
if zip_file.exists():
zip_file.unlink()
if temp_dir.exists() and not any(temp_dir.iterdir()):
temp_dir.rmdir()
except Exception as e:
print(f"Warning: Could not clean up temporary files: {e}")
if final_epw is None:
raise ValueError("Failed to extract EPW file")
metadata_file = final_epw.with_suffix('.json')
with open(metadata_file, 'w') as f:
json.dump(nearest_station, f, indent=2)
print(f"\nDownloaded EPW file for {nearest_station['name']}")
print(f"Distance: {distance:.2f} km")
print(f"Station coordinates: {nearest_station['longitude']}, {nearest_station['latitude']}")
if nearest_station.get('wmo'):
print(f"WMO: {nearest_station['wmo']}")
if nearest_station.get('climate_zone'):
print(f"Climate zone: {nearest_station['climate_zone']}")
if nearest_station.get('period'):
print(f"Data period: {nearest_station['period']}")
print(f"Files saved:")
print(f"- EPW: {final_epw}")
print(f"- Metadata: {metadata_file}")
df = None
headers = None
if load_data:
print("\nLoading EPW data...")
df, headers = process_epw(final_epw)
print(f"Loaded {len(df)} hourly records")
return str(final_epw), df, headers
except Exception as e:
print(f"Error processing data: {e}")
return None, None, None