This repo will show you how to generate time series news reporting for earthquake events via GDELT. The GDELT Project is a real-time network diagram and database of global human society for open research. GDELT monitors the world's news media from nearly every corner of every country in print, broadcast, and web formats, in over 100 languages, every moment of every day.
Eathquake event data is collected from The International Disaster Database (EM-DAT). To reduce the data size, this notebook only focus on sample data containing 3 earthquake events.
# Data structure:
# .
# ├── bigquery.json // your own Google BigQuery API key
# ├── data
# │ ├── countryinfo2.csv // country information
# │ ├── earthquake_sample.csv // sample data containing 3 earthquake events
# │ ├── earthquake_gdelt.csv // sample gdelt bigquery data containing 3 earthquake events
# │ └── sourcesbycountry2018.csv // news sources by country
# ├── EQNews_TS_generation.ipynb // Jupyter notebook
# └── README.md
import os
import pandas as pd
import numpy as np
from datetime import timedelta
# read earthquake event sample data
df_event = pd.read_csv('data/earthquake_sample.csv')
df_event
# read country information
country_info = pd.read_csv('data/countryinfo2.csv')
# construct a iso3 to fips mapping
iso3_to_fips = pd.Series(country_info.fips.values, index=country_info.iso3).to_dict()
# Earthquake affected counrties, join by '|'
print(df_event.Location_fips.str.split(',').explode().unique().tolist())
!pip install --upgrade 'google-cloud-bigquery[bqstorage,pandas]'
!pip install --upgrade google-cloud-storage
from google.cloud import bigquery
from google.cloud import storage
# Setting Google application credentials
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "bigquery.json"
# Initialize BigQuery client
bq_client = bigquery.Client()
# Test: Run a simple query
query = """
SELECT
gkg.GKGRECORDID,
gkg.DATE,
gkg.SourceCommonName,
ARRAY_TO_STRING(
ARRAY(
SELECT
DISTINCT UPPER(TRIM(SPLIT(location, '#')[OFFSET(2)])) -- Extract and uppercase the FIPS code
FROM
UNNEST(SPLIT(gkg.V2Locations, ';')) AS location
WHERE
UPPER(TRIM(SPLIT(location, '#')[OFFSET(2)])) IN ['JA', 'TW', 'MO']
), ','
) AS V2Locations_FIPS
FROM
`gdelt-bq.gdeltv2.gkg_partitioned` gkg
WHERE
_PARTITIONTIME BETWEEN TIMESTAMP("2023-09-07") AND TIMESTAMP("2024-08-17")
AND gkg.DATE >= 20230907000000
AND REGEXP_CONTAINS(gkg.V2Themes, r'(?i)EARTHQUAKE.*EARTHQUAKE')
AND ARRAY_LENGTH(
ARRAY(
SELECT
DISTINCT UPPER(TRIM(SPLIT(location, '#')[OFFSET(2)])) -- Ensure FIPS codes match case-insensitively
FROM
UNNEST(SPLIT(gkg.V2Locations, ';')) AS location
WHERE
UPPER(TRIM(SPLIT(location, '#')[OFFSET(2)])) IN ['JA', 'TW', 'MO']
)
) > 0;
"""
# Run the query once
# Uncomment to run
# query_job = bq_client.query(query)
# results = query_job.to_dataframe()
# results.to_csv('data/earthquake_gdelt.csv', index=False, sep=',')
In this step, we show how to match sources country by sourcesbycountry2018. The sourcesbycountry2018 table is a list of all known news sources in the GDELT Global Knowledge Graph, along with their country of focus. The table is generated based on assumption that the country of focus of a news source is the country in which it is headquartered.
df_2018 = pd.read_csv('data/sourcesbycountry2018.csv', sep='\t')
results = pd.read_csv('data/earthquake_gdelt.csv')
# add column names sources_FIPS to results join by df_2018 Domain on SourceCommonName
results['sources_FIPS'] = results['SourceCommonName'].map(df_2018.set_index('Domain')['FIPS'])
# filter out rows with sources_FIPS is null
results = results[results['sources_FIPS'].notnull()]
results_count = len(results)
# drop SourceCommonName
results.drop(columns=['SourceCommonName'], inplace=True)
# print step 2 statistics
print(f'News records dropped with invalid or null FIPS: {results_count - len(results)} (% of total: {len(results) / results_count * 100:.2f}%)')
results.to_csv('data/earthquake_gdelt_addsourcefips.csv', index=False, sep=',')
In this step, we show how to generate time series news reporting for earthquake events via GDELT. And we also show how quickly the earthquake events can be reported. The temporal resolution of the time series is 15 mins, and time series can be generate at resolution of an integer times 15 mins, such as 30 mins, 1 hour, 1 day, etc.
df_gdelts = pd.read_csv('data/earthquake_gdelt_addsourcefips.csv')
df_event = pd.read_csv('data/earthquake_sample.csv')
df_event['UTC'] = pd.to_datetime(df_event['UTC'])
df_event['UTC_round'] = pd.to_datetime(df_event['UTC_round'])
df_gdelts['DATE'] = pd.to_datetime(df_gdelts['DATE'], format='%Y%m%d%H%M%S')
df_gdelts.head()
# check if V2Locations_FIPS is NaN
df_gdelts['V2Locations_FIPS'].isnull().sum()
df_event.head()
def generate_ts_df(df_event, df_gdelts, start_shift='1D', end_shift='21D', ts_interval='1D', quick_report=['1h', '3h']):
# Initialize an empty list to store results
results = []
# Create timedelta objects for shifts and intervals
ts_interval_timedelta = pd.to_timedelta(ts_interval)
start_shift_timedelta = pd.to_timedelta(start_shift)
end_shift_timedelta = pd.to_timedelta(end_shift)
# Iterate over each event in df_event
for _, event in df_event.iterrows():
event_id = event['Event_id']
event_fips = set(event['Location_fips'].split(','))
# Filter df_gdelts for the full time series generation (including first report time)
ts_gdelts = df_gdelts[
(df_gdelts['DATE'] >= event['UTC_round'] - start_shift_timedelta) &
(df_gdelts['DATE'] <= event['UTC_round'] + end_shift_timedelta)
]
# Further filter based on relevant FIPS codes and ensure FIPS codes are valid
ts_gdelts = ts_gdelts[
ts_gdelts['V2Locations_FIPS'].apply(lambda x: bool(event_fips.intersection(set(x.split(',')))))
].copy()
# Skip to the next event if there are no reports in the current window
if ts_gdelts.empty:
continue
# Calculate the first report time for each reporting country
# the first report time is after event['UTC_round']
first_report_times = ts_gdelts[ts_gdelts['DATE'] > event['UTC_round']].groupby('sources_FIPS')['DATE'].min().reset_index()
first_report_times.columns = ['report_country_FIP', 'first_report_time']
# Initialize a dictionary to store quick report counts for different intervals
quick_report_counts_dict = {}
# Process each quick_report interval
for quick_interval in quick_report:
quick_report_timedelta = pd.to_timedelta(quick_interval)
# Filter df_gdelts based on the quick report date range
quick_report_gdelts = ts_gdelts[
(ts_gdelts['DATE'] >= event['UTC_round']) &
(ts_gdelts['DATE'] < event['UTC_round'] + quick_report_timedelta)
]
# Calculate the quick report count for this event and interval
quick_report_counts = quick_report_gdelts.groupby('sources_FIPS')['DATE'].count().reset_index()
quick_report_counts.columns = ['report_country_FIP', f'report_{quick_interval}']
# Store the quick report counts for the current interval
quick_report_counts_dict[quick_interval] = quick_report_counts
# Group by the reporting country for time series
grouped_gdelts = ts_gdelts.groupby('sources_FIPS')
for report_country, group in grouped_gdelts:
# Find the first report time for this report_country
first_report_time = first_report_times[first_report_times['report_country_FIP'] == report_country]
if not first_report_time.empty:
first_report_time = first_report_time['first_report_time'].values[0]
else:
continue # Skip this country if there's no valid first report time
ts_start = event['UTC_round'] - start_shift_timedelta
ts_end = event['UTC_round'] + end_shift_timedelta
# Create a time series range
time_range = pd.date_range(start=ts_start, end=ts_end, freq=ts_interval_timedelta)
# Efficiently calculate reports within each time interval
# [start, end)
ts_array = group['DATE'].groupby(pd.cut(group['DATE'], time_range, right=False, include_lowest=True), observed=False).size().tolist()
# Initialize quick report counts for this report_country
quick_report_results = {}
for quick_interval in quick_report:
quick_report_count = quick_report_counts_dict[quick_interval]
quick_report_count_for_country = quick_report_count[quick_report_count['report_country_FIP'] == report_country]
if not quick_report_count_for_country.empty:
quick_report_results[f'report_{quick_interval}'] = quick_report_count_for_country[f'report_{quick_interval}'].values[0]
else:
quick_report_results[f'report_{quick_interval}'] = 0
# Add results for this country and event to the final result list
result = {
'Event_id': event_id,
'Location_fips': event['Location_fips'],
'UTC': event['UTC'],
'UTC_round': event['UTC_round'],
'report_country_FIP': report_country,
'first_report_time': first_report_time,
'TS_start': ts_start,
'TS_end': ts_end,
'TS_interval': ts_interval,
'TS_array': ts_array
}
# Add quick report results to the result dictionary
result.update(quick_report_results)
results.append(result)
print(f"Event {event_id} processed.")
# Convert results list to DataFrame
final_df = pd.DataFrame(results)
return final_df
result_df = generate_ts_df(df_event, df_gdelts)
result_df.to_csv('data/earthquake_ts_1D_21D_1D.csv', index=False, sep=',')