Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add UNPP data source #21

Open
wants to merge 17 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ services:
DATABASE_URL: postgis://postgres:@db:5432/etools_datamart
DATABASE_URL_ETOOLS: postgis://postgres:@db-etools:5432/etools
DATABASE_URL_PRP: postgis://postgres:@db-prp:5432/prp
DATABASE_URL_UNPP: postgis://postgres:@db-unpp:5432/unpp
AUTOCREATE_USERS: "admin,123"
BASE_IMAGE: "compose"
CACHE_URL: "redis://redis:6379/1"
Expand Down Expand Up @@ -61,7 +62,7 @@ services:
volumes:
- "$PWD/build/db:/var/lib/postgresql/data"

# Rely on etools and prp database instances running locally
# Rely on etools, prp, and unpp database instances running locally
db-etools:
image: mdillon/postgis:9.6
container_name: datamart_etools
Expand All @@ -84,6 +85,17 @@ services:
volumes:
- "$PWD/build/prp:/var/lib/postgresql/data"

db-unpp:
image: mdillon/postgis:9.6
shm_size: '1gb'
container_name: datamart_unpp
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD:
POSTGRES_DB: unpp
volumes:
- "$PWD/build/unpp:/var/lib/postgresql/data"

redis:
image: redis:alpine
container_name: datamart_redis
1 change: 0 additions & 1 deletion docker/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -174,4 +174,3 @@ shell:

docker-remove:
docker-remove.sh ${IMAGE_NAME} -o ${ORGANIZATION}

1 change: 1 addition & 0 deletions src/etools_datamart/api/endpoints/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@
from .prp import * # noqa
from .system import * # noqa
from .unicef import * # noqa
from .unpp import * # noqa
1 change: 1 addition & 0 deletions src/etools_datamart/api/endpoints/datamart/location.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def get_geonameid(self, obj):
class LocationRamSerializer(serializers.ModelSerializer):
geonameid = serializers.SerializerMethodField()
admin_level = serializers.SerializerMethodField()

