欢迎光临顺德区华金智网
详情描述
python脚本实现下载高德离线底图瓦片并使用

1. 基础下载脚本

import requests
import os
import threading
from queue import Queue
import time

class GaodeTileDownloader:
    def __init__(self, save_dir="tiles", max_workers=5):
        """
        初始化高德地图瓦片下载器

        参数:
            save_dir: 瓦片保存目录
            max_workers: 最大线程数
        """
        self.save_dir = save_dir
        self.max_workers = max_workers

        # 创建保存目录
        if not os.path.exists(save_dir):
            os.makedirs(save_dir)

    def download_tile(self, x, y, z, style='webrd'):
        """
        下载单个瓦片

        参数:
            x, y: 瓦片坐标
            z: 缩放级别
            style: 地图样式
                - 'webrd': 道路图(默认)
                - 'webimg': 卫星图
                - 'webst': 道路+标注
        """
        # 高德地图瓦片URL模板
        url_templates = {
            'webrd': f'https://webrd0{sid}.is.autonavi.com/appmaptile?x={{x}}&y={{y}}&z={{z}}&lang=zh_cn&size=1&scale=1&style=8',
            'webimg': f'https://webst0{sid}.is.autonavi.com/appmaptile?style=6&x={{x}}&y={{y}}&z={{z}}',
            'webst': f'https://webst0{sid}.is.autonavi.com/appmaptile?x={{x}}&y={{y}}&z={{z}}&lang=zh_cn&size=1&scale=1&style=7'
        }

        # 随机选择服务器 (0-4)
        import random
        sid = random.randint(1, 4)

        url = url_templates[style].format(x=x, y=y, z=z)

        # 构建保存路径
        tile_dir = os.path.join(self.save_dir, str(z), str(x))
        if not os.path.exists(tile_dir):
            os.makedirs(tile_dir)

        file_path = os.path.join(tile_dir, f"{y}.png")

        # 如果文件已存在,跳过下载
        if os.path.exists(file_path):
            print(f"瓦片已存在: z={z}, x={x}, y={y}")
            return True

        try:
            headers = {
                'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
            }

            response = requests.get(url, headers=headers, timeout=30)

            if response.status_code == 200:
                with open(file_path, 'wb') as f:
                    f.write(response.content)
                print(f"下载成功: z={z}, x={x}, y={y}")
                return True
            else:
                print(f"下载失败: z={z}, x={x}, y={y}, 状态码: {response.status_code}")
                return False

        except Exception as e:
            print(f"下载错误: z={z}, x={x}, y={y}, 错误: {str(e)}")
            return False

    def download_range(self, x_range, y_range, z, style='webrd'):
        """
        下载指定范围的瓦片

        参数:
            x_range: x坐标范围 (start, end)
            y_range: y坐标范围 (start, end)
            z: 缩放级别
            style: 地图样式
        """
        print(f"开始下载: z={z}, x范围: {x_range}, y范围: {y_range}")

        # 创建队列
        task_queue = Queue()

        # 添加任务到队列
        for x in range(x_range[0], x_range[1] + 1):
            for y in range(y_range[0], y_range[1] + 1):
                task_queue.put((x, y))

        # 工作线程函数
        def worker():
            while not task_queue.empty():
                try:
                    x, y = task_queue.get_nowait()
                    self.download_tile(x, y, z, style)
                    time.sleep(0.1)  # 防止请求过快
                    task_queue.task_done()
                except:
                    break

        # 创建并启动工作线程
        threads = []
        for _ in range(min(self.max_workers, task_queue.qsize())):
            thread = threading.Thread(target=worker)
            thread.start()
            threads.append(thread)

        # 等待所有任务完成
        task_queue.join()

        for thread in threads:
            thread.join()

        print("下载完成!")

    def latlon_to_tile(self, lat, lon, zoom):
        """
        经纬度转换为瓦片坐标

        参数:
            lat: 纬度
            lon: 经度
            zoom: 缩放级别
        """
        import math

        n = 2.0 ** zoom
        x = int((lon + 180.0) / 360.0 * n)
        lat_rad = math.radians(lat)
        y = int((1.0 - math.asinh(math.tan(lat_rad)) / math.pi) / 2.0 * n)

        return x, y

    def download_by_bounds(self, bounds, zoom_range, style='webrd'):
        """
        根据地理边界下载瓦片

        参数:
            bounds: 边界框 (min_lon, min_lat, max_lon, max_lat)
            zoom_range: 缩放级别范围 (min_zoom, max_zoom)
            style: 地图样式
        """
        min_lon, min_lat, max_lon, max_lat = bounds

        for z in range(zoom_range[0], zoom_range[1] + 1):
            # 计算边界对应的瓦片坐标
            x_min, y_max = self.latlon_to_tile(max_lat, min_lon, z)
            x_max, y_min = self.latlon_to_tile(min_lat, max_lon, z)

            print(f"Zoom {z}: x范围 [{x_min}, {x_max}], y范围 [{y_min}, {y_max}]")

            self.download_range((x_min, x_max), (y_min, y_max), z, style)


