-
Notifications
You must be signed in to change notification settings - Fork 391
/
FileWriter.cpp
198 lines (189 loc) · 6.14 KB
/
FileWriter.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
/**
* Copyright (c) 2014-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree.
*/
#include <wdt/util/FileWriter.h>
#include <wdt/util/CommonImpl.h>
#include <fcntl.h>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include <sys/types.h>
namespace facebook {
namespace wdt {
FileWriter::~FileWriter() {
// Make sure that the file is closed but this should be a no-op as the
// caller should always call sync() and close() manually to check the error
// code.
if (!isClosed()) {
WLOG(ERROR ) << "File " << blockDetails_->fileName
<< " was not closed and needed to be closed in the dtor";
close();
}
}
ErrorCode FileWriter::open() {
if (threadCtx_.getOptions().skip_writes) {
return OK;
}
// TODO: consider a working optimization for small files
fd_ = fileCreator_->openForBlocks(threadCtx_, blockDetails_);
if (blockDetails_->allocationStatus == TO_BE_DELETED) {
WDT_CHECK_EQ(-1, fd_);
return OK;
}
if (fd_ >= 0 && blockDetails_->offset > 0) {
int64_t ret;
{
PerfStatCollector statCollector(threadCtx_, PerfStatReport::FILE_SEEK);
ret = lseek(fd_, blockDetails_->offset, SEEK_SET);
}
if (ret < 0) {
WPLOG(ERROR) << "Unable to seek to " << blockDetails_->offset << " for "
<< blockDetails_->fileName;
close();
return FILE_WRITE_ERROR;
}
}
if (fd_ == -1) {
WLOG(ERROR) << "File open/seek failed for " << blockDetails_->fileName;
return FILE_WRITE_ERROR;
}
return OK;
}
ErrorCode FileWriter::sync() {
if (fd_ < 0) {
// File was either never opened or already closed
return OK;
}
const auto &options = threadCtx_.getOptions();
if (options.fsync || options.isLogBasedResumption()) {
PerfStatCollector statCollector(threadCtx_, PerfStatReport::FSYNC_STATS);
if (::fsync(fd_) < 0) {
WPLOG(ERROR) << "Unable to fsync() fd " << fd_;
return FILE_WRITE_ERROR;
}
}
#ifdef HAS_POSIX_FADVISE
if (!options.skip_fadvise) {
PerfStatCollector statCollector(threadCtx_, PerfStatReport::FADVISE);
if (posix_fadvise(fd_, blockDetails_->offset, blockDetails_->dataSize,
POSIX_FADV_DONTNEED) != 0) {
WPLOG(ERROR) << "posix_fadvise failed for " << blockDetails_->fileName
<< " " << blockDetails_->offset << " "
<< blockDetails_->dataSize;
return FILE_WRITE_ERROR;
}
}
#endif
return OK;
}
ErrorCode FileWriter::close() {
if (fd_ >= 0) {
PerfStatCollector statCollector(threadCtx_, PerfStatReport::FILE_CLOSE);
if (::close(fd_) != 0) {
WPLOG(ERROR) << "Unable to close fd " << fd_;
fd_ = -1;
return FILE_WRITE_ERROR;
}
fd_ = -1;
}
return OK;
}
bool FileWriter::isClosed() {
return fd_ < 0;
}
ErrorCode FileWriter::write(char *buf, int64_t size) {
WDT_CHECK_NE(TO_BE_DELETED, blockDetails_->allocationStatus);
auto &options = threadCtx_.getOptions();
if (!options.skip_writes) {
int64_t count = 0;
while (count < size) {
int64_t written;
{
PerfStatCollector statCollector(threadCtx_, PerfStatReport::FILE_WRITE);
written = ::write(fd_, buf + count, size - count);
}
if (written == -1) {
if (errno == EINTR) {
WVLOG(1) << "Disk write interrupted, retrying "
<< blockDetails_->fileName;
continue;
}
WPLOG(ERROR) << "File write failed for " << blockDetails_->fileName
<< "fd : " << fd_ << " " << written << " " << count << " "
<< size;
return FILE_WRITE_ERROR;
}
count += written;
}
WVLOG(1) << "Successfully written " << count << " bytes to fd " << fd_
<< " for file " << blockDetails_->fileName;
const bool finished = ((totalWritten_ + size) == blockDetails_->dataSize);
if (!syncFileRange(count, finished /*forced*/)) {
return FILE_WRITE_ERROR;
}
}
totalWritten_ += size;
return OK;
}
bool FileWriter::syncFileRange(int64_t written, bool forced) {
#ifdef HAS_SYNC_FILE_RANGE
const WdtOptions &options = threadCtx_.getOptions();
if (options.disk_sync_interval_mb < 0) {
return true;
}
const int64_t syncIntervalBytes = options.disk_sync_interval_mb * 1024 * 1024;
writtenSinceLastSync_ += written;
if (writtenSinceLastSync_ == 0) {
// no need to sync
WVLOG(1) << "skipping syncFileRange for " << blockDetails_->fileName
<< ". Data written " << written
<< " sync forced = " << std::boolalpha << forced;
return true;
}
if (forced || writtenSinceLastSync_ > syncIntervalBytes) {
// sync_file_range with flag SYNC_FILE_RANGE_WRITE is an asynchronous
// operation. So, this is not that costly. Source :
// http://yoshinorimatsunobu.blogspot.com/2014/03/how-syncfilerange-really-works.html
int status;
{
PerfStatCollector statCollector(threadCtx_,
PerfStatReport::SYNC_FILE_RANGE);
status = sync_file_range(fd_, nextSyncOffset_, writtenSinceLastSync_,
SYNC_FILE_RANGE_WRITE);
}
if (status != 0) {
WPLOG(ERROR) << "sync_file_range() failed for " << blockDetails_->fileName
<< "fd " << fd_;
return false;
}
#ifdef HAS_POSIX_FADVISE
int ret;
if (!options.skip_fadvise) {
{
PerfStatCollector statCollector(threadCtx_, PerfStatReport::FADVISE);
ret = posix_fadvise(fd_, nextSyncOffset_, writtenSinceLastSync_,
POSIX_FADV_DONTNEED);
}
if (ret != 0) {
WPLOG(ERROR) << "posix_fadvise failed for "
<< blockDetails_->fileName
<< " " << nextSyncOffset_ << " "
<< writtenSinceLastSync_;
return false;
}
}
#endif
WVLOG(1) << "file range [" << nextSyncOffset_ << " "
<< writtenSinceLastSync_ << "] synced for file "
<< blockDetails_->fileName;
nextSyncOffset_ += writtenSinceLastSync_;
writtenSinceLastSync_ = 0;
}
#endif
return true;
}
}
}