From e7ef50a7cae155b5bace91abec626591242e7075 Mon Sep 17 00:00:00 2001 From: Ahmed Ammar Date: Fri, 27 Oct 2017 20:59:03 +0200 Subject: [PATCH 1/3] Add Capabilities::zsyncSupportedVersion function. --- src/libsync/capabilities.cpp | 5 +++++ src/libsync/capabilities.h | 1 + 2 files changed, 6 insertions(+) diff --git a/src/libsync/capabilities.cpp b/src/libsync/capabilities.cpp index 08c1e45ff79..0bbf8b7def3 100644 --- a/src/libsync/capabilities.cpp +++ b/src/libsync/capabilities.cpp @@ -159,4 +159,9 @@ bool Capabilities::uploadConflictFiles() const return _capabilities["uploadConflictFiles"].toBool(); } + +QString Capabilities::zsyncSupportedVersion() const +{ + return _capabilities["dav"].toMap()["zsync"].toString(); +} } diff --git a/src/libsync/capabilities.h b/src/libsync/capabilities.h index 63a59d6c007..c6f84bf7ac2 100644 --- a/src/libsync/capabilities.h +++ b/src/libsync/capabilities.h @@ -43,6 +43,7 @@ class OWNCLOUDSYNC_EXPORT Capabilities bool sharePublicLinkMultiple() const; bool shareResharing() const; bool chunkingNg() const; + QString zsyncSupportedVersion() const; /// disable parallel upload in chunking bool chunkingParallelUploadDisabled() const; From 95861212f8977f0e4fd6990b49e25c9805d5ecac Mon Sep 17 00:00:00 2001 From: Ahmed Ammar Date: Fri, 27 Oct 2017 21:02:20 +0200 Subject: [PATCH 2/3] Add support to update ProgressInfo item size. --- src/libsync/owncloudpropagator.cpp | 5 +++++ src/libsync/owncloudpropagator.h | 2 ++ src/libsync/progressdispatcher.cpp | 18 +++++++++++++++++- src/libsync/progressdispatcher.h | 13 +++++++++++++ src/libsync/syncengine.cpp | 7 +++++++ src/libsync/syncengine.h | 1 + 6 files changed, 45 insertions(+), 1 deletion(-) diff --git a/src/libsync/owncloudpropagator.cpp b/src/libsync/owncloudpropagator.cpp index d960b4a2d17..cce3cc13982 100644 --- a/src/libsync/owncloudpropagator.cpp +++ b/src/libsync/owncloudpropagator.cpp @@ -674,6 +674,11 @@ void OwncloudPropagator::scheduleNextJobImpl() } } +void OwncloudPropagator::reportFileTotal(const SyncFileItem &item, quint64 newSize) +{ + emit updateFileTotal(item, newSize); +} + void OwncloudPropagator::reportProgress(const SyncFileItem &item, quint64 bytes) { emit progress(item, bytes); diff --git a/src/libsync/owncloudpropagator.h b/src/libsync/owncloudpropagator.h index 7cd00531d4b..e56a47b0d53 100644 --- a/src/libsync/owncloudpropagator.h +++ b/src/libsync/owncloudpropagator.h @@ -457,6 +457,7 @@ class OwncloudPropagator : public QObject void scheduleNextJob(); void reportProgress(const SyncFileItem &, quint64 bytes); + void reportFileTotal(const SyncFileItem &item, quint64 newSize); void abort() { @@ -515,6 +516,7 @@ private slots: void newItem(const SyncFileItemPtr &); void itemCompleted(const SyncFileItemPtr &); void progress(const SyncFileItem &, quint64 bytes); + void updateFileTotal(const SyncFileItem &, quint64 newSize); void finished(bool success); /** Emitted when propagation has problems with a locked file. */ diff --git a/src/libsync/progressdispatcher.cpp b/src/libsync/progressdispatcher.cpp index bfdc0521bcc..9b5edd360ab 100644 --- a/src/libsync/progressdispatcher.cpp +++ b/src/libsync/progressdispatcher.cpp @@ -195,6 +195,22 @@ void ProgressInfo::adjustTotalsForFile(const SyncFileItem &item) } } +void ProgressInfo::updateTotalsForFile(const SyncFileItem &item, quint64 newSize) +{ + if (!shouldCountProgress(item)) { + return; + } + + if (!_currentItems.contains(item._file)) { + _sizeProgress._total += newSize - item._size; + } else { + _sizeProgress._total += newSize - _currentItems[item._file]._progress._total; + } + + setProgressItem(item, 0); + _currentItems[item._file]._progress._total = newSize; +} + quint64 ProgressInfo::totalFiles() const { return _fileProgress._total; @@ -229,7 +245,7 @@ void ProgressInfo::setProgressComplete(const SyncFileItem &item) _currentItems.remove(item._file); _fileProgress.setCompleted(_fileProgress._completed + item._affectedItems); if (ProgressInfo::isSizeDependent(item)) { - _totalSizeOfCompletedJobs += item._size; + _totalSizeOfCompletedJobs += _currentItems[item._file]._progress._total; } recomputeCompletedSize(); _lastCompletedItem = item; diff --git a/src/libsync/progressdispatcher.h b/src/libsync/progressdispatcher.h index b51be9810aa..539b24e3145 100644 --- a/src/libsync/progressdispatcher.h +++ b/src/libsync/progressdispatcher.h @@ -91,6 +91,19 @@ class OWNCLOUDSYNC_EXPORT ProgressInfo : public QObject */ void adjustTotalsForFile(const SyncFileItem &item); + /** + * Update totals for item. + * adjustTotalsForFile is called during the treewalk phase to collect + * the initial total size and file count. + * updateTotalsForFile is called at most once per item during propagation + * to adjust them when new information has become available. + * + * Example: With delta-sync, the actual size of the download will only + * be known during propagation - this function adjusts the total size + * to account for it. + */ + void updateTotalsForFile(const SyncFileItem &item, quint64 newSize); + quint64 totalFiles() const; quint64 completedFiles() const; diff --git a/src/libsync/syncengine.cpp b/src/libsync/syncengine.cpp index 31338f4f68c..4a1ca472182 100644 --- a/src/libsync/syncengine.cpp +++ b/src/libsync/syncengine.cpp @@ -1092,6 +1092,8 @@ void SyncEngine::slotDiscoveryJobFinished(int discoveryResult) this, &SyncEngine::slotItemCompleted); connect(_propagator.data(), &OwncloudPropagator::progress, this, &SyncEngine::slotProgress); + connect(_propagator.data(), &OwncloudPropagator::updateFileTotal, + this, &SyncEngine::updateFileTotal); connect(_propagator.data(), &OwncloudPropagator::finished, this, &SyncEngine::slotFinished, Qt::QueuedConnection); connect(_propagator.data(), &OwncloudPropagator::seenLockedFile, this, &SyncEngine::seenLockedFile); connect(_propagator.data(), &OwncloudPropagator::touchedFile, this, &SyncEngine::slotAddTouchedFile); @@ -1212,6 +1214,11 @@ void SyncEngine::slotProgress(const SyncFileItem &item, quint64 current) emit transmissionProgress(*_progressInfo); } +void SyncEngine::updateFileTotal(const SyncFileItem &item, quint64 newSize) +{ + _progressInfo->updateTotalsForFile(item, newSize); + emit transmissionProgress(*_progressInfo); +} /* Given a path on the remote, give the path as it is when the rename is done */ QString SyncEngine::adjustRenamedPath(const QString &original) diff --git a/src/libsync/syncengine.h b/src/libsync/syncengine.h index 5ae335b4248..d07075c8f58 100644 --- a/src/libsync/syncengine.h +++ b/src/libsync/syncengine.h @@ -173,6 +173,7 @@ private slots: void slotItemCompleted(const SyncFileItemPtr &item); void slotFinished(bool success); void slotProgress(const SyncFileItem &item, quint64 curent); + void updateFileTotal(const SyncFileItem &item, quint64 newSize); void slotDiscoveryJobFinished(int updateResult); void slotCleanPollsJobAborted(const QString &error); From 12aeb890c9a6ad329e3612ebb96d843d70e413c9 Mon Sep 17 00:00:00 2001 From: Ahmed Ammar Date: Wed, 15 Nov 2017 16:16:19 +0200 Subject: [PATCH 3/3] Implementation of delta-sync support on client-side. This commit adds client-side support for delta-sync, this adds a new 3rdparty submodule `gh:ahmedammar/zsync`. This zsync tree is a modified version of upstream, adding some needed support for the upload path and other requirements. If the server does not announce the required zsync capability then a full upload/download is fallen back to. Delta synchronization can be enabled/disabled using command line, config, or gui options. On both upload and download paths, a check is made for the existance of a zsync metadata file on the server for a given path. This is provided by a dav property called `zsync`, found during discovery phase. If it doesn't exist the code reverts back to a complete upload or download, i.e. previous implementations. In the case of upload, a new zsync metadata file will be uploaded as part of the chunked upload and future synchronizations will be delta-sync capable. Chunked uploads no longer use sequential file names for each chunk id, instead, they are named as the byte offset into the remote file, this is a minimally intrusive modification to allow fo delta-sync and legacy code paths to run seamlessly. A new http header OC-Total-File-Length is sent, which informs the server of the final expected size of the file not just the total transmitted bytes as reported by OC-Total-Length. The seeding and generation of the zsync metadata file is done in a separate thread since this is a cpu intensive task, ensuring main thread is not blocked. This commit closes owncloud/client#179. --- .gitmodules | 3 + src/3rdparty/zsync | 1 + src/cmd/cmd.cpp | 14 + src/common/remotepermissions.cpp | 2 +- src/common/remotepermissions.h | 3 +- src/common/syncjournaldb.cpp | 11 +- src/gui/folder.cpp | 3 + src/gui/generalsettings.cpp | 6 + src/gui/generalsettings.ui | 51 +++- src/libsync/CMakeLists.txt | 33 +++ src/libsync/bandwidthmanager.cpp | 12 +- src/libsync/bandwidthmanager.h | 8 +- src/libsync/configfile.cpp | 25 ++ src/libsync/configfile.h | 6 + src/libsync/discoveryphase.cpp | 5 +- src/libsync/propagatecommonzsync.cpp | 363 +++++++++++++++++++++++ src/libsync/propagatecommonzsync.h | 121 ++++++++ src/libsync/propagatedownload.cpp | 118 ++++++-- src/libsync/propagatedownload.h | 222 +++++++++----- src/libsync/propagatedownloadzsync.cpp | 323 ++++++++++++++++++++ src/libsync/propagateupload.h | 31 +- src/libsync/propagateuploadng.cpp | 395 ++++++++++++++++++++----- src/libsync/syncengine.cpp | 1 + src/libsync/syncengine.h | 1 + src/libsync/syncoptions.h | 6 + test/CMakeLists.txt | 2 + test/syncenginetestutils.h | 233 ++++++++++++++- test/testzsync.cpp | 156 ++++++++++ 28 files changed, 1948 insertions(+), 207 deletions(-) create mode 160000 src/3rdparty/zsync create mode 100644 src/libsync/propagatecommonzsync.cpp create mode 100644 src/libsync/propagatecommonzsync.h create mode 100644 src/libsync/propagatedownloadzsync.cpp create mode 100644 test/testzsync.cpp diff --git a/.gitmodules b/.gitmodules index d25acc3cc12..19d0e8349fa 100644 --- a/.gitmodules +++ b/.gitmodules @@ -7,3 +7,6 @@ [submodule "src/3rdparty/libcrashreporter-qt"] path = src/3rdparty/libcrashreporter-qt url = git://github.com/dschmidt/libcrashreporter-qt.git +[submodule "src/3rdparty/zsync"] + path = src/3rdparty/zsync + url = https://github.com/ahmedammar/zsync diff --git a/src/3rdparty/zsync b/src/3rdparty/zsync new file mode 160000 index 00000000000..3271b605049 --- /dev/null +++ b/src/3rdparty/zsync @@ -0,0 +1 @@ +Subproject commit 3271b60504916c614448acf00ab3437a897aa1df diff --git a/src/cmd/cmd.cpp b/src/cmd/cmd.cpp index e962dd4e07b..c7491ecb898 100644 --- a/src/cmd/cmd.cpp +++ b/src/cmd/cmd.cpp @@ -77,6 +77,8 @@ struct CmdOptions int restartTimes; int downlimit; int uplimit; + bool deltasync; + quint64 deltasyncminfilesize; }; // we can't use csync_set_userdata because the SyncEngine sets it already. @@ -188,6 +190,8 @@ void help() std::cout << " --max-sync-retries [n] Retries maximum n times (default to 3)" << std::endl; std::cout << " --uplimit [n] Limit the upload speed of files to n KB/s" << std::endl; std::cout << " --downlimit [n] Limit the download speed of files to n KB/s" << std::endl; + std::cout << " --deltasync, -ds Enable delta sync (disabled by default)" << std::endl; + std::cout << " --deltasyncmin [n] Set delta sync minimum file size to n MB (10 MiB default)" << std::endl; std::cout << " -h Sync hidden files,do not ignore them" << std::endl; std::cout << " --version, -v Display version and exit" << std::endl; std::cout << " --logdebug More verbose logging" << std::endl; @@ -268,6 +272,10 @@ void parseOptions(const QStringList &app_args, CmdOptions *options) options->uplimit = it.next().toInt() * 1000; } else if (option == "--downlimit" && !it.peekNext().startsWith("-")) { options->downlimit = it.next().toInt() * 1000; + } else if (option == "-ds" || option == "--deltasync") { + options->deltasync = true; + } else if (option == "--deltasyncmin" && !it.peekNext().startsWith("-")) { + options->deltasyncminfilesize = it.next().toLongLong() * 1024 * 1024; } else if (option == "--logdebug") { Logger::instance()->setLogFile("-"); Logger::instance()->setLogDebug(true); @@ -327,6 +335,8 @@ int main(int argc, char **argv) options.restartTimes = 3; options.uplimit = 0; options.downlimit = 0; + options.deltasync = false; + options.deltasyncminfilesize = 10 * 1024 * 1024; ClientProxy clientProxy; parseOptions(app.arguments(), &options); @@ -508,7 +518,11 @@ int main(int argc, char **argv) selectiveSyncFixup(&db, selectiveSyncList); } + SyncOptions opt; + opt._deltaSyncEnabled = options.deltasync; + opt._deltaSyncMinFileSize = options.deltasyncminfilesize; SyncEngine engine(account, options.source_dir, folder, &db); + engine.setSyncOptions(opt); engine.setIgnoreHiddenFiles(options.ignoreHiddenFiles); engine.setNetworkLimits(options.uplimit, options.downlimit); QObject::connect(&engine, &SyncEngine::finished, diff --git a/src/common/remotepermissions.cpp b/src/common/remotepermissions.cpp index ce39460b512..30ac59cff16 100644 --- a/src/common/remotepermissions.cpp +++ b/src/common/remotepermissions.cpp @@ -21,7 +21,7 @@ namespace OCC { -static const char letters[] = " WDNVCKRSMm"; +static const char letters[] = " WDNVCKRSMmz"; template diff --git a/src/common/remotepermissions.h b/src/common/remotepermissions.h index 2b34dcbf028..3e118a4e288 100644 --- a/src/common/remotepermissions.h +++ b/src/common/remotepermissions.h @@ -52,10 +52,11 @@ class OCSYNC_EXPORT RemotePermissions IsShared = 8, // S IsMounted = 9, // M IsMountedSub = 10, // m (internal: set if the parent dir has IsMounted) + HasZSyncMetadata = 11, // z (internal: set if remote file has zsync metadata property set) // Note: when adding support for more permissions, we need to invalid the cache in the database. // (by setting forceRemoteDiscovery in SyncJournalDb::checkConnect) - PermissionsCount = IsMountedSub + PermissionsCount = HasZSyncMetadata }; RemotePermissions() = default; explicit RemotePermissions(const char *); diff --git a/src/common/syncjournaldb.cpp b/src/common/syncjournaldb.cpp index a40149d3acd..5105b30fa08 100644 --- a/src/common/syncjournaldb.cpp +++ b/src/common/syncjournaldb.cpp @@ -494,11 +494,12 @@ bool SyncJournalDb::checkConnect() forceRemoteDiscovery = true; } - // There was a bug in versions <2.3.0 that could lead to stale - // local files and a remote discovery will fix them. - // See #5190 #5242. - if (major == 2 && minor < 3) { - qCInfo(lcDb) << "upgrade form client < 2.3.0 detected! forcing remote discovery"; + // - There was a bug in versions <2.3.0 that could lead to stale + // local files and a remote discovery will fix them. + // See #5190 #5242. + // - New remote HasZSyncMetadata permission added, invalidate cache + if (major == 2 && minor < 5) { + qCInfo(lcDb) << "upgrade from client < 2.5.0 detected! forcing remote discovery"; forceRemoteDiscovery = true; } diff --git a/src/gui/folder.cpp b/src/gui/folder.cpp index 986af370d55..51d277048fc 100644 --- a/src/gui/folder.cpp +++ b/src/gui/folder.cpp @@ -706,6 +706,9 @@ void Folder::setSyncOptions() opt._targetChunkUploadDuration = cfgFile.targetChunkUploadDuration(); } + opt._deltaSyncEnabled = cfgFile.deltaSyncEnabled(); + opt._deltaSyncMinFileSize = cfgFile.deltaSyncMinFileSize(); + _engine->setSyncOptions(opt); } diff --git a/src/gui/generalsettings.cpp b/src/gui/generalsettings.cpp index 69baaa84be4..8f08b1f73fb 100644 --- a/src/gui/generalsettings.cpp +++ b/src/gui/generalsettings.cpp @@ -69,6 +69,8 @@ GeneralSettings::GeneralSettings(QWidget *parent) connect(_ui->newFolderLimitCheckBox, &QAbstractButton::toggled, this, &GeneralSettings::saveMiscSettings); connect(_ui->newFolderLimitSpinBox, static_cast(&QSpinBox::valueChanged), this, &GeneralSettings::saveMiscSettings); connect(_ui->newExternalStorage, &QAbstractButton::toggled, this, &GeneralSettings::saveMiscSettings); + connect(_ui->deltaSyncCheckBox, &QAbstractButton::toggled, this, &GeneralSettings::saveMiscSettings); + connect(_ui->deltaSyncSpinBox, static_cast(&QSpinBox::valueChanged), this, &GeneralSettings::saveMiscSettings); #ifndef WITH_CRASHREPORTER _ui->crashreporterCheckBox->setVisible(false); @@ -122,6 +124,8 @@ void GeneralSettings::loadMiscSettings() _ui->newFolderLimitSpinBox->setValue(newFolderLimit.second); _ui->newExternalStorage->setChecked(cfgFile.confirmExternalStorage()); _ui->monoIconsCheckBox->setChecked(cfgFile.monoIcons()); + _ui->deltaSyncCheckBox->setChecked(cfgFile.deltaSyncEnabled()); + _ui->deltaSyncSpinBox->setValue(cfgFile.deltaSyncMinFileSize() / (1024 * 1024)); } void GeneralSettings::slotUpdateInfo() @@ -157,6 +161,8 @@ void GeneralSettings::saveMiscSettings() cfgFile.setNewBigFolderSizeLimit(_ui->newFolderLimitCheckBox->isChecked(), _ui->newFolderLimitSpinBox->value()); cfgFile.setConfirmExternalStorage(_ui->newExternalStorage->isChecked()); + cfgFile.setDeltaSyncEnabled(_ui->deltaSyncCheckBox->isChecked()); + cfgFile.setDeltaSyncMinFileSize(_ui->deltaSyncSpinBox->value() * 1024 * 1024); } void GeneralSettings::slotToggleLaunchOnStartup(bool enable) diff --git a/src/gui/generalsettings.ui b/src/gui/generalsettings.ui index aacce23c2a3..3424992eb8b 100644 --- a/src/gui/generalsettings.ui +++ b/src/gui/generalsettings.ui @@ -48,6 +48,53 @@ + + + Experimental + + + + + + + + Enable Delta-Synchronization for files larger than + + + + + + + 999999 + + + + + + + MB + + + + + + + Qt::Horizontal + + + + 40 + 20 + + + + + + + + + + About @@ -69,7 +116,7 @@ - + Updates @@ -120,7 +167,7 @@ - + Qt::Vertical diff --git a/src/libsync/CMakeLists.txt b/src/libsync/CMakeLists.txt index 93f3e4169e9..dd7f29fac07 100644 --- a/src/libsync/CMakeLists.txt +++ b/src/libsync/CMakeLists.txt @@ -43,7 +43,9 @@ set(libsync_SRCS owncloudtheme.cpp progressdispatcher.cpp propagatorjobs.cpp + propagatecommonzsync.cpp propagatedownload.cpp + propagatedownloadzsync.cpp propagateupload.cpp propagateuploadv1.cpp propagateuploadng.cpp @@ -67,6 +69,37 @@ else() set (libsync_SRCS ${libsync_SRCS} creds/httpcredentials.cpp) endif() +## begin zsync + +include_directories( ${CMAKE_SOURCE_DIR}/src/3rdparty/zsync/c ) + +set( libsync_SRCS + ${libsync_SRCS} + ../3rdparty/zsync/c/librcksum/hash.c + ../3rdparty/zsync/c/librcksum/md4.c + ../3rdparty/zsync/c/librcksum/range.c + ../3rdparty/zsync/c/librcksum/rsum.c + ../3rdparty/zsync/c/librcksum/state.c + ../3rdparty/zsync/c/libzsync/sha1.c + ../3rdparty/zsync/c/libzsync/zsync.c + ../3rdparty/zsync/c/progress.c +) + +if ( WIN32 ) + # ntohs + list(APPEND OS_SPECIFIC_LINK_LIBRARIES + ws2_32 + ) + # ensure size_t is 64 bits + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -D_FILE_OFFSET_BITS=64") + set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -D_FILE_OFFSET_BITS=64") +endif() + +set_source_files_properties( ../3rdparty/zsync/c/libzsync/zsync.c + PROPERTIES COMPILE_FLAGS -DVERSION=\\"0.6.3\\" ) + +## end zsync + # These headers are installed for libowncloudsync to be used by 3rd party apps set(owncloudsync_HEADERS account.h diff --git a/src/libsync/bandwidthmanager.cpp b/src/libsync/bandwidthmanager.cpp index c1ed6adcd17..6c9796343c5 100644 --- a/src/libsync/bandwidthmanager.cpp +++ b/src/libsync/bandwidthmanager.cpp @@ -120,7 +120,7 @@ void BandwidthManager::unregisterUploadDevice(QObject *o) } } -void BandwidthManager::registerDownloadJob(GETFileJob *j) +void BandwidthManager::registerDownloadJob(GETJob *j) { _downloadJobList.append(j); QObject::connect(j, &QObject::destroyed, this, &BandwidthManager::unregisterDownloadJob); @@ -139,7 +139,7 @@ void BandwidthManager::registerDownloadJob(GETFileJob *j) void BandwidthManager::unregisterDownloadJob(QObject *o) { - GETFileJob *j = reinterpret_cast(o); // note, we might already be in the ~QObject + GETJob *j = reinterpret_cast(o); // note, we might already be in the ~QObject _downloadJobList.removeAll(j); if (_relativeLimitCurrentMeasuredJob == j) { _relativeLimitCurrentMeasuredJob = 0; @@ -289,7 +289,7 @@ void BandwidthManager::relativeDownloadMeasuringTimerExpired() quota -= 20 * 1024; } qint64 quotaPerJob = quota / jobCount + 1.0; - Q_FOREACH (GETFileJob *gfj, _downloadJobList) { + Q_FOREACH (GETJob *gfj, _downloadJobList) { gfj->setBandwidthLimited(true); gfj->setChoked(false); gfj->giveBandwidthQuota(quotaPerJob); @@ -323,7 +323,7 @@ void BandwidthManager::relativeDownloadDelayTimerExpired() _relativeLimitCurrentMeasuredJob->setChoked(false); // choke all other download jobs - Q_FOREACH (GETFileJob *gfj, _downloadJobList) { + Q_FOREACH (GETJob *gfj, _downloadJobList) { if (gfj != _relativeLimitCurrentMeasuredJob) { gfj->setBandwidthLimited(true); gfj->setChoked(true); @@ -358,7 +358,7 @@ void BandwidthManager::switchingTimerExpired() if (newDownloadLimit != _currentDownloadLimit) { qCInfo(lcBandwidthManager) << "Download Bandwidth limit changed" << _currentDownloadLimit << newDownloadLimit; _currentDownloadLimit = newDownloadLimit; - Q_FOREACH (GETFileJob *j, _downloadJobList) { + Q_FOREACH (GETJob *j, _downloadJobList) { if (usingAbsoluteDownloadLimit()) { j->setBandwidthLimited(true); j->setChoked(false); @@ -386,7 +386,7 @@ void BandwidthManager::absoluteLimitTimerExpired() if (usingAbsoluteDownloadLimit() && _downloadJobList.count() > 0) { qint64 quotaPerJob = _currentDownloadLimit / qMax(1, _downloadJobList.count()); qCDebug(lcBandwidthManager) << quotaPerJob << _downloadJobList.count() << _currentDownloadLimit; - Q_FOREACH (GETFileJob *j, _downloadJobList) { + Q_FOREACH (GETJob *j, _downloadJobList) { j->giveBandwidthQuota(quotaPerJob); qCDebug(lcBandwidthManager) << "Gave " << quotaPerJob / 1024.0 << " kB to" << j; } diff --git a/src/libsync/bandwidthmanager.h b/src/libsync/bandwidthmanager.h index 691e11162d0..33a3cdecebb 100644 --- a/src/libsync/bandwidthmanager.h +++ b/src/libsync/bandwidthmanager.h @@ -23,7 +23,7 @@ namespace OCC { class UploadDevice; -class GETFileJob; +class GETJob; class OwncloudPropagator; /** @@ -47,7 +47,7 @@ public slots: void registerUploadDevice(UploadDevice *); void unregisterUploadDevice(QObject *); - void registerDownloadJob(GETFileJob *); + void registerDownloadJob(GETJob *); void unregisterDownloadJob(QObject *); void absoluteLimitTimerExpired(); @@ -86,14 +86,14 @@ public slots: qint64 _relativeUploadLimitProgressAtMeasuringRestart; qint64 _currentUploadLimit; - QLinkedList _downloadJobList; + QLinkedList _downloadJobList; QTimer _relativeDownloadMeasuringTimer; // for relative bw limiting, we need to wait this amount before measuring again QTimer _relativeDownloadDelayTimer; // the device measured - GETFileJob *_relativeLimitCurrentMeasuredJob; + GETJob *_relativeLimitCurrentMeasuredJob; // for measuring how much progress we made at start qint64 _relativeDownloadLimitProgressAtMeasuringRestart; diff --git a/src/libsync/configfile.cpp b/src/libsync/configfile.cpp index 295fafc1715..b99e80ac3b4 100644 --- a/src/libsync/configfile.cpp +++ b/src/libsync/configfile.cpp @@ -80,6 +80,9 @@ static const char newBigFolderSizeLimitC[] = "newBigFolderSizeLimit"; static const char useNewBigFolderSizeLimitC[] = "useNewBigFolderSizeLimit"; static const char confirmExternalStorageC[] = "confirmExternalStorage"; +static const char deltaSyncEnabledC[] = "DeltaSync/enabled"; +static const char deltaSyncMinimumFileSizeC[] = "DeltaSync/minFileSize"; + static const char maxLogLinesC[] = "Logging/maxLogLines"; const char certPath[] = "http_certificatePath"; @@ -676,6 +679,28 @@ void ConfigFile::setConfirmExternalStorage(bool isChecked) setValue(confirmExternalStorageC, isChecked); } +bool ConfigFile::deltaSyncEnabled() const +{ + QSettings settings(configFile(), QSettings::IniFormat); + return settings.value(QLatin1String(deltaSyncEnabledC), false).toBool(); // default to false +} + +void ConfigFile::setDeltaSyncEnabled(bool enabled) +{ + setValue(deltaSyncEnabledC, enabled); +} + +quint64 ConfigFile::deltaSyncMinFileSize() const +{ + QSettings settings(configFile(), QSettings::IniFormat); + return settings.value(QLatin1String(deltaSyncMinimumFileSizeC), 10 * 1024 * 1024).toLongLong(); // default to 10 MiB +} + +void ConfigFile::setDeltaSyncMinFileSize(quint64 bytes) +{ + setValue(deltaSyncMinimumFileSizeC, bytes); +} + bool ConfigFile::promptDeleteFiles() const { QSettings settings(configFile(), QSettings::IniFormat); diff --git a/src/libsync/configfile.h b/src/libsync/configfile.h index c4c66ac1a84..35a9ca89043 100644 --- a/src/libsync/configfile.h +++ b/src/libsync/configfile.h @@ -117,6 +117,12 @@ class OWNCLOUDSYNC_EXPORT ConfigFile void setNewBigFolderSizeLimit(bool isChecked, quint64 mbytes); bool confirmExternalStorage() const; void setConfirmExternalStorage(bool); + /** delta sync */ + bool deltaSyncEnabled() const; + void setDeltaSyncEnabled(bool enabled); + quint64 deltaSyncMinFileSize() const; // bytes + void setDeltaSyncMinFileSize(quint64 bytes); + static bool setConfDir(const QString &value); diff --git a/src/libsync/discoveryphase.cpp b/src/libsync/discoveryphase.cpp index eb0ce8d3e0f..3418f927b7a 100644 --- a/src/libsync/discoveryphase.cpp +++ b/src/libsync/discoveryphase.cpp @@ -275,7 +275,8 @@ void DiscoverySingleDirectoryJob::start() << "http://owncloud.org/ns:downloadURL" << "http://owncloud.org/ns:dDC" << "http://owncloud.org/ns:permissions" - << "http://owncloud.org/ns:checksums"; + << "http://owncloud.org/ns:checksums" + << "http://owncloud.org/ns:zsync"; if (_isRootPath) props << "http://owncloud.org/ns:data-fingerprint"; if (_account->serverVersionInt() >= Account::makeServerVersion(10, 0, 0)) { @@ -345,6 +346,8 @@ static std::unique_ptr propertyMapToFileStat(const QMapremotePerm.setPermission(RemotePermissions::IsShared); } + } else if (property == "zsync" && value.toUtf8() == "true") { + file_stat->remotePerm.setPermission(RemotePermissions::HasZSyncMetadata); } } return file_stat; diff --git a/src/libsync/propagatecommonzsync.cpp b/src/libsync/propagatecommonzsync.cpp new file mode 100644 index 00000000000..35b7f7d6dfb --- /dev/null +++ b/src/libsync/propagatecommonzsync.cpp @@ -0,0 +1,363 @@ +/* + * Copyright (C) by Ahmed Ammar + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE + * WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR + * OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + * + */ + +#include "config.h" +#include "propagateupload.h" +#include "owncloudpropagator_p.h" +#include "networkjobs.h" +#include "account.h" +#include "common/syncjournaldb.h" +#include "common/syncjournalfilerecord.h" +#include "common/utility.h" +#include "filesystem.h" +#include "propagatorjobs.h" +#include "syncengine.h" +#include "propagateremotemove.h" +#include "propagateremotedelete.h" +#include "common/asserts.h" + +#include +#include +#include +#include +#include +#include + +#ifdef Q_OS_UNIX +#include +#include +#endif + +namespace OCC { + +Q_LOGGING_CATEGORY(lcZsyncSeed, "sync.propagate.zsync.seed", QtInfoMsg) +Q_LOGGING_CATEGORY(lcZsyncGenerate, "sync.propagate.zsync.generate", QtInfoMsg) +Q_LOGGING_CATEGORY(lcZsyncGet, "sync.networkjob.zsync.get", QtInfoMsg) +Q_LOGGING_CATEGORY(lcZsyncPut, "sync.networkjob.zsync.put", QtInfoMsg) + +bool isZsyncPropagationEnabled(OwncloudPropagator *propagator, const SyncFileItemPtr &item) +{ + if (propagator->account()->capabilities().zsyncSupportedVersion() != "1.0") { + qCInfo(lcPropagator) << "[zsync disabled] Lack of server support."; + return false; + } + if (item->_remotePerm.hasPermission(RemotePermissions::IsMounted) || item->_remotePerm.hasPermission(RemotePermissions::IsMountedSub)) { + qCInfo(lcPropagator) << "[zsync disabled] External storage not supported."; + return false; + } + if (!propagator->syncOptions()._deltaSyncEnabled) { + qCInfo(lcPropagator) << "[zsync disabled] Client configuration option."; + return false; + } + if (item->_size < propagator->syncOptions()._deltaSyncMinFileSize) { + qCInfo(lcPropagator) << "[zsync disabled] File size is smaller than minimum."; + return false; + } + + return true; +} + +QUrl zsyncMetadataUrl(OwncloudPropagator *propagator, const QString &path) +{ + QUrlQuery urlQuery; + QList> QueryItems({ { "zsync", nullptr } }); + urlQuery.setQueryItems(QueryItems); + return Utility::concatUrlPath(propagator->account()->davUrl(), propagator->_remoteFolder + path, urlQuery); +} + +void ZsyncSeedRunnable::run() +{ + // Create a temporary file to use with zsync_begin() + QTemporaryFile zsyncControlFile; + zsyncControlFile.open(); + zsyncControlFile.write(_zsyncData.constData(), _zsyncData.size()); + zsyncControlFile.flush(); + + int fileHandle = zsyncControlFile.handle(); + zsync_unique_ptr f(fdopen(dup(fileHandle), "r"), [](FILE *f) { + fclose(f); + }); + zsyncControlFile.close(); + rewind(f.get()); + + zsync_unique_ptr zs(zsync_parse(f.get()), [](struct zsync_state *zs) { + zsync_end(zs); + }); + if (!zs) { + QString errorString = tr("Unable to parse zsync file."); + emit failedSignal(errorString); + return; + } + + QByteArray tmp_file; + if (!_tmpFilePath.isEmpty()) { + tmp_file = _tmpFilePath.toLocal8Bit(); + } else { + QTemporaryFile tmpFile; + tmpFile.open(); + tmp_file = tmpFile.fileName().toLocal8Bit(); + tmpFile.close(); + } + + const char *tfname = tmp_file; + if (zsync_rename_file(zs.get(), tfname) != 0) { + QString errorString = tr("Unable to rename temporary file."); + emit failedSignal(errorString); + return; + } + + if (zsync_begin(zs.get(), f.get())) { + QString errorString = tr("Unable to begin zsync."); + emit failedSignal(errorString); + return; + } + + { + /* Simple uncompressed file - open it */ + QFile file(_zsyncFilePath); + if (!file.open(QIODevice::ReadOnly)) { + QString errorString = tr("Unable to open file."); + emit failedSignal(errorString); + return; + } + + /* Give the contents to libzsync to read, to find any content that + * is part of the target file. */ + qCInfo(lcZsyncSeed) << "Reading seed file:" << _zsyncFilePath; + int fileHandle = file.handle(); + zsync_unique_ptr f(fdopen(dup(fileHandle), "r"), [](FILE *f) { + fclose(f); + }); + file.close(); + rewind(f.get()); + zsync_submit_source_file(zs.get(), f.get(), false, _type == ZsyncMode::download ? false : true); + } + + emit finishedSignal(zs.release()); +} + +/** + * Exit with IO-related error message + */ +int ZsyncGenerateRunnable::stream_error(const char *func, FILE *stream) +{ + QString error = QString(func) + QString(": ") + QString(strerror(ferror(stream))); + emit failedSignal(error); + return -1; +} + +/** + * Copy the full block checksums from their temporary store file to the .zsync, + * stripping the hashes down to the desired lengths specified by the last 2 + * parameters. + */ +int ZsyncGenerateRunnable::fcopy_hashes(FILE *fin, FILE *fout, size_t rsum_bytes, size_t hash_bytes) +{ + unsigned char buf[CHECKSUM_SIZE + 4]; + size_t len; + + while ((len = fread(buf, 1, sizeof(buf), fin)) > 0) { + /* write trailing rsum_bytes of the rsum (trailing because the second part of the rsum is more useful in practice for hashing), and leading checksum_bytes of the checksum */ + if (fwrite(buf + 4 - rsum_bytes, 1, rsum_bytes, fout) < rsum_bytes) + break; + if (fwrite(buf + 4, 1, hash_bytes, fout) < hash_bytes) + break; + } + if (ferror(fin)) { + return stream_error("fread", fin); + } + if (ferror(fout)) { + return stream_error("fwrite", fout); + } + + return 0; +} + +/** + * Given one block of data, calculate the checksums for this block and write + * them (as raw bytes) to the given output stream + */ +int ZsyncGenerateRunnable::write_block_sums(unsigned char *buf, size_t got, FILE *f) +{ + struct rsum r; + unsigned char checksum[CHECKSUM_SIZE]; + + /* Pad for our checksum, if this is a short last block */ + if (got < _blocksize) + memset(buf + got, 0, _blocksize - got); + + /* Do rsum and checksum, and convert to network endian */ + r = rcksum_calc_rsum_block(buf, _blocksize); + rcksum_calc_checksum(&checksum[0], buf, _blocksize); + r.a = htons(r.a); + r.b = htons(r.b); + + /* Write them raw to the stream */ + if (fwrite(&r, sizeof r, 1, f) != 1) + return stream_error("fwrite", f); + if (fwrite(checksum, sizeof checksum, 1, f) != 1) + return stream_error("fwrite", f); + + return 0; +} + +/** + * Reads the data stream and writes to the zsync stream the blocksums for the + * given data. No compression handling. + */ +int ZsyncGenerateRunnable::read_stream_write_blocksums(FILE *fin, FILE *fout) +{ + unsigned char *buf = (unsigned char *)malloc(_blocksize); + + if (!buf) { + fprintf(stderr, "out of memory\n"); + exit(1); + } + + while (!feof(fin)) { + int got = fread(buf, 1, _blocksize, fin); + + if (got > 0) { + /* The SHA-1 sum, unlike our internal block-based sums, is on the whole file and nothing else - no padding */ + SHA1Update(&_shactx, buf, got); + + write_block_sums(buf, got, fout); + _len += got; + } else { + if (ferror(fin)) + return stream_error("fread", fin); + } + } + free(buf); + return 0; +} + +void ZsyncGenerateRunnable::run() +{ + // Create a temporary file to use with zsync_begin() + QTemporaryFile zsynctf, zsyncmeta; + zsyncmeta.open(); + zsyncmeta.setAutoRemove(false); + zsynctf.open(); + + int metaHandle = zsyncmeta.handle(); + zsync_unique_ptr meta(fdopen(dup(metaHandle), "w"), [](FILE *f) { + fclose(f); + }); + zsyncmeta.close(); + + int tfHandle = zsynctf.handle(); + zsync_unique_ptr tf(fdopen(dup(tfHandle), "w+"), [](FILE *f) { + fclose(f); + }); + zsynctf.close(); + + /* Ensure that metadata file is not buffered, since we are using handles directly */ + setvbuf(meta.get(), NULL, _IONBF, 0); + + int rsum_len, checksum_len, seq_matches; + qCDebug(lcZsyncGenerate) << "Starting generation of:" << _file; + + QByteArray fileString = _file.toLocal8Bit(); + zsync_unique_ptr in(fopen(fileString, "r"), [](FILE *f) { + fclose(f); + }); + if (!in) { + QString error = QString(tr("Failed to open input file:")) + _file; + FileSystem::remove(zsyncmeta.fileName()); + emit failedSignal(error); + return; + } + + /* Read the input file and construct the checksum of the whole file, and + * the per-block checksums */ + SHA1Init(&_shactx); + if (read_stream_write_blocksums(in.get(), tf.get())) { + QString error = QString(tr("Failed to write block sums:")) + _file; + FileSystem::remove(zsyncmeta.fileName()); + emit failedSignal(error); + return; + } + + { /* Decide how long a rsum hash and checksum hash per block we need for this file */ + seq_matches = 1; + rsum_len = ceil(((log(_len) + log(_blocksize)) / log(2) - 8.6) / 8); + /* For large files, the optimum weak checksum size can be more than + * what we have available. Switch to seq_matches for this case. */ + if (rsum_len > 4) { + /* seq_matches > 1 in theory would reduce the amount of rsum_len + * needed, since we get effectively rsum_len*seq_matches required + * to match before a strong checksum is calculated. In practice, + * consecutive blocks in the file can be highly correlated, so we + * want to keep the maximum available rsum_len as well. */ + // XXX: disabled: this seems to cause unmatched blocks at end of + // files with sizes which are unaligned to blocksize + // seq_matches = 2; + rsum_len = 4; + } + + /* min lengths of rsums to store */ + rsum_len = max(2, rsum_len); + + /* Now the checksum length; min of two calculations */ + checksum_len = max(ceil( + (20 + (log(_len) + log(1 + _len / _blocksize)) / log(2)) + / seq_matches / 8), + ceil((20 + log(1 + _len / _blocksize) / log(2)) / 8)); + + /* Keep checksum_len within 4-16 bytes */ + checksum_len = min(16, max(4, checksum_len)); + } + + /* Okay, start writing the zsync file */ + fprintf(meta.get(), "zsync: 0.6.3\n"); + fprintf(meta.get(), "Blocksize: %lu\n", _blocksize); + fprintf(meta.get(), "Length: %llu\n", _len); + fprintf(meta.get(), "Hash-Lengths: %d,%d,%d\n", seq_matches, rsum_len, + checksum_len); + + { /* Write out SHA1 checksum of the entire file */ + unsigned char digest[SHA1_DIGEST_LENGTH]; + unsigned int i; + + fputs("SHA-1: ", meta.get()); + + SHA1Final(digest, &_shactx); + + for (i = 0; i < sizeof digest; i++) + fprintf(meta.get(), "%02x", digest[i]); + fputc('\n', meta.get()); + } + + /* End of headers */ + fputc('\n', meta.get()); + + /* Now copy the actual block hashes to the .zsync */ + rewind(tf.get()); + if (fcopy_hashes(tf.get(), meta.get(), rsum_len, checksum_len)) { + QString error = QString(tr("Failed to copy hashes:")) + _file; + FileSystem::remove(zsyncmeta.fileName()); + emit failedSignal(error); + return; + } + + qCDebug(lcZsyncGenerate) << "Done generation of:" << zsyncmeta.fileName(); + + emit finishedSignal(zsyncmeta.fileName()); +} +} diff --git a/src/libsync/propagatecommonzsync.h b/src/libsync/propagatecommonzsync.h new file mode 100644 index 00000000000..e18bc466ca9 --- /dev/null +++ b/src/libsync/propagatecommonzsync.h @@ -0,0 +1,121 @@ +/* + * Copyright (C) by Ahmed Ammar + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE + * WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR + * OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + * + */ + +#pragma once + +#include +#include +#include +#include + +extern "C" { +#include "librcksum/rcksum.h" +#include "libzsync/zmap.h" +#include "libzsync/sha1.h" +#include "libzsync/zsync.h" +} + +#define ZSYNC_BLOCKSIZE (1 * 1024 * 1024) // must be power of 2 + +namespace OCC { +Q_DECLARE_LOGGING_CATEGORY(lcZsyncPut) +Q_DECLARE_LOGGING_CATEGORY(lcZsyncGet) + +enum class ZsyncMode { download, + upload }; + +template +using zsync_unique_ptr = std::unique_ptr>; + +/** + * @ingroup libsync + * + * Helper function to know if we are allowed to attempt using zsync from configuration/command-line options. + * + */ +bool isZsyncPropagationEnabled(OwncloudPropagator *propagator, const SyncFileItemPtr &item); + +/** + * @ingroup libsync + * + * Helper function to get zsync metadata Url. + * + */ +QUrl zsyncMetadataUrl(OwncloudPropagator *propagator, const QString &path); + +/** + * @ingroup libsync + * + * Helper runnable to 'seed' the zsync_state by providing the downloaded metadata and seed file. + * This is needed for both upload and download since they both must seed the zsync_state to know which + * ranges to upload/download. + * + */ +class ZsyncSeedRunnable : public QObject, public QRunnable +{ + Q_OBJECT + QByteArray _zsyncData; + QString _zsyncFilePath; + QString _tmpFilePath; + ZsyncMode _type; + +public: + explicit ZsyncSeedRunnable(QByteArray &zsyncData, QString path, ZsyncMode type, QString tmpFilePath = nullptr) + : _zsyncData(zsyncData) + , _zsyncFilePath(path) + , _tmpFilePath(tmpFilePath) + , _type(type){}; + + void run(); + +signals: + void finishedSignal(void *zs); + void failedSignal(const QString &errorString); +}; + +/** + * @ingroup libsync + * + * Helper runnable to generate zsync metadata file when uploading. + * Takes an input file path and returns a zsync metadata file path finsihed. + * + */ +class ZsyncGenerateRunnable : public QObject, public QRunnable +{ + Q_OBJECT + size_t _blocksize = ZSYNC_BLOCKSIZE; + off_t _len = 0; + const QString _file; + SHA1_CTX _shactx; + + int fcopy_hashes(FILE *fin, FILE *fout, size_t rsum_bytes, size_t hash_bytes); + int write_block_sums(unsigned char *buf, size_t got, FILE *f); + int read_stream_write_blocksums(FILE *fin, FILE *fout); + int stream_error(const char *func, FILE *stream); + +public: + explicit ZsyncGenerateRunnable(const QString &file) + : _file(file){}; + + void run(); + +signals: + void finishedSignal(const QString &generatedFileName); + void failedSignal(const QString &errorString); +}; +} diff --git a/src/libsync/propagatedownload.cpp b/src/libsync/propagatedownload.cpp index 2b7bd764071..1837755f92b 100644 --- a/src/libsync/propagatedownload.cpp +++ b/src/libsync/propagatedownload.cpp @@ -69,42 +69,28 @@ QString OWNCLOUDSYNC_EXPORT createDownloadTmpFileName(const QString &previous) GETFileJob::GETFileJob(AccountPtr account, const QString &path, QFile *device, const QMap &headers, const QByteArray &expectedEtagForResume, quint64 resumeStart, QObject *parent) - : AbstractNetworkJob(account, path, parent) + : GETJob(account, path, parent) , _device(device) , _headers(headers) , _expectedEtagForResume(expectedEtagForResume) , _resumeStart(resumeStart) - , _errorStatus(SyncFileItem::NoStatus) - , _bandwidthLimited(false) - , _bandwidthChoked(false) - , _bandwidthQuota(0) - , _bandwidthManager(0) , _hasEmittedFinishedSignal(false) - , _lastModified() { } GETFileJob::GETFileJob(AccountPtr account, const QUrl &url, QFile *device, const QMap &headers, const QByteArray &expectedEtagForResume, quint64 resumeStart, QObject *parent) - - : AbstractNetworkJob(account, url.toEncoded(), parent) + : GETJob(account, url.toEncoded(), parent) , _device(device) , _headers(headers) , _expectedEtagForResume(expectedEtagForResume) , _resumeStart(resumeStart) - , _errorStatus(SyncFileItem::NoStatus) , _directDownloadUrl(url) - , _bandwidthLimited(false) - , _bandwidthChoked(false) - , _bandwidthQuota(0) - , _bandwidthManager(0) , _hasEmittedFinishedSignal(false) - , _lastModified() { } - void GETFileJob::start() { if (_resumeStart > 0) { @@ -229,24 +215,24 @@ void GETFileJob::slotMetaDataChanged() _saveBodyToFile = true; } -void GETFileJob::setBandwidthManager(BandwidthManager *bwm) +void GETJob::setBandwidthManager(BandwidthManager *bwm) { _bandwidthManager = bwm; } -void GETFileJob::setChoked(bool c) +void GETJob::setChoked(bool c) { _bandwidthChoked = c; QMetaObject::invokeMethod(this, "slotReadyRead", Qt::QueuedConnection); } -void GETFileJob::setBandwidthLimited(bool b) +void GETJob::setBandwidthLimited(bool b) { _bandwidthLimited = b; QMetaObject::invokeMethod(this, "slotReadyRead", Qt::QueuedConnection); } -void GETFileJob::giveBandwidthQuota(qint64 q) +void GETJob::giveBandwidthQuota(qint64 q) { _bandwidthQuota = q; qCDebug(lcGetJob) << "Got" << q << "bytes"; @@ -322,7 +308,7 @@ void GETFileJob::slotReadyRead() } } -void GETFileJob::onTimedOut() +void GETJob::onTimedOut() { qCWarning(lcGetJob) << "Timeout" << (reply() ? reply()->request().url() : path()); if (!reply()) @@ -332,7 +318,7 @@ void GETFileJob::onTimedOut() reply()->abort(); } -QString GETFileJob::errorString() const +QString GETJob::errorString() const { if (!_errorString.isEmpty()) { return _errorString; @@ -413,7 +399,6 @@ void PropagateDownloadFile::startDownload() propagator()->reportProgress(*_item, 0); QString tmpFileName; - QByteArray expectedEtagForResume; const SyncJournalDb::DownloadInfo progressInfo = propagator()->_journal->getDownloadInfo(_item->_file); if (progressInfo._valid) { // if the etag has changed meanwhile, remove the already downloaded part. @@ -422,7 +407,7 @@ void PropagateDownloadFile::startDownload() propagator()->_journal->setDownloadInfo(_item->_file, SyncJournalDb::DownloadInfo()); } else { tmpFileName = progressInfo._tmpfile; - expectedEtagForResume = progressInfo._etag; + _expectedEtagForResume = progressInfo._etag; } } @@ -480,13 +465,60 @@ void PropagateDownloadFile::startDownload() propagator()->_journal->commit("download file start"); } + if (_item->_remotePerm.hasPermission(RemotePermissions::HasZSyncMetadata) && isZsyncPropagationEnabled(propagator(), _item)) { + if (_item->_previousSize) { + // Retrieve zsync metadata file from the server + qCInfo(lcZsyncGet) << "Retrieving zsync metadata for:" << _item->_file; + QNetworkRequest req; + req.setPriority(QNetworkRequest::LowPriority); + QUrl zsyncUrl = zsyncMetadataUrl(propagator(), _item->_file); + auto job = propagator()->account()->sendRequest("GET", zsyncUrl, req); + connect(job, &SimpleNetworkJob::finishedSignal, this, &PropagateDownloadFile::slotZsyncGetMetaFinished); + return; + } + qCInfo(lcZsyncGet) << "No local copy of:" << _item->_file; + } + + startFullDownload(); +} + + +void PropagateDownloadFile::slotZsyncGetMetaFinished(QNetworkReply *reply) +{ + int httpStatusCode = reply->attribute(QNetworkRequest::HttpStatusCodeAttribute).toInt(); + if (httpStatusCode / 100 != 2) { + /* Fall back to full download */ + qCWarning(lcZsyncGet) << "Failed to retrieve zsync metadata for:" << _item->_file; + startFullDownload(); + return; + } + + QByteArray zsyncData = reply->readAll(); + _expectedEtagForResume = getEtagFromReply(reply); + qCInfo(lcZsyncGet) << "Retrieved zsync metadata for:" << _item->_file << "size:" << zsyncData.size() + << "etag:" << _expectedEtagForResume; + + QMap headers; + _job = new GETFileZsyncJob(propagator(), _item, propagator()->_remoteFolder + _item->_file, + &_tmpFile, headers, _expectedEtagForResume, zsyncData, this); + connect(_job.data(), &GETJob::finishedSignal, this, &PropagateDownloadFile::slotGetFinished); + connect(qobject_cast(_job.data()), &GETFileZsyncJob::overallDownloadProgress, + this, &PropagateDownloadFile::slotDownloadProgress); + _job->setBandwidthManager(&propagator()->_bandwidthManager); + propagator()->_activeJobList.append(this); + _job->start(); + _isDeltaSyncDownload = true; +} + +void PropagateDownloadFile::startFullDownload() +{ QMap headers; if (_item->_directDownloadUrl.isEmpty()) { // Normal job, download from oC instance _job = new GETFileJob(propagator()->account(), propagator()->_remoteFolder + _item->_file, - &_tmpFile, headers, expectedEtagForResume, _resumeStart, this); + &_tmpFile, headers, _expectedEtagForResume, _resumeStart, this); } else { // We were provided a direct URL, use that one qCInfo(lcPropagateDownload) << "directDownloadUrl given for " << _item->_file << _item->_directDownloadUrl; @@ -498,11 +530,12 @@ void PropagateDownloadFile::startDownload() QUrl url = QUrl::fromUserInput(_item->_directDownloadUrl); _job = new GETFileJob(propagator()->account(), url, - &_tmpFile, headers, expectedEtagForResume, _resumeStart, this); + &_tmpFile, headers, _expectedEtagForResume, _resumeStart, this); } _job->setBandwidthManager(&propagator()->_bandwidthManager); - connect(_job.data(), &GETFileJob::finishedSignal, this, &PropagateDownloadFile::slotGetFinished); - connect(_job.data(), &GETFileJob::downloadProgress, this, &PropagateDownloadFile::slotDownloadProgress); + connect(_job.data(), &GETJob::finishedSignal, this, &PropagateDownloadFile::slotGetFinished); + connect(qobject_cast(_job.data()), &GETFileJob::downloadProgress, + this, &PropagateDownloadFile::slotDownloadProgress); propagator()->_activeJobList.append(this); _job->start(); } @@ -525,9 +558,31 @@ void PropagateDownloadFile::slotGetFinished() { propagator()->_activeJobList.removeOne(this); - GETFileJob *job = _job; + GETJob *job = _job; ASSERT(job); + SyncFileItem::Status status = job->errorStatus(); + + // Needed because GETFileZsyncJob may emit finishedSignal without any further network activity + if (!job->reply()) { + if (status == SyncFileItem::Success) { + _tmpFile.close(); + _tmpFile.flush(); + downloadFinished(); + return; + } + + FileSystem::remove(_tmpFile.fileName()); + if (status != SyncFileItem::NoStatus) { + done(status, job->errorString()); + return; + } + + ASSERT(false, "Download slot finished, but there was no reply!"); + done(SyncFileItem::FatalError, tr("Download slot finished, but there was no reply!")); + return; + } + QNetworkReply::NetworkError err = job->reply()->error(); if (err != QNetworkReply::NoError) { _item->_httpErrorCode = job->reply()->attribute(QNetworkRequest::HttpStatusCodeAttribute).toInt(); @@ -615,7 +670,7 @@ void PropagateDownloadFile::slotGetFinished() const QByteArray sizeHeader("Content-Length"); quint64 bodySize = job->reply()->rawHeader(sizeHeader).toULongLong(); - if (!job->reply()->rawHeader(sizeHeader).isEmpty() && _tmpFile.size() > 0 && bodySize == 0) { + if (!_isDeltaSyncDownload && !job->reply()->rawHeader(sizeHeader).isEmpty() && _tmpFile.size() > 0 && bodySize == 0) { // Strange bug with broken webserver or webfirewall https://github.com/owncloud/client/issues/3373#issuecomment-122672322 // This happened when trying to resume a file. The Content-Range header was files, Content-Length was == 0 qCDebug(lcPropagateDownload) << bodySize << _item->_size << _tmpFile.size() << job->resumeStart(); @@ -624,7 +679,7 @@ void PropagateDownloadFile::slotGetFinished() return; } - if (bodySize > 0 && bodySize != _tmpFile.size() - job->resumeStart()) { + if (!_isDeltaSyncDownload && bodySize > 0 && bodySize != _tmpFile.size() - job->resumeStart()) { qCDebug(lcPropagateDownload) << bodySize << _tmpFile.size() << job->resumeStart(); propagator()->_anotherSyncNeeded = true; done(SyncFileItem::SoftError, tr("The file could not be downloaded completely.")); @@ -975,6 +1030,7 @@ void PropagateDownloadFile::slotDownloadProgress(qint64 received, qint64) if (!_job) return; _downloadProgress = received; + propagator()->reportProgress(*_item, _resumeStart + received); } diff --git a/src/libsync/propagatedownload.h b/src/libsync/propagatedownload.h index 632cb0c7949..6c099563435 100644 --- a/src/libsync/propagatedownload.h +++ b/src/libsync/propagatedownload.h @@ -15,33 +15,124 @@ #include "owncloudpropagator.h" #include "networkjobs.h" +#include "propagatecommonzsync.h" #include #include namespace OCC { +class GETJob : public AbstractNetworkJob +{ + Q_OBJECT +protected: + QByteArray _etag; + time_t _lastModified = 0; + QString _errorString; + SyncFileItem::Status _errorStatus = SyncFileItem::NoStatus; + bool _bandwidthLimited = false; // if _bandwidthQuota will be used + bool _bandwidthChoked = false; // if download is paused (won't read on readyRead()) + qint64 _bandwidthQuota = 0; + QPointer _bandwidthManager = nullptr; + +public: + GETJob(AccountPtr account, const QString &path, QObject *parent = 0) + : AbstractNetworkJob(account, path, parent) + { + } + + ~GETJob() + { + if (_bandwidthManager) { + _bandwidthManager->unregisterDownloadJob(this); + } + } + + virtual qint64 currentDownloadPosition() = 0; + virtual quint64 resumeStart() { return 0; } + + QByteArray &etag() { return _etag; } + time_t lastModified() { return _lastModified; } + + void setErrorString(const QString &s) { _errorString = s; } + QString errorString() const; + SyncFileItem::Status errorStatus() { return _errorStatus; } + void setErrorStatus(const SyncFileItem::Status &s) { _errorStatus = s; } + void setBandwidthManager(BandwidthManager *bwm); + void setChoked(bool c); + void setBandwidthLimited(bool b); + void giveBandwidthQuota(qint64 q); + void onTimedOut(); + +signals: + void finishedSignal(); +}; + /** - * @brief The GETFileJob class + * @brief Downloads the zsync metadata and uses the original file as a seed, then downloads needed ranges via GET * @ingroup libsync */ -class GETFileJob : public AbstractNetworkJob +class GETFileZsyncJob : public GETJob +{ + Q_OBJECT + QFile *_device; + SyncFileItemPtr _item; + OwncloudPropagator *_propagator; + QMap _headers; + QByteArray _expectedEtagForResume; + bool _hasEmittedFinishedSignal; + QByteArray _zsyncData; + int _nrange = 0; + int _current = 0; + off_t _pos = 0; + off_t _received = 0; + /* these must be in this order so the destructors are done in the right order */ + zsync_unique_ptr _zs = nullptr; + zsync_unique_ptr _zr = nullptr; + zsync_unique_ptr _zbyterange = nullptr; + +public: + // DOES NOT take ownership of the device. + GETFileZsyncJob(OwncloudPropagator *propagator, SyncFileItemPtr &item, const QString &path, QFile *device, + const QMap &headers, const QByteArray &expectedEtagForResume, + const QByteArray &zsyncData, QObject *parent = 0); + + qint64 currentDownloadPosition() override; + + void start() override; + bool finished() override; + +private: + void seedFinished(void *zs); + void seedFailed(const QString &errorString); + + void startCurrentRange(quint64 start = 0, quint64 end = 0); + +private slots: + void slotReadyRead(); + void slotMetaDataChanged(); + +public slots: + void slotOverallDownloadProgress(qint64, qint64); + +signals: + void overallDownloadProgress(qint64, qint64); +}; + + +/** + * @brief Downloads the remote file via GET + * @ingroup libsync + */ +class GETFileJob : public GETJob { Q_OBJECT QFile *_device; QMap _headers; - QString _errorString; QByteArray _expectedEtagForResume; quint64 _resumeStart; - SyncFileItem::Status _errorStatus; QUrl _directDownloadUrl; - QByteArray _etag; - bool _bandwidthLimited; // if _bandwidthQuota will be used - bool _bandwidthChoked; // if download is paused (won't read on readyRead()) - qint64 _bandwidthQuota; - QPointer _bandwidthManager; bool _hasEmittedFinishedSignal; - time_t _lastModified; /// Will be set to true once we've seen a 2xx response header bool _saveBodyToFile = false; @@ -55,22 +146,15 @@ class GETFileJob : public AbstractNetworkJob explicit GETFileJob(AccountPtr account, const QUrl &url, QFile *device, const QMap &headers, const QByteArray &expectedEtagForResume, quint64 resumeStart, QObject *parent = 0); - virtual ~GETFileJob() - { - if (_bandwidthManager) { - _bandwidthManager->unregisterDownloadJob(this); - } - } - virtual void start() Q_DECL_OVERRIDE; - virtual bool finished() Q_DECL_OVERRIDE + qint64 currentDownloadPosition() Q_DECL_OVERRIDE; + + void start() Q_DECL_OVERRIDE; + bool finished() Q_DECL_OVERRIDE { if (reply()->bytesAvailable()) { return false; } else { - if (_bandwidthManager) { - _bandwidthManager->unregisterDownloadJob(this); - } if (!_hasEmittedFinishedSignal) { emit finishedSignal(); } @@ -81,31 +165,17 @@ class GETFileJob : public AbstractNetworkJob void newReplyHook(QNetworkReply *reply) override; - void setBandwidthManager(BandwidthManager *bwm); - void setChoked(bool c); - void setBandwidthLimited(bool b); - void giveBandwidthQuota(qint64 q); - qint64 currentDownloadPosition(); - - QString errorString() const; - void setErrorString(const QString &s) { _errorString = s; } - - SyncFileItem::Status errorStatus() { return _errorStatus; } - void setErrorStatus(const SyncFileItem::Status &s) { _errorStatus = s; } - - void onTimedOut() Q_DECL_OVERRIDE; - - QByteArray &etag() { return _etag; } - quint64 resumeStart() { return _resumeStart; } - time_t lastModified() { return _lastModified; } - + quint64 resumeStart() Q_DECL_OVERRIDE + { + return _resumeStart; + } -signals: - void finishedSignal(); - void downloadProgress(qint64, qint64); private slots: void slotReadyRead(); void slotMetaDataChanged(); + +signals: + void downloadProgress(qint64, qint64); }; /** @@ -116,39 +186,54 @@ private slots: \code{.unparsed} start() - | + + | deleteExistingFolder() if enabled | +--> mtime and size identical? | then compute the local checksum - | done?-> conflictChecksumComputed() - | | - | checksum differs? | - +-> startDownload() <--------------------------+ - | | - +-> run a GETFileJob | checksum identical? - | - done?-> slotGetFinished() | - | | - +-> validate checksum header | - | - done?-> transmissionChecksumValidated() | - | | - +-> compute the content checksum | - | - done?-> contentChecksumComputed() | - | | - +-> downloadFinished() | - | | - +------------------+ | - | | - +-> updateMetadata() <-------------------------+ + | done?+> conflictChecksumComputed() + | + + | checksum differs? | + +-> startDownload() <--------------------------------------------+ + + | + +-> isZsyncPropagationEnabled()? | + + | + +-+ yes +> local file exists? | + | + | + | +-+ yes +------> run a GETFIleZsyncJob | + | | | + + + done? +------------+ | + no no | | + + + | | + | v | | + +-> startFullDownload() | | + + | | + +-> run a GETFileJob | | checksum identical? + | | + done?+> slotGetFinished() <--------+ | + + | + +-> validate checksum header | + | + done?+> transmissionChecksumValidated() | + + | + +-> compute the content checksum | + | + done?+> contentChecksumComputed() | + + | + +-> downloadFinished() | + + | + +------------------+ | + | | + +-> updateMetadata() <-------------------------------+ \endcode */ class PropagateDownloadFile : public PropagateItemJob { Q_OBJECT + QByteArray _expectedEtagForResume; + bool _isDeltaSyncDownload = false; + public: PropagateDownloadFile(OwncloudPropagator *propagator, const SyncFileItemPtr &item) : PropagateItemJob(propagator, item) @@ -180,8 +265,11 @@ private slots: void conflictChecksumComputed(const QByteArray &checksumType, const QByteArray &checksum); /// Called to start downloading the remote file void startDownload(); - /// Called when the GETFileJob finishes + void startFullDownload(); + /// Called when the GETJob finishes void slotGetFinished(); + /// Called when the we have finished getting the zsync metadata file + void slotZsyncGetMetaFinished(QNetworkReply *reply); /// Called when the download's checksum header was validated void transmissionChecksumValidated(const QByteArray &checksumType, const QByteArray &checksum); /// Called when the download's checksum computation is done @@ -199,7 +287,7 @@ private slots: quint64 _resumeStart; qint64 _downloadProgress; - QPointer _job; + QPointer _job; QFile _tmpFile; bool _deleteExisting; ConflictRecord _conflictRecord; diff --git a/src/libsync/propagatedownloadzsync.cpp b/src/libsync/propagatedownloadzsync.cpp new file mode 100644 index 00000000000..12cec1a8f9f --- /dev/null +++ b/src/libsync/propagatedownloadzsync.cpp @@ -0,0 +1,323 @@ +/* + * Copyright (C) by Ahmed Ammar + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE + * WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR + * OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + * + */ + +#include "config.h" +#include "owncloudpropagator_p.h" +#include "propagatedownload.h" +#include "networkjobs.h" +#include "account.h" +#include "common/syncjournaldb.h" +#include "common/syncjournalfilerecord.h" +#include "common/utility.h" +#include "filesystem.h" +#include "propagatorjobs.h" +#include "propagateremotedelete.h" +#include "common/checksums.h" +#include "common/asserts.h" + +#include +#include +#include +#include +#include +#include + +#ifdef Q_OS_UNIX +#include +#endif + +namespace OCC { + +// DOES NOT take ownership of the device. +GETFileZsyncJob::GETFileZsyncJob(OwncloudPropagator *propagator, SyncFileItemPtr &item, + const QString &path, QFile *device, const QMap &headers, + const QByteArray &expectedEtagForResume, const QByteArray &zsyncData, QObject *parent) + : GETJob(propagator->account(), path, parent) + , _device(device) + , _item(item) + , _propagator(propagator) + , _headers(headers) + , _expectedEtagForResume(expectedEtagForResume) + , _hasEmittedFinishedSignal(false) + , _zsyncData(zsyncData) +{ +} + +void GETFileZsyncJob::startCurrentRange(quint64 start, quint64 end) +{ + _headers["Range"] = "bytes=" + QByteArray::number(start) + '-' + QByteArray::number(end); + + qCDebug(lcZsyncGet) << path() << "HTTP GET with range" << _headers["Range"]; + + QNetworkRequest req; + for (QMap::const_iterator it = _headers.begin(); it != _headers.end(); ++it) { + req.setRawHeader(it.key(), it.value()); + } + req.setPriority(QNetworkRequest::LowPriority); // Long downloads must not block non-propagation jobs. + + sendRequest("GET", makeDavUrl(path()), req); + + reply()->setReadBufferSize(16 * 1024); // keep low so we can easier limit the bandwidth + qCDebug(lcZsyncGet) << _bandwidthManager << _bandwidthChoked << _bandwidthLimited; + if (_bandwidthManager) { + _bandwidthManager->registerDownloadJob(this); + } + + if (reply()->error() != QNetworkReply::NoError) { + qCWarning(lcZsyncGet) << " Network error: " << errorString(); + } + + _pos = 0; + + connect(reply(), &QNetworkReply::downloadProgress, this, &GETFileZsyncJob::slotOverallDownloadProgress); + connect(reply(), &QIODevice::readyRead, this, &GETFileZsyncJob::slotReadyRead); + connect(reply(), &QNetworkReply::metaDataChanged, this, &GETFileZsyncJob::slotMetaDataChanged); + connect(this, &AbstractNetworkJob::networkActivity, account().data(), &Account::propagatorNetworkActivity); + + AbstractNetworkJob::start(); +} + +bool GETFileZsyncJob::finished() +{ + if (reply()->bytesAvailable()) { + return false; + } + + // zsync_receive_data will only complete once we have sent block aligned data + off_t range_size = _zbyterange.get()[(2 * _current) + 1] - _zbyterange.get()[(2 * _current)] + 1; + if (_pos < range_size) { + QByteArray fill(range_size - _pos, 0); + qCDebug(lcZsyncGet) << "About to zsync" << range_size - _pos << "filler bytes @" << _zbyterange.get()[2 * _current] << "pos:" << _pos << "of" << path(); + if (zsync_receive_data(_zr.get(), (const unsigned char *)fill.constData(), _zbyterange.get()[2 * _current] + _pos, range_size - _pos) != 0) { + _errorString = "Failed to receive data for: " + _propagator->getFilePath(_item->_file); + _errorStatus = SyncFileItem::NormalError; + qCWarning(lcZsyncGet) << "Error while writing to file:" << _errorString; + reply()->abort(); + emit finishedSignal(); + return true; + } + } + + // chain the next range if we still have some + if (_current < _nrange - 1) { + _current++; + startCurrentRange(_zbyterange.get()[2 * _current], _zbyterange.get()[(2 * _current) + 1]); + return false; + } + + if (!_hasEmittedFinishedSignal) { + emit finishedSignal(); + } + + _hasEmittedFinishedSignal = true; + + return true; // discard +} + +void GETFileZsyncJob::seedFinished(void *zs) +{ + _zs = zsync_unique_ptr(static_cast(zs), [](struct zsync_state *zs) { + zsync_complete(zs); + zsync_end(zs); + }); + if (!_zs) { + _errorString = tr("Unable to parse zsync."); + _errorStatus = SyncFileItem::NormalError; + qCDebug(lcZsyncGet) << _errorString; + emit finishedSignal(); + return; + } + + { /* And print how far we've progressed towards the target file */ + long long done, total; + + zsync_progress(_zs.get(), &done, &total); + qCInfo(lcZsyncGet).nospace() << "Done reading: " + << _propagator->getFilePath(_item->_file) + << " " << fixed << qSetRealNumberPrecision(1) << (100.0f * done) / total + << "% of target seeded."; + } + + /* Get a set of byte ranges that we need to complete the target */ + _zbyterange = zsync_unique_ptr(zsync_needed_byte_ranges(_zs.get(), &_nrange, 0), [](off_t *zbr) { + free(zbr); + }); + if (!_zbyterange) { + _errorString = tr("Failed to get zsync byte ranges."); + _errorStatus = SyncFileItem::NormalError; + qCDebug(lcZsyncGet) << _errorString; + emit finishedSignal(); + return; + } + + qCDebug(lcZsyncGet) << "Number of ranges:" << _nrange; + + /* If we have no ranges then we have equal files and we are done */ + if (_nrange == 0 && _item->_size == quint64(zsync_file_length(_zs.get()))) { + _propagator->reportFileTotal(*_item, 0); + _errorStatus = SyncFileItem::Success; + emit finishedSignal(); + return; + } + + _zr = zsync_unique_ptr(zsync_begin_receive(_zs.get(), 0), [](struct zsync_receiver *zr) { + zsync_end_receive(zr); + }); + if (!_zr) { + _errorString = tr("Failed to initialize zsync receive structure."); + _errorStatus = SyncFileItem::NormalError; + qCDebug(lcZsyncGet) << _errorString; + emit finishedSignal(); + return; + } + + quint64 totalBytes = 0; + for (int i = 0; i < _nrange; i++) { + totalBytes += _zbyterange.get()[(2 * i) + 1] - _zbyterange.get()[(2 * i)] + 1; + } + + qCDebug(lcZsyncGet) << "Total bytes:" << totalBytes; + _propagator->reportFileTotal(*_item, totalBytes); + + /* start getting bytes for first zsync byte range */ + startCurrentRange(_zbyterange.get()[0], _zbyterange.get()[1]); +} + +void GETFileZsyncJob::seedFailed(const QString &errorString) +{ + _errorString = errorString; + _errorStatus = SyncFileItem::NormalError; + + qCCritical(lcZsyncGet) << _errorString; + + /* delete remote zsync file */ + QUrl zsyncUrl = zsyncMetadataUrl(_propagator, _item->_file); + (new DeleteJob(_propagator->account(), zsyncUrl, this))->start(); + + emit finishedSignal(); +} + +void GETFileZsyncJob::start() +{ + ZsyncSeedRunnable *run = new ZsyncSeedRunnable(_zsyncData, _propagator->getFilePath(_item->_file), + ZsyncMode::download, _device->fileName()); + connect(run, &ZsyncSeedRunnable::finishedSignal, this, &GETFileZsyncJob::seedFinished); + connect(run, &ZsyncSeedRunnable::failedSignal, this, &GETFileZsyncJob::seedFailed); + + // Starts in a seperate thread + QThreadPool::globalInstance()->start(run); +} + +qint64 GETFileZsyncJob::currentDownloadPosition() +{ + return _received; +} + +void GETFileZsyncJob::slotReadyRead() +{ + if (!reply()) + return; + + int bufferSize = qMin(1024 * 8ll, reply()->bytesAvailable()); + QByteArray buffer(bufferSize, Qt::Uninitialized); + + while (reply()->bytesAvailable() > 0) { + if (_bandwidthChoked) { + qCWarning(lcZsyncGet) << "Download choked"; + return; + } + qint64 toRead = bufferSize; + if (_bandwidthLimited) { + toRead = qMin(qint64(bufferSize), _bandwidthQuota); + if (toRead == 0) { + qCWarning(lcZsyncGet) << "Out of quota"; + return; + } + _bandwidthQuota -= toRead; + } + + qint64 r = reply()->read(buffer.data(), toRead); + if (r < 0) { + _errorString = networkReplyErrorString(*reply()); + _errorStatus = SyncFileItem::NormalError; + qCWarning(lcZsyncGet) << "Error while reading from device: " << _errorString; + reply()->abort(); + return; + } + + if (!_nrange) { + qCWarning(lcZsyncGet) << "No ranges to fetch."; + _received += r; + _pos += r; + return; + } + + qCDebug(lcZsyncGet) << "About to zsync" << r << "bytes @" << _zbyterange.get()[2 * _current] << "pos:" << _pos << "of" << path(); + + if (zsync_receive_data(_zr.get(), (const unsigned char *)buffer.constData(), _zbyterange.get()[2 * _current] + _pos, r) != 0) { + _errorString = "Failed to receive data for: " + _propagator->getFilePath(_item->_file); + _errorStatus = SyncFileItem::NormalError; + qCWarning(lcZsyncGet) << "Error while writing to file:" << _errorString; + reply()->abort(); + return; + } + + _received += r; + _pos += r; + } +} + +void GETFileZsyncJob::slotMetaDataChanged() +{ + // For some reason setting the read buffer in GETFileJob::start doesn't seem to go + // through the HTTP layer thread(?) + reply()->setReadBufferSize(16 * 1024); + + int httpStatus = reply()->attribute(QNetworkRequest::HttpStatusCodeAttribute).toInt(); + + // If the status code isn't 2xx, don't write the reply body to the file. + // For any error: handle it when the job is finished, not here. + if (httpStatus / 100 != 2) { + _device->close(); + return; + } + if (reply()->error() != QNetworkReply::NoError) { + return; + } + _etag = getEtagFromReply(reply()); + + if (!_expectedEtagForResume.isEmpty() && _expectedEtagForResume != _etag) { + qCWarning(lcZsyncGet) << "We received a different E-Tag for delta!" + << _expectedEtagForResume << "vs" << _etag; + _errorString = tr("We received a different E-Tag for delta. Retrying next time."); + _errorStatus = SyncFileItem::NormalError; + reply()->abort(); + return; + } + + auto lastModified = reply()->header(QNetworkRequest::LastModifiedHeader); + if (!lastModified.isNull()) { + _lastModified = Utility::qDateTimeToTime_t(lastModified.toDateTime()); + } +} + +void GETFileZsyncJob::slotOverallDownloadProgress(qint64, qint64) +{ + emit overallDownloadProgress(_received, 0); +} +} diff --git a/src/libsync/propagateupload.h b/src/libsync/propagateupload.h index c156190b52f..ec3fd261226 100644 --- a/src/libsync/propagateupload.h +++ b/src/libsync/propagateupload.h @@ -15,12 +15,12 @@ #include "owncloudpropagator.h" #include "networkjobs.h" +#include "propagatecommonzsync.h" #include #include #include - namespace OCC { Q_DECLARE_LOGGING_CATEGORY(lcPutJob) @@ -344,9 +344,12 @@ class PropagateUploadFileNG : public PropagateUploadFileCommon private: quint64 _sent = 0; /// amount of data (bytes) that was already sent uint _transferId = 0; /// transfer id (part of the url) - int _currentChunk = 0; /// Id of the next chunk that will be sent + int _currentChunk = 0; /// id of the next chunk that will be sent quint64 _currentChunkSize = 0; /// current chunk size - bool _removeJobError = false; /// If not null, there was an error removing the job + bool _removeJobError = false; /// if not null, there was an error removing the job + bool _zsyncSupported = false; /// if zsync is supported this will be set to true + bool _isZsyncMetadataUploadRunning = false; // flag to ensure that zsync metadata upload is complete before job is + quint64 _bytesToUpload; // in case of zsync upload this will hold the actual bytes to upload, normal upload will be file size // Map chunk number with its size from the PROPFIND on resume. // (Only used from slotPropfindIterate/slotPropfindFinished because the LsColJob use signals to report data.) @@ -355,25 +358,37 @@ class PropagateUploadFileNG : public PropagateUploadFileCommon quint64 size; QString originalName; }; - QMap _serverChunks; + QMap _serverChunks; + + // Vector with expected PUT ranges. + struct UploadRangeInfo + { + quint64 start; + quint64 size; + }; + QVector _rangesToUpload; /** * Return the URL of a chunk. * If chunk == -1, returns the URL of the parent folder containing the chunks */ - QUrl chunkUrl(int chunk = -1); + QUrl chunkUrl(qint64 chunk = -1); + bool updateRanges(quint64 start, quint64 size); public: PropagateUploadFileNG(OwncloudPropagator *propagator, const SyncFileItemPtr &item) : PropagateUploadFileCommon(propagator, item) + , _bytesToUpload(item->_size) { } void doStartUpload() Q_DECL_OVERRIDE; private: + void doStartUploadNext(); void startNewUpload(); void startNextChunk(); + void doFinalMove(); public slots: void abort(AbortType abortType) Q_DECL_OVERRIDE; private slots: @@ -383,6 +398,12 @@ private slots: void slotDeleteJobFinished(); void slotMkColFinished(QNetworkReply::NetworkError); void slotPutFinished(); + void slotZsyncGetMetaFinished(QNetworkReply *reply); + void slotZsyncSeedFinished(void *zs); + void slotZsyncSeedFailed(const QString &errorString); + void slotZsyncGenerationFinished(const QString &fileName); + void slotZsyncGenerationFailed(const QString &errorString); + void slotZsyncMetadataUploadFinished(); void slotMoveJobFinished(); void slotUploadProgress(qint64, qint64); }; diff --git a/src/libsync/propagateuploadng.cpp b/src/libsync/propagateuploadng.cpp index 0b2c14295fe..a2523756fb8 100644 --- a/src/libsync/propagateuploadng.cpp +++ b/src/libsync/propagateuploadng.cpp @@ -30,58 +30,213 @@ #include #include #include + #include #include namespace OCC { -QUrl PropagateUploadFileNG::chunkUrl(int chunk) +QUrl PropagateUploadFileNG::chunkUrl(qint64 chunk) { QString path = QLatin1String("remote.php/dav/uploads/") + propagator()->account()->davUser() + QLatin1Char('/') + QString::number(_transferId); - if (chunk >= 0) { + if (chunk != -1) { // We need to do add leading 0 because the server orders the chunk alphabetically - path += QLatin1Char('/') + QString::number(chunk).rightJustified(8, '0'); + path += QLatin1Char('/') + QString::number(quint64(chunk)).rightJustified(16, '0'); // 1e16 is 10 petabyte } return Utility::concatUrlPath(propagator()->account()->url(), path); } -/* - State machine: - - *----> doStartUpload() - Check the db: is there an entry? - / \ - no yes - / \ - / PROPFIND - startNewUpload() <-+ +----------------------------\ - | | | \ - MKCOL + slotPropfindFinishedWithError() slotPropfindFinished() - | Is there stale files to remove? - slotMkColFinished() | | - | no yes - | | | - | | DeleteJob - | | | - +-----+<------------------------------------------------------+<--- slotDeleteJobFinished() - | - +----> startNextChunk() ---finished? --+ - ^ | | - +---------------+ | - | - +----------------------------------------+ - | - +-> MOVE ------> moveJobFinished() ---> finalize() +void PropagateUploadFileNG::slotZsyncSeedFinished(void *_zs) +{ + zsync_unique_ptr zs(static_cast(_zs), [](struct zsync_state *zs) { + zsync_end(zs); + }); + { /* And print how far we've progressed towards the target file */ + long long done, total; + + zsync_progress(zs.get(), &done, &total); + qCInfo(lcZsyncPut).nospace() << "Done reading: " + << _item->_file << " " << fixed << qSetRealNumberPrecision(1) << (100.0f * done) / total + << "% of target seeded."; + } + + /* Get a set of byte ranges that we need to complete the target */ + int _nrange = 0; + zsync_unique_ptr zbyterange(zsync_needed_byte_ranges(zs.get(), &_nrange, 0), [](off_t *zbr) { + free(zbr); + }); + if (!zbyterange) { + abortWithError(SyncFileItem::NormalError, tr("Failed to get zsync byte ranges.")); + return; + } + + qCDebug(lcZsyncPut) << "Number of ranges:" << _nrange; + /* If we have no ranges then we have equal files and we are done */ + if (_nrange == 0 && _item->_size == quint64(zsync_file_length(zs.get()))) { + propagator()->reportFileTotal(*_item, 0); + finalize(); + return; + } + + /** + * If the `_item->size` is smaller than remote file then zbyterange is expected to have ranges that are + * outside it's size. This is because of the simplicity of the current upload algorithm in `zsync`. It + * currently will just return all the differences between the two files up to the remote file + * size. This is because of the case of `Moved` blocks within a file, blocks that are at the end of + * the remote file might be useful to the local file, they might just need to be moved them earlier in + * the file. + */ + int totalBytes = 0; + for (int i = 0; i < _nrange; i++) { + UploadRangeInfo rangeinfo = { quint64(zbyterange.get()[(2 * i)]), quint64(zbyterange.get()[(2 * i) + 1]) - quint64(zbyterange.get()[(2 * i)]) + 1 }; + if (rangeinfo.start < _item->_size) { + if (rangeinfo.start + rangeinfo.size > _item->_size) + rangeinfo.size = _item->_size - rangeinfo.start; + _rangesToUpload.append(rangeinfo); + totalBytes += rangeinfo.size; + } + } + + /** + * _item->_size here is the local file size, where as zsync_file_length will provide the size + * of the remote item according to the zsync metadata downloaded. So if we have more bytes than + * remote then we must assume we have to upload them. This is related to the simple implementation + * for upload path today, but is an area for future work. + */ + if (_item->_size > quint64(zsync_file_length(zs.get()))) { + quint64 appendedBytes = _item->_size - quint64(zsync_file_length(zs.get())); + UploadRangeInfo rangeinfo = { quint64(zsync_file_length(zs.get())), appendedBytes }; + _rangesToUpload.append(rangeinfo); + totalBytes += rangeinfo.size; + } + qCDebug(lcZsyncPut) << "Total bytes:" << totalBytes; + propagator()->reportFileTotal(*_item, totalBytes); + + _bytesToUpload = totalBytes; + + doStartUploadNext(); +} + +void PropagateUploadFileNG::slotZsyncSeedFailed(const QString &errorString) +{ + qCCritical(lcZsyncPut) << errorString; + + /* delete remote zsync file */ + QUrl zsyncUrl = zsyncMetadataUrl(propagator(), _item->_file); + (new DeleteJob(propagator()->account(), zsyncUrl, this))->start(); + + abortWithError(SyncFileItem::NormalError, errorString); +} + +/* +State machine: + + +---> doStartUpload() + isZsyncPropagationEnabled()? +--+ yes +---> Download and seed zsync metadata and set-up new _rangesToUpload + + + + |no | + | | + | | + +^--------------------------------------------------------------+ + v + doStartUploadNext() + isZsyncPropagationEnabled()? +--+ yes +---> Generate new zsync metadata file +--------------------+ + + + | + |no | | + | | Upload .zsync chunk + v | | + Check the db: is there an entry? <-------------------------+ | + + + | + |no |yes | + | v | + v PROPFIND | + startNewUpload() <-+ +-------------------------------------+ | + + | + | | + MKCOL + slotPropfindFinishedWithError() slotPropfindFinished() | + + Is there stale files to remove? | + slotMkColFinished() + + | + + no yes | + | + + | + | | DeleteJob | + | | + | + +-----+^------------------------------------------------------+^--+ slotDeleteJobFinished() | + | | + | +--------------------------------------------------------------------+ + | v + +----> startNextChunk() +-> finished? +- + ^ + | + +---------------+ | + | + +----------------------------------------+ + | + +-> MOVE +-----> moveJobFinished() +--> finalize() */ void PropagateUploadFileNG::doStartUpload() { propagator()->_activeJobList.append(this); + _zsyncSupported = isZsyncPropagationEnabled(propagator(), _item); + if (_zsyncSupported && _item->_remotePerm.hasPermission(RemotePermissions::HasZSyncMetadata)) { + // Retrieve zsync metadata file from the server + qCInfo(lcZsyncPut) << "Retrieving zsync metadata for:" << _item->_file; + QNetworkRequest req; + req.setPriority(QNetworkRequest::LowPriority); + QUrl zsyncUrl = zsyncMetadataUrl(propagator(), _item->_file); + auto job = propagator()->account()->sendRequest("GET", zsyncUrl, req); + connect(job, &SimpleNetworkJob::finishedSignal, this, &PropagateUploadFileNG::slotZsyncGetMetaFinished); + return; + } + + UploadRangeInfo rangeinfo = { 0, _item->_size }; + _rangesToUpload.append(rangeinfo); + _bytesToUpload = _item->_size; + doStartUploadNext(); +} + +void PropagateUploadFileNG::slotZsyncGetMetaFinished(QNetworkReply *reply) +{ + int httpStatusCode = reply->attribute(QNetworkRequest::HttpStatusCodeAttribute).toInt(); + if (httpStatusCode / 100 != 2) { + /* Fall back to full upload */ + qCWarning(lcZsyncPut) << "Failed to retrieve zsync metadata for:" << _item->_file; + _rangesToUpload.clear(); + UploadRangeInfo rangeinfo = { 0, _item->_size }; + _rangesToUpload.append(rangeinfo); + _bytesToUpload = _item->_size; + doStartUploadNext(); + return; + } + + QByteArray zsyncData = reply->readAll(); + + qCInfo(lcZsyncPut) << "Retrieved zsync metadata for:" << _item->_file << "size:" << zsyncData.size(); + + ZsyncSeedRunnable *run = new ZsyncSeedRunnable(zsyncData, propagator()->getFilePath(_item->_file), ZsyncMode::upload); + connect(run, &ZsyncSeedRunnable::finishedSignal, this, &PropagateUploadFileNG::slotZsyncSeedFinished); + connect(run, &ZsyncSeedRunnable::failedSignal, this, &PropagateUploadFileNG::slotZsyncSeedFailed); + + // Starts in a seperate thread + QThreadPool::globalInstance()->start(run); +} + +void PropagateUploadFileNG::doStartUploadNext() +{ + if (_zsyncSupported) { + _isZsyncMetadataUploadRunning = true; + + ZsyncGenerateRunnable *run = new ZsyncGenerateRunnable(propagator()->getFilePath(_item->_file)); + connect(run, &ZsyncGenerateRunnable::finishedSignal, this, &PropagateUploadFileNG::slotZsyncGenerationFinished); + connect(run, &ZsyncGenerateRunnable::failedSignal, this, &PropagateUploadFileNG::slotZsyncGenerationFailed); + + // Starts in a seperate thread + QThreadPool::globalInstance()->start(run); + } + const SyncJournalDb::UploadInfo progressInfo = propagator()->_journal->getUploadInfo(_item->_file); if (progressInfo._valid && progressInfo._modtime == _item->_modtime) { _transferId = progressInfo._transferid; @@ -123,6 +278,30 @@ void PropagateUploadFileNG::slotPropfindIterate(const QString &name, const QMap< } } + +/* + * Finds the range starting at 'start' and removes the first 'size' bytes from it. If it becomes + * empty, remove the range. + */ +bool PropagateUploadFileNG::updateRanges(quint64 start, quint64 size) +{ + bool found = false; + for (auto iter = _rangesToUpload.begin(); iter != _rangesToUpload.end(); ++iter) { + /* Only remove if they start at exactly the same chunk */ + if (iter->start == start && iter->size >= size) { + found = true; + iter->start += size; + iter->size -= size; + if (iter->size == 0) { + _rangesToUpload.erase(iter); + break; + } + } + } + + return found; +} + void PropagateUploadFileNG::slotPropfindFinished() { auto job = qobject_cast(sender()); @@ -131,13 +310,18 @@ void PropagateUploadFileNG::slotPropfindFinished() _currentChunk = 0; _sent = 0; - while (_serverChunks.contains(_currentChunk)) { - _sent += _serverChunks[_currentChunk].size; - _serverChunks.remove(_currentChunk); - ++_currentChunk; + + for (auto &chunkId : _serverChunks.keys()) { + if (updateRanges(chunkId, _serverChunks[chunkId].size)) { + _sent += _serverChunks[chunkId].size; + _serverChunks.remove(chunkId); + } } - if (_sent > _item->_size) { + if (!_rangesToUpload.isEmpty()) + _currentChunk = _rangesToUpload.first().start; + + if (_sent > _bytesToUpload) { // Normally this can't happen because the size is xor'ed with the transfer id, and it is // therefore impossible that there is more data on the server than on the file. qCCritical(lcPropagateUpload) << "Inconsistency while resuming " << _item->_file @@ -222,7 +406,6 @@ void PropagateUploadFileNG::startNewUpload() ASSERT(propagator()->_activeJobList.count(this) == 1); _transferId = qrand() ^ _item->_modtime ^ (_item->_size << 16) ^ qHash(_item->_file); _sent = 0; - _currentChunk = 0; propagator()->reportProgress(*_item, 0); @@ -236,7 +419,6 @@ void PropagateUploadFileNG::startNewUpload() QMap headers; headers["OC-Total-Length"] = QByteArray::number(_item->_size); auto job = new MkColJob(propagator()->account(), chunkUrl(), headers, this); - connect(job, SIGNAL(finished(QNetworkReply::NetworkError)), this, SLOT(slotMkColFinished(QNetworkReply::NetworkError))); connect(job, &QObject::destroyed, this, &PropagateUploadFileCommon::slotJobDestroyed); @@ -257,53 +439,69 @@ void PropagateUploadFileNG::slotMkColFinished(QNetworkReply::NetworkError) abortWithError(status, job->errorStringParsingBody()); return; } + startNextChunk(); } +void PropagateUploadFileNG::doFinalMove() +{ + // Still not finished metadata upload. + if (_isZsyncMetadataUploadRunning) + return; + + // Still not finished all ranges. + if (!_rangesToUpload.isEmpty()) + return; + + _finished = true; + // Finish with a MOVE + QString destination = QDir::cleanPath(propagator()->account()->url().path() + QLatin1Char('/') + + propagator()->account()->davPath() + propagator()->_remoteFolder + _item->_file); + auto headers = PropagateUploadFileCommon::headers(); + + // "If-Match applies to the source, but we are interested in comparing the etag of the destination + auto ifMatch = headers.take("If-Match"); + if (!ifMatch.isEmpty()) { + headers["If"] = "<" + destination.toUtf8() + "> ([" + ifMatch + "])"; + } + if (!_transmissionChecksumHeader.isEmpty()) { + headers[checkSumHeaderC] = _transmissionChecksumHeader; + } + headers["OC-Total-Length"] = QByteArray::number(_bytesToUpload); + headers["OC-Total-File-Length"] = QByteArray::number(_item->_size); + + QUrl source = _zsyncSupported ? Utility::concatUrlPath(chunkUrl(), "/.file.zsync") : Utility::concatUrlPath(chunkUrl(), "/.file"); + + auto job = new MoveJob(propagator()->account(), source, destination, headers, this); + _jobs.append(job); + connect(job, &MoveJob::finishedSignal, this, &PropagateUploadFileNG::slotMoveJobFinished); + connect(job, &QObject::destroyed, this, &PropagateUploadFileCommon::slotJobDestroyed); + propagator()->_activeJobList.append(this); + job->start(); + return; +} + void PropagateUploadFileNG::startNextChunk() { if (propagator()->_abortRequested.fetchAndAddRelaxed(0)) return; - quint64 fileSize = _item->_size; - ENFORCE(fileSize >= _sent, "Sent data exceeds file size"); - - // prevent situation that chunk size is bigger then required one to send - _currentChunkSize = qMin(propagator()->_chunkSize, fileSize - _sent); + ENFORCE(_bytesToUpload >= _sent, "Sent data exceeds file size"); - if (_currentChunkSize == 0) { + // All ranges complete! + if (_rangesToUpload.isEmpty()) { Q_ASSERT(_jobs.isEmpty()); // There should be no running job anymore - _finished = true; - // Finish with a MOVE - QString destination = QDir::cleanPath(propagator()->account()->url().path() + QLatin1Char('/') - + propagator()->account()->davPath() + propagator()->_remoteFolder + _item->_file); - auto headers = PropagateUploadFileCommon::headers(); - - // "If-Match applies to the source, but we are interested in comparing the etag of the destination - auto ifMatch = headers.take("If-Match"); - if (!ifMatch.isEmpty()) { - headers["If"] = "<" + destination.toUtf8() + "> ([" + ifMatch + "])"; - } - if (!_transmissionChecksumHeader.isEmpty()) { - qCInfo(lcPropagateUpload) << destination << _transmissionChecksumHeader; - headers[checkSumHeaderC] = _transmissionChecksumHeader; - } - headers["OC-Total-Length"] = QByteArray::number(fileSize); - - auto job = new MoveJob(propagator()->account(), Utility::concatUrlPath(chunkUrl(), "/.file"), - destination, headers, this); - _jobs.append(job); - connect(job, &MoveJob::finishedSignal, this, &PropagateUploadFileNG::slotMoveJobFinished); - connect(job, &QObject::destroyed, this, &PropagateUploadFileCommon::slotJobDestroyed); - propagator()->_activeJobList.append(this); - job->start(); + doFinalMove(); return; } + _currentChunk = _rangesToUpload.first().start; + _currentChunkSize = qMin(propagator()->_chunkSize, _rangesToUpload.first().size); + auto device = new UploadDevice(&propagator()->_bandwidthManager); const QString fileName = propagator()->getFilePath(_item->_file); - if (!device->prepareAndOpen(fileName, _sent, _currentChunkSize)) { + if (!device->prepareAndOpen(fileName, _currentChunk, _currentChunkSize)) { qCWarning(lcPropagateUpload) << "Could not prepare upload device: " << device->errorString(); // If the file is currently locked, we want to retry the sync @@ -317,13 +515,13 @@ void PropagateUploadFileNG::startNextChunk() } QMap headers; - headers["OC-Chunk-Offset"] = QByteArray::number(_sent); + headers["OC-Chunk-Offset"] = QByteArray::number(_currentChunk); _sent += _currentChunkSize; QUrl url = chunkUrl(_currentChunk); // job takes ownership of device via a QScopedPointer. Job deletes itself when finishing - PUTFileJob *job = new PUTFileJob(propagator()->account(), url, device, headers, _currentChunk, this); + PUTFileJob *job = new PUTFileJob(propagator()->account(), url, device, headers, 0, this); _jobs.append(job); connect(job, &PUTFileJob::finishedSignal, this, &PropagateUploadFileNG::slotPutFinished); connect(job, &PUTFileJob::uploadProgress, @@ -333,7 +531,58 @@ void PropagateUploadFileNG::startNextChunk() connect(job, &QObject::destroyed, this, &PropagateUploadFileCommon::slotJobDestroyed); job->start(); propagator()->_activeJobList.append(this); - _currentChunk++; + updateRanges(_currentChunk, _currentChunkSize); +} + +void PropagateUploadFileNG::slotZsyncGenerationFinished(const QString &generatedFileName) +{ + qCDebug(lcPropagateUpload) + << "Finished generation of:" << generatedFileName + << "size:" << FileSystem::getSize(generatedFileName); + + auto device = new UploadDevice(&propagator()->_bandwidthManager); + + if (!device->prepareAndOpen(generatedFileName, 0, FileSystem::getSize(generatedFileName))) { + qCWarning(lcPropagateUpload) << "Could not prepare generated file: " << generatedFileName << device->errorString(); + abortWithError(SyncFileItem::SoftError, device->errorString()); + return; + } + + QMap headers; + QUrl url = Utility::concatUrlPath(chunkUrl(), ".zsync"); + + _sent += FileSystem::getSize(generatedFileName); + _bytesToUpload += FileSystem::getSize(generatedFileName); + + qCDebug(lcPropagateUpload) << "Starting upload of .zsync"; + + // job takes ownership of device via a QScopedPointer. Job deletes itself when finishing + PUTFileJob *job = new PUTFileJob(propagator()->account(), url, device, headers, 0, this); + _jobs.append(job); + connect(job, &PUTFileJob::finishedSignal, this, &PropagateUploadFileNG::slotZsyncMetadataUploadFinished); + connect(job, &PUTFileJob::uploadProgress, + this, &PropagateUploadFileNG::slotUploadProgress); + connect(job, &PUTFileJob::uploadProgress, + device, &UploadDevice::slotJobUploadProgress); + connect(job, &QObject::destroyed, this, &PropagateUploadFileCommon::slotJobDestroyed); + job->start(); + propagator()->_activeJobList.append(this); + + FileSystem::remove(generatedFileName); +} + +void PropagateUploadFileNG::slotZsyncMetadataUploadFinished() +{ + qCDebug(lcPropagateUpload) << "Uploading of .zsync complete"; + _isZsyncMetadataUploadRunning = false; + doFinalMove(); +} + +void PropagateUploadFileNG::slotZsyncGenerationFailed(const QString &errorString) +{ + qCWarning(lcZsyncPut) << "Failed to generate zsync metadata file:" << errorString; + + abortWithError(SyncFileItem::SoftError, tr("Failed to generate zsync file.")); } void PropagateUploadFileNG::slotPutFinished() @@ -358,7 +607,7 @@ void PropagateUploadFileNG::slotPutFinished() return; } - ENFORCE(_sent <= _item->_size, "can't send more than size"); + ENFORCE(_sent <= _bytesToUpload, "can't send more than size"); // Adjust the chunk size for the time taken. // @@ -391,7 +640,7 @@ void PropagateUploadFileNG::slotPutFinished() << propagator()->_chunkSize << "bytes"; } - bool finished = _sent == _item->_size; + bool finished = _sent == _bytesToUpload; // Check if the file still exists const QString fullFilePath(propagator()->getFilePath(_item->_file)); diff --git a/src/libsync/syncengine.cpp b/src/libsync/syncengine.cpp index 4a1ca472182..5dbedd4c5f5 100644 --- a/src/libsync/syncengine.cpp +++ b/src/libsync/syncengine.cpp @@ -26,6 +26,7 @@ #include "propagateremotedelete.h" #include "propagatedownload.h" #include "common/asserts.h" +#include "configfile.h" #ifdef Q_OS_WIN #include diff --git a/src/libsync/syncengine.h b/src/libsync/syncengine.h index d07075c8f58..022350fe115 100644 --- a/src/libsync/syncengine.h +++ b/src/libsync/syncengine.h @@ -285,6 +285,7 @@ private slots: int _uploadLimit; int _downloadLimit; + SyncOptions _syncOptions; /// Hook for computing checksums from csync_update diff --git a/src/libsync/syncoptions.h b/src/libsync/syncoptions.h index f6565584cd4..9375fc6f797 100644 --- a/src/libsync/syncoptions.h +++ b/src/libsync/syncoptions.h @@ -55,6 +55,12 @@ struct SyncOptions /** Whether parallel network jobs are allowed. */ bool _parallelNetworkJobs = true; + + /** Whether delta-synchronization is enabled */ + bool _deltaSyncEnabled = false; + + /** What the minimum file size (in Bytes) is for delta-synchronization */ + quint64 _deltaSyncMinFileSize = 0; }; diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index b23678543ea..619811c757a 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -1,6 +1,7 @@ include_directories(${QT_INCLUDES} ${CMAKE_SOURCE_DIR}/src ${CMAKE_SOURCE_DIR}/src/3rdparty/qtokenizer + ${CMAKE_SOURCE_DIR}/src/3rdparty/zsync/c ${CMAKE_SOURCE_DIR}/src/csync ${CMAKE_SOURCE_DIR}/src/csync/std ${CMAKE_SOURCE_DIR}/src/gui @@ -50,6 +51,7 @@ owncloud_add_test(SyncMove "syncenginetestutils.h") owncloud_add_test(SyncConflict "syncenginetestutils.h") owncloud_add_test(SyncFileStatusTracker "syncenginetestutils.h") owncloud_add_test(ChunkingNg "syncenginetestutils.h") +owncloud_add_test(Zsync "syncenginetestutils.h") owncloud_add_test(UploadReset "syncenginetestutils.h") owncloud_add_test(AllFilesDeleted "syncenginetestutils.h") owncloud_add_test(FolderWatcher "${FolderWatcher_SRC}") diff --git a/test/syncenginetestutils.h b/test/syncenginetestutils.h index f7cf4eb4623..ea9939fea90 100644 --- a/test/syncenginetestutils.h +++ b/test/syncenginetestutils.h @@ -12,6 +12,7 @@ #include "filesystem.h" #include "syncengine.h" #include "common/syncjournaldb.h" +#include #include #include @@ -68,7 +69,8 @@ class FileModifier virtual void remove(const QString &relativePath) = 0; virtual void insert(const QString &relativePath, qint64 size = 64, char contentChar = 'W') = 0; virtual void setContents(const QString &relativePath, char contentChar) = 0; - virtual void appendByte(const QString &relativePath) = 0; + virtual void appendByte(const QString &relativePath, char contentChar = 0) = 0; + virtual void modifyByte(const QString &relativePath, quint64 offset, char contentChar) = 0; virtual void mkdir(const QString &relativePath) = 0; virtual void rename(const QString &relativePath, const QString &relativeDestinationDirectory) = 0; virtual void setModTime(const QString &relativePath, const QDateTime &modTime) = 0; @@ -107,14 +109,29 @@ class DiskFileModifier : public FileModifier file.open(QFile::WriteOnly); file.write(QByteArray{}.fill(contentChar, size)); } - void appendByte(const QString &relativePath) override { + void appendByte(const QString &relativePath, char contentChar) override + { QFile file{_rootDir.filePath(relativePath)}; QVERIFY(file.exists()); file.open(QFile::ReadWrite); - QByteArray contents = file.read(1); + QByteArray contents; + if (contentChar) + contents += contentChar; + else + contents = file.read(1); file.seek(file.size()); file.write(contents); } + void modifyByte(const QString &relativePath, quint64 offset, char contentChar) override + { + QFile file{ _rootDir.filePath(relativePath) }; + QVERIFY(file.exists()); + file.open(QFile::ReadWrite); + file.seek(offset); + file.write(&contentChar, 1); + file.close(); + } + void mkdir(const QString &relativePath) override { _rootDir.mkpath(relativePath); } @@ -190,12 +207,23 @@ class FileInfo : public FileModifier file->contentChar = contentChar; } - void appendByte(const QString &relativePath) override { + void appendByte(const QString &relativePath, char contentChar = 0) override + { + Q_UNUSED(contentChar); FileInfo *file = findInvalidatingEtags(relativePath); Q_ASSERT(file); file->size += 1; } + void modifyByte(const QString &relativePath, quint64 offset, char contentChar) override + { + Q_UNUSED(offset); + Q_UNUSED(contentChar); + FileInfo *file = findInvalidatingEtags(relativePath); + Q_ASSERT(file); + Q_ASSERT(!"unimplemented"); + } + void mkdir(const QString &relativePath) override { createDir(relativePath); } @@ -365,6 +393,7 @@ class FakePropfindReply : public QNetworkReply xml.writeTextElement(ocUri, QStringLiteral("permissions"), fileInfo.isShared ? QStringLiteral("SRDNVCKW") : QStringLiteral("RDNVCKW")); xml.writeTextElement(ocUri, QStringLiteral("id"), fileInfo.fileId); xml.writeTextElement(ocUri, QStringLiteral("checksums"), fileInfo.checksums); + xml.writeTextElement(ocUri, QStringLiteral("zsync"), QStringLiteral("true")); buffer.write(fileInfo.extraDavProperties); xml.writeEndElement(); // prop xml.writeTextElement(davUri, QStringLiteral("status"), "HTTP/1.1 200 OK"); @@ -593,6 +622,7 @@ class FakeGetReply : public QNetworkReply } void abort() override { + setError(OperationCanceledError, "Operation Canceled"); aborted = true; } qint64 bytesAvailable() const override { @@ -612,6 +642,87 @@ class FakeGetReply : public QNetworkReply using QNetworkReply::setRawHeader; }; +class FakeGetWithDataReply : public QNetworkReply +{ + Q_OBJECT +public: + const FileInfo *fileInfo; + const char *payload; + quint64 size; + quint64 offset = 0; + bool aborted = false; + + FakeGetWithDataReply(FileInfo &remoteRootFileInfo, const QByteArray &data, QNetworkAccessManager::Operation op, const QNetworkRequest &request, QObject *parent) + : QNetworkReply{ parent } + { + setRequest(request); + setUrl(request.url()); + setOperation(op); + open(QIODevice::ReadOnly); + + Q_ASSERT(!data.isEmpty()); + payload = data.data(); + size = data.length(); + QString fileName = getFilePathFromUrl(request.url()); + Q_ASSERT(!fileName.isEmpty()); + fileInfo = remoteRootFileInfo.find(fileName); + QMetaObject::invokeMethod(this, "respond", Qt::QueuedConnection); + + if (request.hasRawHeader("Range")) { + QByteArray range = request.rawHeader("Range"); + quint64 start, end; + const char *r = range.constData(); + int res = sscanf(r, "bytes=%llu-%llu", &start, &end); + if (res == 2) { + payload += start; + size -= start; + } + } + } + + Q_INVOKABLE void respond() + { + if (aborted) { + setError(OperationCanceledError, "Operation Canceled"); + emit metaDataChanged(); + emit finished(); + return; + } + setHeader(QNetworkRequest::ContentLengthHeader, size); + setAttribute(QNetworkRequest::HttpStatusCodeAttribute, 200); + setRawHeader("OC-ETag", fileInfo->etag.toLatin1()); + setRawHeader("ETag", fileInfo->etag.toLatin1()); + setRawHeader("OC-FileId", fileInfo->fileId); + emit metaDataChanged(); + if (bytesAvailable()) + emit readyRead(); + emit finished(); + } + + void abort() override + { + setError(OperationCanceledError, "Operation Canceled"); + aborted = true; + } + qint64 bytesAvailable() const override + { + if (aborted) + return 0; + return size + QIODevice::bytesAvailable(); + } + + qint64 readData(char *data, qint64 maxlen) override + { + qint64 len = std::min(size, quint64(maxlen)); + std::memcpy(data, payload + offset, len); + size -= len; + offset += len; + return len; + } + + // useful to be public for testing + using QNetworkReply::setRawHeader; +}; class FakeChunkMoveReply : public QNetworkReply { @@ -630,27 +741,36 @@ class FakeChunkMoveReply : public QNetworkReply QString source = getFilePathFromUrl(request.url()); Q_ASSERT(!source.isEmpty()); - Q_ASSERT(source.endsWith("/.file")); - source = source.left(source.length() - qstrlen("/.file")); + Q_ASSERT(source.endsWith("/.file") || source.endsWith("/.file.zsync")); + if (source.endsWith("/.file")) + source = source.left(source.length() - qstrlen("/.file")); + if (source.endsWith("/.file.zsync")) + source = source.left(source.length() - qstrlen("/.file.zsync")); auto sourceFolder = uploadsFileInfo.find(source); Q_ASSERT(sourceFolder); Q_ASSERT(sourceFolder->isDir); int count = 0; int size = 0; + qlonglong prev = 0; char payload = '\0'; - do { - QString chunkName = QString::number(count).rightJustified(8, '0'); - if (!sourceFolder->children.contains(chunkName)) - break; + + // Ignore .zsync metadata + if (sourceFolder->children.contains(".zsync")) + sourceFolder->children.remove(".zsync"); + + for (auto chunkName : sourceFolder->children.keys()) { auto &x = sourceFolder->children[chunkName]; + if (chunkName.toLongLong() != prev) + break; Q_ASSERT(!x.isDir); Q_ASSERT(x.size > 0); // There should not be empty chunks size += x.size; Q_ASSERT(!payload || payload == x.contentChar); payload = x.contentChar; ++count; - } while(true); + prev = chunkName.toLongLong() + x.size; + } Q_ASSERT(count > 1); // There should be at least two chunks, otherwise why would we use chunking? QCOMPARE(sourceFolder->children.count(), count); // There should not be holes or extra files @@ -709,6 +829,97 @@ class FakeChunkMoveReply : public QNetworkReply qint64 readData(char *, qint64) override { return 0; } }; +class FakeChunkZsyncMoveReply : public QNetworkReply +{ + Q_OBJECT + FileInfo *fileInfo; + +public: + FakeChunkZsyncMoveReply(FileInfo &uploadsFileInfo, FileInfo &remoteRootFileInfo, + QNetworkAccessManager::Operation op, const QNetworkRequest &request, + quint64 delayMs, QVector &mods, QObject *parent) + : QNetworkReply{ parent } + { + setRequest(request); + setUrl(request.url()); + setOperation(op); + open(QIODevice::ReadOnly); + + Q_ASSERT(!mods.isEmpty()); + + QString source = getFilePathFromUrl(request.url()); + Q_ASSERT(!source.isEmpty()); + Q_ASSERT(source.endsWith("/.file.zsync")); + source = source.left(source.length() - qstrlen("/.file.zsync")); + auto sourceFolder = uploadsFileInfo.find(source); + Q_ASSERT(sourceFolder); + Q_ASSERT(sourceFolder->isDir); + int count = 0; + + // Ignore .zsync metadata + if (sourceFolder->children.contains(".zsync")) + sourceFolder->children.remove(".zsync"); + + for (auto chunkName : sourceFolder->children.keys()) { + auto &x = sourceFolder->children[chunkName]; + Q_ASSERT(!x.isDir); + Q_ASSERT(x.size > 0); // There should not be empty chunks + quint64 start = quint64(chunkName.toLongLong()); + auto it = mods.begin(); + while (it != mods.end()) { + if (*it >= start && *it < start + x.size) { + ++count; + mods.erase(it); + } else + ++it; + } + } + + Q_ASSERT(count > 0); // There should be at least one chunk + Q_ASSERT(mods.isEmpty()); // All files should match a modification + + QString fileName = getFilePathFromUrl(QUrl::fromEncoded(request.rawHeader("Destination"))); + Q_ASSERT(!fileName.isEmpty()); + + Q_ASSERT((fileInfo = remoteRootFileInfo.find(fileName))); + + QVERIFY(request.hasRawHeader("If")); // The client should put this header + if (request.rawHeader("If") != QByteArray("<" + request.rawHeader("Destination") + "> ([\"" + fileInfo->etag.toLatin1() + "\"])")) { + QMetaObject::invokeMethod(this, "respondPreconditionFailed", Qt::QueuedConnection); + return; + } + + if (!fileInfo) { + abort(); + return; + } + fileInfo->lastModified = OCC::Utility::qDateTimeFromTime_t(request.rawHeader("X-OC-Mtime").toLongLong()); + remoteRootFileInfo.find(fileName, /*invalidate_etags=*/true); + + QTimer::singleShot(delayMs, this, &FakeChunkZsyncMoveReply::respond); + } + + Q_INVOKABLE void respond() + { + setAttribute(QNetworkRequest::HttpStatusCodeAttribute, 201); + setRawHeader("OC-ETag", fileInfo->etag.toLatin1()); + setRawHeader("ETag", fileInfo->etag.toLatin1()); + setRawHeader("OC-FileId", fileInfo->fileId); + emit metaDataChanged(); + emit finished(); + } + + Q_INVOKABLE void respondPreconditionFailed() + { + setAttribute(QNetworkRequest::HttpStatusCodeAttribute, 412); + setError(InternalServerError, "Precondition Failed"); + emit metaDataChanged(); + emit finished(); + } + + void abort() override {} + qint64 readData(char *, qint64) override { return 0; } +}; class FakeErrorReply : public QNetworkReply { diff --git a/test/testzsync.cpp b/test/testzsync.cpp new file mode 100644 index 00000000000..f9c76ca84c3 --- /dev/null +++ b/test/testzsync.cpp @@ -0,0 +1,156 @@ +/* + * This software is in the public domain, furnished "as is", without technical + * support, and with no warranty, express or implied, as to its usefulness for + * any purpose. + * + */ + +#include +#include "syncenginetestutils.h" +#include +#include + +using namespace OCC; + +QStringList findConflicts(const FileInfo &dir) +{ + QStringList conflicts; + for (const auto &item : dir.children) { + if (item.name.contains("conflict")) { + conflicts.append(item.path()); + } + } + return conflicts; +} + +static quint64 blockstart_from_offset(quint64 offset) +{ + return offset & ~quint64(ZSYNC_BLOCKSIZE - 1); +} + +class TestZsync : public QObject +{ + Q_OBJECT + +private slots: + + void testFileDownloadSimple() + { + FakeFolder fakeFolder{ FileInfo::A12_B12_C12_S12() }; + fakeFolder.syncEngine().account()->setCapabilities({ { "dav", QVariantMap{ { "chunking", "1.0" }, { "zsync", "1.0" } } } }); + + SyncOptions opt; + opt._deltaSyncEnabled = true; + opt._deltaSyncMinFileSize = 0; + fakeFolder.syncEngine().setSyncOptions(opt); + + const int size = 100 * 1000 * 1000; + QByteArray metadata; + + // Test 1: NEW file upload with zsync metadata + fakeFolder.localModifier().insert("A/a0", size); + fakeFolder.localModifier().appendByte("A/a0", 'X'); + qsrand(QDateTime::currentDateTime().toTime_t()); + for (int i = 0; i < 10; i++) { + quint64 offset = qrand() % size; + fakeFolder.localModifier().modifyByte("A/a0", offset, 'Y'); + } + fakeFolder.setServerOverride([&](QNetworkAccessManager::Operation op, const QNetworkRequest &request, QIODevice *data) -> QNetworkReply * { + if (op == QNetworkAccessManager::PutOperation && request.url().toString().endsWith(".zsync")) { + metadata = data->readAll(); + return new FakePutReply{ fakeFolder.uploadState(), op, request, metadata, this }; + } + + return nullptr; + }); + QVERIFY(fakeFolder.syncOnce()); + QCOMPARE(fakeFolder.currentLocalState(), fakeFolder.currentRemoteState()); + + // Keep hold of original file contents + QFile f(fakeFolder.localPath() + "/A/a0"); + f.open(QIODevice::ReadOnly); + QByteArray data = f.readAll(); + f.close(); + + // Test 2: update local file to unchanged version and download changes + fakeFolder.localModifier().remove("A/a0"); + fakeFolder.localModifier().insert("A/a0", size); + auto currentMtime = QDateTime::currentDateTimeUtc(); + fakeFolder.remoteModifier().setModTime("A/a0", currentMtime); + fakeFolder.setServerOverride([&](QNetworkAccessManager::Operation op, const QNetworkRequest &request, QIODevice *) -> QNetworkReply * { + QUrlQuery query(request.url()); + if (op == QNetworkAccessManager::GetOperation) { + if (query.hasQueryItem("zsync")) { + return new FakeGetWithDataReply{ fakeFolder.remoteModifier(), metadata, op, request, this }; + } + + return new FakeGetWithDataReply{ fakeFolder.remoteModifier(), data, op, request, this }; + } + + return nullptr; + }); + QVERIFY(fakeFolder.syncOnce()); + auto conflicts = findConflicts(fakeFolder.currentLocalState().children["A"]); + QCOMPARE(conflicts.size(), 1); + for (auto c : conflicts) { + fakeFolder.localModifier().remove(c); + } + QCOMPARE(fakeFolder.currentLocalState(), fakeFolder.currentRemoteState()); + } + + void testFileUploadSimple() + { + FakeFolder fakeFolder{ FileInfo::A12_B12_C12_S12() }; + fakeFolder.syncEngine().account()->setCapabilities({ { "dav", QVariantMap{ { "chunking", "1.0" }, { "zsync", "1.0" } } } }); + + SyncOptions opt; + opt._deltaSyncEnabled = true; + opt._deltaSyncMinFileSize = 0; + fakeFolder.syncEngine().setSyncOptions(opt); + + const int size = 100 * 1000 * 1000; + QByteArray metadata; + + // Test 1: NEW file upload with zsync metadata + fakeFolder.localModifier().insert("A/a0", size); + fakeFolder.setServerOverride([&](QNetworkAccessManager::Operation op, const QNetworkRequest &request, QIODevice *data) -> QNetworkReply * { + if (op == QNetworkAccessManager::PutOperation && request.url().toString().endsWith(".zsync")) { + metadata = data->readAll(); + return new FakePutReply{ fakeFolder.uploadState(), op, request, metadata, this }; + } + + return nullptr; + }); + QVERIFY(fakeFolder.syncOnce()); + QCOMPARE(fakeFolder.currentLocalState(), fakeFolder.currentRemoteState()); + + // Test 2: Modify local contents and ensure that modified chunks are sent + QVector mods; + qsrand(QDateTime::currentDateTime().toTime_t()); + fakeFolder.localModifier().appendByte("A/a0", 'X'); + mods.append(blockstart_from_offset(size + 1)); + for (int i = 0; i < 10; i++) { + quint64 offset = qrand() % size; + fakeFolder.localModifier().modifyByte("A/a0", offset, 'Y'); + mods.append(blockstart_from_offset(offset)); + } + fakeFolder.setServerOverride([&](QNetworkAccessManager::Operation op, const QNetworkRequest &request, QIODevice *) -> QNetworkReply * { + QUrlQuery query(request.url()); + if (op == QNetworkAccessManager::GetOperation && query.hasQueryItem("zsync")) { + return new FakeGetWithDataReply{ fakeFolder.remoteModifier(), metadata, op, request, this }; + } + + if (request.attribute(QNetworkRequest::CustomVerbAttribute) == QLatin1String("MOVE")) { + return new FakeChunkZsyncMoveReply{ fakeFolder.uploadState(), fakeFolder.remoteModifier(), op, request, 0, mods, this }; + } + + return nullptr; + }); + QVERIFY(fakeFolder.syncOnce()); + fakeFolder.remoteModifier().appendByte("A/a0", 'X'); + QCOMPARE(fakeFolder.currentLocalState(), fakeFolder.currentRemoteState()); + } +}; + +QTEST_GUILESS_MAIN(TestZsync) +#include "testzsync.moc"