Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix clickhouse and optimize compatibility when using SQL link #1496

Merged

Conversation

jieguangzhou
Copy link
Collaborator

@jieguangzhou jieguangzhou commented Dec 7, 2023

#1471
#1490

Description

Related Issues

Checklist

  • Is this code covered by new or existing unit tests or integration tests?
  • Did you run make test successfully?
  • Do new classes, functions, methods and parameters all have docstrings?
  • Were existing docstrings updated, if necessary?
  • Was external documentation updated, if necessary?

Additional Notes or Comments

@codecov-commenter
Copy link

codecov-commenter commented Dec 7, 2023

Codecov Report

All modified and coverable lines are covered by tests ✅

Comparison is base (34830a7) 80.33% compared to head (253dd9c) 80.32%.
Report is 1299 commits behind head on main.

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #1496      +/-   ##
==========================================
- Coverage   80.33%   80.32%   -0.02%     
==========================================
  Files          95      107      +12     
  Lines        6602     7467     +865     
==========================================
+ Hits         5304     5998     +694     
- Misses       1298     1469     +171     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@jieguangzhou jieguangzhou force-pushed the impv/use-string-instead-of-json branch from 45c9431 to b564dd7 Compare December 7, 2023 04:14
@jieguangzhou jieguangzhou linked an issue Dec 7, 2023 that may be closed by this pull request
@fnikolai
Copy link
Collaborator

fnikolai commented Dec 7, 2023

Don't forget to update the CHANGELOG about the change.

@jieguangzhou
Copy link
Collaborator Author

jieguangzhou commented Dec 7, 2023

There is a query that doesn't work when using Clickhouse, but works fine using SQLite and MySQL

Test case : test_query.py::test_execute_like_queries_sqldb

self = <superduperdb.backends.ibis.query.IbisQueryLinker[
    documents.select('id', 'x').join(_outputs_linear_a_0.r...d)).filter(_outputs_linear_a_0.relabel({'output': "'_outputs.x.linear_a.0'"}).key == 'x')}
] object at 0x284683a00>
db = <superduperdb.base.datalayer.Datalayer object at 0x2843ac490>

    def execute(self, db):
        native_query, _ = self.compile(db)
        try:
            result = native_query.execute()
        except Exception as exc:
>           raise IbisBackendError(
                f'{native_query} Wrong query or not supported yet :: {exc}'
            )
E           superduperdb.backends.ibis.query.IbisBackendError: r0 := DatabaseTable: documents
E             id    string
E             x     string
E             y     int32
E             z     string
E             _fold string
E           
E           r1 := DatabaseTable: _outputs_linear_a_0
E             output_id int32
E             input_id  string
E             query_id  string
E             output    string
E             key       string
E             _fold     string
E           
E           r2 := Selection[r0]
E             selections:
E               id: r0.id
E               x:  r0.x
E           
E           r3 := Selection[r1]
E             selections:
E               output_id:             r1.output_id
E               input_id:              r1.input_id
E               query_id:              r1.query_id
E               _outputs.x.linear_a.0: r1.output
E               key:                   r1.key
E               _fold:                 r1._fold
E           
E           r4 := InnerJoin[r2, r3] r3.input_id == r2.id
E           
E           Selection[r4]
E             predicates:
E               r3.key == 'x' Wrong query or not supported yet :: :HTTPDriver for http://localhost:8123 returned response code 404)
E            Code: 47. DB::Exception: Missing columns: 't3.key' while processing query: 'SELECT id, x, output_id, input_id, query_id, `_outputs.x.linear_a.0`, key, _fold FROM (SELECT * FROM (SELECT t0.id, t0.x FROM documents AS t0) AS t2 INNER JOIN (SELECT t1.output_id, t1.input_id, t1.query_id, t1.output AS `_outputs.x.linear_a.0`, t1.key, t1._fold FROM _outputs_linear_a_0 AS t1) AS t3 ON t3.input_id = t2.id HAVING t3.key = 'x') AS t4 WHERE t3.key = 'x'', required columns: 'id' 'query_id' 'x' 'input_id' 'output_id' 't3.key' '_outputs.x.linear_a.0' 'key' '_fold' 'id' 'query_id' 'x' 'input_id' 'output_id' 't3.key' '_outputs.x.linear_a.0' 'key' '_fold'. (UNKNOWN_IDENTIFIER) (version 23.11.1.2681 (official build))

