Revision c380da1357b20f55c8e80a515fc024e1b3b380cc authored by Raghu Angadi on 18 August 2023, 17:39:42 UTC, committed by Gengliang Wang on 18 August 2023, 17:39:42 UTC
[This is 3.5x port of #42460 in master. It resolves couple of conflicts. ]

This terminates Python worker created for `foreachBatch` when the streaming query terminate. All of the tracking is done inside connect server (inside `StreamingForeachBatchHelper`). How this works:

* (A) The helper class returns a cleaner (an `AutoCloseable`) to connect server when foreachBatch function is set up (happens before starting the query).
* (B) If the query fails to start, server directly invokes the cleaner.
* (C) If the query starts up, the server registers the cleaner with `streamingRunnerCleanerCache` in the `SessionHolder`.
* (D) The cache keeps a mapping of query to cleaner.
* It registers a streaming listener (only once per session), which invokes the cleaner when a query terminates.
* There is also finally cleanup when SessionHolder expires.

This ensures Python process created for a streaming query is properly terminated when a query terminates.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

- Unit tests are added for `CleanerCache`
- Existing unit tests for foreachBatch.
- Manual test to verify python process is terminated in different cases.
- Unit tests don't really verify that the process is terminated. There will be a follow up PR to verify this.

Closes #42555 from rangadi/pr-terminate-3.5x.

Authored-by: Raghu Angadi <raghu.angadi@databricks.com>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
1 parent f7dd0a9
Raw File
.gitattributes
*.bat text eol=crlf
*.cmd text eol=crlf
*.java text eol=lf
*.scala text eol=lf
*.xml text eol=lf
*.py text eol=lf
*.R text eol=lf
back to top