#!/usr/bin/env python
# -*- coding: utf-8 -*-.
# Full license can be found in License.md
# Full author list can be found in .zenodo.json file
# DOI:10.5281/zenodo.3986138
#
# DISTRIBUTION STATEMENT A: Approved for public release. Distribution is
# unlimited.
# ----------------------------------------------------------------------------
"""Provides routines that support GFZ space weather instruments."""
import datetime as dt
import json
import numpy as np
import os
import pandas as pds
import requests
import pysat
from pysatSpaceWeather.instruments.methods import general
# ----------------------------------------------------------------------------
# Define the module variables
ackn = ''.join(['CC BY 4.0, This index is produced by the Geomagnetic ',
'Observatory Niemegk, GFZ German Research Centre for ',
'Geosciences. Please cite the references in the ',
"'references' attribute"])
geoind_refs = '\n'.join([''.join(["Bartels, J. (1949) - The standardized index",
"Ks and the planetary index Kp, IATME ",
"Bulletin 12b, 97."]),
''.join(["Matzka, J., Bronkalla, O., Tornow, K., ",
"Elger, K. and Stolle, C., 2021. ",
"Geomagnetic Kp index. V. 1.0. GFZ Data ",
"Services, doi:10.5880/Kp.0001"]),
''.join(["Matzka, J., Stolle, C., Yamazaki, Y., ",
"Bronkalla, O. and Morschhauser, A., 2021. ",
"The geomagnetic Kp index and derived ",
"indices of geomagnetic activity. Space ",
"Weather,doi:10.1029/2020SW002641"])])
hpo_refs = '\n'.join([''.join(["Yamazaki, Y., Matzka, J., Stolle, C., ",
"Kervalishvili, G., Rauberg, J., Bronkalla, O.,",
" Morschhauser, A., Bruinsma, S., Shprits, ",
"Y.Y., Jackson, D.R., 2022. Geomagnetic ",
"Activity Index Hpo. Geophys. Res. Lett., 49, ",
"e2022GL098860, doi:10.1029/2022GL098860"]),
''.join(["Matzka, J., Bronkalla, O., Kervalishvili, G.,",
" Rauberg, J. and Yamazaki, Y., 2022. ",
"Geomagnetic Hpo index. V. 2.0. GFZ Data ",
"Services, doi:10.5880/Hpo.0002"])])
# ----------------------------------------------------------------------------
# Define the module functions
[docs]
def json_downloads(date_array, data_path, local_file_prefix, local_date_fmt,
gfz_data_name, freq, update_files=False, is_def=False,
mock_download_dir=None):
"""Download data from GFZ into CSV files at a specified cadence.
Parameters
----------
date_array : array-like or pandas.DatetimeIndex
Array-like or index of datetimes to be downloaded.
data_path : str
Path to data directory.
local_file_prefix : str
Prefix for local files, e.g., 'tag_' or 'tag_monthly_'
local_date_fmt : str
String format for the local filename, e.g., '%Y-%m-%d' or '%Y-%m'
gfz_data_name : str
Name of the data index on the GFZ server, expects one of: 'Kp', 'ap',
'Ap', 'Cp', 'C9', 'Hp30', 'Hp60', 'ap30', 'ap60', 'SN', 'Fobs', 'Fadj'
where SN is the international sunspot number and Fxxx is the observed
and adjusted F10.7.
freq : pds.DateOffset or dt.timedelta
Offset to add to the start date to ensure all data is downloaded
(inclusive)
update_files : bool
Re-download data for files that already exist if True (default=False)
is_def : bool
If true, selects only the definitive data, otherwise also includes
nowcast data (default=False)
mock_download_dir : str or NoneType
Local directory with downloaded files or None. If not None, will
process any files with the correct name and date (following the local
file prefix and date format) as if they were downloaded (default=None)
Raises
------
IOError
If there is a gateway timeout when downloading data or if an unknown
mock download directory is supplied.
"""
# If a mock download directory was supplied, test to see it exists
if mock_download_dir is not None:
if not os.path.isdir(mock_download_dir):
raise IOError('file location is not a directory: {:}'.format(
mock_download_dir))
# Set the local variables
base_url = "https://kp.gfz-potsdam.de/app/json/"
time_fmt = "%Y-%m-%dT%H:%M:%SZ"
last_file = ''
# Cycle through all the dates
for dl_date in date_array:
# Build the local filename
local_fname = ''.join([local_file_prefix,
dl_date.strftime(local_date_fmt), '.txt'])
local_file = os.path.join(data_path, local_fname)
# Determine if the download should occur
if not os.path.isfile(local_file) or (update_files
and local_file != last_file):
if mock_download_dir is None:
# Get the URL for the desired data
stop = dl_date + freq
query_url = "{:s}?start={:s}&end={:s}&index={:s}".format(
base_url, dl_date.strftime(time_fmt),
stop.strftime(time_fmt), gfz_data_name)
if is_def:
# Add the definitive flag
query_url = '{:s}&status=def'.format(query_url)
# The data is returned as a JSON file
req = requests.get(query_url)
# Process the JSON file
if req.text.find('Gateway Timeout') >= 0:
raise IOError(''.join(['Gateway timeout when requesting ',
'file using command: ', query_url]))
raw_txt = req.text if req.ok else None
else:
# Get the text from the downloaded file
query_url = os.path.join(mock_download_dir, local_fname)
if os.path.isfile(query_url):
with open(query_url, 'r') as fpin:
raw_txt = fpin.read()
else:
raw_txt = None
if raw_txt is not None:
raw_dict = json.loads(raw_txt)
data = pds.DataFrame.from_dict({gfz_data_name:
raw_dict[gfz_data_name]})
if data.empty:
pysat.logger.warning("no data for {:}".format(dl_date))
else:
# Convert the datetime strings to datetime objects
time_list = [dt.datetime.strptime(time_str, time_fmt)
for time_str in raw_dict['datetime']]
# Add the time index
data.index = time_list
# Create a local CSV file
data.to_csv(local_file, header=True)
else:
pysat.logger.info("".join(["Data not downloaded for ",
dl_date.strftime("%d %b %Y"),
", date may be out of range ",
"for the database or data may ",
"have been saved to an unexpected",
" filename: {:}".format(query_url)]))
return
[docs]
def kp_ap_cp_download(platform, name, date_array, tag, inst_id, data_path,
mock_download_dir=None):
"""Download Kp, ap, and Cp data from GFZ.
Parameters
----------
platform : str
Instrument platform.
name : str
Instrument name.
date_array : array-like or pandas.DatetimeIndex
Array-like or index of datetimes to be downloaded.
tag : str
String specifying the database, expects 'def' (definitive) or 'now'
(nowcast)
inst_id : str
Specifies the instrument identification, not used.
data_path : str
Path to data directory.
mock_download_dir : str or NoneType
Local directory with downloaded files or None. If not None, will
process any files with the correct name and date (following the local
file prefix and date format) as if they were downloaded (default=None)
Raises
------
ValueError
If an unknown instrument module is supplied.
IOError
If an unknown mock download directory is supplied.
Note
----
Note that the download path for the complementary Instrument will use
the standard pysat data paths
"""
# If a mock download directory was supplied, test to see it exists
if mock_download_dir is not None:
if not os.path.isdir(mock_download_dir):
raise IOError('file location is not a directory: {:}'.format(
mock_download_dir))
# Set the page for the definitive or nowcast Kp
burl = ''.join(['https://datapub.gfz-potsdam.de/download/10.5880.Kp.0001',
'/Kp_', 'nowcast' if tag == 'now' else 'definitive', '/'])
data_cols = ['Bartels_solar_rotation_num', 'day_within_Bartels_rotation',
'Kp', 'daily_Kp_sum', 'ap', 'daily_Ap', 'Cp', 'C9']
hours = np.arange(0, 24, 3)
kp_translate = {'0': 0.0, '3': 1.0 / 3.0, '7': 2.0 / 3.0}
dnames = list()
inst_cols = {'sw_kp': [0, 1, 2, 3], 'sw_cp': [0, 1, 6, 7],
'sw_ap': [0, 1, 4, 5]}
# Construct the Instrument module name from the platform and name
inst_mod_name = '_'.join([platform, name])
if inst_mod_name not in inst_cols.keys():
raise ValueError('Unknown Instrument module {:}, expected {:}'.format(
inst_mod_name, inst_cols.keys()))
data_paths = {inst_mod: data_path if inst_mod == inst_mod_name else
general.get_instrument_data_path(inst_mod, tag=tag,
inst_id=inst_id)
for inst_mod in inst_cols.keys()}
# Check that the directories exist
for data_path in data_paths.values():
pysat.utils.files.check_and_make_path(data_path)
# Cycle through all the times
for dl_date in date_array:
fname = 'Kp_{:s}{:04d}.wdc'.format(tag, dl_date.year)
if fname not in dnames:
pysat.logger.info(' '.join(('Downloading file for',
dl_date.strftime('%Y'))))
if mock_download_dir is None:
furl = ''.join([burl, fname])
req = requests.get(furl)
raw_txt = req.text if req.ok else None
else:
furl = os.path.join(mock_download_dir, fname)
if os.path.isfile(furl):
with open(furl, "r") as fpin:
raw_txt = fpin.read()
else:
raw_txt = None
if raw_txt is not None:
# Split the file text into lines
lines = raw_txt.split('\n')[:-1]
# Remove the header
while lines[0].find('#') == 0:
lines.pop(0)
# Process the data lines
ddict = {dkey: list() for dkey in data_cols}
times = list()
for line in lines:
ldate = dt.datetime.strptime(' '.join([
"{:02d}".format(int(dd)) for dd in
[line[:2], line[2:4], line[4:6]]]), "%y %m %d")
bsr_num = int(line[6:10])
bsr_day = int(line[10:12])
if line[28:30] == ' ':
kp_ones = 0.0
else:
kp_ones = float(line[28:30])
sum_kp = kp_ones + kp_translate[line[30]]
daily_ap = int(line[55:58])
cp = float(line[58:61])
c9 = int(line[61])
for i, hour in enumerate(hours):
# Set the time for this hour and day
times.append(ldate + dt.timedelta(hours=int(hour)))
# Set the daily values for this hour
ddict['Bartels_solar_rotation_num'].append(bsr_num)
ddict['day_within_Bartels_rotation'].append(bsr_day)
ddict['daily_Kp_sum'].append(sum_kp)
ddict['daily_Ap'].append(daily_ap)
ddict['Cp'].append(cp)
ddict['C9'].append(c9)
# Get the hourly-specific values
ikp = i * 2
kp_ones = line[12 + ikp]
if kp_ones == ' ':
kp_ones = 0.0
ddict['Kp'].append(np.float64(kp_ones)
+ kp_translate[line[13 + ikp]])
iap = i * 3
ddict['ap'].append(np.int64(line[31 + iap:34 + iap]))
# Put data into nicer DataFrames
for inst_mod in inst_cols.keys():
sel_cols = np.array(data_cols)[inst_cols[inst_mod]]
sel_dict = {col: ddict[col] for col in sel_cols}
data = pds.DataFrame(sel_dict, index=times,
columns=sel_cols)
# Write out as a CSV file
sfname = fname.replace('Kp',
inst_mod.split('_')[-1].capitalize())
saved_fname = os.path.join(data_paths[inst_mod],
sfname).replace('.wdc', '.txt')
data.to_csv(saved_fname, header=True)
# Record the filename so we don't download it twice
dnames.append(fname)
else:
pysat.logger.info("".join(["Unable to download data for ",
dl_date.strftime("%d %b %Y"),
", date may be out of range for ",
"the database or data may have been",
" saved to an unexpected filename: ",
furl]))
return
[docs]
def kp_ap_cp_list_files(name, tag, inst_id, data_path, format_str=None):
"""List local files for Kp, ap, or Cp data obtained from GFZ.
Parameters
----------
name : str
Instrument name.
tag : str
String specifying the database, expects 'def' (definitive) or 'now'
(nowcast)
inst_id : str
Specifies the instrument identification, not used.
data_path : str
Path to data directory.
format_str : str or NoneType
User specified file format. If None is specified, the default
formats associated with the supplied tags are used. (default=None)
Returns
-------
files : pysat._files.Files
A class containing the verified available files
"""
if format_str is None:
format_str = ''.join(['_'.join([name.capitalize(), tag]),
'{year:04d}.txt'])
# Files are stored by year, going to add a date to the yearly filename for
# each month and day of month. The load routine will load the year and use
# the append date to select out approriate data.
files = pysat.Files.from_os(data_path=data_path, format_str=format_str)
if not files.empty:
files.loc[files.index[-1]
+ pds.DateOffset(years=1, days=-1)] = files.iloc[-1]
files = files.asfreq('D', 'pad')
files = files + '_' + files.index.strftime('%Y-%m-%d')
return files
[docs]
def load_def_now(fnames):
"""Load GFZ yearly definitive or nowcast index data.
Parameters
----------
fnames : pandas.Series
Series of filenames
Returns
-------
data : pandas.DataFrame
Object containing satellite data
"""
# Load the definitive or nowcast data. The GFZ index data are stored in
# yearly or monthly files that are separated by index ondownload. We need
# to return data daily. The daily date is attached to filename. Parse off
# the last date, load all data, and downselect to the desired day
unique_fnames = dict()
for filename in fnames:
fname = filename[0:-11]
fdate = dt.datetime.strptime(filename[-10:], '%Y-%m-%d')
if fname not in unique_fnames.keys():
unique_fnames[fname] = [fdate]
else:
unique_fnames[fname].append(fdate)
# Load the desired filenames
all_data = []
for fname in unique_fnames.keys():
# The daily date is attached to the filename. Parse off the last
# date, load the year of data, downselect to the desired day
fdate = min(unique_fnames[fname])
temp = pds.read_csv(fname, index_col=0, parse_dates=True)
if temp.empty:
pysat.logger.warn('Empty file: {:}'.format(fname))
continue
# Select the desired times and add to data list
all_data.append(pds.DataFrame(temp[fdate:max(unique_fnames[fname])
+ dt.timedelta(seconds=86399)]))
# Combine data together
if len(all_data) > 0:
data = pds.concat(all_data, axis=0, sort=True)
else:
data = pds.DataFrame()
return data