From eb54a9ff402c0fa02cd81597eebdf34cb1dc4781 Mon Sep 17 00:00:00 2001 From: Ben Chambers <35960+bjchambers@users.noreply.github.com> Date: Mon, 12 Aug 2024 20:34:14 -0700 Subject: [PATCH] increased default insert timeout (#639) * increased default insert timeout * lint --- .../ragstack_knowledge_store/concurrency.py | 11 ++++++++++- .../ragstack_knowledge_store/graph_store.py | 3 +++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/libs/knowledge-store/ragstack_knowledge_store/concurrency.py b/libs/knowledge-store/ragstack_knowledge_store/concurrency.py index 838c39a36..7a2e57b37 100644 --- a/libs/knowledge-store/ragstack_knowledge_store/concurrency.py +++ b/libs/knowledge-store/ragstack_knowledge_store/concurrency.py @@ -67,6 +67,7 @@ def execute( query: PreparedStatement, parameters: tuple[Any, ...] | None = None, callback: _Callback | None = None, + timeout: float | None = None, ) -> None: """Execute a query concurrently. @@ -77,6 +78,7 @@ def execute( query: The query to execute. parameters: Parameter tuple for the query. Defaults to `None`. callback: Callback to apply to the results. Defaults to `None`. + timeout: Timeout to use (if not the session default). """ # TODO: We could have some form of throttling, where we track the number # of pending calls and queue things if it exceed some threshold. @@ -86,7 +88,14 @@ def execute( if self._error is not None: return - future: ResponseFuture = self._session.execute_async(query, parameters) + execute_kwargs = {} + if timeout is not None: + execute_kwargs["timeout"] = timeout + future: ResponseFuture = self._session.execute_async( + query, + parameters, + **execute_kwargs, + ) future.add_callbacks( self._handle_result, self._handle_error, diff --git a/libs/knowledge-store/ragstack_knowledge_store/graph_store.py b/libs/knowledge-store/ragstack_knowledge_store/graph_store.py index 67b18509f..02110a62e 100644 --- a/libs/knowledge-store/ragstack_knowledge_store/graph_store.py +++ b/libs/knowledge-store/ragstack_knowledge_store/graph_store.py @@ -157,7 +157,9 @@ def __init__( keyspace: str | None = None, setup_mode: SetupMode = SetupMode.SYNC, metadata_indexing: MetadataIndexingType = "all", + insert_timeout: float = 30.0, ): + self._insert_timeout = insert_timeout if targets_table: logger.warning( "The 'targets_table' parameter is deprecated " @@ -318,6 +320,7 @@ def add_nodes( metadata_blob, metadata_s, ), + timeout=self._insert_timeout, ) return node_ids