From 58b0a1a722740576a59fffc6477d75519a410427 Mon Sep 17 00:00:00 2001 From: Leo Ghignone Date: Mon, 15 Jul 2024 11:23:41 +1000 Subject: [PATCH 1/6] Pyarrow parquet provider --- pygeoapi/plugin.py | 1 + pygeoapi/provider/parquet.py | 460 +++++++++++++++++++++++++++++++ requirements-provider.txt | 2 + tests/data/random.parquet | Bin 0 -> 9805 bytes tests/data/random_nogeom.parquet | Bin 0 -> 6631 bytes tests/test_parquet_provider.py | 211 ++++++++++++++ 6 files changed, 674 insertions(+) create mode 100644 pygeoapi/provider/parquet.py create mode 100644 tests/data/random.parquet create mode 100644 tests/data/random_nogeom.parquet create mode 100644 tests/test_parquet_provider.py diff --git a/pygeoapi/plugin.py b/pygeoapi/plugin.py index 8e922f2a9..dd327836f 100644 --- a/pygeoapi/plugin.py +++ b/pygeoapi/plugin.py @@ -55,6 +55,7 @@ 'MVT-proxy': 'pygeoapi.provider.mvt_proxy.MVTProxyProvider', # noqa: E501 'OracleDB': 'pygeoapi.provider.oracle.OracleProvider', 'OGR': 'pygeoapi.provider.ogr.OGRProvider', + 'parquet': 'pygeoapi.provider.parquet.parquet.ParquetProvider', 'PostgreSQL': 'pygeoapi.provider.postgresql.PostgreSQLProvider', 'rasterio': 'pygeoapi.provider.rasterio_.RasterioProvider', 'SensorThings': 'pygeoapi.provider.sensorthings.SensorThingsProvider', diff --git a/pygeoapi/provider/parquet.py b/pygeoapi/provider/parquet.py new file mode 100644 index 000000000..962434050 --- /dev/null +++ b/pygeoapi/provider/parquet.py @@ -0,0 +1,460 @@ +# ================================================================= +# +# Authors: Leo Ghignone +# +# Copyright (c) 2024 Leo Ghignone +# +# Permission is hereby granted, free of charge, to any person +# obtaining a copy of this software and associated documentation +# files (the "Software"), to deal in the Software without +# restriction, including without limitation the rights to use, +# copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the +# Software is furnished to do so, subject to the following +# conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES +# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. +# +# ================================================================= + +import json +import logging +import fsspec + +import geopandas as gpd +import pyarrow +import pyarrow.compute as pc +import pyarrow.dataset +import s3fs +from shapely.geometry import box + +from pygeoapi.provider.base import ( + BaseProvider, + ProviderConnectionError, + ProviderGenericError, + ProviderItemNotFoundError, + ProviderQueryError, +) +from itertools import chain +from dateutil.parser import isoparse + +LOGGER = logging.getLogger(__name__) + + +def arrow_to_pandas_type(arrow_type): + pd_type = arrow_type.to_pandas_dtype() + try: + # Needed for specific types such as dtype(' pc.scalar(minx)) + & (pc.field(self.miny) > pc.scalar(miny)) + & (pc.field(self.maxx) < pc.scalar(maxx)) + & (pc.field(self.maxy) < pc.scalar(maxy)) + ) + + if datetime_ is not None: + if self.time_field is None: + msg = ( + 'Dataset does not have a time field, ' + 'querying by datetime is not supported.' + ) + raise ProviderQueryError(msg) + timefield = pc.field(self.time_field) + if '/' in datetime_: + begin, end = datetime_.split('/') + if begin != '..': + begin = isoparse(begin) + filter = filter & (timefield >= begin) + if end != '..': + end = isoparse(end) + filter = filter & (timefield <= end) + else: + target_time = isoparse(datetime_) + filter = filter & (timefield == target_time) + + if properties: + LOGGER.debug('processing properties') + for name, value in properties: + field = self.ds.schema.field(name) + pd_type = arrow_to_pandas_type(field.type) + expr = pc.field(name) == pc.scalar(pd_type(value)) + + filter = filter & expr + + if len(select_properties) == 0: + select_properties = self.ds.schema.names + else: # Load id and geometry together with any specified columns + if self.has_geometry and 'geometry' not in select_properties: + select_properties.append('geometry') + if self.id_field not in select_properties: + select_properties.insert(0, self.id_field) + + if skip_geometry: + select_properties.remove('geometry') + + # Make response based on resulttype specified + if resulttype == 'hits': + LOGGER.debug('hits only specified') + result = self._response_feature_hits(filter) + elif resulttype == 'results': + LOGGER.debug('results specified') + result = self._response_feature_collection( + filter, offset, limit, columns=select_properties + ) + else: + LOGGER.error('Invalid resulttype: %s' % resulttype) + + except RuntimeError as err: + LOGGER.error(err) + raise ProviderQueryError(err) + except ProviderConnectionError as err: + LOGGER.error(err) + raise ProviderConnectionError(err) + except Exception as err: + LOGGER.error(err) + raise ProviderGenericError(err) + + return result + + def get(self, identifier, **kwargs): + """ + Get Feature by id + + :param identifier: feature id + + :returns: feature collection + """ + result = None + try: + LOGGER.debug(f'Fetching identifier {identifier}') + id_type = arrow_to_pandas_type(self.ds.schema.field(self.id_field).type) + batches = self._read_parquet( + filter=(pc.field(self.id_field) == pc.scalar(id_type(identifier))) + ) + + for batch in batches: + if batch.num_rows > 0: + assert ( + batch.num_rows == 1 + ), f'Multiple items found with ID {identifier}' + row = batch.to_pandas() + break + else: + raise ProviderItemNotFoundError(f'ID {identifier} not found') + + if self.has_geometry: + geom = gpd.GeoSeries.from_wkb(row['geometry'], crs=self.crs).to_crs( + 'WGS84' + ) + else: + geom = [None] + gdf = gpd.GeoDataFrame(row, geometry=geom) + LOGGER.debug('results computed') + + # Grab the collection from geopandas geo_interface + result = gdf.__geo_interface__['features'][0] + + except RuntimeError as err: + LOGGER.error(err) + raise ProviderQueryError(err) + except ProviderConnectionError as err: + LOGGER.error(err) + raise ProviderConnectionError(err) + except ProviderItemNotFoundError as err: + LOGGER.error(err) + raise ProviderItemNotFoundError(err) + except Exception as err: + LOGGER.error(err) + raise ProviderGenericError(err) + + return result + + def __repr__(self): + return f' {self.data}' + + def _response_feature_collection(self, filter, offset, limit, columns=None): + """ + Assembles output from query as + GeoJSON FeatureCollection structure. + + :returns: GeoJSON FeatureCollection + """ + + LOGGER.debug(f'offset:{offset}, limit:{limit}') + + try: + batches, scanner = self._read_parquet( + filter=filter, columns=columns, return_scanner=True + ) + + # Discard batches until offset is reached + counted = 0 + for batch in batches: + if counted + batch.num_rows > offset: + # Slice current batch to start from the requested row + batch = batch.slice(offset=offset - counted) + # Build a new generator yielding the current batch + # and all following ones + + batches = chain([batch], batches) + break + else: + counted += batch.num_rows + + # batches is a generator, it will now be either fully spent + # or set to the new generator starting from offset + + # Get the next `limit+1` rows + # The extra row is used to check if a "next" link is needed + # (when numberMatched > offset + limit) + batches_list = [] + read = 0 + + for batch in batches: + read += batch.num_rows + if read > limit: + batches_list.append(batch.slice(0, limit + 1)) + break + else: + batches_list.append(batch) + + # Passing schema from scanner in case no rows are returned + table = pyarrow.Table.from_batches( + batches_list, schema=scanner.projected_schema + ) + + rp = table.to_pandas() + + number_matched = offset + len(rp) + + # Remove the extra row + if len(rp) > limit: + rp = rp.iloc[:-1] + + if 'geometry' not in rp.columns: + # We need a null geometry column to create a GeoDataFrame + rp['geometry'] = None + geom = gpd.GeoSeries.from_wkb(rp['geometry']) + else: + geom = gpd.GeoSeries.from_wkb(rp['geometry'], crs=self.crs).to_crs( + 'WGS84' + ) + + gdf = gpd.GeoDataFrame(rp, geometry=geom) + LOGGER.debug('results computed') + result = gdf.__geo_interface__ + + # Add numberMatched to generate "next" link + result['numberMatched'] = number_matched + + return result + + except RuntimeError as error: + LOGGER.error(error) + raise error + + def _response_feature_hits(self, filter): + """ + Assembles GeoJSON hits from row count + + :returns: GeoJSON FeaturesCollection + """ + + try: + scanner = pyarrow.dataset.Scanner.from_dataset(self.ds, filter=filter) + return { + 'type': 'FeatureCollection', + 'numberMatched': scanner.count_rows(), + 'features': [], + } + except Exception as error: + LOGGER.error(error) + raise error \ No newline at end of file diff --git a/requirements-provider.txt b/requirements-provider.txt index b42210f72..3f5bdb7ac 100644 --- a/requirements-provider.txt +++ b/requirements-provider.txt @@ -6,11 +6,13 @@ elasticsearch-dsl fiona GDAL<=3.8.4 geoalchemy2 +geopandas netCDF4 numpy oracledb pandas psycopg2 +pyarrow pygeofilter[backend-sqlalchemy] pygeoif pygeometa diff --git a/tests/data/random.parquet b/tests/data/random.parquet new file mode 100644 index 0000000000000000000000000000000000000000..f8168f471a07164f73841ff79477b0493551aa33 GIT binary patch literal 9805 zcmcgy2~<-@+rGFU?i-;*K`2&KL`*_}pr8{H2w^8I2|>VYKp+H!070eVR?*gK>)xtW zaj({`ty{I@TCG-V>rz{_qSoD76m36k?f*{h9Yvu%-*?X6=J4d^&hpMX?>l$y^UNVh zsuJ?r@Ng&Im!0Bxhn;v#4~ALl-00Q1)5t!BQLW-}*M_c{V{CEg-5Vy?qsa;Ao1odn zpJoVAY~S2RRfi(6eYe`6~RVZ=_LE-4AcwQhhkH z$57n3-=+TK(zqm6D1X!12Wz_d<4tK357zHbLa~`6uLPEXSpC8 z7O*Z-{%14_{dV%*{yoFclnz~n)P5$%!|iv!d1F@~cEq<*j@&*UxIT593=OIOIx|e} zjdFgTUB^sM!c!B6zBB64Q0({gUTue0RA^;fL4NC`zA(}DuA58K@!6>lqc(iq7LN&W zb9MGgN83MI(EXxc9R654YUZ>@Qrw64XQljfDz0v8KD2zQ0*xLf_K#A;Bh|L88^faH zxU%u{k#F7_j%RGW>2fL94ZE~mng7#c1)e+bn)i*`R%mR@@oC|A1gNp__Ub?9MdL2# zCk*|mDjp3DzjtWj-V|J;^h-Hb9)qy=&2Rl~C1Lk$<^7R4UGafA(aYyohv1tNX6~KN zlc74T+;#S>WOU7ie|YOyZ@jlp&w*cEibo%0-iT?_%MW`g%8WnFKRCjXzbiug)sZ*j za8Vp~|Kv$p&2A~Go6~Lmy3l02?ezM1x}ir(2`(+3CSw?D#PmjUBQlf9CuM{N;t~zvTOYoLu@Bc9&dl>#fSgx)B|9K2~T#h^WlS? z-Gv6--G!#ZjTz8FDZ?{FZtbDoO!h|(*3Jw-I&~FK++mb-c7B+P~M#!z)UUScZP^^%`5u z2JOc0)>j<8JLdCCo9vyyzVE)S9=s0xpl*ya-@Ua(XXCYUJM84PuB+S5uD*?(d|mz* zulfG4S1`O*419Hi3&XU$cZh}`%l>r1NVM;Fo=81UiH<*>W7sl7i^i{fP@nN_4hpvC z%-r_aIgicLZzdF5t0A92&rtMrm1O9gKumdC3NN{;ZoMrgoQMlp3kCW6p z40vrqyDdw$h|#4b*Asi^B%m9COE11vsKFU^eU_h8r(iE*hM?gUHJUnA@OrQOEVOY9 z4l|C{qAyjq|2TAL6xt9wpcv$St3}javDtORZk4MZH`=Ga6gNNUgPh8t69;NNcdu?B$ z0gYVujoaBx3HbP-m}6TzM56ndwk@Yr!T7?xB-a}oMxn-o7vCK6WlvOk>2zt{p-AK{ z7hV0%M~%;Yta7l)jCD4;Pv+ z?3AZ&#U>%;2k9rh0s_&AJhYqtnGWAhKcT$0QiI2BtGcpcS_)1}yyjK?dJdkRHc7DU zAi{ghkMGo6Nkx}$F}1q}Ytf4RGWX9XjKcYwU5}joj{yzt==*fQZ;@!k!O@pauNjFN z>+0w1h)Kia_YBso)9BD`oA>RtC$n(vveQTSw=?mD$OX&KFEZdW#lEXLCL2*$eC}r( zt|FX&!c@>BKMr}H-t=S74+F5}WUG%iJWa)E?wKy7hjP%|4^p4l`e$Kg=9;7n*HX~+ zbyaU4z7UOt&SysdE|TL_i{CridAt!Va=jLQTpohwZhw39!A}O_@s@YJ8m6aVm$#2@ zUwht&U;RoWUVJnSB?qpUetCfo+2?upUHYyV{gJER*==qFh8=zM+7X#}r)lbMQ$I;Z z@6F%;^RR#fJYd3Cq4r2U;%zwJ@mN0<@?V;i=ruhH_4*@UV%n?1pZsV1>38~$K=W?S zuichzKuPPWhc0O_qE?3L*0a}2ab0+Jbi2C-wE2g;xeHZsxW&DLE>3qBnl`d`G%x)Z z_l}+3P4|vsYty~sj;C);ci23dJNOrF|F@e*O-)eJle29Zv^TI-^!IHUeCUy{v(rm$nR}He@e7-eg(W z1pV;jqYdMr9J!8L|HgH&Gfi|*`7_*iy;X7iDqP#oxOE@wFdzM}rAGtw+njxR-#zFz zVPyLbFb)pVT$DmR7VfKyz6R|IzKU6O3;KneUep`>!Pz~Q_5Km=S6mqJBWY*#s_G8) zHH!YF*=OLscI&S)(th_ZD^<+ZkPd7_9Ta~hx>^G?W$Vkl?gYQrW%UTV5+ zNEt$|59*@bZ$rDiTh6@+{^5xS$L+rYSKjdjPlb{SxtL~R4^&i zr@>$>%A`0o1|^hJak>obPZ_3(B}S)dqGb_u}*$^r9?`Q*!)R3FTv) zrh{ITnQ`h4N+?I;bRO8DY>iWV{8%yNZLIbSZK|<*uCn;g#lo)d25=tmVgeUdd|H@JKObDy&Ei z_ZL&9$*R@rP%)*PtZc0=6jScW>R0GONhm917n>xMjdBu-e^^3!DW`4u{4mN;so?q~ zM?%ReCwL+Hl(w?cH|u8!<*%FuhUil!%c)^liImfFx)_2)*)6M(!`DhE&E=#rbfScp z6U`8PN_{!`3?-BVb3z){3#G%Hq=q(>7<1wpmL{dhtjzX2ET&wU)!Wr7F(u8c@Ln7z zrtFzj;90&BN~1X`4kJ@S&53fDB&F7@JkP2VQ;yB*^b5Nslx%Z?9eks-o0aaK^%Baz zIStPjg;6F>)qI-9gOc)<_BL09kR?jaS%IH5T0&Vmr}RT-Nhn|EG(QYYnLDfgVQ5O> zxnlrW?Uc-O2LjNE(t7r2Abg*g@_Y7-Agi;O5`Fftz_VIH89#Sw0I8 zz@8-dU2&06J;0qQz$B?MU{4o3H%h2P;Eov}CsZtO2M&-EDjV3N2TxHL)enDj1`*5* z;5puaF9ig-v&ApwS%&^aLU1P8gV%{81TJ}|;v$o+w2CDKPY1eP>f`%+g6|`d$NxNo z7O4T8Xzp}=Mh%uaw{c?HJ29==w`;?+?(D*}0kj1;1KI&x0PO)C0389H0G$C{09^sy z0NnvS06hV&03N^%@Cv{kzz6gK^agkUJOO#yK{-o zYSfjv?{FULW40Pi_8dd8rM$>m<}=hg)5mP|$@2I1VT*Dc^(IHnSRbpd$OLVy_oQjfOWx?KY_L-PyIeO5F)X`g$z;o38pUjFF_X`Vo6OQNh-3Yfnsm0} zQpXyznDwQ)(kimVN~(0FrNxzG;0jY|nHdrar9we~AVBB?@$lh>Zdw@-6cFfB&A7LQ zK)63~8tYS1YAynM+%rTBG!4=7Cu{8U@|TyoIyQp%U??RkoO`;ck!O!gQZY=CG}G&g z?a(fBurNdvAQ1|M5|O|6U`c2|NT^5^cmBC6`j8&@+wP`h~DpQKG4DJi`BD+wp4jQ8@ zD9(d^P3wb|(Pd_>ItbdRO7ukrbD~*lP8Zl@N^@nNvM8a-Pz5#>K|;NvlGu9QH$7Bf z(nMH@O{FEGLZ=QAuzh08`oKJ}6QPewjSP%8i^cJ&k@iG6@kJm`Oc4hrrYb8FQuE4` z^2ophvse_LT4;}V+~yIZh@u zX9UJrGBm0R1DQ9>L$9!y4MkC8-ZT!C8S0?I3{AYFy|F00${?~-uyImW#uY?X#T6*c zxf+2~CY3^*lgU*{;tN-)lw<>NL`>+^Dvf z5j%8jO`s~spdfWpvr!#Xk|7E$)Ur8-w!%tN)0`BAnsr6#1xA@;4UqMa5B>`>NNifl zdAdpwmzoE$5We8MBmYsh5M>edOA2w0;&T16&rRyA5&00Ge1ovbIw{hY=|xe3go0#8 zK49)PL$oE=7;UM9d61kH=@p^*MtKHjUuZEZq9A{&e6r3Qv5FRyIdWYTW#jsQO|iqC zw#Z`9D^$7q7V27xp|;3gk)cTzC9pASA%-vct4n=t$nlv(;jJi0yPo8)J0wU&V;T1L9(_X;U)!`uwk!oTwS`E$>X#XVOEO><{9fnTs*$Uf0REdORbI`$WP zGrx3dcAc~kM|j2@QiQf8mIklAk{rck8|KIN|l_qsmm7yp!kj>>wdsM2l9QG-@ z(h6%;7W3T47xy1~TtQlSd{s;+*=@8*qzYcR?Pwv=1;eE&!5ZBoE9hpm1dRlU&JcGPSNk^&O(I}04 zvs)P6@R_bnDaA`$Ft%^$(MQd$qzqt?aj|Knd`_B2=9DeWS&7C`U?##U5 zaY&*?9F`3W_hB9G6URDe#i9mMRHa?3M~^-ehfa>Mj>r95`pZjAap=Qa8mA-43FuVF zqP%VC95lfz`${I2L%s+ospB^EwpEBI*6o#Y)u*Yv@w`0uThFBRb@UeRi4nK!Q6T6R!A zFao#kw{JT6JPCi3e{)|f-xpnA>%yM7IAXpzV&UV3NoZlt(!8?rNVIL%fZuJN`k?qZ zmNzo1W3Xfzx2bM7LaP;=+cGl`(JiW@85u(CmN(*?(7r-6XZg12ntL?XA1SX}?7+o} zMoY1JKnmh6oHa0eC=UnrSkiJ|5{&}X=Fe|dJEFA?G2Uj|6fFCs{)x<)i@wPIw59A} z3hvi5G1qbcrw533d&&_=tv+)O8TpH%=WalA9 zJHA*p;F50~{!%t+!8=cAJe2iUF|Q#Nm)Yv-SI_06DPw~DV)*e$w7qUqWDE}%w|+bE z-P_~v{JLiQE1}NV-gb@l=VyGpWcUrwTNT!5s_^(bWA6r_*2#C){q<2S?ssuU#Lp%1 zC}QmW`k8xEaJj%Y<)~SRuxImW-`h#pB~$QlVpf0r-Qw8QOUuG=^Na<1=drk`O2%_q zv@jXnuy;LJH`NpG9Xe?Eu`BWDGx;r{jhip_;1{ZYUV30WlfP>meC*J>ao7-tUB3D~ zt^6ArRV{Yhus%E)Z*SNTw5KT<*&bNHDb|fgCs=D{PYM}|zyI{q-e))k7pyaFeqI-g z&&><{e&^+6G2mYutceMit;>^suo zObzQsN>P;hfvN`#2d%7n4lXaBgoe5FtC+3as~tUbVTC%XR_ed(z3lzXmT{E}wLIVD z2{nsz59IF5+`LR5^Z5nijupo5@BH=_uFQ}1b6aq*y5NvL;9Ij`^UysN)YYabu`|BKhIe$Xo@mNf+z zM;Iuo``!^a?r7$=WfRf9KUjg1j|AxWv&E{d^JQrInnz9Pr?XI~C2L9VCmCqY$qjms zIU00k#)fs`H~sO7eL~wMzAAir*vgOhN}qsy}-J5{(gq370>TZ7S+ zl{XUyXCdA zC!viK=KpYFP&P{RyzBJ2aSXb*Ydf2BG6Pko?i*CPG7B$SOP`EniO`@O=CDML96g@! zo&4tgWHe#O^~YCi6nNE-S9q-_l=yh#7faVxr{KuM$eiAhBK-K=2)`ZCa!i*US>gXA z8ErO=i=4149a;GGxsQ4%@%Z^u9}VV7@wjH*%-TNjC~Z&98~YMfXyW=)&gV8K;N$he zqje6^=%L)S^^7PKH{MTjy0vi>epY)%&?F-_DqXwOgDHo%>CNM)vl3KI~~U8h>EQm4=#$sI{tT@lIhHp1xr380*cgjBc4w#j5y-=9 zKmGVf-|1?!!s*7?|$+?RYufrJ4J5?pI>;@zVXjjPXyv!)6={w?r!u zYvaYNkv6-k4;n@4$hNYd};Y$>!i$U62%TME}d@zFQK{iquE(-?WBls*mwtlLEnxO^vhJOIt)fQItE$;F3)bP;M_Q(V963$f@b*ox^G@Ia z!7p&%|8~*w>oE43fBPZWp}zQO>%bORw&$ko^T9qIE%)@i1@ocG zJq~a#1@&F=Z4PA63P=+yTA4TQ9r|{^6MirtQB5 zBkTCG=fu8~!r>M<=K{+P9kI*1b)hfGN3TGeR+2|lG;Isf3tSVkmiX&2{q!n`SNY7f z1NM=+8gO-%8}Yw6co+@!foiIsp8|dK@g3_5V)yGa>s-nH-+%J@+r+;?hj*ue9kf59 zvM;PF?{O$S4D_hLr$_5Z9X>nZy$SMJ{(*Ev4a6H2opoDA;{3*k&RGzLa@~@P-el~3 zZ5xNlx9Q_blE=LFfAuH+O`5jxQ`iTc7+AE2tnb%6iUN7J&7-4eUF$w!vT1+uurcWjn$F$hhQH6V*P(lK_3#ybzp8uevg`C`i;Wf4%Zjq@)zgOR(bt}` z0c-&~peJAt^a31!-asFqFVGL@4>$q?fPuguzzJXh&cJJc3*ZX40fPZ|U5DAP0 z#sT921kgYfzy)|fG!O&ufmlEQ2!RPe91ssA0Es{nkPL``6d)By1H^y?kOJv|43Gm8 zfeauM$O0w-3P9P{K2imhMn3Acq5G=sm~^qXYF8uK?zwyS@y@VdL2b)6VOx!4t|v>Jng1?A=7ZOy!vcYMn;!aFs{m?y^DDek=i z?tV;c7%6x}!Rx!LyKCG({9^C(5C87>`KQ~1&p(?<=bEvQ2p@ar^I=XDZL8;phQ2v0d%r)FpPd9vF?S<3swmwf`3Jvb+N2iHSj*tVE`GG&}hhyvW zjNNS2Cj5Wiiv0TNx>twvZ<|5jG-S$D+VyB1bD_as?WlvluHT(46WC<;nQ!t+ZM1qFGLIp7?#&Kz#%%ImR`J!y4A;u(20X=>QN69ZHcG~w& z$X3VdiZsfZm${0al9^D3i*wFDWCbWwtRH;m(((;FJqKeX%J~uoX)w@ zoBvzvpyvO39^30oWe~%8!b{rgvQQxrmZVFCAxu3Oqe9?&i84sV#7u1vm#6~uB;Qc8 z0!w^JaB*BNts^zj$;UvMR9LEzgqs-~xdL-MH@H+_3?R=pO~;YvowNX&PGYX42}DgM zt%ZD{wTw_ju8cl&+qOUv#q7O#<^3}iytcAfufS&Up)T<$dk!= literal 0 HcmV?d00001 diff --git a/tests/test_parquet_provider.py b/tests/test_parquet_provider.py new file mode 100644 index 000000000..eaa28a848 --- /dev/null +++ b/tests/test_parquet_provider.py @@ -0,0 +1,211 @@ +# ================================================================= +# +# Authors: Leo Ghignone +# +# Copyright (c) 2024 Leo Ghignone +# +# Permission is hereby granted, free of charge, to any person +# obtaining a copy of this software and associated documentation +# files (the "Software"), to deal in the Software without +# restriction, including without limitation the rights to use, +# copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the +# Software is furnished to do so, subject to the following +# conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES +# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. +# +# ================================================================= + +import pytest + +from pygeoapi.provider.base import ProviderItemNotFoundError +from pygeoapi.provider.parquet import ParquetProvider + +from .util import get_test_file_path + +path = get_test_file_path( + 'data/random.parquet') + +path_nogeom = get_test_file_path( + 'data/random_nogeom.parquet') + + +@pytest.fixture() +def config_parquet(): + return { + 'name': 'Parquet', + 'type': 'feature', + 'data': { + 'source_type': 'Parquet', + 'source': path, + }, + 'id_field': 'id', + 'time_field': 'time', + 'x_field': 'lon', + 'y_field': 'lat', + } + + +@pytest.fixture() +def config_parquet_nogeom_notime(): + return { + 'name': 'ParquetNoGeomNoTime', + 'type': 'feature', + 'data': { + 'source_type': 'Parquet', + 'source': path_nogeom, + }, + 'id_field': 'id' + } + + +def test_get_fields(config_parquet): + """Testing field types""" + + p = ParquetProvider(config_parquet) + results = p.get_fields() + assert results['lat']['type'] == 'number' + assert results['lon']['format'] == 'double' + assert results['time']['format'] == 'date-time' + + +def test_get(config_parquet): + """Testing query for a specific object""" + + p = ParquetProvider(config_parquet) + result = p.get('42') + assert result['id'] == '42' + assert result['properties']['lon'] == 4.947447 + + +def test_get_not_existing_feature_raise_exception( + config_parquet +): + """Testing query for a not existing object""" + p = ParquetProvider(config_parquet) + with pytest.raises(ProviderItemNotFoundError): + p.get(-1) + + +def test_query_hits(config_parquet): + """Testing query on entire collection for hits""" + + p = ParquetProvider(config_parquet) + feature_collection = p.query(resulttype='hits') + assert feature_collection.get('type') == 'FeatureCollection' + features = feature_collection.get('features') + assert len(features) == 0 + hits = feature_collection.get('numberMatched') + assert hits is not None + assert hits == 100 + + +def test_query_bbox_hits(config_parquet): + """Testing query for a valid JSON object with geometry""" + + p = ParquetProvider(config_parquet) + feature_collection = p.query( + bbox=[100, -50, 150, 0], + resulttype='hits') + assert feature_collection.get('type') == 'FeatureCollection' + features = feature_collection.get('features') + assert len(features) == 0 + hits = feature_collection.get('numberMatched') + assert hits is not None + assert hits == 6 + + +def test_query_with_limit(config_parquet): + """Testing query for a valid JSON object with geometry""" + + p = ParquetProvider(config_parquet) + feature_collection = p.query(limit=2, resulttype='results') + assert feature_collection.get('type') == 'FeatureCollection' + features = feature_collection.get('features') + assert len(features) == 2 + hits = feature_collection.get('numberMatched') + assert hits > 2 + feature = features[0] + properties = feature.get('properties') + assert properties is not None + geometry = feature.get('geometry') + assert geometry is not None + + +def test_query_with_offset(config_parquet): + """Testing query for a valid JSON object with geometry""" + + p = ParquetProvider(config_parquet) + feature_collection = p.query(offset=20, limit=10, resulttype='results') + assert feature_collection.get('type') == 'FeatureCollection' + features = feature_collection.get('features') + assert len(features) == 10 + hits = feature_collection.get('numberMatched') + assert hits > 30 + feature = features[0] + properties = feature.get('properties') + assert properties is not None + assert feature['id'] == '21' + assert properties['lat'] == 66.264988 + geometry = feature.get('geometry') + assert geometry is not None + + +def test_query_with_property(config_parquet): + """Testing query for a valid JSON object with property filter""" + + p = ParquetProvider(config_parquet) + feature_collection = p.query( + resulttype='results', + properties=[('lon', -12.855022)]) + assert feature_collection.get('type') == 'FeatureCollection' + features = feature_collection.get('features') + assert len(features) == 1 + for feature in features: + assert feature['properties']['lon'] == -12.855022 + + +def test_query_with_skip_geometry(config_parquet): + """Testing query for a valid JSON object with property filter""" + + p = ParquetProvider(config_parquet) + feature_collection = p.query(skip_geometry=True) + for feature in feature_collection['features']: + assert feature.get('geometry') is None + + +def test_query_with_datetime(config_parquet): + """Testing query for a valid JSON object with time""" + + p = ParquetProvider(config_parquet) + feature_collection = p.query( + datetime_='2022-05-01T00:00:00Z/2022-05-31T23:59:59Z') + assert feature_collection.get('type') == 'FeatureCollection' + features = feature_collection.get('features') + assert len(features) == 7 + for feature in feature_collection['features']: + time = feature['properties'][config_parquet['time_field']] + assert time.year == 2022 + assert time.month == 5 + + +def test_query_nogeom(config_parquet_nogeom_notime): + """Testing query for a valid JSON object without geometry""" + + p = ParquetProvider(config_parquet_nogeom_notime) + feature_collection = p.query(resulttype='results') + assert feature_collection.get('type') == 'FeatureCollection' + assert len(feature_collection.get('features')) > 0 + for feature in feature_collection['features']: + assert feature.get('geometry') is None From a12051ca491cb1c8968af16cb3165db7cef31344 Mon Sep 17 00:00:00 2001 From: Leo Ghignone Date: Mon, 22 Jul 2024 15:21:29 +1000 Subject: [PATCH 2/6] Defer crs management to pygeoapi --- pygeoapi/provider/parquet.py | 21 +++++++-------------- 1 file changed, 7 insertions(+), 14 deletions(-) diff --git a/pygeoapi/provider/parquet.py b/pygeoapi/provider/parquet.py index 962434050..6e573f067 100644 --- a/pygeoapi/provider/parquet.py +++ b/pygeoapi/provider/parquet.py @@ -48,6 +48,8 @@ from itertools import chain from dateutil.parser import isoparse +from pygeoapi.util import crs_transform + LOGGER = logging.getLogger(__name__) @@ -183,6 +185,7 @@ def get_fields(self): return fields + @crs_transform def query( self, offset=0, @@ -225,13 +228,6 @@ def query( msg = 'Dataset does not support bbox filtering' raise ProviderQueryError(msg) - # Convert bbox to data CRS - bbox = ( - gpd.GeoSeries([box(*bbox)], crs='WGS84') - .to_crs(self.crs) - .total_bounds - ) - minx, miny, maxx, maxy = [float(b) for b in bbox] filter = ( (pc.field(self.minx) > pc.scalar(minx)) @@ -304,6 +300,7 @@ def query( return result + @crs_transform def get(self, identifier, **kwargs): """ Get Feature by id @@ -331,9 +328,7 @@ def get(self, identifier, **kwargs): raise ProviderItemNotFoundError(f'ID {identifier} not found') if self.has_geometry: - geom = gpd.GeoSeries.from_wkb(row['geometry'], crs=self.crs).to_crs( - 'WGS84' - ) + geom = gpd.GeoSeries.from_wkb(row['geometry'], crs=self.crs) else: geom = [None] gdf = gpd.GeoDataFrame(row, geometry=geom) @@ -424,9 +419,7 @@ def _response_feature_collection(self, filter, offset, limit, columns=None): rp['geometry'] = None geom = gpd.GeoSeries.from_wkb(rp['geometry']) else: - geom = gpd.GeoSeries.from_wkb(rp['geometry'], crs=self.crs).to_crs( - 'WGS84' - ) + geom = gpd.GeoSeries.from_wkb(rp['geometry'], crs=self.crs) gdf = gpd.GeoDataFrame(rp, geometry=geom) LOGGER.debug('results computed') @@ -457,4 +450,4 @@ def _response_feature_hits(self, filter): } except Exception as error: LOGGER.error(error) - raise error \ No newline at end of file + raise error From b2fc685dc22cb650dcdb856378847629729962b5 Mon Sep 17 00:00:00 2001 From: Leo Ghignone Date: Mon, 22 Jul 2024 15:40:08 +1000 Subject: [PATCH 3/6] Add parquet provider docs --- .../data-publishing/ogcapi-features.rst | 35 +++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/docs/source/data-publishing/ogcapi-features.rst b/docs/source/data-publishing/ogcapi-features.rst index a8e2c9a55..a375baefa 100644 --- a/docs/source/data-publishing/ogcapi-features.rst +++ b/docs/source/data-publishing/ogcapi-features.rst @@ -27,6 +27,7 @@ parameters. `MongoDB`_,✅/❌,results,✅,✅,✅,✅,❌,❌,✅ `OGR`_,✅/❌,results/hits,✅,❌,❌,✅,❌,❌,✅ `Oracle`_,✅/✅,results/hits,✅,❌,✅,✅,❌,❌,✅ + `Parquet`_,✅/✅,results/hits,✅,✅,❌,✅,❌,❌,✅ `PostgreSQL`_,✅/✅,results/hits,✅,✅,✅,✅,✅,❌,✅ `SQLiteGPKG`_,✅/❌,results/hits,✅,❌,❌,✅,❌,❌,✅ `SensorThings API`_,✅/✅,results/hits,✅,✅,✅,✅,❌,❌,✅ @@ -410,6 +411,40 @@ useful e.g. for authorization at row level or manipulation of the explain plan w An example an more information about that feature you can find in the test class in tests/test_oracle_provider.py. +.. _Parquet: + +Parquet +^^^^^^^ + +.. note:: + Requires Python package pyarrow + +To publish a GeoParquet file (with a geometry column) the `geopandas` package is also required. + +.. note:: + Reading data directly from a public s3 bucket is also supported. + +.. code-block:: yaml + + providers: + - type: feature + name: Parquet + data: + source: ./tests/data/parquet/random.parquet + id_field: id + time_field: time + x_field: + - minlon + - maxlon + y_field: + - minlat + - maxlat + +For GeoParquet data, the `x_field` and `y_field` must be specified in the provider definition, +and they must be arrays of two column names that contain the x and y coordinates of the +bounding box of each geometry. If the geometries in the data are all points, the `x_field` and `y_field` +can be strings instead of arrays and refer to a single column each. + .. _PostgreSQL: PostgreSQL From 3a3b52b480bb4a2e308eee2423183ed47e328686 Mon Sep 17 00:00:00 2001 From: Leo Ghignone Date: Mon, 22 Jul 2024 15:59:47 +1000 Subject: [PATCH 4/6] Fix flake8 errors --- pygeoapi/provider/parquet.py | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/pygeoapi/provider/parquet.py b/pygeoapi/provider/parquet.py index 6e573f067..b5843df81 100644 --- a/pygeoapi/provider/parquet.py +++ b/pygeoapi/provider/parquet.py @@ -29,14 +29,12 @@ import json import logging -import fsspec import geopandas as gpd import pyarrow import pyarrow.compute as pc import pyarrow.dataset import s3fs -from shapely.geometry import box from pygeoapi.provider.base import ( BaseProvider, @@ -88,7 +86,8 @@ def __init__(self, provider_def): # Source url is required self.source = self.data.get('source') if not self.source: - msg = "Need explicit 'source' attr " "in data field of provider config" + msg = "Need explicit 'source' attr " \ + "in data field of provider config" LOGGER.error(msg) raise Exception(msg) @@ -126,8 +125,9 @@ def __init__(self, provider_def): # Get the CRS of the data geo_metadata = json.loads(self.ds.schema.metadata[b'geo']) geom_column = geo_metadata['primary_column'] - # if the CRS is not set, default to EPSG:4326 as per geoparquet spec - self.crs = geo_metadata['columns'][geom_column]['crs'] or 'EPSG:4326' + # if the CRS is not set default to EPSG:4326, per geoparquet spec + self.crs = geo_metadata['columns'][geom_column]['crs'] \ + or 'EPSG:4326' def _read_parquet(self, return_scanner=False, **kwargs): """ @@ -151,7 +151,8 @@ def get_fields(self): fields = dict() - for field_name, field_type in zip(self.ds.schema.names, self.ds.schema.types): + for field_name, field_type in zip(self.ds.schema.names, + self.ds.schema.types): # Geometry is managed as a special case by pygeoapi if field_name == 'geometry': continue @@ -312,9 +313,12 @@ def get(self, identifier, **kwargs): result = None try: LOGGER.debug(f'Fetching identifier {identifier}') - id_type = arrow_to_pandas_type(self.ds.schema.field(self.id_field).type) + id_type = arrow_to_pandas_type( + self.ds.schema.field(self.id_field).type) batches = self._read_parquet( - filter=(pc.field(self.id_field) == pc.scalar(id_type(identifier))) + filter=( + pc.field(self.id_field) == pc.scalar(id_type(identifier)) + ) ) for batch in batches: @@ -355,7 +359,8 @@ def get(self, identifier, **kwargs): def __repr__(self): return f' {self.data}' - def _response_feature_collection(self, filter, offset, limit, columns=None): + def _response_feature_collection(self, filter, offset, limit, + columns=None): """ Assembles output from query as GeoJSON FeatureCollection structure. @@ -442,7 +447,8 @@ def _response_feature_hits(self, filter): """ try: - scanner = pyarrow.dataset.Scanner.from_dataset(self.ds, filter=filter) + scanner = pyarrow.dataset.Scanner.from_dataset(self.ds, + filter=filter) return { 'type': 'FeatureCollection', 'numberMatched': scanner.count_rows(), From df1dac38c219b0eb54a7fa580e8970a8016584b8 Mon Sep 17 00:00:00 2001 From: Leo Ghignone Date: Mon, 19 Aug 2024 10:41:42 +1000 Subject: [PATCH 5/6] Remove extra .parquet --- pygeoapi/plugin.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pygeoapi/plugin.py b/pygeoapi/plugin.py index dd327836f..439c06929 100644 --- a/pygeoapi/plugin.py +++ b/pygeoapi/plugin.py @@ -55,7 +55,7 @@ 'MVT-proxy': 'pygeoapi.provider.mvt_proxy.MVTProxyProvider', # noqa: E501 'OracleDB': 'pygeoapi.provider.oracle.OracleProvider', 'OGR': 'pygeoapi.provider.ogr.OGRProvider', - 'parquet': 'pygeoapi.provider.parquet.parquet.ParquetProvider', + 'parquet': 'pygeoapi.provider.parquet.ParquetProvider', 'PostgreSQL': 'pygeoapi.provider.postgresql.PostgreSQLProvider', 'rasterio': 'pygeoapi.provider.rasterio_.RasterioProvider', 'SensorThings': 'pygeoapi.provider.sensorthings.SensorThingsProvider', From 9c7641f2794afe2141e36915f9bb944000508d97 Mon Sep 17 00:00:00 2001 From: Leo Ghignone Date: Mon, 19 Aug 2024 10:59:46 +1000 Subject: [PATCH 6/6] Address reviews --- .../data-publishing/ogcapi-features.rst | 2 +- pygeoapi/plugin.py | 2 +- pygeoapi/provider/parquet.py | 23 +++++++++---------- 3 files changed, 13 insertions(+), 14 deletions(-) diff --git a/docs/source/data-publishing/ogcapi-features.rst b/docs/source/data-publishing/ogcapi-features.rst index a375baefa..cefa61ff4 100644 --- a/docs/source/data-publishing/ogcapi-features.rst +++ b/docs/source/data-publishing/ogcapi-features.rst @@ -419,7 +419,7 @@ Parquet .. note:: Requires Python package pyarrow -To publish a GeoParquet file (with a geometry column) the `geopandas` package is also required. +To publish a GeoParquet file (with a geometry column) the geopandas package is also required. .. note:: Reading data directly from a public s3 bucket is also supported. diff --git a/pygeoapi/plugin.py b/pygeoapi/plugin.py index 439c06929..73d0374be 100644 --- a/pygeoapi/plugin.py +++ b/pygeoapi/plugin.py @@ -55,7 +55,7 @@ 'MVT-proxy': 'pygeoapi.provider.mvt_proxy.MVTProxyProvider', # noqa: E501 'OracleDB': 'pygeoapi.provider.oracle.OracleProvider', 'OGR': 'pygeoapi.provider.ogr.OGRProvider', - 'parquet': 'pygeoapi.provider.parquet.ParquetProvider', + 'Parquet': 'pygeoapi.provider.parquet.ParquetProvider', 'PostgreSQL': 'pygeoapi.provider.postgresql.PostgreSQLProvider', 'rasterio': 'pygeoapi.provider.rasterio_.RasterioProvider', 'SensorThings': 'pygeoapi.provider.sensorthings.SensorThingsProvider', diff --git a/pygeoapi/provider/parquet.py b/pygeoapi/provider/parquet.py index b5843df81..70ec81e80 100644 --- a/pygeoapi/provider/parquet.py +++ b/pygeoapi/provider/parquet.py @@ -27,9 +27,11 @@ # # ================================================================= +from itertools import chain import json import logging +from dateutil.parser import isoparse import geopandas as gpd import pyarrow import pyarrow.compute as pc @@ -43,9 +45,6 @@ ProviderItemNotFoundError, ProviderQueryError, ) -from itertools import chain -from dateutil.parser import isoparse - from pygeoapi.util import crs_transform LOGGER = logging.getLogger(__name__) @@ -105,7 +104,7 @@ def __init__(self, provider_def): self.fields = self.get_fields() # Must be set to visualise queryables # Column names for bounding box data. - if self.x_field is None or self.y_field is None: + if None in [self.x_field, self.y_field]: self.has_geometry = False else: self.has_geometry = True @@ -126,8 +125,8 @@ def __init__(self, provider_def): geo_metadata = json.loads(self.ds.schema.metadata[b'geo']) geom_column = geo_metadata['primary_column'] # if the CRS is not set default to EPSG:4326, per geoparquet spec - self.crs = geo_metadata['columns'][geom_column]['crs'] \ - or 'EPSG:4326' + self.crs = (geo_metadata['columns'][geom_column]['crs'] + or 'EPSG:4326') def _read_parquet(self, return_scanner=False, **kwargs): """ @@ -152,7 +151,7 @@ def get_fields(self): fields = dict() for field_name, field_type in zip(self.ds.schema.names, - self.ds.schema.types): + self.ds.schema.types): # Geometry is managed as a special case by pygeoapi if field_name == 'geometry': continue @@ -160,7 +159,7 @@ def get_fields(self): field_type = str(field_type) converted_type = None converted_format = None - if field_type.startswith('int') or field_type.startswith('uint'): + if field_type.startswith(('int', 'uint')): converted_type = 'integer' converted_format = field_type elif field_type == 'double' or field_type.startswith('float'): @@ -287,7 +286,7 @@ def query( filter, offset, limit, columns=select_properties ) else: - LOGGER.error('Invalid resulttype: %s' % resulttype) + LOGGER.error(f'Invalid resulttype: {resulttype}') except RuntimeError as err: LOGGER.error(err) @@ -308,7 +307,7 @@ def get(self, identifier, **kwargs): :param identifier: feature id - :returns: feature collection + :returns: a single feature """ result = None try: @@ -360,7 +359,7 @@ def __repr__(self): return f' {self.data}' def _response_feature_collection(self, filter, offset, limit, - columns=None): + columns=None): """ Assembles output from query as GeoJSON FeatureCollection structure. @@ -447,7 +446,7 @@ def _response_feature_hits(self, filter): """ try: - scanner = pyarrow.dataset.Scanner.from_dataset(self.ds, + scanner = pyarrow.dataset.Scanner.from_dataset(self.ds, filter=filter) return { 'type': 'FeatureCollection',