Skip to content

Commit

Permalink
Fix linting issues
Browse files Browse the repository at this point in the history
  • Loading branch information
kartik4949 committed Sep 26, 2024
1 parent 8561558 commit c3f60bd
Show file tree
Hide file tree
Showing 27 changed files with 129 additions and 47 deletions.
8 changes: 5 additions & 3 deletions plugins/ibis/superduper_ibis/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ def documents(self):
return super().documents

def _get_tables(self):
out = {self.table: self.db.tables[self.table]}
breakpoint()
out = {self.table: self.db.cache[self.table.uuid]}

for part in self.parts:
if isinstance(part, str):
Expand All @@ -137,7 +138,7 @@ def _get_schema(self):

table_renamings = self.renamings({})
if len(tables) == 1 and not table_renamings:
return self.db.tables[self.table].schema
return self.db.cache[self.table].schema
for identifier, c in tables.items():
renamings = table_renamings.get(identifier, {})

Expand Down Expand Up @@ -201,7 +202,8 @@ def _execute(self, parent, method="encode"):

assert isinstance(output, pandas.DataFrame)
output = output.to_dict(orient="records")
component_table = self.db.tables[self.table]
breakpoint()
component_table = self.db.cache[self.table]
return SuperDuperCursor(
raw_cursor=output,
db=self.db,
Expand Down
9 changes: 5 additions & 4 deletions plugins/mongodb/superduper_mongodb/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -658,18 +658,19 @@ def _get_schema(self):
predict_ids = sum([p[1] for p in outputs_parts], ())

try:
table = self.db.tables[self.table]
table = self.db.cluster.cache[self.table]
if not predict_ids:
return table.schema
fields = table.schema.fields
except FileNotFoundError:
except (FileNotFoundError, KeyError):
fields = {}

for predict_id in predict_ids:
key = f'{CFG.output_prefix}{predict_id}'
predict_id = predict_id.split('__')[-1]
try:
output_table = self.db.tables[key]
except FileNotFoundError:
output_table = self.db.cluster.cache[predict_id]
except (FileNotFoundError, KeyError):
logging.warn(
f'No schema found for table {key}. Using default projection'
)
Expand Down
1 change: 0 additions & 1 deletion superduper/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
# ruff: noqa: E402
import os

from .base import config, config_settings, configs, logger
from .base.superduper import superduper
Expand Down
4 changes: 0 additions & 4 deletions superduper/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,6 @@
__all__ = (
'apply',
'info',
'local_cluster',
'vector_search',
'cdc',
'ray_serve',
)


Expand Down
7 changes: 5 additions & 2 deletions superduper/backends/base/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,15 @@ def __init__(self):

@abstractmethod
def drop(self):
pass
"""Drop components registered in backend."""

@abstractmethod
def list_uuids(self):
pass
"""List uuids of all components registered."""

@abstractmethod
def list_components(self):
"""List all registered components."""
pass

@abstractmethod
Expand All @@ -49,6 +50,7 @@ def initialize(self):
pass

def put(self, component: 'Component', **kwargs):
"""Register component under the backend."""
# This is to make sure that we only have 1 version
# of each component implemented at any given time
# TODO: get identifier in string component argument.
Expand All @@ -67,6 +69,7 @@ def put(self, component: 'Component', **kwargs):

@property
def db(self) -> 'Datalayer':
"""Datalayer instance property."""
return self._db

@db.setter
Expand Down
7 changes: 6 additions & 1 deletion superduper/backends/base/cdc.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
import typing as t
from abc import abstractmethod

from superduper.backends.base.backends import BaseBackend

if t.TYPE_CHECKING:
from superduper.base.datalayer import Datalayer


class CDCBackend(BaseBackend):
"""Base backend for CDC."""

@abstractmethod
def handle_event(self, event_type, table, ids):
pass
"""Abstract method to handle events."""

@property
def db(self) -> 'Datalayer':
"""Datalayer instance property."""
return self._db

@db.setter
Expand Down
11 changes: 7 additions & 4 deletions superduper/backends/base/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,38 +28,41 @@ class Cluster(ABC):
cdc: CDCBackend
crontab: CrontabBackend

def __post_init__(self):
self._db = None

def drop(self, force: bool = False):
"""Drop services registered under the cluster."""
self.compute.drop()
self.queue.drop()
self.vector_search.drop()
self.cdc.drop()
self.crontab.drop()

def disconnect(self):
pass
"""Disconnect from the cluster."""

def __post_init__(self):
"""Dataclass post init."""
self._db = None

@classmethod
@abstractmethod
def build(cls, CFG, **kwargs):
"""Abstract method build method."""
pass

@property
def db(self):
"""Datalayer instance property."""
return self._db

@db.setter
def db(self, value):
"""Get Datalayer instance."""
self._db = value

def initialize(
self,
):
"""Initialze all services registered in cluster."""
assert self.db
self.compute.db = self.db
self.cache.db = self.db
Expand Down
6 changes: 6 additions & 0 deletions superduper/backends/base/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
from superduper.backends.base.backends import BaseBackend
from superduper.base.event import Job

if t.TYPE_CHECKING:
from superduper.base.datalayer import Datalayer


class ComputeBackend(BaseBackend):
"""
Expand All @@ -23,6 +26,7 @@ def type(self) -> str:

@abstractmethod
def release_futures(self, context: str):
"""Abstract method for release futures."""
pass

@property
Expand Down Expand Up @@ -93,9 +97,11 @@ def create_handler(self, *args, **kwargs):

@property
def db(self) -> 'Datalayer':
"""Get Datalayer instance."""
return self._db

@db.setter
def db(self, value: 'Datalayer'):
"""Datalayer setter."""
self._db = value
self.initialize()
8 changes: 3 additions & 5 deletions superduper/backends/base/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ def drop(self, force: bool = False):

@abstractmethod
def set_component_status(self, uuid: str, status: 'Status'):
pass
"""Set component status abstractmethod."""

# ------------------ JOBS ------------------

Expand Down Expand Up @@ -260,9 +260,10 @@ def show_components(self, type_id: t.Optional[str] = None):

@abstractmethod
def show_cdc_tables(self):
pass
"""Show cdc tables."""

def show_cdcs(self, table):
"""Show components registered for cdc."""
results = self._show_cdcs(table)
lookup = {}
results = list(results)
Expand All @@ -276,9 +277,6 @@ def show_cdcs(self, table):
def _show_cdcs(self, table):
pass

def show_cdc_tables(self):
...

@abstractmethod
def _show_components(self, type_id: t.Optional[str] = None):
"""
Expand Down
1 change: 1 addition & 0 deletions superduper/backends/base/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ class Query(_BaseQuery):

@property
def tables(self):
"""Get tables defined in the query."""
out = []
for part in self.parts:
if part[0] == 'outputs':
Expand Down
4 changes: 3 additions & 1 deletion superduper/backends/base/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,12 @@ def publish(self, events: t.List[Event]):

@property
def db(self) -> 'Datalayer':
"""Get Datalayer instance."""
return self._db

@db.setter
def db(self, value: 'Datalayer'):
"""Datalayer instance Setter."""
self._db = value
self.initialize()

Expand Down Expand Up @@ -135,7 +137,7 @@ def consume_streaming_events(events, table, db):
@dc.dataclass
class Future:
"""
Future output
Future output.
:param job_id: job identifier
"""
Expand Down
7 changes: 6 additions & 1 deletion superduper/backends/base/vector_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from superduper.backends.base.backends import BaseBackend

if t.TYPE_CHECKING:
from superduper.base.datalayer import Datalayer
from superduper.components.vector_index import VectorIndex


Expand All @@ -23,17 +24,21 @@ def __getitem__(self, identifier):
pass

def add(self, identifier, vectors):
"""Add vectors to vector index."""
self.get(identifier).add(vectors)

def delete(self, identifier, ids):
"""Delete vectors in vector index by ids."""
self.get(identifier).delete(ids)

@property
def db(self) -> 'Datalayer':
"""Get Datalayer instance."""
return self._db

@db.setter
def db(self, value: 'Datalayer'):
"""Database setter."""
self._db = value
self.initialize()

Expand Down Expand Up @@ -126,7 +131,7 @@ def from_component(cls, index: 'VectorIndex'):

@abstractmethod
def initialize(self, db):
pass
"""Initialze abstract method."""

@abstractmethod
def __len__(self):
Expand Down
4 changes: 2 additions & 2 deletions superduper/backends/local/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from .artifacts import FileSystemArtifactStore as ArtifactStore
from .cluster import LocalCluster as Cluster
from .compute import LocalComputeBackend as ComputeBackend
from .vector_search import InMemoryVectorSearcher as VectorSearcher
from .vector_search import LocalVectorSearchBackend as VectorSearcher

__all__ = ["ArtifactStore", "ComputeBackend", "Cluster"]
__all__ = ["ArtifactStore", "ComputeBackend", "Cluster", "VectorSearcher"]
10 changes: 7 additions & 3 deletions superduper/backends/local/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ def __init__(self, init_cache: bool = True):
self._db = None

def list_components(self):
"""List components."""
return list(self._cache.keys())

def list_uuids(self):
"""List registered UUIDs of components."""
return list(self._cache_uuid.keys())

def __getitem__(self, *item):
return self._cache[item]

def get_by_id(self, *item):
"""Get item with id."""
return self._cache[*item]

def _put(self, component: Component):
Expand All @@ -37,6 +37,7 @@ def __delitem__(self, name: str):
del self._cache[name]

def initialize(self):
"""Initialize cache with existing components."""
for type_id, identifier in self.db.show():
r = self.db.show(type_id=type_id, identifier=identifier, version=-1)
if r.get('cache', False):
Expand All @@ -45,14 +46,17 @@ def initialize(self):
self.db.cluster.compute.put(component)

def drop(self):
"""Drop all components in cache."""
self._cache = {}

@property
def db(self):
"""Datalayer instance."""
return self._db

@db.setter
def db(self, value):
"""Set Datalayer instance."""
self._db = value
self.init()

Expand Down
5 changes: 5 additions & 0 deletions superduper/backends/local/cdc.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,15 @@ def __init__(self):
self._trigger_uuid_mapping = {}

def handle_event(self, table, ids, event_type):
"""Handle events."""
return self.db.on_event(table=table, ids=ids, event_type=event_type)

def list_components(self):
"""List components."""
return sorted(list(self.triggers))

def list_uuids(self):
"""List all UUIDs registered."""
return list(self._trigger_uuid_mapping.values())

def _put(self, item):
Expand All @@ -27,12 +30,14 @@ def __delitem__(self, item):
self.triggers.remove(item)

def initialize(self):
"""Initialize cdc backend to recover existing components."""
for type_id, identifier in self.db.show():
r = self.db.show(type_id=type_id, identifier=identifier, version=-1)
if r['trigger']:
self.put(self.db.load(type_id=type_id, identifier=identifier))
# TODO consider re-initialzing CDC jobs since potentially failure

def drop(self):
"""Drop all components from cdc."""
self.triggers = set()
self._trigger_uuid_mapping = {}
Loading

0 comments on commit c3f60bd

Please sign in to comment.