Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-50642][CONNECT][SS]Fix the state schema for FlatMapGroupsWithState in spark connect when there is no initial state #49260

Open
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

huanliwang-db
Copy link
Contributor

@huanliwang-db huanliwang-db commented Dec 21, 2024

In spark connect, when there is no initial state, we derived the state schema from the input:

create the initialDs from the original input: https://github.com/apache/spark/blob/master/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala#L679-L689
derived the state expression encoder from this initialDs which is incorrect: https://github.com/apache/spark/blob/master/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala#L729

Our unit tests fail to cover this case because it doesn't do the state update: https://github.com/apache/spark/blob/master/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateStreamingSuite.scala#L55-L59

after changing the stateFunc to the following

val stateFunc =
  (key: String, values: Iterator[ClickEvent], state: GroupState[ClickState]) => {
    if (state.exists) throw new IllegalArgumentException("state.exists should be false")
    val newState = ClickState(key, values.size)
    state.update(newState)
    Iterator(newState)
  }

the test is actually failing with

Cause: org.apache.spark.SparkException: Job aborted due to stage failure: Task 122 in stage 2.0 failed 1 times, most recent failure: Lost task 122.0 in stage 2.0 (TID 12) (192.168.68.84 executor driver): java.lang.ClassCastException: class org.apache.spark.sql.streaming.ClickState cannot be cast to class org.apache.spark.sql.streaming.ClickEvent (org.apache.spark.sql.streaming.ClickState and org.apache.spark.sql.streaming.ClickEvent are in unnamed module of loader 'app')
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.CreateNamedStruct_0$(Unknown Source)

What changes were proposed in this pull request?

  • introduce a new state_schema proto field
  • pass the state agnostic encoder to the serialized udf
  • pass the state schema to query proto for spark connect
  • rebuild the state expression encoder based on the state agnostic encoder and state schema.

Why are the changes needed?

fix the broken behavior for flatMapGroupsWithState on spark connect

Does this PR introduce any user-facing change?

yes

How was this patch tested?

modified the existing unit tests.

Was this patch authored or co-authored using generative AI tooling?

no

@huanliwang-db huanliwang-db changed the title Fix the state schema for FlatMapGroupsWithState in spark connect when… [SPARK-50642][CONNECT][SS]Fix the state schema for FlatMapGroupsWithState in spark connect when there is no initial state Dec 21, 2024
@huanliwang-db huanliwang-db force-pushed the huanliwang-db/fmgws-client branch from 6ede9b0 to d670291 Compare December 23, 2024 19:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants