根据你提供的坐标点列表(定义一个区域范围),筛选出与该区域相交的要素,并合并共享至少两个顶点的要素,最后只输出该区域范围内的几何数据。
这个脚本同样依赖 shapely 库来处理几何体的相交判断、裁剪和合并。
安装 Shapely (如果尚未安装)
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple/ shapelyPython 脚本
import json
from collections import defaultdict
from shapely.geometry import shape, Polygon, mapping
from shapely.ops import unary_union
import sys
def process_geojson_within_bounds_and_merge(geojson_data, bounds_coords):
"""
筛选、合并并裁剪 GeoJSON 要素。
Args:
geojson_data (dict): 解析后的 GeoJSON 数据字典。
bounds_coords (list): 定义区域范围的坐标列表 [[x1,y1], [x2,y2], ...]。
Returns:
list: 包含处理后要素的列表。
"""
features = geojson_data["features"]
# 假设只处理 Polygon 类型
polygon_features = [f for f in features if f.get("geometry", {}).get("type") == "Polygon"]
if not polygon_features:
print("警告:GeoJSON 中没有找到 Polygon 类型的要素。")
return []
# 1. 创建裁剪边界 (Bounding Box) 和 Shapely Polygon
lons = [coord[0] for coord in bounds_coords]
lats = [coord[1] for coord in bounds_coords]
min_lon, max_lon = min(lons), max(lons)
min_lat, max_lat = min(lats), max(lats)
# 创建一个代表裁剪区域的 Shapely Polygon (用于精确裁剪)
bounds_polygon = Polygon(bounds_coords)
# 一个包围盒 (用于快速初步筛选)
bounds_bbox = (min_lon, min_lat, max_lon, max_lat)
# 2. 筛选与边界相交的要素
intersecting_features = []
for feature in polygon_features:
geom_json = feature["geometry"]
shapely_geom = shape(geom_json)
# 快速 bbox 筛选
if not shapely_geom.intersects(bounds_polygon):
continue
# 精确相交判断
if shapely_geom.intersects(bounds_polygon):
intersecting_features.append(feature)
if not intersecting_features:
print("在指定区域内未找到相交的要素。")
return []
print(f"找到 {len(intersecting_features)} 个与区域相交的要素。")
# 3. 构建邻接图 (在相交要素内部查找共享顶点)
graph = defaultdict(set)
vertex_to_ids = defaultdict(set)
feature_map = {f["properties"]["OBJECTID"]: f for f in intersecting_features}
for feature in intersecting_features:
obj_id = feature["properties"]["OBJECTID"]
geometry = feature["geometry"]
coordinates = geometry["coordinates"]
ring_coords = coordinates[0] # 外环
for coord in ring_coords:
vertex_key = tuple(coord)
vertex_to_ids[vertex_key].add(obj_id)
for vertex, obj_ids in vertex_to_ids.items():
if len(obj_ids) > 1:
obj_ids_list = list(obj_ids)
for m in range(len(obj_ids_list)):
for n in range(m + 1, len(obj_ids_list)):
id1, id2 = obj_ids_list[m], obj_ids_list[n]
if id1 != id2:
graph[id1].add(id2)
graph[id2].add(id1)
# 4. 找到连接组件
visited = set()
components = []
for feature in intersecting_features:
obj_id = feature["properties"]["OBJECTID"]
if obj_id in visited:
continue
component_ids = set()
queue = [obj_id]
while queue:
current_id = queue.pop(0)
if current_id in component_ids:
continue
component_ids.add(current_id)
visited.add(current_id)
for neighbor_id in graph.get(current_id, []):
if neighbor_id not in component_ids:
queue.append(neighbor_id)
if component_ids:
components.append(list(component_ids))
# 5. 处理每个组件:合并、裁剪、创建新要素
processed_features = []
for component_id_list in components:
if len(component_id_list) == 1:
# 单一要素,直接裁剪
obj_id = component_id_list[0]
original_geom_json = feature_map[obj_id]["geometry"]
original_shapely_geom = shape(original_geom_json)
# 裁剪
clipped_geom = original_shapely_geom.intersection(bounds_polygon)
if not clipped_geom.is_empty:
# 创建新要素
new_feature = {
"type": "Feature",
"properties": feature_map[obj_id]["properties"].copy(),
"geometry": mapping(clipped_geom)
}
processed_features.append(new_feature)
else:
# 多要素组件,合并后再裁剪
polygons_to_merge = []
representative_feature = None
for obj_id in component_id_list:
if not representative_feature:
representative_feature = feature_map[obj_id]
geom_json = feature_map[obj_id]["geometry"]
shapely_geom = shape(geom_json)
polygons_to_merge.append(shapely_geom)
if not polygons_to_merge:
continue
# 合并
merged_geom = unary_union(polygons_to_merge)
# 裁剪
clipped_geom = merged_geom.intersection(bounds_polygon)
if not clipped_geom.is_empty:
# 创建新要素
new_feature = {
"type": "Feature",
# 使用第一个要素的属性作为代表,或合并属性
"properties": representative_feature["properties"].copy()
}
# 正确的字典赋值方式
new_feature["properties"]["OBJECTID"] = f"merged_{','.join(map(str, sorted(component_id_list)))}"
new_feature["properties"]["BH"] = f"merged_{','.join(map(str, sorted(component_id_list)))}"
new_feature["geometry"] = mapping(clipped_geom)
processed_features.append(new_feature)
return processed_features
def write_processed_geojson(processed_features, output_file_path, original_crs=None):
"""
将处理后的要素列表写入新的 GeoJSON 文件。
"""
if not processed_features:
print("没有处理数据可写入。")
return
new_geojson = {
"type": "FeatureCollection",
"crs": original_crs,
"features": processed_features
}
try:
with open(output_file_path, 'w', encoding='utf-8') as f:
json.dump(new_geojson, f, ensure_ascii=False, indent=2)
print(f"成功将处理后的 GeoJSON 数据写入到 {output_file_path}")
except IOError as e:
print(f"错误:写入文件 {output_file_path} 时发生错误: {e}")
# --- 主程序 ---
if __name__ == "__main__":
# 1. 指定输入文件路径
input_file = "input_data.json" # 请修改为你的输入文件路径
# 2. 指定输出文件路径
output_file = "clipped_merged_output.json" # 请修改为你希望的输出文件路径
# 3. 定义裁剪区域的坐标
bounds_coordinates = [
[108.61508953170733, 21.712980643849264],
[108.61368133018868, 21.711355635230348],
[108.61014895654131, 21.709234261538427],
[108.61131040924721, 21.707121058827184],
[108.6239464067533, 21.70642589647258],
[108.62929392858842, 21.706291009488353],
[108.62574528508623, 21.71175077714281],
[108.62569914775031, 21.711754859447737],
[108.62542012672162, 21.711720159853087],
[108.61524918324398, 21.712928653961626],
[108.61508953170733, 21.712980643849264] # 闭合环
]
# 4. 读取 GeoJSON 文件
try:
with open(input_file, 'r', encoding='utf-8') as f:
data = json.load(f)
except FileNotFoundError:
print(f"错误:找不到文件 {input_file}")
sys.exit()
except json.JSONDecodeError:
print(f"错误:文件 {input_file} 不是有效的 JSON 格式")
sys.exit()
# 5. 执行筛选、合并和裁剪
processed_features = process_geojson_within_bounds_and_merge(data, bounds_coordinates)
# 6. 写入结果
if processed_features:
print(f"处理完成,共生成 {len(processed_features)} 个要素。")
write_processed_geojson(processed_features, output_file, data.get("crs"))
else:
print("没有要素被处理或处理失败。")
### 如何使用
1. 安装 Shapely:如果尚未安装,请使用上面提供的镜像源命令安装。
2. 保存脚本:将代码保存为 clip_and_merge.py。
3. 准备 GeoJSON 文件:将你的数据保存为 input_data.json。
4. 修改路径:修改 input_file 和 output_file 变量为你实际的文件路径。
5. 运行脚本python clip_and_merge.py。
### 脚本解释
1. 定义区域:脚本使用你提供的 bounds_coordinates 创建一个 shapely.Polygon 作为裁剪边界。
2. 筛选:遍历输入的要素,使用 shapely 的 intersects 方法判断要素是否与裁剪区域相交。
3. 合并:在筛选出的要素内部,构建邻接图,找到共享顶点的连接组件,然后使用 unary_union 合并组件内的几何体。
4. 裁剪:对合并后(或未合并的单一)几何体,使用 intersection(bounds_polygon) 进行精确裁剪。
5. 输出:将裁剪后的几何体和(可能更新的)属性写入新的 GeoJSON 文件。
这个脚本会输出一个只包含指定区域范围内数据的新 GeoJSON 文件,且区域内连接的多边形已被合并。
评论