Raw File
flow-control.js
'use strict';

if (self.importScripts) {
  self.importScripts('/resources/testharness.js');
  self.importScripts('../resources/test-utils.js');
  self.importScripts('../resources/rs-utils.js');
  self.importScripts('../resources/recording-streams.js');
}

const error1 = new Error('error1!');
error1.name = 'error1';

promise_test(t => {

  const rs = recordingReadableStream({
    start(controller) {
      controller.enqueue('a');
      controller.enqueue('b');
      controller.close();
    }
  });

  const ws = recordingWritableStream(undefined, new CountQueuingStrategy({ highWaterMark: 0 }));

  const pipePromise = rs.pipeTo(ws, { preventCancel: true });

  // Wait and make sure it doesn't do any reading.
  return flushAsyncEvents().then(() => {
    ws.controller.error(error1);
  })
  .then(() => promise_rejects(t, error1, pipePromise, 'pipeTo must reject with the same error'))
  .then(() => {
    assert_array_equals(rs.eventsWithoutPulls, []);
    assert_array_equals(ws.events, []);
  })
  .then(() => readableStreamToArray(rs))
  .then(chunksNotPreviouslyRead => {
    assert_array_equals(chunksNotPreviouslyRead, ['a', 'b']);
  });

}, 'Piping from a non-empty ReadableStream into a WritableStream that does not desire chunks');

promise_test(() => {

  const rs = recordingReadableStream({
    start(controller) {
      controller.enqueue('b');
      controller.close();
    }
  });

  let resolveWritePromise;
  const ws = recordingWritableStream({
    write() {
      if (!resolveWritePromise) {
        // first write
        return new Promise(resolve => {
          resolveWritePromise = resolve;
        });
      }
      return undefined;
    }
  });

  const writer = ws.getWriter();
  const firstWritePromise = writer.write('a');
  assert_equals(writer.desiredSize, 0, 'after writing the writer\'s desiredSize must be 0');
  writer.releaseLock();

  // firstWritePromise won't settle until we call resolveWritePromise.

  const pipePromise = rs.pipeTo(ws);

  return flushAsyncEvents().then(() => resolveWritePromise())
    .then(() => Promise.all([firstWritePromise, pipePromise]))
    .then(() => {
      assert_array_equals(rs.eventsWithoutPulls, []);
      assert_array_equals(ws.events, ['write', 'a', 'write', 'b', 'close']);
    });

}, 'Piping from a non-empty ReadableStream into a WritableStream that does not desire chunks, but then does');

promise_test(() => {

  const rs = recordingReadableStream();

  const startPromise = Promise.resolve();
  let resolveWritePromise;
  const ws = recordingWritableStream({
    start() {
      return startPromise;
    },
    write() {
      if (!resolveWritePromise) {
        // first write
        return new Promise(resolve => {
          resolveWritePromise = resolve;
        });
      }
      return undefined;
    }
  });

  const writer = ws.getWriter();
  writer.write('a');

  return startPromise.then(() => {
    assert_array_equals(ws.events, ['write', 'a']);
    assert_equals(writer.desiredSize, 0, 'after writing the writer\'s desiredSize must be 0');
    writer.releaseLock();

    const pipePromise = rs.pipeTo(ws);

    rs.controller.enqueue('b');
    resolveWritePromise();
    rs.controller.close();

    return pipePromise.then(() => {
      assert_array_equals(rs.eventsWithoutPulls, []);
      assert_array_equals(ws.events, ['write', 'a', 'write', 'b', 'close']);
    });
  });

}, 'Piping from an empty ReadableStream into a WritableStream that does not desire chunks, but then the readable ' +
   'stream becomes non-empty and the writable stream starts desiring chunks');

promise_test(() => {
  const unreadChunks = ['b', 'c', 'd'];

  const rs = recordingReadableStream({
    pull(controller) {
      controller.enqueue(unreadChunks.shift());
      if (unreadChunks.length === 0) {
        controller.close();
      }
    }
  }, new CountQueuingStrategy({ highWaterMark: 0 }));

  let resolveWritePromise;
  const ws = recordingWritableStream({
    write() {
      if (!resolveWritePromise) {
        // first write
        return new Promise(resolve => {
          resolveWritePromise = resolve;
        });
      }
      return undefined;
    }
  }, new CountQueuingStrategy({ highWaterMark: 3 }));

  const writer = ws.getWriter();
  const firstWritePromise = writer.write('a');
  assert_equals(writer.desiredSize, 2, 'after writing the writer\'s desiredSize must be 2');
  writer.releaseLock();

  // firstWritePromise won't settle until we call resolveWritePromise.

  const pipePromise = rs.pipeTo(ws);

  return flushAsyncEvents().then(() => {
    assert_array_equals(ws.events, ['write', 'a']);
    assert_equals(unreadChunks.length, 1, 'chunks should continue to be enqueued until the HWM is reached');
  }).then(() => resolveWritePromise())
    .then(() => Promise.all([firstWritePromise, pipePromise]))
    .then(() => {
      assert_array_equals(rs.events, ['pull', 'pull', 'pull']);
      assert_array_equals(ws.events, ['write', 'a', 'write', 'b','write', 'c','write', 'd', 'close']);
    });

}, 'Piping from a ReadableStream to a WritableStream that desires more chunks before finishing with previous ones');

class StepTracker {
  constructor() {
    this.waiters = [];
    this.wakers = [];
  }

