Skip to content

Commit

Permalink
code claenup for release
Browse files Browse the repository at this point in the history
  • Loading branch information
JeffersonDing committed Sep 5, 2024
1 parent 529b7db commit 10c7a38
Show file tree
Hide file tree
Showing 13 changed files with 901 additions and 910 deletions.
6 changes: 3 additions & 3 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ name = "pypi"
[packages]
fastapi = "*"
pip = "*"
install = "*"
uvicorn = {extras = ["standard"], version = "*"}
pydantic = "*"
pydantic-settings = "*"
Expand All @@ -20,8 +19,9 @@ jwcrypto = "*"
pytest-asyncio = "*"
pytest = "*"
pre-commit = "*"
click = "==8.0.4"
click = "*"
psycopg2 = "*"
fastapi-utilities = "*"

[dev-packages]
flake8 = "==6.1.0"
Expand All @@ -32,7 +32,7 @@ black = "==19.10b0"
coverage = "*"

[requires]
python_version = "3.11"
python_version = "3.12"

[scripts]
docker= "docker-compose up -d"
Expand Down
1,491 changes: 777 additions & 714 deletions Pipfile.lock

Large diffs are not rendered by default.

37 changes: 0 additions & 37 deletions scripts/create_table.py

This file was deleted.

74 changes: 0 additions & 74 deletions scripts/flush_db.py

This file was deleted.

27 changes: 0 additions & 27 deletions scripts/migrate.py

This file was deleted.

7 changes: 0 additions & 7 deletions scripts/settings/config.py

This file was deleted.

20 changes: 19 additions & 1 deletion src/config.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,33 @@
from typing import Any
from enum import Enum

from jwcrypto.jwk import JWKSet
from pydantic import PostgresDsn, RedisDsn
from pydantic_settings import BaseSettings

from src.constants import Environment

class Environment(str, Enum):
DEVELOPMENT = "DEVELOPMENT"
TESTING = "TESTING"
PRODUCTION = "PRODUCTION"

@property
def is_debug(self):
return self in (self.DEVELOPMENT, self.TESTING)

@property
def is_testing(self):
return self == self.TESTING

@property
def is_deployed(self) -> bool:
return self in (self.PRODUCTION)


class Config(BaseSettings):
DATABASE_URL: PostgresDsn
REDIS_URL: RedisDsn
REDIS_BATCH_SIZE: int = 1000

JWKS_CACHE: JWKSet | None = None
JWKS_URL: str = "https://platform.pennlabs.org/identity/jwks/"
Expand Down
19 changes: 0 additions & 19 deletions src/constants.py

This file was deleted.

58 changes: 53 additions & 5 deletions src/database.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,25 @@
import asyncio
import json
from datetime import datetime

from sqlalchemy import Column, DateTime, Identity, Integer, MetaData, String, Table
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy import (
Column,
DateTime,
Identity,
Integer,
MetaData,
String,
Table,
insert,
)
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker

from src.config import settings
from src.redis import redis_client
from src.config import settings


DATABASE_URL = settings.DATABASE_URL
DATABASE_URL = str(settings.DATABASE_URL) # Ensure DATABASE_URL is a string

engine = create_async_engine(DATABASE_URL)

Expand All @@ -30,4 +43,39 @@ async def create_tables():
await conn.run_sync(metadata.create_all)


asyncio.run(create_tables())
# Create an async session
async_session = sessionmaker(engine, expire_on_commit=False, class_=AsyncSession)


async def batch_insert(events):
async with async_session() as session:
async with session.begin():
await session.execute(insert(event), events)
await session.commit()


async def flush():
items = redis_client.scan_iter(count=settings.REDIS_BATCH_SIZE)
events = list()
async for key in items:
try:
data_bytes = await redis_client.get(key)
data = data_bytes.decode("utf-8").replace("'", '"')
json_string = json.dumps(data)
data = json.loads(json.loads(json_string))
except ValueError as e:
print(e)
print("flush_db: invalid key")
continue
events.append(
{
"product": data.get("product"),
"pennkey": data["pennkey"],
"datapoint": data["datapoint"],
"value": data["value"],
"timestamp": datetime.fromtimestamp(data["timestamp"]),
}
)

await batch_insert(events)
await redis_client.flushall()
27 changes: 24 additions & 3 deletions src/main.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,19 @@
from fastapi import Depends, FastAPI, HTTPException, Request
from fastapi_utilities import repeat_at

from src.config import settings

from src.auth import verify_jwt
from src.models import AnalyticsTxn
from src.redis import set_redis_from_tx
from src.redis import set_redis_from_tx, redis_count
from src.database import flush


app = FastAPI()
app = FastAPI(
title="Labs Analytics API",
version="1.0.0",
description="Unified Asynchronous API Engine for Penn Labs",
)


@app.post("/analytics/")
Expand All @@ -17,4 +25,17 @@ async def store_data(request: Request, token: dict = Depends(verify_jwt)):
raise HTTPException(status_code=400, detail=str(e))

await set_redis_from_tx(txn)
return {"message": "Data stored successfully!"}
return {"message": "success"}


@app.on_event("startup")
@repeat_at(cron="0 0 * * *")
async def flush_db():
count = await redis_count()
print(f"{count} items found in redis")
while count > 0:
await flush()
count -= settings.REDIS_BATCH_SIZE
count = max(count, 0)
print(f"{count} items left in redis")
print("Redis flushed")
38 changes: 20 additions & 18 deletions src/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,6 @@
from pydantic import BaseModel, ConfigDict


class CustomModel(BaseModel):
model_config = ConfigDict(populate_by_name=True,)

def serializable_dict(self, **kwargs):
default_dict = self.model_dump()
return jsonable_encoder(default_dict)

def json(self, **kwargs):
# Override the json method to customize JSON serialization if needed
return self.model_dump_json()

def __str__(self):
return str(self.json())

def hash_as_key(self):
return hashlib.md5(str(self).encode()).hexdigest()[0:16]


class Product(Enum):
OTHER = 0
MOBILE_IOS = 1
Expand All @@ -43,6 +25,26 @@ def __str__(self):
return self.name


class CustomModel(BaseModel):
model_config = ConfigDict(
populate_by_name=True,
)

def serializable_dict(self, **kwargs):
default_dict = self.model_dump()
return jsonable_encoder(default_dict)

def json(self, **kwargs):
# Override the json method to customize JSON serialization if needed
return self.model_dump_json()

def __str__(self):
return str(self.json())

def hash_as_key(self):
return hashlib.md5(str(self).encode()).hexdigest()[0:16]


class RedisEvent(CustomModel):
key: bytes | str
value: bytes | str
Expand Down
Loading

0 comments on commit 10c7a38

Please sign in to comment.