From 109106794d226a2dd8c764681b411ffeb34efc38 Mon Sep 17 00:00:00 2001 From: Mukul Mishra Date: Fri, 30 Jun 2017 23:29:52 +0530 Subject: [PATCH] Adds Streams API support for networking task of PDF.js project. network.js file moved to main thread and `PDFNetworkStream` implemented at worker thread, that is used to ask for data whenever worker needs. --- src/core/worker.js | 257 +++++++++---------------------- src/display/api.js | 153 +++++++++++++----- src/{core => display}/network.js | 2 +- src/display/transport_stream.js | 243 +++++++++++++++++++++++++++++ src/pdf.js | 5 + src/pdf.worker.js | 5 - src/shared/util.js | 49 ++++-- src/worker_loader.js | 3 +- test/unit/network_spec.js | 2 +- 9 files changed, 481 insertions(+), 238 deletions(-) rename src/{core => display}/network.js (99%) create mode 100644 src/display/transport_stream.js diff --git a/src/core/worker.js b/src/core/worker.js index 8be7d816d3b58..4156e48137daa 100644 --- a/src/core/worker.js +++ b/src/core/worker.js @@ -198,218 +198,120 @@ IPDFStreamRangeReader.prototype = { /** @implements {IPDFStream} */ var PDFWorkerStream = (function PDFWorkerStreamClosure() { - function PDFWorkerStream(params, msgHandler) { - this._queuedChunks = []; - var initialData = params.initialData; - if (initialData && initialData.length > 0) { - this._queuedChunks.push(initialData); - } + function PDFWorkerStream(msgHandler) { this._msgHandler = msgHandler; - - this._isRangeSupported = !(params.disableRange); - this._isStreamingSupported = !(params.disableStream); - this._contentLength = params.length; - + this._contentLength = null; this._fullRequestReader = null; - this._rangeReaders = []; - - msgHandler.on('OnDataRange', this._onReceiveData.bind(this)); - msgHandler.on('OnDataProgress', this._onProgress.bind(this)); + this._rangeRequestReaders = []; } PDFWorkerStream.prototype = { - _onReceiveData: function PDFWorkerStream_onReceiveData(args) { - if (args.begin === undefined) { - if (this._fullRequestReader) { - this._fullRequestReader._enqueue(args.chunk); - } else { - this._queuedChunks.push(args.chunk); - } - } else { - var found = this._rangeReaders.some(function (rangeReader) { - if (rangeReader._begin !== args.begin) { - return false; - } - rangeReader._enqueue(args.chunk); - return true; - }); - assert(found); - } - }, - - _onProgress: function PDFWorkerStream_onProgress(evt) { - if (this._rangeReaders.length > 0) { - // Reporting to first range reader. - var firstReader = this._rangeReaders[0]; - if (firstReader.onProgress) { - firstReader.onProgress({ loaded: evt.loaded, }); - } - } - }, - - _removeRangeReader: function PDFWorkerStream_removeRangeReader(reader) { - var i = this._rangeReaders.indexOf(reader); - if (i >= 0) { - this._rangeReaders.splice(i, 1); - } - }, - - getFullReader: function PDFWorkerStream_getFullReader() { + getFullReader() { assert(!this._fullRequestReader); - var queuedChunks = this._queuedChunks; - this._queuedChunks = null; - return new PDFWorkerStreamReader(this, queuedChunks); + this._fullRequestReader = new PDFWorkerStreamReader(this._msgHandler); + return this._fullRequestReader; }, - getRangeReader: function PDFWorkerStream_getRangeReader(begin, end) { - var reader = new PDFWorkerStreamRangeReader(this, begin, end); - this._msgHandler.send('RequestDataRange', { begin, end, }); - this._rangeReaders.push(reader); + getRangeReader(begin, end) { + let reader = new PDFWorkerStreamRangeReader(begin, end, this._msgHandler); + this._rangeRequestReaders.push(reader); return reader; }, - cancelAllRequests: function PDFWorkerStream_cancelAllRequests(reason) { + cancelAllRequests(reason) { if (this._fullRequestReader) { this._fullRequestReader.cancel(reason); } - var readers = this._rangeReaders.slice(0); - readers.forEach(function (rangeReader) { - rangeReader.cancel(reason); + let readers = this._rangeRequestReaders.slice(0); + readers.forEach(function (reader) { + reader.cancel(reason); }); }, }; /** @implements {IPDFStreamReader} */ - function PDFWorkerStreamReader(stream, queuedChunks) { - this._stream = stream; - this._done = false; - this._queuedChunks = queuedChunks || []; - this._requests = []; - this._headersReady = Promise.resolve(); - stream._fullRequestReader = this; - - this.onProgress = null; // not used + function PDFWorkerStreamReader(msgHandler) { + this._msgHandler = msgHandler; + + this._contentLength = null; + this._isRangeSupported = false; + this._isStreamingSupported = false; + + let readableStream = this._msgHandler.sendWithStream('GetReader'); + + this._reader = readableStream.getReader(); + + this._headersReady = this._msgHandler.sendWithPromise('ReaderHeadersReady'). + then((data) => { + this._isStreamingSupported = data.isStreamingSupported; + this._isRangeSupported = data.isRangeSupported; + this._contentLength = data.contentLength; + }); } PDFWorkerStreamReader.prototype = { - _enqueue: function PDFWorkerStreamReader_enqueue(chunk) { - if (this._done) { - return; // ignore new data - } - if (this._requests.length > 0) { - var requestCapability = this._requests.shift(); - requestCapability.resolve({ value: chunk, done: false, }); - return; - } - this._queuedChunks.push(chunk); - }, - get headersReady() { return this._headersReady; }, - get isRangeSupported() { - return this._stream._isRangeSupported; + get contentLength() { + return this._contentLength; }, get isStreamingSupported() { - return this._stream._isStreamingSupported; + return this._isStreamingSupported; }, - get contentLength() { - return this._stream._contentLength; + get isRangeSupported() { + return this._isRangeSupported; }, - read: function PDFWorkerStreamReader_read() { - if (this._queuedChunks.length > 0) { - var chunk = this._queuedChunks.shift(); - return Promise.resolve({ value: chunk, done: false, }); - } - if (this._done) { - return Promise.resolve({ value: undefined, done: true, }); - } - var requestCapability = createPromiseCapability(); - this._requests.push(requestCapability); - return requestCapability.promise; + read() { + return this._reader.read().then(function({ value, done, }) { + if (done) { + return { value: undefined, done: true, }; + } + // `value` is wrapped into Uint8Array, we need to + // unwrap it to ArrayBuffer for further processing. + return { value: value.buffer, done: false, }; + }); }, - cancel: function PDFWorkerStreamReader_cancel(reason) { - this._done = true; - this._requests.forEach(function (requestCapability) { - requestCapability.resolve({ value: undefined, done: true, }); - }); - this._requests = []; + cancel(reason) { + this._reader.cancel(reason); }, }; /** @implements {IPDFStreamRangeReader} */ - function PDFWorkerStreamRangeReader(stream, begin, end) { - this._stream = stream; - this._begin = begin; - this._end = end; - this._queuedChunk = null; - this._requests = []; - this._done = false; - + function PDFWorkerStreamRangeReader(begin, end, msgHandler) { + this._msgHandler = msgHandler; this.onProgress = null; + + let readableStream = this._msgHandler.sendWithStream('GetRangeReader', + { begin, end, }); + + this._reader = readableStream.getReader(); } PDFWorkerStreamRangeReader.prototype = { - _enqueue: function PDFWorkerStreamRangeReader_enqueue(chunk) { - if (this._done) { - return; // ignore new data - } - if (this._requests.length === 0) { - this._queuedChunk = chunk; - } else { - var requestsCapability = this._requests.shift(); - requestsCapability.resolve({ value: chunk, done: false, }); - this._requests.forEach(function (requestCapability) { - requestCapability.resolve({ value: undefined, done: true, }); - }); - this._requests = []; - } - this._done = true; - this._stream._removeRangeReader(this); - }, - get isStreamingSupported() { return false; }, - read: function PDFWorkerStreamRangeReader_read() { - if (this._queuedChunk) { - return Promise.resolve({ value: this._queuedChunk, done: false, }); - } - if (this._done) { - return Promise.resolve({ value: undefined, done: true, }); - } - var requestCapability = createPromiseCapability(); - this._requests.push(requestCapability); - return requestCapability.promise; + read() { + return this._reader.read().then(function({ value, done, }) { + if (done) { + return { value: undefined, done: true, }; + } + return { value: value.buffer, done: false, }; + }); }, - cancel: function PDFWorkerStreamRangeReader_cancel(reason) { - this._done = true; - this._requests.forEach(function (requestCapability) { - requestCapability.resolve({ value: undefined, done: true, }); - }); - this._requests = []; - this._stream._removeRangeReader(this); + cancel(reason) { + this._reader.cancel(reason); }, }; return PDFWorkerStream; })(); -/** @type IPDFStream */ -var PDFNetworkStream; - -/** - * Sets PDFNetworkStream class to be used as alternative PDF data transport. - * @param {IPDFStream} cls - the PDF data transport. - */ -function setPDFNetworkStreamClass(cls) { - PDFNetworkStream = cls; -} - var WorkerMessageHandler = { setup(handler, port) { var testMessageProcessed = false; @@ -536,16 +438,9 @@ var WorkerMessageHandler = { return pdfManagerCapability.promise; } - var pdfStream; + var pdfStream, cachedChunks = []; try { - if (source.chunkedViewerLoading) { - pdfStream = new PDFWorkerStream(source, handler); - } else { - if (!PDFNetworkStream) { - throw new Error('./network module is not loaded'); - } - pdfStream = new PDFNetworkStream(data); - } + pdfStream = new PDFWorkerStream(handler); } catch (ex) { pdfManagerCapability.reject(ex); return pdfManagerCapability.promise; @@ -553,18 +448,6 @@ var WorkerMessageHandler = { var fullRequest = pdfStream.getFullReader(); fullRequest.headersReady.then(function () { - if (!fullRequest.isStreamingSupported || - !fullRequest.isRangeSupported) { - // If stream or range are disabled, it's our only way to report - // loading progress. - fullRequest.onProgress = function (evt) { - handler.send('DocProgress', { - loaded: evt.loaded, - total: evt.total, - }); - }; - } - if (!fullRequest.isRangeSupported) { return; } @@ -580,6 +463,15 @@ var WorkerMessageHandler = { disableAutoFetch, rangeChunkSize: source.rangeChunkSize, }, evaluatorOptions, docBaseUrl); + // There may be a chance that `pdfManager` is not initialized + // for first few runs of `readchunk` block of code. Be sure + // to send all cached chunks, if any, to chunked_stream via + // pdf_manager. + for (let i = 0; i < cachedChunks.length; i++) { + pdfManager.sendProgressiveData(cachedChunks[i]); + } + + cachedChunks = []; pdfManagerCapability.resolve(pdfManager); cancelXHRs = null; }).catch(function (reason) { @@ -587,7 +479,7 @@ var WorkerMessageHandler = { cancelXHRs = null; }); - var cachedChunks = [], loaded = 0; + var loaded = 0; var flushChunks = function () { var pdfFile = arraysToBytes(cachedChunks); if (source.length && pdfFile.length !== source.length) { @@ -969,7 +861,6 @@ if (typeof window === 'undefined' && !isNodeJS() && } export { - setPDFNetworkStreamClass, WorkerTask, WorkerMessageHandler, }; diff --git a/src/display/api.js b/src/display/api.js index b2c86e36141ab..c62348747e711 100644 --- a/src/display/api.js +++ b/src/display/api.js @@ -15,7 +15,7 @@ /* globals requirejs, __non_webpack_require__ */ import { - createPromiseCapability, deprecated, getVerbosityLevel, globalScope, + assert, createPromiseCapability, deprecated, getVerbosityLevel, globalScope, info, InvalidPDFException, isArray, isArrayBuffer, isInt, isSameOrigin, loadJpegStream, MessageHandler, MissingPDFException, NativeImageDecoding, PageViewport, PasswordException, StatTimer, stringToBytes, @@ -28,6 +28,7 @@ import { import { FontFaceObject, FontLoader } from './font_loader'; import { CanvasGraphics } from './canvas'; import { Metadata } from './metadata'; +import { PDFDataTransportStream } from './transport_stream'; var DEFAULT_RANGE_CHUNK_SIZE = 65536; // 2^16 = 65536 @@ -80,6 +81,17 @@ if (typeof PDFJSDev !== 'undefined' && }) : null; } +/** @type IPDFStream */ +var PDFNetworkStream; + +/** + * Sets PDFNetworkStream class to be used as alternative PDF data transport. + * @param {IPDFStream} cls - the PDF data transport. + */ +function setPDFNetworkStreamClass(cls) { + PDFNetworkStream = cls; +} + /** * Document initialization / loading parameters object. * @@ -281,8 +293,20 @@ function getDocument(src, pdfDataRangeTransport, if (task.destroyed) { throw new Error('Loading aborted'); } + + let networkStream; + if (rangeTransport) { + networkStream = new PDFDataTransportStream(params, rangeTransport); + } else if (!params.data) { + networkStream = new PDFNetworkStream({ + source: params, + disableRange: getDefaultSetting('disableRange'), + }); + } + var messageHandler = new MessageHandler(docId, workerId, worker.port); - var transport = new WorkerTransport(messageHandler, task, rangeTransport, + messageHandler.postMessageTransfers = worker.postMessageTransfers; + var transport = new WorkerTransport(messageHandler, task, networkStream, CMapReaderFactory); task._transport = transport; messageHandler.send('Ready', null); @@ -317,7 +341,6 @@ function _fetchDocument(worker, source, pdfDataRangeTransport, docId) { return worker.messageHandler.sendWithPromise('GetDocRequest', { docId, source, - disableRange: getDefaultSetting('disableRange'), maxImageSize: getDefaultSetting('maxImageSize'), disableFontFace: getDefaultSetting('disableFontFace'), disableCreateObjectURL: getDefaultSetting('disableCreateObjectURL'), @@ -1238,9 +1261,7 @@ var PDFWorker = (function PDFWorkerClosure() { // pdf.worker.js file is needed. if (typeof PDFJSDev === 'undefined' || !PDFJSDev.test('PRODUCTION')) { if (typeof SystemJS === 'object') { - Promise.all([SystemJS.import('pdfjs/core/network'), - SystemJS.import('pdfjs/core/worker')]).then((modules) => { - var worker = modules[1]; + SystemJS.import('pdfjs/core/worker').then((worker) => { WorkerMessageHandler = worker.WorkerMessageHandler; fakeWorkerFilesLoadedCapability.resolve(WorkerMessageHandler); }); @@ -1254,7 +1275,6 @@ var PDFWorker = (function PDFWorkerClosure() { } } else if (PDFJSDev.test('SINGLE_FILE')) { var pdfjsCoreWorker = require('../core/worker.js'); - require('../core/network.js'); WorkerMessageHandler = pdfjsCoreWorker.WorkerMessageHandler; fakeWorkerFilesLoadedCapability.resolve(WorkerMessageHandler); } else { @@ -1285,6 +1305,7 @@ var PDFWorker = (function PDFWorkerClosure() { this.name = name; this.destroyed = false; + this.postMessageTransfers = true; this._readyCapability = createPromiseCapability(); this._port = null; @@ -1381,6 +1402,7 @@ var PDFWorker = (function PDFWorkerClosure() { this._port = worker; this._webWorker = worker; if (!data.supportTransfers) { + this.postMessageTransfers = false; isPostMessageTransfersDisabled = true; } this._readyCapability.resolve(); @@ -1513,11 +1535,10 @@ var PDFWorker = (function PDFWorkerClosure() { * @ignore */ var WorkerTransport = (function WorkerTransportClosure() { - function WorkerTransport(messageHandler, loadingTask, pdfDataRangeTransport, + function WorkerTransport(messageHandler, loadingTask, networkStream, CMapReaderFactory) { this.messageHandler = messageHandler; this.loadingTask = loadingTask; - this.pdfDataRangeTransport = pdfDataRangeTransport; this.commonObjs = new PDFObjects(); this.fontLoader = new FontLoader(loadingTask.docId); this.CMapReaderFactory = new CMapReaderFactory({ @@ -1529,6 +1550,10 @@ var WorkerTransport = (function WorkerTransportClosure() { this.destroyCapability = null; this._passwordCapability = null; + this._networkStream = networkStream; + this._fullReader = null; + this._lastProgress = null; + this.pageCache = []; this.pagePromises = []; this.downloadInfoCapability = createPromiseCapability(); @@ -1564,10 +1589,10 @@ var WorkerTransport = (function WorkerTransportClosure() { waitOn.push(terminated); Promise.all(waitOn).then(() => { this.fontLoader.clear(); - if (this.pdfDataRangeTransport) { - this.pdfDataRangeTransport.abort(); - this.pdfDataRangeTransport = null; + if (this._networkStream) { + this._networkStream.cancelAllRequests(); } + if (this.messageHandler) { this.messageHandler.destroy(); this.messageHandler = null; @@ -1581,32 +1606,92 @@ var WorkerTransport = (function WorkerTransportClosure() { var messageHandler = this.messageHandler; var loadingTask = this.loadingTask; - var pdfDataRangeTransport = this.pdfDataRangeTransport; - if (pdfDataRangeTransport) { - pdfDataRangeTransport.addRangeListener(function(begin, chunk) { - messageHandler.send('OnDataRange', { - begin, - chunk, + messageHandler.on('GetReader', function(data, sink) { + assert(this._networkStream); + this._fullReader = this._networkStream.getFullReader(); + this._fullReader.onProgress = (evt) => { + this._lastProgress = { + loaded: evt.loaded, + total: evt.total, + }; + }; + sink.onPull = () => { + this._fullReader.read().then(function({ value, done, }) { + if (done) { + sink.close(); + return; + } + assert(isArrayBuffer(value)); + // Enqueue data chunk into sink, and transfer it + // to other side as `Transferable` object. + sink.enqueue(new Uint8Array(value), 1, [value]); + }).catch((reason) => { + sink.error(reason); }); - }); + }; + + sink.onCancel = (reason) => { + this._fullReader.cancel(reason); + }; + }, this); + + messageHandler.on('ReaderHeadersReady', function(data) { + let headersCapability = createPromiseCapability(); + let fullReader = this._fullReader; + fullReader.headersReady.then(() => { + // If stream or range are disabled, it's our only way to report + // loading progress. + if (!fullReader.isStreamingSupported || + !fullReader.isRangeSupported) { + if (this._lastProgress) { + let loadingTask = this.loadingTask; + if (loadingTask.onProgress) { + loadingTask.onProgress(this._lastProgress); + } + } + fullReader.onProgress = (evt) => { + let loadingTask = this.loadingTask; + if (loadingTask.onProgress) { + loadingTask.onProgress({ + loaded: evt.loaded, + total: evt.total, + }); + } + }; + } - pdfDataRangeTransport.addProgressListener(function(loaded) { - messageHandler.send('OnDataProgress', { - loaded, + headersCapability.resolve({ + isStreamingSupported: fullReader.isStreamingSupported, + isRangeSupported: fullReader.isRangeSupported, + contentLength: fullReader.contentLength, }); - }); + }, headersCapability.reject); + + return headersCapability.promise; + }, this); - pdfDataRangeTransport.addProgressiveReadListener(function(chunk) { - messageHandler.send('OnDataRange', { - chunk, + messageHandler.on('GetRangeReader', function(data, sink) { + assert(this._networkStream); + let _rangeReader = + this._networkStream.getRangeReader(data.begin, data.end); + + sink.onPull = () => { + _rangeReader.read().then(function({ value, done, }) { + if (done) { + sink.close(); + return; + } + assert(isArrayBuffer(value)); + sink.enqueue(new Uint8Array(value), 1, [value]); + }).catch((reason) => { + sink.error(reason); }); - }); + }; - messageHandler.on('RequestDataRange', - function transportDataRange(data) { - pdfDataRangeTransport.requestDataRange(data.begin, data.end); - }, this); - } + sink.onCancel = (reason) => { + _rangeReader.cancel(reason); + }; + }, this); messageHandler.on('GetDoc', function transportDoc(data) { var pdfInfo = data.pdfInfo; @@ -1668,9 +1753,6 @@ var WorkerTransport = (function WorkerTransportClosure() { }, this); messageHandler.on('PDFManagerReady', function transportPage(data) { - if (this.pdfDataRangeTransport) { - this.pdfDataRangeTransport.transportReady(); - } }, this); messageHandler.on('StartRenderPage', function transportRender(data) { @@ -2335,6 +2417,7 @@ export { PDFWorker, PDFDocumentProxy, PDFPageProxy, + setPDFNetworkStreamClass, _UnsupportedManager, version, build, diff --git a/src/core/network.js b/src/display/network.js similarity index 99% rename from src/core/network.js rename to src/display/network.js index fd78a3ec7ac74..66e77ef164724 100644 --- a/src/core/network.js +++ b/src/display/network.js @@ -17,7 +17,7 @@ import { assert, createPromiseCapability, globalScope, isInt, MissingPDFException, UnexpectedResponseException } from '../shared/util'; -import { setPDFNetworkStreamClass } from './worker'; +import { setPDFNetworkStreamClass } from './api'; if (typeof PDFJSDev !== 'undefined' && PDFJSDev.test('FIREFOX || MOZCENTRAL')) { throw new Error('Module "./network" shall not ' + diff --git a/src/display/transport_stream.js b/src/display/transport_stream.js new file mode 100644 index 0000000000000..a8992058909d0 --- /dev/null +++ b/src/display/transport_stream.js @@ -0,0 +1,243 @@ +/* Copyright 2012 Mozilla Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { assert, createPromiseCapability } from '../shared/util'; + + /** @implements {IPDFStream} */ +var PDFDataTransportStream = (function PDFDataTransportStreamClosure() { + function PDFDataTransportStream(params, pdfDataRangeTransport) { + assert(pdfDataRangeTransport); + + this._queuedChunks = []; + var initialData = params.initialData; + if (initialData && initialData.length > 0) { + let buffer = new Uint8Array(initialData).buffer; + this._queuedChunks.push(buffer); + } + + this._pdfDataRangeTransport = pdfDataRangeTransport; + this._isRangeSupported = !(params.disableRange); + this._isStreamingSupported = !(params.disableStream); + this._contentLength = params.length; + + this._fullRequestReader = null; + this._rangeReaders = []; + + this._pdfDataRangeTransport.addRangeListener((begin, chunk) => { + this._onReceiveData({ begin, chunk, }); + }); + + this._pdfDataRangeTransport.addProgressListener((loaded) => { + this._onProgress({ loaded, }); + }); + + this._pdfDataRangeTransport.addProgressiveReadListener((chunk) => { + this._onReceiveData({ chunk, }); + }); + + this._pdfDataRangeTransport.transportReady(); + } + PDFDataTransportStream.prototype = { + _onReceiveData: function PDFDataTransportStream_onReceiveData(args) { + let buffer = new Uint8Array(args.chunk).buffer; + if (args.begin === undefined) { + if (this._fullRequestReader) { + this._fullRequestReader._enqueue(buffer); + } else { + this._queuedChunks.push(buffer); + } + } else { + var found = this._rangeReaders.some(function (rangeReader) { + if (rangeReader._begin !== args.begin) { + return false; + } + rangeReader._enqueue(buffer); + return true; + }); + assert(found); + } + }, + + _onProgress: function PDFDataTransportStream_onDataProgress(evt) { + if (this._rangeReaders.length > 0) { + // Reporting to first range reader. + var firstReader = this._rangeReaders[0]; + if (firstReader.onProgress) { + firstReader.onProgress({ loaded: evt.loaded, }); + } + } + }, + + _removeRangeReader: + function PDFDataTransportStream_removeRangeReader(reader) { + var i = this._rangeReaders.indexOf(reader); + if (i >= 0) { + this._rangeReaders.splice(i, 1); + } + }, + + getFullReader: function PDFDataTransportStream_getFullReader() { + assert(!this._fullRequestReader); + var queuedChunks = this._queuedChunks; + this._queuedChunks = null; + return new PDFDataTransportStreamReader(this, queuedChunks); + }, + + getRangeReader: function PDFDataTransportStream_getRangeReader(begin, end) { + var reader = new PDFDataTransportStreamRangeReader(this, begin, end); + this._pdfDataRangeTransport.requestDataRange(begin, end); + this._rangeReaders.push(reader); + return reader; + }, + + cancelAllRequests: + function PDFDataTransportStream_cancelAllRequests(reason) { + if (this._fullRequestReader) { + this._fullRequestReader.cancel(reason); + } + var readers = this._rangeReaders.slice(0); + readers.forEach(function (rangeReader) { + rangeReader.cancel(reason); + }); + this._pdfDataRangeTransport.abort(); + }, + }; + + /** @implements {IPDFStreamReader} */ + function PDFDataTransportStreamReader(stream, queuedChunks) { + this._stream = stream; + this._done = false; + this._queuedChunks = queuedChunks || []; + this._requests = []; + this._headersReady = Promise.resolve(); + stream._fullRequestReader = this; + + this.onProgress = null; // not used + } + PDFDataTransportStreamReader.prototype = { + _enqueue: function PDFDataTransportStreamReader_enqueue(chunk) { + if (this._done) { + return; // ignore new data + } + if (this._requests.length > 0) { + var requestCapability = this._requests.shift(); + requestCapability.resolve({ value: chunk, done: false, }); + return; + } + this._queuedChunks.push(chunk); + }, + + get headersReady() { + return this._headersReady; + }, + + get isRangeSupported() { + return this._stream._isRangeSupported; + }, + + get isStreamingSupported() { + return this._stream._isStreamingSupported; + }, + + get contentLength() { + return this._stream._contentLength; + }, + + read: function PDFDataTransportStreamReader_read() { + if (this._queuedChunks.length > 0) { + var chunk = this._queuedChunks.shift(); + return Promise.resolve({ value: chunk, done: false, }); + } + if (this._done) { + return Promise.resolve({ value: undefined, done: true, }); + } + var requestCapability = createPromiseCapability(); + this._requests.push(requestCapability); + return requestCapability.promise; + }, + + cancel: function PDFDataTransportStreamReader_cancel(reason) { + this._done = true; + this._requests.forEach(function (requestCapability) { + requestCapability.resolve({ value: undefined, done: true, }); + }); + this._requests = []; + }, + }; + + /** @implements {IPDFStreamRangeReader} */ + function PDFDataTransportStreamRangeReader(stream, begin, end) { + this._stream = stream; + this._begin = begin; + this._end = end; + this._queuedChunk = null; + this._requests = []; + this._done = false; + + this.onProgress = null; + } + PDFDataTransportStreamRangeReader.prototype = { + _enqueue: function PDFDataTransportStreamRangeReader_enqueue(chunk) { + if (this._done) { + return; // ignore new data + } + if (this._requests.length === 0) { + this._queuedChunk = chunk; + } else { + var requestsCapability = this._requests.shift(); + requestsCapability.resolve({ value: chunk, done: false, }); + this._requests.forEach(function (requestCapability) { + requestCapability.resolve({ value: undefined, done: true, }); + }); + this._requests = []; + } + this._done = true; + this._stream._removeRangeReader(this); + }, + + get isStreamingSupported() { + return false; + }, + + read: function PDFDataTransportStreamRangeReader_read() { + if (this._queuedChunk) { + let chunk = this._queuedChunk; + this._queuedChunk = null; + return Promise.resolve({ value: chunk, done: false, }); + } + if (this._done) { + return Promise.resolve({ value: undefined, done: true, }); + } + var requestCapability = createPromiseCapability(); + this._requests.push(requestCapability); + return requestCapability.promise; + }, + + cancel: function PDFDataTransportStreamRangeReader_cancel(reason) { + this._done = true; + this._requests.forEach(function (requestCapability) { + requestCapability.resolve({ value: undefined, done: true, }); + }); + this._requests = []; + this._stream._removeRangeReader(this); + }, + }; + + return PDFDataTransportStream; +})(); + +export { + PDFDataTransportStream, +}; diff --git a/src/pdf.js b/src/pdf.js index 0682da1992792..2a451c2722b18 100644 --- a/src/pdf.js +++ b/src/pdf.js @@ -29,6 +29,11 @@ var pdfjsDisplayAnnotationLayer = require('./display/annotation_layer.js'); var pdfjsDisplayDOMUtils = require('./display/dom_utils.js'); var pdfjsDisplaySVG = require('./display/svg.js'); +if (typeof PDFJSDev === 'undefined' || + !PDFJSDev.test('FIREFOX || MOZCENTRAL')) { + require('./display/network.js'); +} + exports.PDFJS = pdfjsDisplayGlobal.PDFJS; exports.build = pdfjsDisplayAPI.build; exports.version = pdfjsDisplayAPI.version; diff --git a/src/pdf.worker.js b/src/pdf.worker.js index 93cfa6c9ffb37..7306f17d2c459 100644 --- a/src/pdf.worker.js +++ b/src/pdf.worker.js @@ -21,9 +21,4 @@ var pdfjsBuild = PDFJSDev.eval('BUNDLE_BUILD'); var pdfjsCoreWorker = require('./core/worker.js'); -if (typeof PDFJSDev === 'undefined' || - !PDFJSDev.test('FIREFOX || MOZCENTRAL')) { - require('./core/network.js'); -} - exports.WorkerMessageHandler = pdfjsCoreWorker.WorkerMessageHandler; diff --git a/src/shared/util.js b/src/shared/util.js index a93216101d1f9..343567058dcab 100644 --- a/src/shared/util.js +++ b/src/shared/util.js @@ -1222,6 +1222,20 @@ function resolveCall(fn, args, thisArg = null) { }); } +function wrapReason(reason) { + if (typeof reason !== 'object') { + return reason; + } + switch (reason.name) { + case 'MissingPDFException': + return new MissingPDFException(reason.message); + case 'UnexpectedResponseException': + return new UnexpectedResponseException(reason.message, reason.status); + default: + return new UnknownErrorException(reason.message, reason.details); + } +} + function resolveOrReject(capability, success, reason) { if (success) { capability.resolve(); @@ -1431,13 +1445,14 @@ MessageHandler.prototype = { let targetName = data.sourceName; let capability = createPromiseCapability(); - let sendStreamRequest = ({ stream, chunk, success, reason, }) => { - this.comObj.postMessage({ sourceName, targetName, stream, streamId, - chunk, success, reason, }); + let sendStreamRequest = ({ stream, chunk, transfers, + success, reason, }) => { + this.postMessage({ sourceName, targetName, stream, streamId, + chunk, success, reason, }, transfers); }; let streamSink = { - enqueue(chunk, size = 1) { + enqueue(chunk, size = 1, transfers) { if (this.isCancelled) { return; } @@ -1450,7 +1465,7 @@ MessageHandler.prototype = { this.sinkCapability = createPromiseCapability(); this.ready = this.sinkCapability.promise; } - sendStreamRequest({ stream: 'enqueue', chunk, }); + sendStreamRequest({ stream: 'enqueue', chunk, transfers, }); }, close() { @@ -1462,6 +1477,10 @@ MessageHandler.prototype = { }, error(reason) { + if (this.isCancelled) { + return; + } + this.isCancelled = true; sendStreamRequest({ stream: 'error', reason, }); }, @@ -1510,11 +1529,11 @@ MessageHandler.prototype = { switch (data.stream) { case 'start_complete': resolveOrReject(this.streamControllers[data.streamId].startCall, - data.success, data.reason); + data.success, wrapReason(data.reason)); break; case 'pull_complete': resolveOrReject(this.streamControllers[data.streamId].pullCall, - data.success, data.reason); + data.success, wrapReason(data.reason)); break; case 'pull': // Ignore any pull after close is called. @@ -1539,11 +1558,15 @@ MessageHandler.prototype = { }); break; case 'enqueue': + assert(this.streamControllers[data.streamId], + 'enqueue should have stream controller'); if (!this.streamControllers[data.streamId].isClosed) { this.streamControllers[data.streamId].controller.enqueue(data.chunk); } break; case 'close': + assert(this.streamControllers[data.streamId], + 'close should have stream controller'); if (this.streamControllers[data.streamId].isClosed) { break; } @@ -1552,12 +1575,15 @@ MessageHandler.prototype = { deleteStreamController(); break; case 'error': - this.streamControllers[data.streamId].controller.error(data.reason); + assert(this.streamControllers[data.streamId], + 'error should have stream controller'); + this.streamControllers[data.streamId].controller. + error(wrapReason(data.reason)); deleteStreamController(); break; case 'cancel_complete': resolveOrReject(this.streamControllers[data.streamId].cancelCall, - data.success, data.reason); + data.success, wrapReason(data.reason)); deleteStreamController(); break; case 'cancel': @@ -1565,13 +1591,14 @@ MessageHandler.prototype = { break; } resolveCall(this.streamSinks[data.streamId].onCancel, - [data.reason]).then(() => { + [wrapReason(data.reason)]).then(() => { sendStreamResponse({ stream: 'cancel_complete', success: true, }); }, (reason) => { sendStreamResponse({ stream: 'cancel_complete', success: false, reason, }); }); - this.streamSinks[data.streamId].sinkCapability.reject(data.reason); + this.streamSinks[data.streamId].sinkCapability. + reject(wrapReason(data.reason)); this.streamSinks[data.streamId].isCancelled = true; delete this.streamSinks[data.streamId]; break; diff --git a/src/worker_loader.js b/src/worker_loader.js index 6745465a753f4..604075714abfa 100644 --- a/src/worker_loader.js +++ b/src/worker_loader.js @@ -28,7 +28,6 @@ importScripts('./shared/compatibility.js'); importScripts('../node_modules/systemjs/dist/system.js'); importScripts('../systemjs.config.js'); -Promise.all([SystemJS.import('pdfjs/core/network'), - SystemJS.import('pdfjs/core/worker')]).then(function () { +SystemJS.import('pdfjs/core/worker').then(function () { // Worker is loaded at this point. }); diff --git a/test/unit/network_spec.js b/test/unit/network_spec.js index f51f8927992c4..50b87f885a8a3 100644 --- a/test/unit/network_spec.js +++ b/test/unit/network_spec.js @@ -13,7 +13,7 @@ * limitations under the License. */ -import { PDFNetworkStream } from '../../src/core/network'; +import { PDFNetworkStream } from '../../src/display/network'; describe('network', function() { var pdf1 = new URL('../pdfs/tracemonkey.pdf', window.location).href;