https://github.com/apache/spark
Revision 02f32ee358cc0a398aa7321bc5613cb92b306f6f authored by wecharyu on 08 December 2022, 08:12:30 UTC, committed by Jungtaek Lim on 08 December 2022, 08:12:45 UTC
### What changes were proposed in this pull request?

Add the empty offset filter in `latestOffset()` for Kafka Source, so that offset remains unchanged if Kafka provides no topic partition during fetch.

### Why are the changes needed?

KafkaOffsetReader may fetch empty partitions in some extreme cases like getting partitions while Kafka cluster is reassigning partitions, this will produce an empty `PartitionOffsetMap` (although there are topic-partitions being unchanged) and stored in `committedOffsets` after `runBatch()`.

Then in the next batch, we fetch partitions normally and get the actual offsets, but when fetching data of this batch in `KafkaOffsetReaderAdmin#getOffsetRangesFromResolvedOffsets()` all partitions in endOffsets will be considered as new partitions since the startOffsets is empty, then these "new partitions" will fetch earliest offsets, which will cause the data duplication.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Add an unit test.

Closes #38898 from wecharyu/SPARK-41375.

Authored-by: wecharyu <yuwq1996@gmail.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
(cherry picked from commit 043475a87844f11c252fb0ebab469148ae6985d7)
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
1 parent 5a91b21
History
Tip revision: 02f32ee358cc0a398aa7321bc5613cb92b306f6f authored by wecharyu on 08 December 2022, 08:12:30 UTC
[SPARK-41375][SS] Avoid empty latest KafkaSourceOffset
Tip revision: 02f32ee

README.md

back to top