Skip to content

Commit

Permalink
Merging multiple MMA model files.
Browse files Browse the repository at this point in the history
  • Loading branch information
pacesm committed Dec 8, 2022
1 parent ad76f7d commit ff5e81e
Show file tree
Hide file tree
Showing 2 changed files with 197 additions and 34 deletions.
103 changes: 69 additions & 34 deletions eoxmagmod/eoxmagmod/magnetic_model/loader_mma.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
# pylint: disable=invalid-name

from spacepy import pycdf
from numpy import concatenate, array_equal
from .model import (
SphericalHarmomicGeomagneticModel,
DipoleSphericalHarmomicGeomagneticModel,
Expand All @@ -46,131 +47,165 @@
MMA2C_NGP_LONGITUDE = 287.78 - 360.0 # deg.


def load_model_swarm_mma_2c_internal(path, lat_ngp=MMA2C_NGP_LATITUDE,
def load_model_swarm_mma_2c_internal(*paths, lat_ngp=MMA2C_NGP_LATITUDE,
lon_ngp=MMA2C_NGP_LONGITUDE):
""" Load internal (secondary field) model from a Swarm MMA_SHA_2C product.
"""
return DipoleSphericalHarmomicGeomagneticModel(
load_coeff_swarm_mma_2c_internal(path),
load_coeff_swarm_mma_2c_internal(*paths),
north_pole=(lat_ngp, lon_ngp),
)


def load_model_swarm_mma_2c_external(path, lat_ngp=MMA2C_NGP_LATITUDE,
def load_model_swarm_mma_2c_external(*paths, lat_ngp=MMA2C_NGP_LATITUDE,
lon_ngp=MMA2C_NGP_LONGITUDE):
""" Load external (primary field) model from a Swarm MMA_SHA_2C product.
"""
return DipoleSphericalHarmomicGeomagneticModel(
load_coeff_swarm_mma_2c_external(path),
load_coeff_swarm_mma_2c_external(*paths),
north_pole=(lat_ngp, lon_ngp),
)


def load_model_swarm_mma_2f_geo_internal(path):
def load_model_swarm_mma_2f_geo_internal(*paths):
""" Load geographic frame internal (secondary field) model from a Swarm
MMA_SHA_2F product.
"""
return SphericalHarmomicGeomagneticModel(
load_coeff_swarm_mma_2f_geo_internal(path)
load_coeff_swarm_mma_2f_geo_internal(*paths)
)


def load_model_swarm_mma_2f_geo_external(path):
def load_model_swarm_mma_2f_geo_external(*paths):
""" Load geographic frame internal (primary field) model from a Swarm
MMA_SHA_2F product.
"""
return SphericalHarmomicGeomagneticModel(
load_coeff_swarm_mma_2f_geo_external(path)
load_coeff_swarm_mma_2f_geo_external(*paths)
)


def load_model_swarm_mma_2f_sm_internal(path):
def load_model_swarm_mma_2f_sm_internal(*paths):
""" Load solar magnetic frame internal (secondary field) model from a Swarm
MMA_SHA_2F product.
"""
# FIXME: solar-magnetic frame model
return SphericalHarmomicGeomagneticModel(
load_coeff_swarm_mma_2f_sm_internal(path),
load_coeff_swarm_mma_2f_sm_internal(*paths),
)


def load_model_swarm_mma_2f_sm_external(path):
def load_model_swarm_mma_2f_sm_external(*paths):
""" Load solar magnetic frame internal (primary field) model from a Swarm
MMA_SHA_2F product.
"""
# FIXME: solar-magnetic frame model
return SphericalHarmomicGeomagneticModel(
load_coeff_swarm_mma_2f_sm_external(path)
load_coeff_swarm_mma_2f_sm_external(*paths)
)


def load_coeff_swarm_mma_2c_internal(path):
""" Load internal model coefficients from a Swarm MMA_SHA_2C product file.
def load_coeff_swarm_mma_2c_internal(*paths):
""" Load internal model coefficients from one or more Swarm MMA_SHA_2C product files.
Note that the models are loaded and merged in the order of the arguments.
"""
return _load_coeff_mma_multi_set(
path, read_swarm_mma_2c_internal, "gh", is_internal=True
paths, read_swarm_mma_2c_internal, "gh", is_internal=True
)


def load_coeff_swarm_mma_2c_external(path):
""" Load external model coefficients from a Swarm MMA_SHA_2C product file.
def load_coeff_swarm_mma_2c_external(*paths):
""" Load external model coefficients from one or more Swarm MMA_SHA_2C product files.
Note that the models are loaded and merged in the order of the arguments.
"""
return _load_coeff_mma_multi_set(
path, read_swarm_mma_2c_external, "qs", is_internal=False
paths, read_swarm_mma_2c_external, "qs", is_internal=False
)


def load_coeff_swarm_mma_2f_geo_internal(path):
def load_coeff_swarm_mma_2f_geo_internal(*paths):
""" Load internal geographic frame model coefficients from a Swarm
MMA_SHA_2F product file.
"""
return _load_coeff_mma_single_set(
path, read_swarm_mma_2f_geo_internal, "gh", is_internal=True
paths, read_swarm_mma_2f_geo_internal, "gh", is_internal=True
)


def load_coeff_swarm_mma_2f_geo_external(path):
def load_coeff_swarm_mma_2f_geo_external(*paths):
""" Load external geographic frame model coefficients from a Swarm
MMA_SHA_2F product file.
"""
return _load_coeff_mma_single_set(
path, read_swarm_mma_2f_geo_external, "qs", is_internal=False
paths, read_swarm_mma_2f_geo_external, "qs", is_internal=False
)


def load_coeff_swarm_mma_2f_sm_internal(path):
def load_coeff_swarm_mma_2f_sm_internal(*paths):
""" Load internal solar magnetic frame model coefficients from a Swarm
MMA_SHA_2F product file.
"""
return _load_coeff_mma_single_set(
path, read_swarm_mma_2f_sm_internal, "gh", is_internal=True
paths, read_swarm_mma_2f_sm_internal, "gh", is_internal=True
)


def load_coeff_swarm_mma_2f_sm_external(path):
def load_coeff_swarm_mma_2f_sm_external(*paths):
""" Load external solar magnetic frame model coefficients from a Swarm
MMA_SHA_2F product file.
"""
return _load_coeff_mma_single_set(
path, read_swarm_mma_2f_sm_external, "qs", is_internal=False
paths, read_swarm_mma_2f_sm_external, "qs", is_internal=False
)


def _load_coeff_mma_multi_set(path, cdf_reader, variable, is_internal):
with pycdf.CDF(path) as cdf:
data = cdf_reader(cdf)

def _load_coeff_mma_multi_set(paths, cdf_reader, variable, is_internal):
data = [_read_data(path, cdf_reader) for path in paths]
data = [
_merge_coefficients(*items, variable=variable)
for items in zip(*data)
]
return CombinedSHCoefficients(*[
SparseSHCoefficientsTimeDependent(
item["nm"], item[variable], item["t"], is_internal=is_internal
) for item in data
])


def _load_coeff_mma_single_set(path, cdf_reader, variable, is_internal):
with pycdf.CDF(path) as cdf:
data = cdf_reader(cdf)

def _load_coeff_mma_single_set(paths, cdf_reader, variable, is_internal):
data = [_read_data(path, cdf_reader) for path in paths]
data = _merge_coefficients(*data, variable=variable)
return SparseSHCoefficientsTimeDependent(
data["nm"], data[variable], data["t"], is_internal=is_internal
)


def _read_data(path, cdf_reader):
with pycdf.CDF(path) as cdf:
return cdf_reader(cdf)


def _merge_coefficients(*sets, variable):
head, *tail = sets

# sanity check
previous_item = head
for item in tail:
if (
previous_item["degree_min"] != item["degree_min"] or
previous_item["degree_max"] != item["degree_max"]
):
raise ValueError("Degree range mismatch!")
if not array_equal(previous_item["nm"], item["nm"]):
raise ValueError("Incompatible sets of coefficients!")
if previous_item["t"][-1] > item["t"][0]:
raise ValueError("Unordered sets of coefficients!")
previous_item = item

return {
"degree_min": head["degree_min"],
"degree_max": head["degree_max"],
"nm": head["nm"],
"t": concatenate([item["t"] for item in sets], axis=0),
variable: concatenate([item[variable] for item in sets], axis=1),
}
128 changes: 128 additions & 0 deletions eoxmagmod/eoxmagmod/magnetic_model/tests/mma_merge.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
#-------------------------------------------------------------------------------
#
# Test of the MMA coefficients merge.
#
# Author: Martin Paces <[email protected]>
#
#-------------------------------------------------------------------------------
# Copyright (C) 2022 EOX IT Services GmbH
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies of this Software or works derived from this Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#-------------------------------------------------------------------------------
# pylint: disable=missing-docstring,attribute-defined-outside-init

from unittest import TestCase, main
from numpy import asarray, concatenate
from numpy.random import random
from numpy.testing import assert_equal
from eoxmagmod.magnetic_model.loader_mma import _merge_coefficients


class MMAMergeTest(TestCase):
variable = "data"
set1 = {
"degree_min": 1,
"degree_max": 2,
"nm": asarray([(1, 0), (1, 1), (2, -1)]),
"t": asarray([0.0, 0.5, 1.0, 1.5, 2.0]),
"data": random((3, 5)),
}
set2 = {
"degree_min": 1,
"degree_max": 2,
"nm": asarray([(1, 0), (1, 1), (2, -1)]),
"t": asarray([2.0, 2.5, 3.0, 3.5, 4.0]),
"data": random((3, 5)),
}
set12_merged = {
**set1,
**{
"t": concatenate((set1["t"], set2["t"])),
"data": concatenate((set1["data"], set2["data"]), axis=1),
}
}
set2_min_degree_mismatch = {
"degree_min": 2,
"degree_max": 2,
"nm": asarray([(1, 0), (1, 1), (2, -1)]),
"t": asarray([2.0, 2.5, 3.0, 3.5, 4.0]),
"data": random((3, 5)),
}
set2_max_degree_mismatch = {
"degree_min": 1,
"degree_max": 1,
"nm": asarray([(1, 0), (1, 1), (2, -1)]),
"t": asarray([2.0, 2.5, 3.0, 3.5, 4.0]),
"data": random((3, 5)),
}
set2_nm_mismatch = {
"degree_min": 1,
"degree_max": 2,
"nm": asarray([(1, 0), (1, 1), (1, -1)]),
"t": asarray([2.0, 2.5, 3.0, 3.5, 4.0]),
"data": random((3, 5)),
}

@staticmethod
def _test_merge(tested, reference):
assert_equal(tested["degree_min"], reference["degree_min"])
assert_equal(tested["degree_max"], reference["degree_max"])
assert_equal(tested["nm"], reference["nm"])
assert_equal(tested["t"], reference["t"])
assert_equal(tested["data"], reference["data"])

def test_merge_single(self):
self._test_merge(
_merge_coefficients(self.set1, variable=self.variable),
self.set1,
)

def test_merge_multiple(self):
self._test_merge(
_merge_coefficients(self.set1, self.set2, variable=self.variable),
self.set12_merged
)

def test_min_degree_mismatch(self):
with self.assertRaises(ValueError):
_merge_coefficients(
self.set1, self.set2_min_degree_mismatch,
variable=self.variable
)

def test_max_degree_mismatch(self):
with self.assertRaises(ValueError):
_merge_coefficients(
self.set1, self.set2_max_degree_mismatch,
variable=self.variable
)

def test_mn_degree_mismatch(self):
with self.assertRaises(ValueError):
_merge_coefficients(
self.set1, self.set2_nm_mismatch, variable=self.variable
)

def test_unordered_time(self):
with self.assertRaises(ValueError):
_merge_coefficients(self.set2, self.set1, variable=self.variable)


if __name__ == "__main__":
main()

0 comments on commit ff5e81e

Please sign in to comment.