Skip to content

Commit

Permalink
Merge pull request mozilla#8617 from mukulmishra18/network-streaming
Browse files Browse the repository at this point in the history
Adds Streams API support for networking task of PDF.js project.
  • Loading branch information
yurydelendik committed Jul 27, 2017
2 parents 37ac8f8 + 1091067 commit 343b4dc
Show file tree
Hide file tree
Showing 9 changed files with 481 additions and 238 deletions.
257 changes: 74 additions & 183 deletions src/core/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -198,218 +198,120 @@ IPDFStreamRangeReader.prototype = {

/** @implements {IPDFStream} */
var PDFWorkerStream = (function PDFWorkerStreamClosure() {
function PDFWorkerStream(params, msgHandler) {
this._queuedChunks = [];
var initialData = params.initialData;
if (initialData && initialData.length > 0) {
this._queuedChunks.push(initialData);
}
function PDFWorkerStream(msgHandler) {
this._msgHandler = msgHandler;

this._isRangeSupported = !(params.disableRange);
this._isStreamingSupported = !(params.disableStream);
this._contentLength = params.length;

this._contentLength = null;
this._fullRequestReader = null;
this._rangeReaders = [];

msgHandler.on('OnDataRange', this._onReceiveData.bind(this));
msgHandler.on('OnDataProgress', this._onProgress.bind(this));
this._rangeRequestReaders = [];
}
PDFWorkerStream.prototype = {
_onReceiveData: function PDFWorkerStream_onReceiveData(args) {
if (args.begin === undefined) {
if (this._fullRequestReader) {
this._fullRequestReader._enqueue(args.chunk);
} else {
this._queuedChunks.push(args.chunk);
}
} else {
var found = this._rangeReaders.some(function (rangeReader) {
if (rangeReader._begin !== args.begin) {
return false;
}
rangeReader._enqueue(args.chunk);
return true;
});
assert(found);
}
},

_onProgress: function PDFWorkerStream_onProgress(evt) {
if (this._rangeReaders.length > 0) {
// Reporting to first range reader.
var firstReader = this._rangeReaders[0];
if (firstReader.onProgress) {
firstReader.onProgress({ loaded: evt.loaded, });
}
}
},

_removeRangeReader: function PDFWorkerStream_removeRangeReader(reader) {
var i = this._rangeReaders.indexOf(reader);
if (i >= 0) {
this._rangeReaders.splice(i, 1);
}
},

getFullReader: function PDFWorkerStream_getFullReader() {
getFullReader() {
assert(!this._fullRequestReader);
var queuedChunks = this._queuedChunks;
this._queuedChunks = null;
return new PDFWorkerStreamReader(this, queuedChunks);
this._fullRequestReader = new PDFWorkerStreamReader(this._msgHandler);
return this._fullRequestReader;
},

getRangeReader: function PDFWorkerStream_getRangeReader(begin, end) {
var reader = new PDFWorkerStreamRangeReader(this, begin, end);
this._msgHandler.send('RequestDataRange', { begin, end, });
this._rangeReaders.push(reader);
getRangeReader(begin, end) {
let reader = new PDFWorkerStreamRangeReader(begin, end, this._msgHandler);
this._rangeRequestReaders.push(reader);
return reader;
},

cancelAllRequests: function PDFWorkerStream_cancelAllRequests(reason) {
cancelAllRequests(reason) {
if (this._fullRequestReader) {
this._fullRequestReader.cancel(reason);
}
var readers = this._rangeReaders.slice(0);
readers.forEach(function (rangeReader) {
rangeReader.cancel(reason);
let readers = this._rangeRequestReaders.slice(0);
readers.forEach(function (reader) {
reader.cancel(reason);
});
},
};

/** @implements {IPDFStreamReader} */
function PDFWorkerStreamReader(stream, queuedChunks) {
this._stream = stream;
this._done = false;
this._queuedChunks = queuedChunks || [];
this._requests = [];
this._headersReady = Promise.resolve();
stream._fullRequestReader = this;

this.onProgress = null; // not used
function PDFWorkerStreamReader(msgHandler) {
this._msgHandler = msgHandler;

this._contentLength = null;
this._isRangeSupported = false;
this._isStreamingSupported = false;

let readableStream = this._msgHandler.sendWithStream('GetReader');

this._reader = readableStream.getReader();

this._headersReady = this._msgHandler.sendWithPromise('ReaderHeadersReady').
then((data) => {
this._isStreamingSupported = data.isStreamingSupported;
this._isRangeSupported = data.isRangeSupported;
this._contentLength = data.contentLength;
});
}
PDFWorkerStreamReader.prototype = {
_enqueue: function PDFWorkerStreamReader_enqueue(chunk) {
if (this._done) {
return; // ignore new data
}
if (this._requests.length > 0) {
var requestCapability = this._requests.shift();
requestCapability.resolve({ value: chunk, done: false, });
return;
}
this._queuedChunks.push(chunk);
},

get headersReady() {
return this._headersReady;
},

get isRangeSupported() {
return this._stream._isRangeSupported;
get contentLength() {
return this._contentLength;
},

get isStreamingSupported() {
return this._stream._isStreamingSupported;
return this._isStreamingSupported;
},

get contentLength() {
return this._stream._contentLength;
get isRangeSupported() {
return this._isRangeSupported;
},

read: function PDFWorkerStreamReader_read() {
if (this._queuedChunks.length > 0) {
var chunk = this._queuedChunks.shift();
return Promise.resolve({ value: chunk, done: false, });
}
if (this._done) {
return Promise.resolve({ value: undefined, done: true, });
}
var requestCapability = createPromiseCapability();
this._requests.push(requestCapability);
return requestCapability.promise;
read() {
return this._reader.read().then(function({ value, done, }) {
if (done) {
return { value: undefined, done: true, };
}
// `value` is wrapped into Uint8Array, we need to
// unwrap it to ArrayBuffer for further processing.
return { value: value.buffer, done: false, };
});
},

cancel: function PDFWorkerStreamReader_cancel(reason) {
this._done = true;
this._requests.forEach(function (requestCapability) {
requestCapability.resolve({ value: undefined, done: true, });
});
this._requests = [];
cancel(reason) {
this._reader.cancel(reason);
},
};

/** @implements {IPDFStreamRangeReader} */
function PDFWorkerStreamRangeReader(stream, begin, end) {
this._stream = stream;
this._begin = begin;
this._end = end;
this._queuedChunk = null;
this._requests = [];
this._done = false;

function PDFWorkerStreamRangeReader(begin, end, msgHandler) {
this._msgHandler = msgHandler;
this.onProgress = null;

let readableStream = this._msgHandler.sendWithStream('GetRangeReader',
{ begin, end, });

this._reader = readableStream.getReader();
}
PDFWorkerStreamRangeReader.prototype = {
_enqueue: function PDFWorkerStreamRangeReader_enqueue(chunk) {
if (this._done) {
return; // ignore new data
}
if (this._requests.length === 0) {
this._queuedChunk = chunk;
} else {
var requestsCapability = this._requests.shift();
requestsCapability.resolve({ value: chunk, done: false, });
this._requests.forEach(function (requestCapability) {
requestCapability.resolve({ value: undefined, done: true, });
});
this._requests = [];
}
this._done = true;
this._stream._removeRangeReader(this);
},

get isStreamingSupported() {
return false;
},

read: function PDFWorkerStreamRangeReader_read() {
if (this._queuedChunk) {
return Promise.resolve({ value: this._queuedChunk, done: false, });
}
if (this._done) {
return Promise.resolve({ value: undefined, done: true, });
}
var requestCapability = createPromiseCapability();
this._requests.push(requestCapability);
return requestCapability.promise;
read() {
return this._reader.read().then(function({ value, done, }) {
if (done) {
return { value: undefined, done: true, };
}
return { value: value.buffer, done: false, };
});
},

cancel: function PDFWorkerStreamRangeReader_cancel(reason) {
this._done = true;
this._requests.forEach(function (requestCapability) {
requestCapability.resolve({ value: undefined, done: true, });
});
this._requests = [];
this._stream._removeRangeReader(this);
cancel(reason) {
this._reader.cancel(reason);
},
};

return PDFWorkerStream;
})();

/** @type IPDFStream */
var PDFNetworkStream;

/**
* Sets PDFNetworkStream class to be used as alternative PDF data transport.
* @param {IPDFStream} cls - the PDF data transport.
*/
function setPDFNetworkStreamClass(cls) {
PDFNetworkStream = cls;
}

var WorkerMessageHandler = {
setup(handler, port) {
var testMessageProcessed = false;
Expand Down Expand Up @@ -536,35 +438,16 @@ var WorkerMessageHandler = {
return pdfManagerCapability.promise;
}

var pdfStream;
var pdfStream, cachedChunks = [];
try {
if (source.chunkedViewerLoading) {
pdfStream = new PDFWorkerStream(source, handler);
} else {
if (!PDFNetworkStream) {
throw new Error('./network module is not loaded');
}
pdfStream = new PDFNetworkStream(data);
}
pdfStream = new PDFWorkerStream(handler);
} catch (ex) {
pdfManagerCapability.reject(ex);
return pdfManagerCapability.promise;
}

var fullRequest = pdfStream.getFullReader();
fullRequest.headersReady.then(function () {
if (!fullRequest.isStreamingSupported ||
!fullRequest.isRangeSupported) {
// If stream or range are disabled, it's our only way to report
// loading progress.
fullRequest.onProgress = function (evt) {
handler.send('DocProgress', {
loaded: evt.loaded,
total: evt.total,
});
};
}

if (!fullRequest.isRangeSupported) {
return;
}
Expand All @@ -580,14 +463,23 @@ var WorkerMessageHandler = {
disableAutoFetch,
rangeChunkSize: source.rangeChunkSize,
}, evaluatorOptions, docBaseUrl);
// There may be a chance that `pdfManager` is not initialized
// for first few runs of `readchunk` block of code. Be sure
// to send all cached chunks, if any, to chunked_stream via
// pdf_manager.
for (let i = 0; i < cachedChunks.length; i++) {
pdfManager.sendProgressiveData(cachedChunks[i]);
}

cachedChunks = [];
pdfManagerCapability.resolve(pdfManager);
cancelXHRs = null;
}).catch(function (reason) {
pdfManagerCapability.reject(reason);
cancelXHRs = null;
});

var cachedChunks = [], loaded = 0;
var loaded = 0;
var flushChunks = function () {
var pdfFile = arraysToBytes(cachedChunks);
if (source.length && pdfFile.length !== source.length) {
Expand Down Expand Up @@ -969,7 +861,6 @@ if (typeof window === 'undefined' && !isNodeJS() &&
}

export {
setPDFNetworkStreamClass,
WorkerTask,
WorkerMessageHandler,
};
Loading

0 comments on commit 343b4dc

Please sign in to comment.