class Meta:
model = models.Location
fields = ('id', 'source_id', 'name', 'latitude', 'longitude', 'parent',
Expand Down
1 change: 1 addition & 0 deletions src/etools_datamart/api/endpoints/unpp/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .application import ApplicationViewSet
25 changes: 25 additions & 0 deletions src/etools_datamart/api/endpoints/unpp/application.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from unicef_rest_framework.ds import DynamicSerializerFilter
from unicef_rest_framework.ordering import OrderingFilter

from etools_datamart.api.endpoints.common import DataMartViewSet
from etools_datamart.api.endpoints.datamart.serializers import DataMartSerializer
from etools_datamart.api.filtering import DatamartQueryStringFilterBackend
from etools_datamart.apps.mart.unpp import models


class ApplicationSerializer(DataMartSerializer):
class Meta(DataMartSerializer.Meta):
model = models.Application
exclude = None
fields = '__all__'


class ApplicationViewSet(DataMartViewSet):
serializer_class = ApplicationSerializer
queryset = models.Application.objects.all()
serializers_fieldsets = {'std': ApplicationSerializer, }
filter_backends = [
DatamartQueryStringFilterBackend,
OrderingFilter,
DynamicSerializerFilter,
]
6 changes: 5 additions & 1 deletion src/etools_datamart/api/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,13 @@ class ReadOnlyRouter(APIReadOnlyRouter):

router.register(r'system/monitor', endpoints.MonitorViewSet)

from etools_datamart.apps.sources.source_prp import api_urls # noqa isort:skip
router.register(r'unpp/application', endpoints.ApplicationViewSet)

import etools_datamart.apps.sources.source_prp.api_urls # noqa isort:skip
from etools_datamart.apps.sources.source_prp.backward_api_urls import backward_compatible_router # noqa isort:skip

import etools_datamart.apps.sources.unpp.api_urls # noqa isort:skip

from .endpoints.rapidpro import _urls_ # noqa isort:skip

urlpatterns = [
Expand Down
119 changes: 117 additions & 2 deletions src/etools_datamart/apps/etl/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from django.conf import settings
from django.contrib.contenttypes.models import ContentType
from django.core.cache import caches
from django.db import transaction
from django.utils import timezone
from django.utils.functional import cached_property

Expand All @@ -15,9 +16,14 @@
from sentry_sdk import capture_exception
from strategy_field.utils import fqn, get_attr

from etools_datamart.apps.etl.exceptions import (LoaderException, MaxRecordsException,
RequiredIsMissing, RequiredIsRunning,)
from etools_datamart.apps.etl.exceptions import (
LoaderException,
MaxRecordsException,
RequiredIsMissing,
RequiredIsRunning,
)
from etools_datamart.celery import app
from etools_datamart.sentry import process_exception

loadeables = set()
locks = caches['lock']
Expand Down Expand Up @@ -457,3 +463,112 @@ def load(self, *, verbosity=0, stdout=None, ignore_dependencies=False, max_recor

def consistency_check(self):
pass


class CommonLoader(BaseLoader):
IGNORED_FIELDS = ['source_id', 'id', 'last_modify_date']

def get_queryset(self):
if self.config.queryset:
ret = self.config.queryset()
elif self.config.source:
ret = self.config.source.objects.all()
else: # pragma: no cover
raise ValueError(
"Option must define 'queryset' or 'source' attribute"
)
return ret

def filter_queryset(self, qs):
use_delta = self.context['only_delta'] and not self.context['is_empty']
if self.config.filters:
qs = qs.filter(**self.config.filters)
if use_delta and (self.config.last_modify_field and self.last_run):
logger.debug(f"Loader {self}: use deltas")
qs = qs.filter(
**{f"{self.config.last_modify_field}__gte": self.last_run}
)
return qs

def load(
self,
*,
verbosity=0,
stdout=None,
ignore_dependencies=False,
max_records=None,
only_delta=True,
run_type=RUN_UNKNOWN,
**kwargs,
):
self.on_start(run_type)
self.results = EtlResult()
logger.debug(f"Running loader {self}")
lock = self.lock()
truncate = self.config.truncate
try:
if lock: # pragma: no branch
if not ignore_dependencies:
for requirement in self.config.depends:
if requirement.loader.is_running():
raise RequiredIsRunning(requirement)
if requirement.loader.need_refresh(self):
raise RequiredIsMissing(requirement)
else:
logger.info(f"Loader {requirement} is uptodate")
self.mapping = {}
mart_fields = self.model._meta.concrete_fields
for field in mart_fields:
if field.name not in self.IGNORED_FIELDS:
self.mapping[field.name] = field.name
if self.config.mapping: # pragma: no branch
self.mapping.update(self.config.mapping)
self.update_context(today=timezone.now(),
max_records=max_records,
verbosity=verbosity,
records=0,
only_delta=only_delta,
is_empty=not self.model.objects.exists(),
stdout=stdout)
sid = transaction.savepoint()
try:
self.results.context = self.context
self.fields_to_compare = [
f for f in self.mapping.keys() if f not in ["seen"]
]
if truncate:
self.model.objects.truncate()
qs = self.filter_queryset(self.get_queryset())
for record in qs.all():
filters = self.config.key(self, record)
values = self.get_values(record)
op = self.process_record(filters, values)
self.increment_counter(op)

if stdout and verbosity > 0:
stdout.write("\n")
except MaxRecordsException:
pass
except Exception:
transaction.savepoint_rollback(sid)
raise
else:
logger.info(f"Unable to get lock for {self}")

except (RequiredIsMissing, RequiredIsRunning) as e:
self.on_end(error=e, retry=True)
raise
except BaseException as e:
self.on_end(e)
process_exception(e)
raise
else:
self.on_end(None)
finally:
if lock: # pragma: no branch
try:
lock.release()
except LockError as e: # pragma: no cover
logger.warning(e)

return self.results
129 changes: 18 additions & 111 deletions src/etools_datamart/apps/mart/prp/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,124 +48,31 @@
from ast import literal_eval

from django.contrib.postgres.fields import JSONField
from django.db import models, transaction
from django.db import models
from django.db.models import Q
from django.utils import timezone

from redis.exceptions import LockError
from strategy_field.utils import get_attr

from etools_datamart.apps.etl.exceptions import MaxRecordsException, RequiredIsMissing, RequiredIsRunning
from etools_datamart.apps.etl.loader import BaseLoader, EtlResult, logger, RUN_UNKNOWN
from etools_datamart.apps.sources.source_prp.models import (CoreCountry, CoreGatewaytype, IndicatorDisaggregationvalue,
IndicatorIndicatorlocationdata, IndicatorIndicatorreport,
IndicatorReportable, IndicatorReportablelocationgoal,
UnicefLowerleveloutput, UnicefPdresultlink,
UnicefProgrammedocument, UnicefProgrammedocumentSections,
UnicefProgressreport,)
from etools_datamart.sentry import process_exception
from etools_datamart.apps.etl.loader import CommonLoader
from etools_datamart.apps.sources.source_prp.models import (
CoreCountry,
CoreGatewaytype,
IndicatorDisaggregationvalue,
IndicatorIndicatorlocationdata,
IndicatorIndicatorreport,
IndicatorReportable,
IndicatorReportablelocationgoal,
UnicefLowerleveloutput,
UnicefPdresultlink,
UnicefProgrammedocument,
UnicefProgrammedocumentSections,
UnicefProgressreport,
)

from .base import PrpDataMartModel


class PrpBaseLoader(BaseLoader):

def get_queryset(self):
if self.config.queryset:
ret = self.config.queryset()
elif self.config.source:
ret = self.config.source.objects.all()
else: # pragma: no cover
raise ValueError("Option must define 'queryset' or 'source' attribute")

return ret

def filter_queryset(self, qs):
use_delta = self.context['only_delta'] and not self.context['is_empty']
if self.config.filters:
qs = qs.filter(**self.config.filters)
if use_delta and (self.config.last_modify_field and self.last_run):
logger.debug(f"Loader {self}: use deltas")
qs = qs.filter(**{f"{self.config.last_modify_field}__gte": self.last_run})
return qs

def load(self, *, verbosity=0, stdout=None,
ignore_dependencies=False, max_records=None,
only_delta=True, run_type=RUN_UNKNOWN, **kwargs):
self.on_start(run_type)
self.results = EtlResult()
logger.debug(f"Running loader {self}")
lock = self.lock()
truncate = self.config.truncate
try:
if lock: # pragma: no branch
if not ignore_dependencies:
for requirement in self.config.depends:
if requirement.loader.is_running():
raise RequiredIsRunning(requirement)
if requirement.loader.need_refresh(self):
raise RequiredIsMissing(requirement)
else:
logger.info(f"Loader {requirement} is uptodate")
self.mapping = {}
mart_fields = self.model._meta.concrete_fields
for field in mart_fields:
if field.name not in ['source_id', 'id', 'last_modify_date']:
self.mapping[field.name] = field.name
if self.config.mapping: # pragma: no branch
self.mapping.update(self.config.mapping)
self.update_context(today=timezone.now(),
max_records=max_records,
verbosity=verbosity,
records=0,
only_delta=only_delta,
is_empty=not self.model.objects.exists(),
stdout=stdout)
sid = transaction.savepoint()
try:
self.results.context = self.context
self.fields_to_compare = [f for f in self.mapping.keys() if f not in ["seen"]]
if truncate:
self.model.objects.truncate()
qs = self.filter_queryset(self.get_queryset())
for record in qs.all():
filters = self.config.key(self, record)
values = self.get_values(record)
op = self.process_record(filters, values)
self.increment_counter(op)

if stdout and verbosity > 0:
stdout.write("\n")
# deleted = self.model.objects.exclude(seen=today).delete()[0]
# self.results.deleted = deleted
except MaxRecordsException:
pass
except Exception:
transaction.savepoint_rollback(sid)
raise
else:
logger.info(f"Unable to get lock for {self}")

except (RequiredIsMissing, RequiredIsRunning) as e:
self.on_end(error=e, retry=True)
raise
except BaseException as e:
self.on_end(e)
process_exception(e)
raise
else:
self.on_end(None)
finally:
if lock: # pragma: no branch
try:
lock.release()
except LockError as e: # pragma: no cover
logger.warning(e)

return self.results


class IndicatorByLocationLoader(PrpBaseLoader):
class IndicatorByLocationLoader(CommonLoader):
def get_location_levelname(self, record, values, field_name):
pass

Expand Down Expand Up @@ -222,7 +129,7 @@ class Options:
}


class DataReportLoader(PrpBaseLoader):
class DataReportLoader(CommonLoader):

def get_queryset(self):
# all_progress_reports = UnicefProgressreport.objects.all()
Expand Down
Empty file.
Loading