diff --git a/eoxmagmod/eoxmagmod/magnetic_model/loader_mma.py b/eoxmagmod/eoxmagmod/magnetic_model/loader_mma.py index 4bcc4be..ea7ef40 100644 --- a/eoxmagmod/eoxmagmod/magnetic_model/loader_mma.py +++ b/eoxmagmod/eoxmagmod/magnetic_model/loader_mma.py @@ -28,6 +28,7 @@ # pylint: disable=invalid-name from spacepy import pycdf +from numpy import concatenate, array_equal from .model import ( SphericalHarmomicGeomagneticModel, DipoleSphericalHarmomicGeomagneticModel, @@ -46,120 +47,124 @@ MMA2C_NGP_LONGITUDE = 287.78 - 360.0 # deg. -def load_model_swarm_mma_2c_internal(path, lat_ngp=MMA2C_NGP_LATITUDE, +def load_model_swarm_mma_2c_internal(*paths, lat_ngp=MMA2C_NGP_LATITUDE, lon_ngp=MMA2C_NGP_LONGITUDE): """ Load internal (secondary field) model from a Swarm MMA_SHA_2C product. """ return DipoleSphericalHarmomicGeomagneticModel( - load_coeff_swarm_mma_2c_internal(path), + load_coeff_swarm_mma_2c_internal(*paths), north_pole=(lat_ngp, lon_ngp), ) -def load_model_swarm_mma_2c_external(path, lat_ngp=MMA2C_NGP_LATITUDE, +def load_model_swarm_mma_2c_external(*paths, lat_ngp=MMA2C_NGP_LATITUDE, lon_ngp=MMA2C_NGP_LONGITUDE): """ Load external (primary field) model from a Swarm MMA_SHA_2C product. """ return DipoleSphericalHarmomicGeomagneticModel( - load_coeff_swarm_mma_2c_external(path), + load_coeff_swarm_mma_2c_external(*paths), north_pole=(lat_ngp, lon_ngp), ) -def load_model_swarm_mma_2f_geo_internal(path): +def load_model_swarm_mma_2f_geo_internal(*paths): """ Load geographic frame internal (secondary field) model from a Swarm MMA_SHA_2F product. """ return SphericalHarmomicGeomagneticModel( - load_coeff_swarm_mma_2f_geo_internal(path) + load_coeff_swarm_mma_2f_geo_internal(*paths) ) -def load_model_swarm_mma_2f_geo_external(path): +def load_model_swarm_mma_2f_geo_external(*paths): """ Load geographic frame internal (primary field) model from a Swarm MMA_SHA_2F product. """ return SphericalHarmomicGeomagneticModel( - load_coeff_swarm_mma_2f_geo_external(path) + load_coeff_swarm_mma_2f_geo_external(*paths) ) -def load_model_swarm_mma_2f_sm_internal(path): +def load_model_swarm_mma_2f_sm_internal(*paths): """ Load solar magnetic frame internal (secondary field) model from a Swarm MMA_SHA_2F product. """ # FIXME: solar-magnetic frame model return SphericalHarmomicGeomagneticModel( - load_coeff_swarm_mma_2f_sm_internal(path), + load_coeff_swarm_mma_2f_sm_internal(*paths), ) -def load_model_swarm_mma_2f_sm_external(path): +def load_model_swarm_mma_2f_sm_external(*paths): """ Load solar magnetic frame internal (primary field) model from a Swarm MMA_SHA_2F product. """ # FIXME: solar-magnetic frame model return SphericalHarmomicGeomagneticModel( - load_coeff_swarm_mma_2f_sm_external(path) + load_coeff_swarm_mma_2f_sm_external(*paths) ) -def load_coeff_swarm_mma_2c_internal(path): - """ Load internal model coefficients from a Swarm MMA_SHA_2C product file. +def load_coeff_swarm_mma_2c_internal(*paths): + """ Load internal model coefficients from one or more Swarm MMA_SHA_2C product files. + Note that the models are loaded and merged in the order of the arguments. """ return _load_coeff_mma_multi_set( - path, read_swarm_mma_2c_internal, "gh", is_internal=True + paths, read_swarm_mma_2c_internal, "gh", is_internal=True ) -def load_coeff_swarm_mma_2c_external(path): - """ Load external model coefficients from a Swarm MMA_SHA_2C product file. +def load_coeff_swarm_mma_2c_external(*paths): + """ Load external model coefficients from one or more Swarm MMA_SHA_2C product files. + Note that the models are loaded and merged in the order of the arguments. """ return _load_coeff_mma_multi_set( - path, read_swarm_mma_2c_external, "qs", is_internal=False + paths, read_swarm_mma_2c_external, "qs", is_internal=False ) -def load_coeff_swarm_mma_2f_geo_internal(path): +def load_coeff_swarm_mma_2f_geo_internal(*paths): """ Load internal geographic frame model coefficients from a Swarm MMA_SHA_2F product file. """ return _load_coeff_mma_single_set( - path, read_swarm_mma_2f_geo_internal, "gh", is_internal=True + paths, read_swarm_mma_2f_geo_internal, "gh", is_internal=True ) -def load_coeff_swarm_mma_2f_geo_external(path): +def load_coeff_swarm_mma_2f_geo_external(*paths): """ Load external geographic frame model coefficients from a Swarm MMA_SHA_2F product file. """ return _load_coeff_mma_single_set( - path, read_swarm_mma_2f_geo_external, "qs", is_internal=False + paths, read_swarm_mma_2f_geo_external, "qs", is_internal=False ) -def load_coeff_swarm_mma_2f_sm_internal(path): +def load_coeff_swarm_mma_2f_sm_internal(*paths): """ Load internal solar magnetic frame model coefficients from a Swarm MMA_SHA_2F product file. """ return _load_coeff_mma_single_set( - path, read_swarm_mma_2f_sm_internal, "gh", is_internal=True + paths, read_swarm_mma_2f_sm_internal, "gh", is_internal=True ) -def load_coeff_swarm_mma_2f_sm_external(path): +def load_coeff_swarm_mma_2f_sm_external(*paths): """ Load external solar magnetic frame model coefficients from a Swarm MMA_SHA_2F product file. """ return _load_coeff_mma_single_set( - path, read_swarm_mma_2f_sm_external, "qs", is_internal=False + paths, read_swarm_mma_2f_sm_external, "qs", is_internal=False ) -def _load_coeff_mma_multi_set(path, cdf_reader, variable, is_internal): - with pycdf.CDF(path) as cdf: - data = cdf_reader(cdf) - +def _load_coeff_mma_multi_set(paths, cdf_reader, variable, is_internal): + data = [_read_data(path, cdf_reader) for path in paths] + data = [ + _merge_coefficients(*items, variable=variable) + for items in zip(*data) + ] return CombinedSHCoefficients(*[ SparseSHCoefficientsTimeDependent( item["nm"], item[variable], item["t"], is_internal=is_internal @@ -167,10 +172,40 @@ def _load_coeff_mma_multi_set(path, cdf_reader, variable, is_internal): ]) -def _load_coeff_mma_single_set(path, cdf_reader, variable, is_internal): - with pycdf.CDF(path) as cdf: - data = cdf_reader(cdf) - +def _load_coeff_mma_single_set(paths, cdf_reader, variable, is_internal): + data = [_read_data(path, cdf_reader) for path in paths] + data = _merge_coefficients(*data, variable=variable) return SparseSHCoefficientsTimeDependent( data["nm"], data[variable], data["t"], is_internal=is_internal ) + + +def _read_data(path, cdf_reader): + with pycdf.CDF(path) as cdf: + return cdf_reader(cdf) + + +def _merge_coefficients(*sets, variable): + head, *tail = sets + + # sanity check + previous_item = head + for item in tail: + if ( + previous_item["degree_min"] != item["degree_min"] or + previous_item["degree_max"] != item["degree_max"] + ): + raise ValueError("Degree range mismatch!") + if not array_equal(previous_item["nm"], item["nm"]): + raise ValueError("Incompatible sets of coefficients!") + if previous_item["t"][-1] > item["t"][0]: + raise ValueError("Unordered sets of coefficients!") + previous_item = item + + return { + "degree_min": head["degree_min"], + "degree_max": head["degree_max"], + "nm": head["nm"], + "t": concatenate([item["t"] for item in sets], axis=0), + variable: concatenate([item[variable] for item in sets], axis=1), + } diff --git a/eoxmagmod/eoxmagmod/magnetic_model/tests/mma_merge.py b/eoxmagmod/eoxmagmod/magnetic_model/tests/mma_merge.py new file mode 100644 index 0000000..8ceb283 --- /dev/null +++ b/eoxmagmod/eoxmagmod/magnetic_model/tests/mma_merge.py @@ -0,0 +1,128 @@ +#------------------------------------------------------------------------------- +# +# Test of the MMA coefficients merge. +# +# Author: Martin Paces +# +#------------------------------------------------------------------------------- +# Copyright (C) 2022 EOX IT Services GmbH +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies of this Software or works derived from this Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +#------------------------------------------------------------------------------- +# pylint: disable=missing-docstring,attribute-defined-outside-init + +from unittest import TestCase, main +from numpy import asarray, concatenate +from numpy.random import random +from numpy.testing import assert_equal +from eoxmagmod.magnetic_model.loader_mma import _merge_coefficients + + +class MMAMergeTest(TestCase): + variable = "data" + set1 = { + "degree_min": 1, + "degree_max": 2, + "nm": asarray([(1, 0), (1, 1), (2, -1)]), + "t": asarray([0.0, 0.5, 1.0, 1.5, 2.0]), + "data": random((3, 5)), + } + set2 = { + "degree_min": 1, + "degree_max": 2, + "nm": asarray([(1, 0), (1, 1), (2, -1)]), + "t": asarray([2.0, 2.5, 3.0, 3.5, 4.0]), + "data": random((3, 5)), + } + set12_merged = { + **set1, + **{ + "t": concatenate((set1["t"], set2["t"])), + "data": concatenate((set1["data"], set2["data"]), axis=1), + } + } + set2_min_degree_mismatch = { + "degree_min": 2, + "degree_max": 2, + "nm": asarray([(1, 0), (1, 1), (2, -1)]), + "t": asarray([2.0, 2.5, 3.0, 3.5, 4.0]), + "data": random((3, 5)), + } + set2_max_degree_mismatch = { + "degree_min": 1, + "degree_max": 1, + "nm": asarray([(1, 0), (1, 1), (2, -1)]), + "t": asarray([2.0, 2.5, 3.0, 3.5, 4.0]), + "data": random((3, 5)), + } + set2_nm_mismatch = { + "degree_min": 1, + "degree_max": 2, + "nm": asarray([(1, 0), (1, 1), (1, -1)]), + "t": asarray([2.0, 2.5, 3.0, 3.5, 4.0]), + "data": random((3, 5)), + } + + @staticmethod + def _test_merge(tested, reference): + assert_equal(tested["degree_min"], reference["degree_min"]) + assert_equal(tested["degree_max"], reference["degree_max"]) + assert_equal(tested["nm"], reference["nm"]) + assert_equal(tested["t"], reference["t"]) + assert_equal(tested["data"], reference["data"]) + + def test_merge_single(self): + self._test_merge( + _merge_coefficients(self.set1, variable=self.variable), + self.set1, + ) + + def test_merge_multiple(self): + self._test_merge( + _merge_coefficients(self.set1, self.set2, variable=self.variable), + self.set12_merged + ) + + def test_min_degree_mismatch(self): + with self.assertRaises(ValueError): + _merge_coefficients( + self.set1, self.set2_min_degree_mismatch, + variable=self.variable + ) + + def test_max_degree_mismatch(self): + with self.assertRaises(ValueError): + _merge_coefficients( + self.set1, self.set2_max_degree_mismatch, + variable=self.variable + ) + + def test_mn_degree_mismatch(self): + with self.assertRaises(ValueError): + _merge_coefficients( + self.set1, self.set2_nm_mismatch, variable=self.variable + ) + + def test_unordered_time(self): + with self.assertRaises(ValueError): + _merge_coefficients(self.set2, self.set1, variable=self.variable) + + +if __name__ == "__main__": + main()