../../../../superduperdb/backends/ibis/query.py:524: IbisBackendError

@jieguangzhou
Copy link
Collaborator Author

jieguangzhou commented Dec 7, 2023

All update operations will report the following error, but work when using SQLite and MySQL

Test case: test_datalayer.py::test_insert_sql_db

test/unittest/base/test_datalayer.py:535 (test_insert_sql_db[db0])
db = <superduperdb.base.datalayer.Datalayer object at 0x286ef8b20>

    @pytest.mark.parametrize("db", [DBConfig.sqldb_empty], indirect=True)
    def test_insert_sql_db(db):
        add_fake_model(db)
        table = db.load('table', 'documents')
>       inserted_ids, _ = db.insert(
            table.insert([Document({'id': str(i), 'x': i}) for i in range(5)])
        )

test_datalayer.py:540: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../../../superduperdb/base/datalayer.py:439: in insert
    return inserted_ids, self.refresh_after_update_or_insert(
../../../superduperdb/base/datalayer.py:493: in refresh_after_update_or_insert
    task_workflow.run_jobs()
../../../superduperdb/jobs/task_workflow.py:54: in run_jobs
    job(
../../../superduperdb/jobs/job.py:140: in __call__
    self.submit(dependencies=dependencies)
../../../superduperdb/jobs/job.py:117: in submit
    self.future = self.db.compute.submit(
../../../superduperdb/backends/local/compute.py:33: in submit
    future = function(*args, **kwargs)
../../../superduperdb/jobs/tasks.py:102: in callable_job
    db.metadata.update_job(job_id, 'status', 'running')
../../../superduperdb/backends/sqlalchemy/metadata.py:371: in update_job
    session.query(Job).filter(Job.identifier == job_id).update({key: value})
../../../env/lib/python3.9/site-packages/sqlalchemy/orm/query.py:3271: in update
    result: CursorResult[Any] = self.session.execute(
../../../env/lib/python3.9/site-packages/sqlalchemy/orm/session.py:2308: in execute
    return self._execute_internal(
../../../env/lib/python3.9/site-packages/sqlalchemy/orm/session.py:2190: in _execute_internal
    result: Result[Any] = compile_state_cls.orm_execute_statement(
../../../env/lib/python3.9/site-packages/sqlalchemy/orm/bulk_persistence.py:1617: in orm_execute_statement
    return super().orm_execute_statement(
../../../env/lib/python3.9/site-packages/sqlalchemy/orm/context.py:296: in orm_execute_statement
    return cls.orm_setup_cursor_result(
../../../env/lib/python3.9/site-packages/sqlalchemy/orm/bulk_persistence.py:786: in orm_setup_cursor_result
    return cls._return_orm_returning(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

cls = <class 'sqlalchemy.orm.bulk_persistence.BulkORMUpdate'>
session = <sqlalchemy.orm.session.Session object at 0x2872bbca0>
statement = <sqlalchemy.sql.annotation.AnnotatedUpdate object at 0x2872a7520>
params = {}
execution_options = immutabledict({'synchronize_session': 'auto', '_sa_orm_update_options': default_update_options(_dml_strategy='orm', _e...ate.<locals>.evaluate at 0x2872a3a60>, _subject_mapper=<Mapper at 0x17bd85520; Job>, _synchronize_session='evaluate')})
bind_arguments = {'clause': <sqlalchemy.sql.dml.Update object at 0x2872a7dc0>, 'mapper': <Mapper at 0x17bd85520; Job>}
result = <sqlalchemy.engine.cursor.CursorResult object at 0x28738c0a0>

    @classmethod
    def _return_orm_returning(
        cls,
        session,
        statement,
        params,
        execution_options,
        bind_arguments,
        result,
    ):
        execution_context = result.context
        compile_state = execution_context.compiled.compile_state
    
        if (
>           compile_state.from_statement_ctx
            and not compile_state.from_statement_ctx.compile_options._is_star
        ):
E       AttributeError: 'NoneType' object has no attribute 'from_statement_ctx'

../../../env/lib/python3.9/site-packages/sqlalchemy/orm/bulk_persistence.py:583: AttributeError

https://clickhouse-sqlalchemy.readthedocs.io/en/latest/features.html#update-and-delete

@jieguangzhou jieguangzhou force-pushed the impv/use-string-instead-of-json branch 7 times, most recently from fca2339 to 4acb5b8 Compare December 11, 2023 13:52
Comment on lines 1 to 19
import pandas as pd


def _default_insert_processor(table_name, datas):
"""Default insert processor for SQL dialects."""
return table_name, datas


def _clickhouse_insert_processor(table_name, datas):
"""Insert processor for ClickHouse."""
return f'`{table_name}`', pd.DataFrame(datas)


def get_insert_processor(dialect):
"""Get the insert processor for the given dialect."""
funcs = {
'clickhouse': _clickhouse_insert_processor,
}
return funcs.get(dialect, _default_insert_processor)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leave a place for future compatibility with other databases

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

Comment on lines +1 to +4
def get_output_table_name(model_identifier, version):
"""Get the output table name for the given model."""
# use `_` to connect the model_identifier and version
return f'_outputs_{model_identifier}_{version}'
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some databases do not support using "/" in table name

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good spot

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do __ ("_" * 2)? This may be needed for some kind of name analysis.

Copy link
Collaborator Author

@jieguangzhou jieguangzhou Dec 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we can, but it doesn’t seem to make much sense.

Because we only need to mark the model text between the first _ and the last _, unless it is changed to a more general symbol to be compatible with more data sets.

A more elegant way is to provide a method in the future to handle the table name processing function of a specific database, so we support a lot of databases, and different databases may have different restrictions.

WDYT?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@blythed @jieguangzhou
should we have a helper function in databackend to create table name?

so for mongodb

def build_table_name(key, model):
    return f'{model}/{key}/{model}'
    
for ibis
def build_table_name:
    return compatible name

Copy link
Collaborator Author

@jieguangzhou jieguangzhou Dec 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need a db helper to adapt the different databases in the future.
But now we have to change a lot of stuff about the query because some methods will build table_name, but the methods in the query do not get the db parameter, WDYT?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok

Comment on lines +91 to +95
def get_db_config(dialect):
if dialect == 'clickhouse':
return create_clickhouse_config()
else:
return DefaultConfig
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If other databases have other table creation behaviors on the meatastore in the future, you can add them here.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great


self._lock = threading.Lock()

def _init_tables(self):
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using this way of creating tables has better compatibility than using class objects.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A driver of clickhouse uses the class object method to build a table and does not support updates.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please can you explain this MetaData in more detail?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MetaData is used for storing and managing the definitions of tables, and this is a common practice for this way to create tables

https://docs.sqlalchemy.org/en/20/tutorial/metadata.html

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.metadata is assigned here so that he can create dynamically within _init_tables

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, move the metadata into _init_tables

Comment on lines +143 to +150
# Connect to metadata store.
# ------------------------------
# 1. try to connect to the metadata store specified in the configuration.
# 2. if that fails, try to connect to the data backend engine.
# 3. if that fails, try to connect to the data backend uri.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the engines of ibis and sqlalchemy are incompatible, directly use the URL to build the metadatastore

@jieguangzhou jieguangzhou marked this pull request as ready for review December 11, 2023 14:01
@jieguangzhou jieguangzhou linked an issue Dec 11, 2023 that may be closed by this pull request
@jieguangzhou jieguangzhou changed the title Use string type instead of Json type in MetadataStore Fix clickhouse and optimize compatibility when using SQL link Dec 11, 2023

self._lock = threading.Lock()

def _init_tables(self):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please can you explain this MetaData in more detail?


def _clickhouse_insert_processor(table_name, datas):
"""Insert processor for ClickHouse."""
return f'`{table_name}`', pd.DataFrame(datas)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this do something? f'\{table_name}`'`

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some table names we defined can not insert data sometimes, for example, table name with -

_outputs _a-b-c_0.

After adding this, the generated sql statement will be "INSERT `_outputs _a-b-c_0` xxxxx" in the query of the table name.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok

for k in self.renamings.values():
if (
re.match(f'^_outputs/{model}/[0-9]+$', tab.identifier)
re.match(f'^_outputs_{model}_[0-9]+$', tab.identifier)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should use the function above right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@pytest.mark.parametrize(
"db", [DBConfig.mongodb_empty, DBConfig.sqldb_empty], indirect=True
)
@pytest.mark.parametrize("db", [DBConfig.sqldb_empty], indirect=True)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Intended?

Copy link
Collaborator Author

@jieguangzhou jieguangzhou Dec 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A miss, have fixed

Copy link
Collaborator

@kartik4949 kartik4949 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great pr just small change need !

if not self.in_memory:
self.conn.insert(table_name, raw_documents)
else:
self.conn.create_table(table_name, pandas.DataFrame(raw_documents))

@staticmethod
def convert_data_format(data):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jieguangzhou
so we override bytes data to base64 string for all ibis backends?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we can also make different databases behave differently. But base64 is more versatile

@@ -55,7 +83,7 @@ def create_model_table_or_collection(self, model: t.Union[Model, APIModel]):
'key': dtype('string'),
}
return Table(
identifier=f'_outputs/{model.identifier}/{model.version}',
identifier=get_output_table_name(model.identifier, model.version),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Databackend should have

def get_table_name(..., ouput=True/False):
...
@jieguangzhou

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines 1 to 19
import pandas as pd


def _default_insert_processor(table_name, datas):
"""Default insert processor for SQL dialects."""
return table_name, datas


def _clickhouse_insert_processor(table_name, datas):
"""Insert processor for ClickHouse."""
return f'`{table_name}`', pd.DataFrame(datas)


def get_insert_processor(dialect):
"""Get the insert processor for the given dialect."""
funcs = {
'clickhouse': _clickhouse_insert_processor,
}
return funcs.get(dialect, _default_insert_processor)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

@@ -301,7 +298,7 @@ def model_update( # type: ignore[override]
for r in table_records:
if isinstance(r['output'], dict) and '_content' in r['output']:
r['output'] = r['output']['_content']['bytes']
db.databackend.insert(f'_outputs/{model}/{version}', table_records)
db.databackend.insert(get_output_table_name(model, version), table_records)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same

db.databackend.get_table_name(model, version, output=True)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines +91 to +95
def get_db_config(dialect):
if dialect == 'clickhouse':
return create_clickhouse_config()
else:
return DefaultConfig
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great


self._lock = threading.Lock()

def _init_tables(self):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.metadata is assigned here so that he can create dynamically within _init_tables

@@ -121,7 +138,7 @@ def drop(self, force: bool = False):
default=False,
):
logging.warn('Aborting...')
Base.metadata.drop_all(self.conn)
self.metadata.drop_all(self.conn)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jieguangzhou since we assign metadata other than Base

im not sure if we will have weird bug because of this ...wdyt

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now, change to

        self.query_id_table.drop(self.conn)
        self.job_table.drop(self.conn)
        self.parent_child_association_table.drop(self.conn)
        self.component_table.drop(self.conn)
        self.meta_table.drop(self.conn)

)
parents = [a.parent_id for a in assocations]
res = session.execute(stmt)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a nitpick:
self.execute(stmt)
which returns parsed result

and make it one-liner since we are using this two three times below

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, Done

Component.version == version,
).update({key: value})
stmt = (
self.component_table.update()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The updated value is defined after the statement, But now, I change to

            stmt = (
                update(self.component_table)
                .where(
                    self.component_table.c.type_id == type_id,
                    self.component_table.c.identifier == identifier,
                    self.component_table.c.version == version,
                )
                .values({key: value})
            )

@@ -40,7 +40,7 @@ def pre_create(self, db) -> None:
@property
def raw(self):
return {
k: (v.identifier if not isinstance(v, Encoder) else 'Bytes')
k: (v.identifier if not isinstance(v, Encoder) else 'String')
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok so we are moving away from bytes to string for Encoders type

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that efficient? Should this not be case-by-case?

Copy link
Collaborator Author

@jieguangzhou jieguangzhou Dec 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that efficient?

It will be a little less efficient than directly storing bytes

Should this not be case-by-case?

Hey, @kartik4949 @blythed . I added a DBHelper to handle this.
Not converted by default, which will be converted to String unless otherwise defined (such as clickhouse).

https://github.com/SuperDuperDB/superduperdb/pull/1496/commits/2d918faa7a9c02d8fe0866e60d0ee192d6deaffb

@kartik4949 kartik4949 force-pushed the impv/use-string-instead-of-json branch from 2d918fa to 253dd9c Compare December 13, 2023 07:17
@jieguangzhou jieguangzhou merged commit 592e8b5 into superduper-io:main Dec 13, 2023
2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Use string instead of JSON for dictionaries in SQLAlchemy connection error: How to connect ClickHouse
5 participants