# 使用示例
if __name__ == "__main__":
    # 创建下载器实例
    downloader = GaodeTileDownloader(save_dir="gaode_tiles", max_workers=10)

    # 方法1: 下载指定坐标范围的瓦片
    print("方法1: 下载指定坐标范围")
    downloader.download_range(
        x_range=(1346, 1348),  # x坐标范围
        y_range=(624, 626),    # y坐标范围
        z=12,                  # 缩放级别
        style='webrd'          # 地图样式
    )

    # 方法2: 下载地理边界内的瓦片
    print("\n方法2: 下载地理边界内的瓦片")
    # 北京市范围示例 (经度, 纬度)
    beijing_bounds = (116.2, 39.8, 116.6, 40.0)
    downloader.download_by_bounds(
        bounds=beijing_bounds,
        zoom_range=(10, 12),  # 下载10-12级瓦片
        style='webrd'
    )

2. 高级版 - 支持进度显示和断点续传

import requests
import os
import json
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
import hashlib

class AdvancedGaodeDownloader:
    def __init__(self, base_dir="gaode_tiles", max_workers=8):
        self.base_dir = base_dir
        self.max_workers = max_workers
        self.config_file = os.path.join(base_dir, "download_config.json")

        # 地图样式配置
        self.styles = {
            'normal': {  # 标准地图
                'url': 'https://webrd0{server}.is.autonavi.com/appmaptile?x={x}&y={y}&z={z}&lang=zh_cn&size=1&scale=1&style=8',
                'servers': [1, 2, 3, 4]
            },
            'satellite': {  # 卫星地图
                'url': 'https://webst0{server}.is.autonavi.com/appmaptile?style=6&x={x}&y={y}&z={z}',
                'servers': [1, 2, 3, 4]
            },
            'terrain': {  # 地形图
                'url': 'https://webst0{server}.is.autonavi.com/appmaptile?x={x}&y={y}&z={z}&lang=zh_cn&size=1&scale=1&style=7',
                'servers': [1, 2, 3, 4]
            }
        }

        # 创建目录
        os.makedirs(base_dir, exist_ok=True)

        # 加载下载记录
        self.download_records = self.load_records()

    def load_records(self):
        """加载下载记录"""
        if os.path.exists(self.config_file):
            try:
                with open(self.config_file, 'r', encoding='utf-8') as f:
                    return json.load(f)
            except:
                return {}
        return {}

    def save_records(self):
        """保存下载记录"""
        with open(self.config_file, 'w', encoding='utf-8') as f:
            json.dump(self.download_records, f, ensure_ascii=False, indent=2)

    def generate_tile_key(self, x, y, z, style):
        """生成瓦片唯一标识"""
        return f"{style}_{z}_{x}_{y}"

    def is_downloaded(self, x, y, z, style):
        """检查瓦片是否已下载"""
        key = self.generate_tile_key(x, y, z, style)
        tile_path = os.path.join(self.base_dir, style, str(z), str(x), f"{y}.png")

        if key in self.download_records:
            # 检查文件是否存在且完整
            if os.path.exists(tile_path):
                file_size = os.path.getsize(tile_path)
                if file_size > 1024:  # 文件大小大于1KB认为有效
                    return True

        return False

    def mark_downloaded(self, x, y, z, style, success=True):
        """标记瓦片下载状态"""
        key = self.generate_tile_key(x, y, z, style)
        self.download_records[key] = {
            'time': time.time(),
            'success': success,
            'style': style,
            'z': z,
            'x': x,
            'y': y
        }

        # 定期保存记录
        if len(self.download_records) % 100 == 0:
            self.save_records()

    def get_tile_url(self, x, y, z, style='normal'):
        """获取瓦片URL"""
        import random
        if style not in self.styles:
            style = 'normal'

        config = self.styles[style]
        server = random.choice(config['servers'])
        url = config['url'].format(server=server, x=x, y=y, z=z)
        return url

    def download_single_tile(self, args):
        """下载单个瓦片"""
        x, y, z, style = args

        if self.is_downloaded(x, y, z, style):
            return (x, y, z, style, True, "已存在")

        url = self.get_tile_url(x, y, z, style)

        try:
            headers = {
                'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
                'Referer': 'https://www.amap.com/'
            }

            response = requests.get(url, headers=headers, timeout=15)

            if response.status_code == 200:
                # 保存瓦片
                tile_dir = os.path.join(self.base_dir, style, str(z), str(x))
                os.makedirs(tile_dir, exist_ok=True)

                tile_path = os.path.join(tile_dir, f"{y}.png")
                with open(tile_path, 'wb') as f:
                    f.write(response.content)

                self.mark_downloaded(x, y, z, style, True)
                return (x, y, z, style, True, "成功")
            else:
                self.mark_downloaded(x, y, z, style, False)
                return (x, y, z, style, False, f"HTTP {response.status_code}")

        except Exception as e:
            self.mark_downloaded(x, y, z, style, False)
            return (x, y, z, style, False, str(e))

    def calculate_tile_range(self, center_lat, center_lon, zoom, radius_km=5):
        """计算以某点为中心,指定半径内的瓦片范围"""
        import math

        # 地球半径(千米)
        R = 6371

        # 计算纬度差(度)
        delta_lat = (radius_km / R) * (180 / math.pi)

        # 计算经度差(考虑纬度影响)
        delta_lon = (radius_km / (R * math.cos(math.radians(center_lat)))) * (180 / math.pi)

        # 计算边界
        bounds = (
            center_lon - delta_lon,  # min_lon
            center_lat - delta_lat,  # min_lat
            center_lon + delta_lon,  # max_lon
            center_lat + delta_lat   # max_lat
        )

        return self.bounds_to_tile_range(bounds, zoom)

    def bounds_to_tile_range(self, bounds, zoom):
        """地理边界转换为瓦片范围"""
        min_lon, min_lat, max_lon, max_lat = bounds

        n = 2 ** zoom
        x_min = int((min_lon + 180) / 360 * n)
        x_max = int((max_lon + 180) / 360 * n)

        lat1_rad = math.radians(max_lat)
        y_min = int((1 - math.asinh(math.tan(lat1_rad)) / math.pi) / 2 * n)

        lat2_rad = math.radians(min_lat)
        y_max = int((1 - math.asinh(math.tan(lat2_rad)) / math.pi) / 2 * n)

        # 确保范围正确
        x_min, x_max = min(x_min, x_max), max(x_min, x_max)
        y_min, y_max = min(y_min, y_max), max(y_min, y_max)

        return (x_min, x_max), (y_min, y_max)

    def download_area(self, center_lat, center_lon, zoom_range, radius_km=5, style='normal'):
        """
        下载指定区域的地图

        参数:
            center_lat, center_lon: 中心点经纬度
            zoom_range: 缩放级别范围 (min, max)
            radius_km: 半径(千米)
            style: 地图样式
        """
        print(f"开始下载区域: 中心点({center_lat}, {center_lon}), 半径{radius_km}km")

        for z in range(zoom_range[0], zoom_range[1] + 1):
            print(f"\n下载缩放级别: {z}")

            # 计算瓦片范围
            (x_min, x_max), (y_min, y_max) = self.calculate_tile_range(
                center_lat, center_lon, z, radius_km
            )

            total_tiles = (x_max - x_min + 1) * (y_max - y_min + 1)
            print(f"瓦片数量: {total_tiles}")

            # 准备下载任务
            tasks = []
            for x in range(x_min, x_max + 1):
                for y in range(y_min, y_max + 1):
                    tasks.append((x, y, z, style))

            # 使用线程池下载
            success_count = 0
            fail_count = 0

            with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
                futures = {executor.submit(self.download_single_tile, task): task for task in tasks}

                # 使用进度条
                with tqdm(total=len(tasks), desc=f"下载级别 {z}") as pbar:
                    for future in as_completed(futures):
                        try:
                            result = future.result(timeout=20)
                            x, y, z, style, success, message = result

                            if success:
                                success_count += 1
                            else:
                                fail_count += 1

                            pbar.update(1)
                            pbar.set_postfix(成功=success_count, 失败=fail_count)

                        except Exception as e:
                            fail_count += 1
                            pbar.update(1)

            print(f"级别 {z} 完成: 成功 {success_count}, 失败 {fail_count}")
            time.sleep(1)  # 避免请求过快

        # 保存最终记录
        self.save_records()
        print("\n所有级别下载完成!")


