summaryrefslogtreecommitdiffstats
path: root/testing/web-platform/tests/streams/resources/rs-utils.js
diff options
context:
space:
mode:
Diffstat (limited to 'testing/web-platform/tests/streams/resources/rs-utils.js')
-rw-r--r--testing/web-platform/tests/streams/resources/rs-utils.js185
1 files changed, 185 insertions, 0 deletions
diff --git a/testing/web-platform/tests/streams/resources/rs-utils.js b/testing/web-platform/tests/streams/resources/rs-utils.js
new file mode 100644
index 000000000..0f3222e23
--- /dev/null
+++ b/testing/web-platform/tests/streams/resources/rs-utils.js
@@ -0,0 +1,185 @@
+'use strict';
+(function () {
+
+ class RandomPushSource {
+ constructor(toPush) {
+ this.pushed = 0;
+ this.toPush = toPush;
+ this.started = false;
+ this.paused = false;
+ this.closed = false;
+
+ this._intervalHandle = null;
+ }
+
+ readStart() {
+ if (this.closed) {
+ return;
+ }
+
+ if (!this.started) {
+ this._intervalHandle = setInterval(writeChunk, 2);
+ this.started = true;
+ }
+
+ if (this.paused) {
+ this._intervalHandle = setInterval(writeChunk, 2);
+ this.paused = false;
+ }
+
+ const source = this;
+ function writeChunk() {
+ if (source.paused) {
+ return;
+ }
+
+ source.pushed++;
+
+ if (source.toPush > 0 && source.pushed > source.toPush) {
+ if (source._intervalHandle) {
+ clearInterval(source._intervalHandle);
+ source._intervalHandle = undefined;
+ }
+ source.closed = true;
+ source.onend();
+ } else {
+ source.ondata(randomChunk(128));
+ }
+ }
+ }
+
+ readStop() {
+ if (this.paused) {
+ return;
+ }
+
+ if (this.started) {
+ this.paused = true;
+ clearInterval(this._intervalHandle);
+ this._intervalHandle = undefined;
+ } else {
+ throw new Error('Can\'t pause reading an unstarted source.');
+ }
+ }
+ }
+
+ function randomChunk(size) {
+ let chunk = '';
+
+ for (let i = 0; i < size; ++i) {
+ // Add a random character from the basic printable ASCII set.
+ chunk += String.fromCharCode(Math.round(Math.random() * 84) + 32);
+ }
+
+ return chunk;
+ }
+
+ function readableStreamToArray(readable, reader) {
+ if (reader === undefined) {
+ reader = readable.getReader();
+ }
+
+ const chunks = [];
+
+ return pump();
+
+ function pump() {
+ return reader.read().then(result => {
+ if (result.done) {
+ return chunks;
+ }
+
+ chunks.push(result.value);
+ return pump();
+ });
+ }
+ }
+
+ class SequentialPullSource {
+ constructor(limit, options) {
+ const async = options && options.async;
+
+ this.current = 0;
+ this.limit = limit;
+ this.opened = false;
+ this.closed = false;
+
+ this._exec = f => f();
+ if (async) {
+ this._exec = f => setTimeout(f, 0);
+ }
+ }
+
+ open(cb) {
+ this._exec(() => {
+ this.opened = true;
+ cb();
+ });
+ }
+
+ read(cb) {
+ this._exec(() => {
+ if (++this.current <= this.limit) {
+ cb(null, false, this.current);
+ } else {
+ cb(null, true, null);
+ }
+ });
+ }
+
+ close(cb) {
+ this._exec(() => {
+ this.closed = true;
+ cb();
+ });
+ }
+ }
+
+ function sequentialReadableStream(limit, options) {
+ const sequentialSource = new SequentialPullSource(limit, options);
+
+ const stream = new ReadableStream({
+ start() {
+ return new Promise((resolve, reject) => {
+ sequentialSource.open(err => {
+ if (err) {
+ reject(err);
+ }
+ resolve();
+ });
+ });
+ },
+
+ pull(c) {
+ return new Promise((resolve, reject) => {
+ sequentialSource.read((err, done, chunk) => {
+ if (err) {
+ reject(err);
+ } else if (done) {
+ sequentialSource.close(err2 => {
+ if (err2) {
+ reject(err2);
+ }
+ c.close();
+ resolve();
+ });
+ } else {
+ c.enqueue(chunk);
+ resolve();
+ }
+ });
+ });
+ }
+ });
+
+ stream.source = sequentialSource;
+
+ return stream;
+ }
+
+
+ self.RandomPushSource = RandomPushSource;
+ self.readableStreamToArray = readableStreamToArray;
+ self.sequentialReadableStream = sequentialReadableStream;
+
+}());