diff --git a/libs/knowledge-store/ragstack_knowledge_store/concurrency.py b/libs/knowledge-store/ragstack_knowledge_store/concurrency.py index 838c39a36..7a2e57b37 100644 --- a/libs/knowledge-store/ragstack_knowledge_store/concurrency.py +++ b/libs/knowledge-store/ragstack_knowledge_store/concurrency.py @@ -67,6 +67,7 @@ def execute( query: PreparedStatement, parameters: tuple[Any, ...] | None = None, callback: _Callback | None = None, + timeout: float | None = None, ) -> None: """Execute a query concurrently. @@ -77,6 +78,7 @@ def execute( query: The query to execute. parameters: Parameter tuple for the query. Defaults to `None`. callback: Callback to apply to the results. Defaults to `None`. + timeout: Timeout to use (if not the session default). """ # TODO: We could have some form of throttling, where we track the number # of pending calls and queue things if it exceed some threshold. @@ -86,7 +88,14 @@ def execute( if self._error is not None: return - future: ResponseFuture = self._session.execute_async(query, parameters) + execute_kwargs = {} + if timeout is not None: + execute_kwargs["timeout"] = timeout + future: ResponseFuture = self._session.execute_async( + query, + parameters, + **execute_kwargs, + ) future.add_callbacks( self._handle_result, self._handle_error, diff --git a/libs/knowledge-store/ragstack_knowledge_store/graph_store.py b/libs/knowledge-store/ragstack_knowledge_store/graph_store.py index 67b18509f..02110a62e 100644 --- a/libs/knowledge-store/ragstack_knowledge_store/graph_store.py +++ b/libs/knowledge-store/ragstack_knowledge_store/graph_store.py @@ -157,7 +157,9 @@ def __init__( keyspace: str | None = None, setup_mode: SetupMode = SetupMode.SYNC, metadata_indexing: MetadataIndexingType = "all", + insert_timeout: float = 30.0, ): + self._insert_timeout = insert_timeout if targets_table: logger.warning( "The 'targets_table' parameter is deprecated " @@ -318,6 +320,7 @@ def add_nodes( metadata_blob, metadata_s, ), + timeout=self._insert_timeout, ) return node_ids