# 使用示例
if __name__ == "__main__":
    # 创建高级下载器
    downloader = AdvancedGaodeDownloader(base_dir="map_tiles", max_workers=12)

    # 示例:下载天安门附近的地图
    # 天安门坐标: 39.9087, 116.3975
    downloader.download_area(
        center_lat=39.9087,
        center_lon=116.3975,
        zoom_range=(10, 15),  # 下载10-15级瓦片
        radius_km=2,          # 半径2公里
        style='normal'        # 标准地图
    )

    # 也可以下载卫星图
    # downloader.download_area(
    #     center_lat=31.2304,    # 上海
    #     center_lon=121.4737,
    #     zoom_range=(12, 14),
    #     radius_km=3,
    #     style='satellite'
    # )

3. 瓦片查看器(可选)

import pygame
import os
import math

class TileViewer:
    def __init__(self, tile_dir, tile_size=256):
        """
        瓦片查看器

        参数:
            tile_dir: 瓦片目录
            tile_size: 瓦片大小(默认256)
        """
        self.tile_dir = tile_dir
        self.tile_size = tile_size
        self.zoom = 10
        self.center_x = 0
        self.center_y = 0
        self.screen = None

        # 计算屏幕显示范围
        self.tiles_x = 0
        self.tiles_y = 0

    def init_display(self, width=800, height=600):
        """初始化显示"""
        pygame.init()
        self.screen = pygame.display.set_mode((width, height))
        pygame.display.set_caption("高德地图瓦片查看器")

        # 计算能显示的瓦片数量
        self.tiles_x = math.ceil(width / self.tile_size) + 1
        self.tiles_y = math.ceil(height / self.tile_size) + 1

    def load_tile(self, x, y, z):
        """加载瓦片图片"""
        tile_path = os.path.join(self.tile_dir, str(z), str(x), f"{y}.png")

        if os.path.exists(tile_path):
            try:
                return pygame.image.load(tile_path)
            except:
                return None
        return None

    def draw_tiles(self):
        """绘制瓦片"""
        if not self.screen:
            return

        # 清屏
        self.screen.fill((200, 200, 200))

        # 计算起始瓦片坐标
        start_x = self.center_x - self.tiles_x // 2
        start_y = self.center_y - self.tiles_y // 2

        # 绘制瓦片
        for i in range(self.tiles_x):
            for j in range(self.tiles_y):
                tile_x = start_x + i
                tile_y = start_y + j

                tile_img = self.load_tile(tile_x, tile_y, self.zoom)

                if tile_img:
                    # 计算屏幕位置
                    screen_x = (i * self.tile_size) - (
                        (self.center_x - start_x) % 1 * self.tile_size
                    )
                    screen_y = (j * self.tile_size) - (
                        (self.center_y - start_y) % 1 * self.tile_size
                    )

                    self.screen.blit(tile_img, (screen_x, screen_y))

        pygame.display.flip()

    def run(self):
        """运行查看器"""
        if not self.screen:
            self.init_display()

        running = True
        clock = pygame.time.Clock()

        while running:
            for event in pygame.event.get():
                if event.type == pygame.QUIT:
                    running = False
                elif event.type == pygame.KEYDOWN:
                    if event.key == pygame.K_UP:
                        self.center_y -= 1
                    elif event.key == pygame.K_DOWN:
                        self.center_y += 1
                    elif event.key == pygame.K_LEFT:
                        self.center_x -= 1
                    elif event.key == pygame.K_RIGHT:
                        self.center_x += 1
                    elif event.key == pygame.K_PLUS or event.key == pygame.K_EQUALS:
                        self.zoom = min(19, self.zoom + 1)
                    elif event.key == pygame.K_MINUS:
                        self.zoom = max(0, self.zoom - 1)

            self.draw_tiles()
            clock.tick(30)

        pygame.quit()

# 使用查看器
if __name__ == "__main__":
    viewer = TileViewer(tile_dir="gaode_tiles")
    viewer.run()

使用说明

安装依赖

pip install requests tqdm pygame

主要功能

基础下载:支持多线程下载指定范围的瓦片 地理坐标转换:自动将经纬度转换为瓦片坐标 多种地图样式:支持道路图、卫星图、地形图 断点续传:记录下载进度,避免重复下载 进度显示:显示下载进度和统计信息 错误处理:自动重试和错误记录

注意事项

遵守使用条款:请遵守高德地图的使用条款,不要用于商业用途 控制下载速度:避免请求过快,建议添加延时 存储空间:瓦片数据量可能很大,注意磁盘空间 版权问题:下载的地图数据有版权限制,请合理使用

扩展建议

添加代理支持 实现自动重试机制 支持其他地图服务商 添加瓦片合并功能(将瓦片拼接成大图)

这个脚本应该能满足你的基本需求。如果需要更多功能或遇到问题,请告诉我!