diff --git a/pygeoapi/provider/sensorthings.py b/pygeoapi/provider/sensorthings.py index b888a60f7..defd48c43 100644 --- a/pygeoapi/provider/sensorthings.py +++ b/pygeoapi/provider/sensorthings.py @@ -30,14 +30,14 @@ # ================================================================= from json.decoder import JSONDecodeError -import os import logging from requests import Session +from pygeoapi.config import get_config from pygeoapi.provider.base import ( BaseProvider, ProviderQueryError, ProviderConnectionError) from pygeoapi.util import ( - yaml_load, url_join, get_provider_default, crs_transform, get_base_url) + url_join, get_provider_default, crs_transform, get_base_url) LOGGER = logging.getLogger(__name__) @@ -51,10 +51,10 @@ _EXPAND = { 'Things': 'Locations,Datastreams', 'Observations': 'Datastream,FeatureOfInterest', + 'ObservedProperties': 'Datastreams/Thing/Locations', 'Datastreams': """ Sensor ,ObservedProperty - ,Thing ,Thing/Locations ,Observations( $select=@iot.id; @@ -71,6 +71,7 @@ class SensorThingsProvider(BaseProvider): """SensorThings API (STA) Provider""" + expand = EXPAND def __init__(self, provider_def): """ @@ -82,68 +83,11 @@ def __init__(self, provider_def): :returns: pygeoapi.provider.sensorthings.SensorThingsProvider """ LOGGER.debug('Setting SensorThings API (STA) provider') - + self.linked_entity = {} super().__init__(provider_def) - self.data.rstrip('/') - try: - self.entity = provider_def['entity'] - self._url = url_join(self.data, self.entity) - except KeyError: - LOGGER.debug('Attempting to parse Entity from provider data') - if not self._get_entity(self.data): - raise RuntimeError('Entity type required') - self.entity = self._get_entity(self.data) - self._url = self.data - self.data = self._url.rstrip(f'/{self.entity}') - LOGGER.debug(f'STA endpoint: {self.data}, Entity: {self.entity}') - - # Default id - if self.id_field: - LOGGER.debug(f'Using id field: {self.id_field}') - else: - LOGGER.debug('Using default @iot.id for id field') - self.id_field = '@iot.id' - - # Create intra-links - self.links = {} - self.intralink = provider_def.get('intralink', False) - if self.intralink and provider_def.get('rel_link'): - # For pytest - self.rel_link = provider_def['rel_link'] - - elif self.intralink: - # Read from pygeoapi config - with open(os.getenv('PYGEOAPI_CONFIG'), encoding='utf8') as fh: - CONFIG = yaml_load(fh) - self.rel_link = get_base_url(CONFIG) - for (name, rs) in CONFIG['resources'].items(): - pvs = rs.get('providers') - - if pvs is None: - LOGGER.debug(f'Skipping collection: {name}') - continue - - p = get_provider_default(pvs) - e = p.get('entity') or self._get_entity(p['data']) - if any([ - not pvs, # No providers in resource - not p.get('intralink'), # No configuration for intralinks - not e, # No STA entity found - self.data not in p.get('data') # No common STA endpoint - ]): - continue - - if p.get('uri_field'): - LOGGER.debug(f'Linking {e} with field: {p["uri_field"]}') - else: - LOGGER.debug(f'Linking {e} with collection: {name}') - - self.links[e] = { - 'cnm': name, # OAPI collection name, - 'cid': p.get('id_field', '@iot.id'), # OAPI id_field - 'uri': p.get('uri_field') # STA uri_field - } + self._generate_mappings(provider_def) + LOGGER.debug(f'STA endpoint: {self.data}, Entity: {self.entity}') # Start session self.http = Session() @@ -277,17 +221,19 @@ def _load(self, offset=0, limit=10, resulttype='results', return fc - def _make_feature(self, entity, select_properties=[], skip_geometry=False): + def _make_feature(self, feature, select_properties=[], skip_geometry=False, + entity=None): """ Private function: Create feature from entity - :param entity: `dict` of STA entity + :param feature: `dict` of STA entity :param select_properties: list of property names :param skip_geometry: bool of whether to skip geometry (default False) + :param entity: SensorThings entity name :returns: dict of GeoJSON Feature """ - _ = entity.pop(self.id_field) + _ = feature.pop(self.id_field) id = f"'{_}'" if isinstance(_, str) else str(_) f = { 'type': 'Feature', 'id': id, 'properties': {}, 'geometry': None @@ -295,28 +241,35 @@ def _make_feature(self, entity, select_properties=[], skip_geometry=False): # Make geometry if not skip_geometry: - f['geometry'] = self._geometry(entity) + f['geometry'] = self._geometry(feature, entity) # Fill properties block try: f['properties'] = self._expand_properties( - entity, select_properties) + feature, select_properties, entity) except KeyError as err: LOGGER.error(err) raise ProviderQueryError(err) return f - def _get_response(self, url, params={}): + def _get_response(self, url, params={}, entity=None, expand=None): """ Private function: Get STA response :param url: request url :param params: query parameters + :param entity: SensorThings entity name + :param expand: SensorThings expand query + :returns: STA response """ - params.update({'$expand': EXPAND[self.entity]}) + if expand: + params.update({'$expand': expand}) + else: + entity_ = entity or self.entity + params.update({'$expand': self.expand[entity_]}) r = self.http.get(url, params=params) @@ -332,13 +285,15 @@ def _get_response(self, url, params={}): return response - def _make_filter(self, properties, bbox=[], datetime_=None): + def _make_filter(self, properties, bbox=[], datetime_=None, + entity=None): """ Private function: Make STA filter from query properties :param properties: list of tuples (name, value) :param bbox: bounding box [minx,miny,maxx,maxy] :param datetime_: temporal (datestamp or extent) + :param entity: SensorThings entity name :returns: STA $filter string of properties """ @@ -350,16 +305,8 @@ def _make_filter(self, properties, bbox=[], datetime_=None): ret.append(f'{name} eq {value}') if bbox: - minx, miny, maxx, maxy = bbox - bbox_ = f'POLYGON (({minx} {miny}, {maxx} {miny}, \ - {maxx} {maxy}, {minx} {maxy}, {minx} {miny}))' - if self.entity == 'Things': - loc = 'Locations/location' - elif self.entity == 'Datastreams': - loc = 'Thing/Locations/location' - elif self.entity == 'Observations': - loc = 'FeatureOfInterest/feature' - ret.append(f"st_within({loc}, geography'{bbox_}')") + entity_ = entity or self.entity + ret.append(self._make_bbox(bbox, entity_)) if datetime_ is not None: if self.time_field is None: @@ -378,6 +325,20 @@ def _make_filter(self, properties, bbox=[], datetime_=None): return ' and '.join(ret) + @staticmethod + def _make_bbox(bbox, entity): + minx, miny, maxx, maxy = bbox + bbox_ = f'POLYGON(({minx} {miny},{maxx} {miny},{maxx} {maxy},{minx} {maxy},{minx} {miny}))' # noqa + if entity == 'Things': + loc = 'Locations/location' + elif entity == 'Datastreams': + loc = 'Thing/Locations/location' + elif entity == 'Observations': + loc = 'FeatureOfInterest/feature' + elif entity == 'ObservedProperties': + loc = 'Datastreams/observedArea' + return f"st_within({loc},geography'{bbox_}')" + def _make_orderby(self, sortby): """ Private function: Make STA filter from query properties @@ -398,79 +359,85 @@ def _make_orderby(self, sortby): return ','.join(ret) - def _geometry(self, entity): + def _geometry(self, feature, entity=None): """ Private function: Retrieve STA geometry - :param entity: SensorThings entity + :param feature: SensorThings entity + :param entity: SensorThings entity name :returns: GeoJSON Geometry for feature """ + entity_ = entity or self.entity try: - if self.entity == 'Things': - return entity['Locations'][0]['location'] + if entity_ == 'Things': + return feature['Locations'][0]['location'] - elif self.entity == 'Observations': - return entity['FeatureOfInterest'].pop('feature') + elif entity_ == 'Observations': + return feature['FeatureOfInterest'].pop('feature') - elif self.entity == 'Datastreams': + elif entity_ == 'Datastreams': try: - return entity['Observations'][0]['FeatureOfInterest'].pop('feature') # noqa + return feature['Observations'][0]['FeatureOfInterest'].pop('feature') # noqa except (KeyError, IndexError): - return entity['Thing'].pop('Locations')[0]['location'] + return feature['Thing'].pop('Locations')[0]['location'] + + elif entity_ == 'ObservedProperties': + return feature['Datastreams'][0]['Thing']['Locations'][0]['location'] # noqa except (KeyError, IndexError): LOGGER.warning('No geometry found') return None - def _expand_properties(self, entity, keys=(), uri=''): + def _expand_properties(self, feature, keys=(), uri='', + entity=None): """ Private function: Parse STA entity into feature - :param entity: SensorThings entity + :param feature: `dict` of SensorThings entity :param keys: keys used in properties block :param uri: uri of STA entity + :param entity: SensorThings entity name :returns: dict of SensorThings feature properties """ - LOGGER.debug('Adding extra properties') - # Properties filter & display keys = (() if not self.properties and not keys else set(self.properties) | set(keys)) - if self.entity == 'Things': - self._expand_location(entity) - elif 'Thing' in entity.keys(): - self._expand_location(entity['Thing']) + entity = entity or self.entity + if entity == 'Things': + self._expand_location(feature) + elif 'Thing' in feature.keys(): + self._expand_location(feature['Thing']) # Retain URI if present - if entity.get('properties') and self.uri_field: - uri = entity['properties'] + if feature.get('properties') and self.uri_field: + uri = feature['properties'] # Create intra links - LOGGER.debug('Creating intralinks') - for k, v in entity.items(): - if k in self.links: - entity[k] = [self._get_uri(_v, **self.links[k]) for _v in v] + for k, v in feature.items(): + if k in self.linked_entity: + feature[k] = [self._get_uri(_v, **self.linked_entity[k]) + for _v in v] LOGGER.debug(f'Created link for {k}') - elif f'{k}s' in self.links: - entity[k] = self._get_uri(v, **self.links[f'{k}s']) + elif f'{k}s' in self.linked_entity: + feature[k] = \ + self._get_uri(v, **self.linked_entity[f'{k}s']) LOGGER.debug(f'Created link for {k}') # Make properties block - LOGGER.debug('Making properties block') - if entity.get('properties'): - entity.update(entity.pop('properties')) + if feature.get('properties'): + feature.update(feature.pop('properties')) if keys: - ret = {k: entity.pop(k) for k in keys} - entity = ret + ret = {k: feature.pop(k) for k in keys} + feature = ret if self.uri_field is not None and uri != '': - entity[self.uri_field] = uri + feature[self.uri_field] = uri - return entity + return feature @staticmethod def _expand_location(entity): @@ -522,5 +489,68 @@ def _get_entity(uri): else: return '' + def _generate_mappings(self, provider_def: dict): + """ + Generate mappings for the STA entity and set up intra-links. + + This function sets up the necessary mappings and configurations for + the STA entity based on the provided provider definition. + + :param provider_def: `dict` of provider definition containing + configuration details for the STA entity. + """ + self.data.rstrip('/') + try: + self.entity = provider_def['entity'] + self._url = url_join(self.data, self.entity) + except KeyError: + LOGGER.debug('Attempting to parse Entity from provider data') + if not self._get_entity(self.data): + raise RuntimeError('Entity type required') + self.entity = self._get_entity(self.data) + self._url = self.data + self.data = self._url.rstrip(f'/{self.entity}') + + # Default id + if self.id_field: + LOGGER.debug(f'Using id field: {self.id_field}') + else: + LOGGER.debug('Using default @iot.id for id field') + self.id_field = '@iot.id' + + # Create intra-links + self.intralink = provider_def.get('intralink', False) + if self.intralink and provider_def.get('rel_link'): + # For pytest + self.rel_link = provider_def['rel_link'] + + elif self.intralink: + # Read from pygeoapi config + CONFIG = get_config() + self.rel_link = get_base_url(CONFIG) + + for name, rs in CONFIG['resources'].items(): + pvs = rs.get('providers') + p = get_provider_default(pvs) + e = p.get('entity') or self._get_entity(p['data']) + if any([ + not pvs, # No providers in resource + not p.get('intralink'), # No configuration for intralinks + not e, # No STA entity found + self.data not in p.get('data') # No common STA endpoint + ]): + continue + + if p.get('uri_field'): + LOGGER.debug(f'Linking {e} with field: {p["uri_field"]}') + else: + LOGGER.debug(f'Linking {e} with collection: {name}') + + self.linked_entity[e] = { + 'cnm': name, # OAPI collection name, + 'cid': p.get('id_field', '@iot.id'), # OAPI id_field + 'uri': p.get('uri_field') # STA uri_field + } + def __repr__(self): return f' {self.data}, {self.entity}'