Introduction

In this blog, we’ll walk through the process of processing geospatial data in the form of tweets with Apache Sedona for spatial analysis. Apache Sedona is a powerful library for geospatial data processing in Spark, enabling efficient spatial operations such as range queries and filtering based on time.

Step 1: Simulate Fake Tweet Data

We begin by simulating 1000 fake tweets. Each tweet contains a creation timestamp, text, and geolocation (latitude, longitude) within a defined bounding box. We use the Faker library to generate random tweet text and the datetime and random libraries to generate timestamps and coordinates. The generated data is then stored in a compressed .txt.gz file.

!pip install faker

import random
from datetime import datetime, timedelta
from faker import Faker
import json
import gzip

# Initialize Faker for generating fake text
fake = Faker()

# Define the bounding box (Envelope)
lat_min, lat_max = 20.01, 60.01
lon_min, lon_max = -100.01, -70.01

# Time window
time_start = datetime.strptime('2024-06-24T09:10:00', '%Y-%m-%dT%H:%M:%S')
time_end = datetime.strptime('2024-10-28T09:04:00', '%Y-%m-%dT%H:%M:%S')

# Function to generate random coordinates within the bounding box (Envelope)
def generate_coordinates():
    lat = random.uniform(lat_min, lat_max)
    lon = random.uniform(lon_min, lon_max)
    return {"coordinates": [lon, lat], "type": "Point"}

# Function to generate random timestamps within the time range
def generate_timestamp():
    delta = time_end - time_start
    random_seconds = random.randint(0, int(delta.total_seconds()))
    return time_start + timedelta(seconds=random_seconds)

# Function to generate fake tweet data
def generate_fake_tweet():
    return {
        "created_at": generate_timestamp().strftime('%a %b %d %H:%M:%S +0000 %Y'),
        "id": random.randint(1000000000000000000, 9999999999999999999),
        "id_str": str(random.randint(1000000000000000000, 9999999999999999999)),
        "text": fake.text(max_nb_chars=140),
        "display_text_range": [0, 140],
        "coordinates": generate_coordinates()
    }

# Generate fake tweet data (1000 tweets)
tweets = [generate_fake_tweet() for _ in range(1000)]

# Save the tweets to a .txt.gz file
with gzip.open('/content/tweets.txt.gz', 'wt', encoding='utf-8') as f:
    for tweet in tweets:
        f.write(json.dumps(tweet) + "\n")

Step 2: Install and Configure Apache Sedona

We then install Apache Sedona, a powerful spatial extension for Apache Spark, which is ideal for processing large-scale geospatial data. The following steps ensure that the necessary dependencies for running Apache Spark with Sedona are installed.

!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.5.3/spark-3.5.3-bin-hadoop3.tgz
!tar xf spark-3.5.3-bin-hadoop3.tgz
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.3-bin-hadoop3"
os.environ["PYTHONPATH"] = "/content/spark-3.5.3-bin-hadoop3/python"
!pip install findspark
import findspark
findspark.init()
!pip install apache-sedona[spark]

from sedona.spark import *
config = SedonaContext.builder(). \
    config('spark.jars.packages',
           'org.apache.sedona:sedona-spark-3.0_2.12:1.6.1,'
           'org.datasyslab:geotools-wrapper:1.6.1-28.2'). \
    config('spark.jars.repositories', 'https://artifacts.unidata.ucar.edu/repository/unidata-all'). \
    getOrCreate()
sedona = SedonaContext.create(config)

Step 3: Read and Process the Tweet Data with Apache Sedona

Once we have set up the environment, we read the generated tweets from the .txt.gz file and load them into a DataFrame. Apache Sedona provides efficient handling for geospatial data, and we can apply spatial operations directly within Spark.

!gunzip -c "/content/tweets.txt.gz" | head -n 5

test_df = sedona.read\
    .option("delimiter", "\t")\
    .option("header", "true")\
    .option("compression", "gzip")\
    .json("/content/tweets.txt.gz")

test_df.show(5, truncate=False)
test_df.count()
test_df.printSchema()

We now extract relevant columns from the DataFrame, including the timestamp and the geolocation (longitude and latitude).

from pyspark.sql.functions import col, to_timestamp

test_df.select(
    col("created_at"),
    col("coordinates.coordinates").getItem(0).alias("longitude"),
    col("coordinates.coordinates").getItem(1).alias("latitude")
).where(col("coordinates").isNotNull()).show(5)

Next, we convert the created_at column to a proper timestamp format for easy manipulation.

parsed_df = test_df.withColumn("created_at", to_timestamp(col("created_at"), "EEE MMM dd HH:mm:ss Z yyyy"))
parsed_df.show(5)

result_df = parsed_df.select(
    col("coordinates.coordinates").getItem(0).alias("longitude"),
    col("coordinates.coordinates").getItem(1).alias("latitude"),
    col("created_at")
).where(col("coordinates").isNotNull())
result_df.show(3)

Step 4: Spatial Range Query with Sedona

We now define a spatial range query using Apache Sedona’s RangeQuery to filter tweets based on their geolocation. We create an envelope (bounding box) to limit the query to a specific geographical area.

from sedona.core.SpatialRDD import PointRDD
from sedona.core.enums import FileDataSplitter

result_df.write.csv("output1", header=False)
data = PointRDD(sedona.sparkContext, 'output1', 0, FileDataSplitter.CSV, True)
all_records = data.rawSpatialRDD.collect()
all_records[0:5]

We can then perform a range query to filter out records that fall outside the defined bounding box.

from sedona.core.geom.envelope import Envelope
from sedona.core.spatialOperator import RangeQuery

range_query_window = Envelope(-90.01, -80.01, 30.01, 40.01)  # Define bounding box
consider_boundary_intersection = False  # Only return geometries fully covered by the window
using_index = False  # Disabling spatial indexing
query_result = RangeQuery.SpatialRangeQuery(data, range_query_window, consider_boundary_intersection, using_index)
query_result.map(lambda x: (x.geom, x.userData)).collect()[0:10]

Step 5: Filtering Tweets by Time Range

Finally, we can filter the query results by a specific time range, which allows us to process tweets that were posted within a defined period.

from datetime import datetime

def convert_to_datetime(timestamp_str):
    return datetime.strptime(timestamp_str, '%Y-%m-%dT%H:%M:%S.%fZ')

time_start = datetime.strptime('2024-10-01T09:04:00', '%Y-%m-%dT%H:%M:%S')
time_end = datetime.strptime('2024-10-24T09:10:00', '%Y-%m-%dT%H:%M:%S')

filtered_result = query_result.filter(
    lambda row: time_start <= convert_to_datetime(row.userData) <= time_end
)

filtered_result.map(lambda x: (x.geom, x.userData)).collect()[0:10]

We can convert the filtered results into a GeoDataFrame using geopandas for further analysis and visualization.

import geopandas as gpd
gpd.GeoDataFrame(
    filtered_result.map(lambda x: [x.geom, x.userData]).collect(),
    columns=["geom", "user_data"],
    geometry="geom"
)

Conclusion

In this tutorial, we’ve demonstrated how to simulate fake tweet data with geospatial information, store it in a compressed file, and process it using Apache Sedona within a Spark environment. We applied spatial operations such as filtering by a geographical bounding box and time range, which is crucial for real-world applications involving large geospatial datasets, such as analyzing geotagged social media data.