https://github.com/apache/spark
Revision 5a50ae37f4c41099c174459603966ee25f21ac75 authored by Tathagata Das on 16 January 2019, 17:42:14 UTC, committed by Shixiong Zhu on 16 January 2019, 17:45:12 UTC
## What changes were proposed in this pull request? When a streaming query has multiple file streams, and there is a batch where one of the file streams dont have data in that batch, then if the query has to restart from that, it will throw the following error. ``` java.lang.IllegalStateException: batch 1 doesn't exist at org.apache.spark.sql.execution.streaming.HDFSMetadataLog$.verifyBatchIds(HDFSMetadataLog.scala:300) at org.apache.spark.sql.execution.streaming.FileStreamSourceLog.get(FileStreamSourceLog.scala:120) at org.apache.spark.sql.execution.streaming.FileStreamSource.getBatch(FileStreamSource.scala:181) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$populateStartOffsets$2.apply(MicroBatchExecution.scala:294) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$populateStartOffsets$2.apply(MicroBatchExecution.scala:291) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$populateStartOffsets(MicroBatchExecution.scala:291) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:178) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:175) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:175) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:251) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:61) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:175) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:169) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:205) ``` Existing `HDFSMetadata.verifyBatchIds` threw error whenever the `batchIds` list was empty. In the context of `FileStreamSource.getBatch` (where verify is called) and `FileStreamSourceLog` (subclass of `HDFSMetadata`), this is usually okay because, in a streaming query with one file stream, the `batchIds` can never be empty: - A batch is planned only when the `FileStreamSourceLog` has seen new offset (that is, there are new data files). - So `FileStreamSource.getBatch` will be called on X to Y where X will always be > Y. This calls internally`HDFSMetadata.verifyBatchIds (X+1, Y)` with X+1-Y ids. For example.,`FileStreamSource.getBatch(4, 5)` will call `verify(batchIds = Seq(5), start = 5, end = 5)`. However, the invariant of X > Y is not true when there are two file stream sources, as a batch may be planned even when only one of the file streams has data. So one of the file stream may not have data, which can call `FileStreamSource.getBatch(X, X)` -> `verify(batchIds = Seq.empty, start = X+1, end = X)` -> failure. Note that `FileStreamSource.getBatch(X, X)` gets called **only when restarting a query in a batch where a file source did not have data**. This is because in normal planning of batches, `MicroBatchExecution` avoids calling `FileStreamSource.getBatch(X, X)` when offset X has not changed. However, when restarting a stream at such a batch, `MicroBatchExecution.populateStartOffsets()` calls `FileStreamSource.getBatch(X, X)` (DataSource V1 hack to initialize the source with last known offsets) thus hitting this issue. The minimum solution here is to skip verification when `FileStreamSource.getBatch(X, X)`. ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. Closes #23557 from tdas/SPARK-26629. Authored-by: Tathagata Das <tathagata.das1565@gmail.com> Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
1 parent 8319ba7
Tip revision: 5a50ae37f4c41099c174459603966ee25f21ac75 authored by Tathagata Das on 16 January 2019, 17:42:14 UTC
[SPARK-26629][SS] Fixed error with multiple file stream in a query + restart on a batch that has no data for one file stream
[SPARK-26629][SS] Fixed error with multiple file stream in a query + restart on a batch that has no data for one file stream
Tip revision: 5a50ae3
File | Mode | Size |
---|---|---|
.github | ||
R | ||
assembly | ||
bin | ||
build | ||
common | ||
conf | ||
core | ||
data | ||
dev | ||
docs | ||
examples | ||
external | ||
graphx | ||
hadoop-cloud | ||
launcher | ||
licenses | ||
mllib | ||
mllib-local | ||
project | ||
python | ||
repl | ||
resource-managers | ||
sbin | ||
sql | ||
streaming | ||
tools | ||
.gitattributes | -rw-r--r-- | 40 bytes |
.gitignore | -rw-r--r-- | 1.3 KB |
.travis.yml | -rw-r--r-- | 1.7 KB |
CONTRIBUTING.md | -rw-r--r-- | 995 bytes |
LICENSE | -rw-r--r-- | 17.6 KB |
NOTICE | -rw-r--r-- | 25.7 KB |
README.md | -rw-r--r-- | 3.7 KB |
appveyor.yml | -rw-r--r-- | 2.3 KB |
pom.xml | -rw-r--r-- | 99.3 KB |
scalastyle-config.xml | -rw-r--r-- | 17.2 KB |
Computing file changes ...