diff --git a/src/python/esgcet/esgmkpubrec.py b/src/python/esgcet/esgmkpubrec.py index 86dd7093..a508e34e 100644 --- a/src/python/esgcet/esgmkpubrec.py +++ b/src/python/esgcet/esgmkpubrec.py @@ -89,10 +89,12 @@ def run(): publog.exception("Missing required Exiting.") exit(1) - scanarg = a.scan_file + #scanarg = a.scan_file - if scanarg: + #if scanarg: + if a.scan_file: format_handler = ESGPubAutocHandler + scanarg = json.load(open(a.scan_file, 'r')) else: format_handler = ESGPubXArrayHandler scanarg = format_handler.xarray_load(map_json_data) @@ -146,7 +148,7 @@ def run(): exit(1) try: - globus = json.loads(config['globus_uuid']) + globus = config['globus_uuid'] except: # globus undefined globus = "none" diff --git a/src/python/esgcet/esgstacpub.py b/src/python/esgcet/esgstacpub.py new file mode 100644 index 00000000..11921683 --- /dev/null +++ b/src/python/esgcet/esgstacpub.py @@ -0,0 +1,280 @@ +import sys, json, os, re +from esgcet.stac_client import TransactionClient +import argparse +from pathlib import Path +from esgcet.settings import STAC_API +import esgcet.logger as logger +import esgcet.args as pub_args + +log = logger.ESGPubLogger() +publog = log.return_logger('esgstacpub') + + +item_properties = { + "CMIP6": [ + "version", + "access", + #"activity_drs", + "activity_id", + "cf_standard_name", + "citation_url", + #"data_node", + "data_spec_version", + #"dataset_id_template_", + #"datetime_start", + #"datetime_stop", + #"directory_format_template_", + "experiment_id", + "experiment_title", + "frequency", + "further_info_url", + #"north_degrees", + #"west_degrees", + #"south_degrees", + #"east_degrees", + #"geo", + #"geo_units", + "grid", + "grid_label", + #"height_bottom", + #"height_top", + #"height_units", + #"index_node", + #"instance_id", + "institution_id", + #"latest", + #"master_id", + #"member_id", + #"metadata_format", + "mip_era", + "model_cohort", + "nominal_resolution", + #"number_of_aggregations", + #"number_of_files", + "pid", + "product", + "project", + "realm", + #"replica", + #"size", + "source_id", + "source_type", + "sub_experiment_id", + "table_id", + "title", + #"type", + #"url", + "variable", + "variable_id", + "variable_long_name", + "variable_units", + "variant_label", + #"xlink", + #"_version_", + "retracted", + #"_timestamp", + #"score", + ] +} + + +def convert2stac(json_data): + dataset_doc = {} + for doc in json_data: + if doc.get("type") == "Dataset": + dataset_doc = doc + break + + collection = dataset_doc.get("project") + item_id = dataset_doc.get("instance_id") + west_degrees = dataset_doc.get("west_degrees") + south_degrees = dataset_doc.get("south_degrees") + east_degrees = dataset_doc.get("east_degrees") + north_degrees = dataset_doc.get("north_degrees") + + properties = { + "datetime": None, + "start_datetime": dataset_doc.get("datetime_start"), + "end_datetime": dataset_doc.get("datetime_end"), + } + property_keys = item_properties.get(collection) + for k in property_keys: + properties[k] = dataset_doc.get(k) + + item = { + "type": "Feature", + "stac_version": "1.0.0", + "extensions": ["https://stac-extensions.github.io/alternate-assets/v1.2.0/schema.json"], + "id": item_id, + "geometry": { + "type": "Polygon", + "coordinates": [ + [ + [west_degrees, south_degrees], + [east_degrees, south_degrees], + [east_degrees, north_degrees], + [west_degrees, north_degrees], + [west_degrees, south_degrees] + ] + ] + }, + "bbox": [ + west_degrees, south_degrees, east_degrees, north_degrees + ], + "collection": collection, + "links": [ + { + "rel": "self", + "type": "application/json", + "href": f"{STAC_API}/collections/{collection}/items/{item_id}" + }, + { + "rel": "parent", + "type": "application/json", + "href": f"{STAC_API}/collections/{collection}" + }, + { + "rel": "collection", + "type": "application/json", + "href": f"{STAC_API}/collections/{collection}" + }, + { + "rel": "root", + "type": "application/json", + "href": f"{STAC_API}/collections" + } + ], + "properties": properties, + } + + assets = {} + + if "Globus" in dataset_doc.get("access"): + for doc in json_data: + if doc.get("type") == "File": + urls = doc.get("url") + for url in urls: + if url.startswith("globus:"): + m = re.search(r"^globus:([^/]*)(.*/)[^/]*$", url) + href = f"https://app.globus.org/file-manager?origin_id={m[1]}&origin_path={m[2]}" + assets = { + "globus": { + "href": href, + "description": "Globus Web App Link", + "type": "text/html", + "roles": ["data"], + "alternate:name": dataset_doc.get("data_node"), + } + } + break + + if "HTTPServer" in dataset_doc.get("access"): + counter = 0 + for doc in json_data: + if doc.get("type") == "File": + urls = doc.get("url") + for url in urls: + if url.endswith("application/netcdf|HTTPServer"): + url_split = url.split("|") + href = url_split[0] + assets[f"data{counter:04}"] = { + "href": href, + "description": "HTTPServer Link", + "type": "application/netcdf", + "roles": ["data"], + "alternate:name": dataset_doc.get("data_node"), + } + counter += 1 + break + + item["assets"] = assets + + return item + + +def get_args(): + parser = argparse.ArgumentParser(description="Publish data sets to ESGF STAC Transaction API.") + + home = str(Path.home()) + def_config = home + "/.esg/esg.yaml" + parser.add_argument("--stac-api", dest="stac_api", default=None, help="Specify STAC Transaction API.") + parser.add_argument("--pub-rec", dest="json_data", default=None, + help="JSON file output from esgpidcitepub or esgmkpubrec.") + parser.add_argument("--config", "-cfg", dest="cfg", default=def_config, help="Path to yaml config file.") + parser.add_argument("--silent", dest="silent", action="store_true", help="Enable silent mode.") + parser.add_argument("--verbose", dest="verbose", action="store_true", help="Enable verbose mode.") + pub = parser.parse_args() + + return pub + + +def run(): + a = get_args() + + ini_file = a.cfg + if not os.path.exists(ini_file): + publog.error("Config file not found. " + ini_file + " does not exist.") + exit(1) + if os.path.isdir(ini_file): + publog.error("Config file path is a directory. Please use a complete file path.") + exit(1) + args = pub_args.PublisherArgs() + config = args.load_config(ini_file) + + if not a.json_data: + publog.error("Input data argument missing. Please provide either records in .json form for esgf2 publishing") + exit(1) + + if not a.silent: + try: + s = config['silent'] + if 'true' in s or 'yes' in s: + silent = True + else: + silent = False + except: + silent = False + else: + silent = True + + if not a.verbose: + try: + v = config['verbose'] + if 'true' in v or 'yes' in v: + verbose = True + else: + verbose = False + except: + verbose = False + else: + verbose = True + + + rc = True + tc = TransactionClient(a.stac_api, silent=silent, verbose=verbose) + + if a.json_data: + try: + new_json_data = json.load(open(a.json_data)) + except: + publog.exception("Could not open json file. Exiting.") + exit(1) + try: + stac_item = convert2stac(new_json_data) + #publog.warn(json.dumps(stac_item, indent=4)) + rc = rc and tc.publish(stac_item) + except Exception as ex: + publog.exception("Failed to publish to STAC Transaction API") + exit(1) + if not rc: + exit(1) + + +def main(): + run() + + +if __name__ == '__main__': + sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.realpath(__file__)))) + main() + diff --git a/src/python/esgcet/mk_dataset_autoc.py b/src/python/esgcet/mk_dataset_autoc.py index 04a76bc5..1b814e45 100644 --- a/src/python/esgcet/mk_dataset_autoc.py +++ b/src/python/esgcet/mk_dataset_autoc.py @@ -25,6 +25,7 @@ def unpack_values(self, invals): yield x['values'] def get_attrs_dict(self, scanobj): + self.publog.warn(type(scanobj)) return scanobj['dataset'] def get_variables(self, scanobj): diff --git a/src/python/esgcet/settings.py b/src/python/esgcet/settings.py index c424806a..db589df8 100644 --- a/src/python/esgcet/settings.py +++ b/src/python/esgcet/settings.py @@ -78,7 +78,7 @@ # Eg replace /thredds/fileServer with the prefix for NginX # Note these are netCDF specific and will need to change if other formats are considered URL_Templates = ["https://{}/thredds/fileServer/{}/{}|application/netcdf|HTTPServer", -"https://{}/thredds/dodsC/{}/{}|application/opendap-html|OPENDAP", +#"https://{}/thredds/dodsC/{}/{}|application/opendap-html|OPENDAP", "globus:{}/{}/{}|Globus|Globus"] DATASET_GLOBUS_URL_TEMPLATE = "https://app.globus.org/file_manager?origin_id={}&origin_path=/{}" @@ -115,6 +115,22 @@ GLOBUS_UUID = "415a6320-e49c-11e5-9798-22000b9da45e" +STAC_CLIENT = { + "client_id": "ec5f07c0-7ed8-4f2b-94f2-ddb6f8fc91a3", + "redirect_uri": "https://auth.globus.org/v2/web/auth-code", +} + +TOKEN_STORAGE_FILE = "~/.esgf2-publisher.json" + +STAC_TRANSACTION_API = { + "client_id": "6fa3b827-5484-42b9-84db-f00c7a183a6a", + "access_control_policy": "https://esgf2.s3.amazonaws.com/access_control_policy.json", + "scope_string": "https://auth.globus.org/scopes/6fa3b827-5484-42b9-84db-f00c7a183a6a/ingest", + "base_url": "https://stac-transaction-api.esgf-west.org", +} + +STAC_API = "https://api.stac.esgf-west.org" + VARIABLE_LIMIT = 75 VARIABLE_EXCLUDES = [ "lat_bounds", "lon_bounds", "time_bounds"] diff --git a/src/python/esgcet/stac_client.py b/src/python/esgcet/stac_client.py new file mode 100644 index 00000000..d882bd66 --- /dev/null +++ b/src/python/esgcet/stac_client.py @@ -0,0 +1,96 @@ +import os +import argparse +import json +from globus_sdk import NativeAppAuthClient, RefreshTokenAuthorizer, BaseClient, GroupsClient +from globus_sdk.scopes import GroupsScopes +from globus_sdk.tokenstorage import SimpleJSONFileAdapter +from esgcet.settings import STAC_CLIENT, TOKEN_STORAGE_FILE, STAC_TRANSACTION_API +from esgcet import __version__ +import esgcet.logger as logger + + +log = logger.ESGPubLogger() + + +class TransactionClient: + def __init__(self, stac_api=None, verbose=False, silent=False): + if stac_api: + self.stac_api = stac_api + else: + self.stac_api = STAC_TRANSACTION_API.get("base_url") + self.verbose = verbose + self.silent = silent + self.publog = log.return_logger('STAC Client', silent, verbose) + self.scopes = [ + GroupsScopes.view_my_groups_and_memberships, + STAC_TRANSACTION_API.get("scope_string") + ] + self.auth_client = NativeAppAuthClient( + client_id=STAC_CLIENT.get("client_id"), + app_name="ESGF2 STAC Transaction API" + ) + self._create_clients() + + def _do_login_flow(self): + self.auth_client.oauth2_start_flow( + requested_scopes=self.scopes, + refresh_tokens=True + ) + authorize_url = self.auth_client.oauth2_get_authorize_url() + print("Please go to this URL and login: {0}".format(authorize_url)) + auth_code = input("Please enter the code here: ").strip() + return self.auth_client.oauth2_exchange_code_for_tokens(auth_code) + + def _create_clients(self): + filename = os.path.expanduser(TOKEN_STORAGE_FILE) + token_storage = SimpleJSONFileAdapter(filename) + if not token_storage.file_exists(): + response = self._do_login_flow() + token_storage.store(response) + self.groups_tokens = response.by_resource_server[GroupsClient.resource_server] + self.transaction_tokens = response.by_resource_server[STAC_TRANSACTION_API.get("client_id")] + else: + self.groups_tokens = token_storage.get_token_data(GroupsClient.resource_server) + self.transaction_tokens = token_storage.get_token_data(STAC_TRANSACTION_API.get("client_id")) + + groups_authorizer = RefreshTokenAuthorizer( + self.groups_tokens["refresh_token"], + self.auth_client, + access_token=self.groups_tokens["access_token"], + expires_at=self.groups_tokens["expires_at_seconds"], + on_refresh=token_storage.on_refresh, + ) + self.groups_client = GroupsClient( + authorizer=groups_authorizer + ) + + transaction_authorizer = RefreshTokenAuthorizer( + self.transaction_tokens["refresh_token"], + self.auth_client, + access_token=self.transaction_tokens["access_token"], + expires_at=self.transaction_tokens["expires_at_seconds"], + on_refresh=token_storage.on_refresh, + ) + self.transaction_client = BaseClient( + base_url=self.stac_api, + authorizer=transaction_authorizer + ) + + def get_my_groups(self): + groups = self.groups_client.get_my_groups() + return groups + + def publish(self, entry): + collection = entry.get('collection') + headers = { + "User-Agent": f"esgf_publisher/{__version__}", + } + resp = self.transaction_client.post(f"/collections/{collection}/items", headers=headers, data=entry) + if resp.http_status == 201: + self.publog.info(resp.http_status) + self.publog.info("Published") + elif resp.http_status == 202: + self.publog.info(resp.http_status) + self.publog.info("Queued for publication") + else: + self.publog.error(f"Failed to publish: Error {resp.http_status}") diff --git a/src/python/setup.py b/src/python/setup.py index 4be41fe2..7339d5eb 100755 --- a/src/python/setup.py +++ b/src/python/setup.py @@ -14,7 +14,8 @@ "xarray", "netcdf4", "dask", - "pyyaml" + "pyyaml", + "globus-sdk" ] @@ -40,5 +41,6 @@ 'esgupdate=esgcet.esgupdate:main', 'esgmapconv=esgcet.esgmapconv:main', 'esgmigrate=esgcet.migratecmd:main', - 'esgunpublish=esgcet.esgunpublish:main']} + 'esgunpublish=esgcet.esgunpublish:main', + 'esgstacpub=esgcet.esgstacpub:main']} )