Exporting Data with GeoPandas in Databricks

I recently had a project requirement to export the contents of a delta table in Databricks to several formats, including shapefile and file geodatabase, with the output being placed in object storage mounted to DBFS.

I set up the logic in a notebook, with the intent to use geopandas, which provides an easy wrapper around OGR to export dataframe contents. In this case, it was a table that had X and Y columns that were being exported to point geometries.

I had been doing a lot of work in DBFS and have generally found it to be a decent proxy for a file system, but I quickly ran into issues as geopandas tried to do binary writes. Many of the exports simply failed. DBFS is a distributed file system that provides and abstraction over cloud-based object storage. Through its dbutils library, Databricks provides a consistent interface over cloud-specific APIs.

Because it abstracts object storage, DBFS can suffer from many of the same limitations as object storage in with regard to low-latency writes and append operations. These limitations were tripping up the exports using geopandas. Digging a little deeper, I realized that I could access the Linux file system for my compute instance and do the writes there. After the export was successfully completed, I zipped it up and copied the output to the DBFS mount.

Full code to the function is at the end of the post, but this is the part that solved my problem:

    # Create a temporary local path for the shapefile
    timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
    local_temp_path = f"/tmp/gdb_{timestamp}"
    
    # Create the directory if it does not exist
    os.makedirs(local_temp_path, exist_ok=True)

    # Create File Geodatabase
    gdb_filename = "exported_data.gdb"
    gdf.to_file(local_temp_path + '/' + gdb_filename, driver="OpenFileGDB")

Note that I am writing to /tmp on the local file system. This is treated as ephemeral storage that will be reset when your compute cluster restarts. As a result, anything written here should be immediately transferred elsewhere, which is what I do later in the function. I also have a separate function (not shown here) that cleans up the /tmp location as a matter of good hygiene.

Databricks is a serviceable platform for data scientists. I am not entirely sold on it as an application platform, but it does have a good deal of flexibility. In this case, the ability to access the cluster’s local file system, along with the ability of dbutils to copy the file into mounted object storage helped get past the hurdle I was encountering.

Here is the full function with an example call at the end.

import geopandas as gpd
from shapely.geometry import Point
import pandas as pd
from datetime import datetime
import os
import shutil
import zipfile

%pip install geopandas shapely geojsonio

def export_delta_to_filegdb(spark, delta_table_name, output_path):
    """
    Export a Delta table with X (longitude), Y (latitude), and survey_date columns to a File Geodatabase.
    
    Parameters:
    spark: SparkSession object
    delta_table_name: str, name of the Delta table
    output_path: str, DBFS path where the File Geodatabase will be saved
    
    Returns:
    None
    """
    # Read the Delta table into a Spark DataFrame
    df_spark = spark.table(delta_table_name)
    
    # Convert to Pandas DataFrame
    df_pandas = df_spark.toPandas()
    
    print(f"Total records to process: {len(df_pandas)}")
    
    # Convert x, y, z columns to string format if they exist
    for col in ['x', 'y', 'z']:
        if col in df_pandas.columns:
            df_pandas[col] = df_pandas[col].astype(str)
    
    # Create geometry column from X and Y coordinates
    geometry = [Point(xy) for xy in zip(df_pandas['x'], df_pandas['y'])]
    
    # Use all columns in the DataFrame
    attribute_data = df_pandas.copy()
    
    # Create GeoDataFrame with all attribute data
    gdf = gpd.GeoDataFrame(
        attribute_data,
        geometry=geometry,
        crs="EPSG:4326"
    )
    
    # Create a temporary local path for the shapefile
    timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
    local_temp_path = f"/tmp/gdb_{timestamp}"
    
    # Create the directory if it does not exist
    os.makedirs(local_temp_path, exist_ok=True)

    # Create File Geodatabase
    gdb_filename = "exported_data.gdb"
    gdf.to_file(local_temp_path + '/' + gdb_filename, driver="OpenFileGDB")
    
    print(f"Exported File Geodatabase to: {local_temp_path}/{gdb_filename}")

    # Create a ZIP archive of the shapefile directory
    zip_filename = f"gdb_{timestamp}.zip"
    zip_filepath = f"/tmp/{zip_filename}"

    with zipfile.ZipFile(zip_filepath, 'w', zipfile.ZIP_DEFLATED) as zipf:
        for root, _, files in os.walk(local_temp_path):
            for file in files:
                file_path = os.path.join(root, file)
                zipf.write(file_path, os.path.relpath(file_path, local_temp_path))
    
    print(f"Created ZIP archive: {zip_filepath}")

    # Copy ZIP file to DBFS
    dbfs_zip_path = f"{output_path}/{zip_filename}".replace('/dbfs/', '')
    dbutils.fs.cp(f"file:{zip_filepath}", dbfs_zip_path)
    print(f"Copied ZIP file to: {output_path}/{zip_filename}")
 
if __name__ == "__main__":
    # Make sure you have an active SparkSession
    delta_table_name = "database.table"
    output_path = "/dbfs/mnt/object/storage"

    export_delta_to_filegdb(spark, delta_table_name, output_path)

Header image: Sabina Bajracharya, CC BY-SA 4.0 https://creativecommons.org/licenses/by-sa/4.0, via Wikimedia Commons