  // Returns promise which resolves when step `n` is reached. Also schedules step n + 1 to happen shortly after the
  // promise is resolved.
  waitThenAdvance(n) {
    if (this.waiters[n] === undefined) {
      this.waiters[n] = new Promise(resolve => {
        this.wakers[n] = resolve;
      });
      this.waiters[n]
          .then(() => flushAsyncEvents())
          .then(() => {
            if (this.wakers[n + 1] !== undefined) {
              this.wakers[n + 1]();
            }
          });
    }
    if (n == 0) {
      this.wakers[0]();
    }
    return this.waiters[n];
  }
}

promise_test(() => {
  const steps = new StepTracker();
  const desiredSizes = [];
  const rs = recordingReadableStream({
    start(controller) {
      steps.waitThenAdvance(1).then(() => enqueue('a'));
      steps.waitThenAdvance(3).then(() => enqueue('b'));
      steps.waitThenAdvance(5).then(() => enqueue('c'));
      steps.waitThenAdvance(7).then(() => enqueue('d'));
      steps.waitThenAdvance(11).then(() => controller.close());

      function enqueue(chunk) {
        controller.enqueue(chunk);
        desiredSizes.push(controller.desiredSize);
      }
    }
  });

  const chunksFinishedWriting = [];
  const writableStartPromise = Promise.resolve();
  let writeCalled = false;
  const ws = recordingWritableStream({
    start() {
      return writableStartPromise;
    },
    write(chunk) {
      const waitForStep = writeCalled ? 12 : 9;
      writeCalled = true;
      return steps.waitThenAdvance(waitForStep).then(() => {
        chunksFinishedWriting.push(chunk);
      });
    }
  });

  return writableStartPromise.then(() => {
    const pipePromise = rs.pipeTo(ws);
    steps.waitThenAdvance(0);

    return Promise.all([
      steps.waitThenAdvance(2).then(() => {
        assert_array_equals(chunksFinishedWriting, [], 'at step 2, zero chunks must have finished writing');
        assert_array_equals(ws.events, ['write', 'a'], 'at step 2, one chunk must have been written');

        // When 'a' (the very first chunk) was enqueued, it was immediately used to fulfill the outstanding read request
        // promise, leaving the queue empty.
        assert_array_equals(desiredSizes, [1],
                            'at step 2, the desiredSize at the last enqueue (step 1) must have been 1');
        assert_equals(rs.controller.desiredSize, 1, 'at step 2, the current desiredSize must be 1');
      }),

      steps.waitThenAdvance(4).then(() => {
        assert_array_equals(chunksFinishedWriting, [], 'at step 4, zero chunks must have finished writing');
        assert_array_equals(ws.events, ['write', 'a'], 'at step 4, one chunk must have been written');

        // When 'b' was enqueued at step 3, the queue was also empty, since immediately after enqueuing 'a' at
        // step 1, it was dequeued in order to fulfill the read() call that was made at step 0. Thus the queue
        // had size 1 (thus desiredSize of 0).
        assert_array_equals(desiredSizes, [1, 0],
                            'at step 4, the desiredSize at the last enqueue (step 3) must have been 0');
        assert_equals(rs.controller.desiredSize, 0, 'at step 4, the current desiredSize must be 0');
      }),

      steps.waitThenAdvance(6).then(() => {
        assert_array_equals(chunksFinishedWriting, [], 'at step 6, zero chunks must have finished writing');
        assert_array_equals(ws.events, ['write', 'a'], 'at step 6, one chunk must have been written');

        // When 'c' was enqueued at step 5, the queue was not empty; it had 'b' in it, since 'b' will not be read until
        // the first write completes at step 9. Thus, the queue size is 2 after enqueuing 'c', giving a desiredSize of
        // -1.
        assert_array_equals(desiredSizes, [1, 0, -1],
                            'at step 6, the desiredSize at the last enqueue (step 5) must have been -1');
        assert_equals(rs.controller.desiredSize, -1, 'at step 6, the current desiredSize must be -1');
      }),

      steps.waitThenAdvance(8).then(() => {
        assert_array_equals(chunksFinishedWriting, [], 'at step 8, zero chunks must have finished writing');
        assert_array_equals(ws.events, ['write', 'a'], 'at step 8, one chunk must have been written');

        // When 'd' was enqueued at step 7, the situation is the same as before, leading to a queue containing 'b', 'c',
        // and 'd'.
        assert_array_equals(desiredSizes, [1, 0, -1, -2],
                            'at step 8, the desiredSize at the last enqueue (step 7) must have been -2');
        assert_equals(rs.controller.desiredSize, -2, 'at step 8, the current desiredSize must be -2');
      }),

      steps.waitThenAdvance(10).then(() => {
        assert_array_equals(chunksFinishedWriting, ['a'], 'at step 10, one chunk must have finished writing');
        assert_array_equals(ws.events, ['write', 'a', 'write', 'b'],
                            'at step 10, two chunks must have been written');

        assert_equals(rs.controller.desiredSize, -1, 'at step 10, the current desiredSize must be -1');
      }),

      pipePromise.then(() => {
        assert_array_equals(desiredSizes, [1, 0, -1, -2], 'backpressure must have been exerted at the source');
        assert_array_equals(chunksFinishedWriting, ['a', 'b', 'c', 'd'], 'all chunks finished writing');

        assert_array_equals(rs.eventsWithoutPulls, [], 'nothing unexpected should happen to the ReadableStream');
        assert_array_equals(ws.events, ['write', 'a', 'write', 'b', 'write', 'c', 'write', 'd', 'close'],
                            'all chunks were written (and the WritableStream closed)');
      })
    ]);
  });
}, 'Piping to a WritableStream that does not consume the writes fast enough exerts backpressure on the ReadableStream');

done();
back to top