[WIP] feat: use buffered fwrite to write binlog instead of Mmap#2778
[WIP] feat: use buffered fwrite to write binlog instead of Mmap#2778cheniujh wants to merge 5 commits intoOpenAtomFoundation:unstablefrom
Conversation
WalkthroughThe update enhances the Changes
Sequence Diagram(s)sequenceDiagram
participant User
participant Binlog
participant BufferedWritableFile
participant TimerTaskThread
User->>Binlog: Initialize Binlog with Buffer
Binlog->>BufferedWritableFile: Create BufferedWritableFile
Binlog->>TimerTaskThread: Start Timer Task for Flush
loop Periodically
TimerTaskThread->>Binlog: Invoke FlushBufferedFile
Binlog->>BufferedWritableFile: Flush Data to Disk
end
User->>Binlog: Write Data
Binlog->>BufferedWritableFile: Append Data to Buffer
User->>Binlog: Destroy Binlog
Binlog->>TimerTaskThread: Stop Timer Task
Binlog->>BufferedWritableFile: Final Flush and Close
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (invoked as PR comments)
Additionally, you can add CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Actionable comments posted: 2
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (5)
- include/pika_binlog.h (3 hunks)
- include/pika_define.h (1 hunks)
- src/pika_binlog.cc (5 hunks)
- src/pstd/include/env.h (2 hunks)
- src/pstd/src/env.cc (3 hunks)
Additional comments not posted (21)
include/pika_binlog.h (3)
16-16: Header inclusion looks good.The inclusion of the
dispatch_thread.hheader is appropriate for supporting theTimerTaskThreadclass.
80-80: Method declaration looks good.The
FlushBufferedFilemethod declaration is appropriate for theBinlogclass.
113-114: Member addition looks good.The addition of the
timer_task_thread_member to theBinlogclass is appropriate for managing periodic tasks.src/pstd/include/env.h (1)
71-76: Function additions look good.The functions
NewBufferedWritableFileandBufferedAppendableFileare appropriate for handling buffered writable files with specified buffer sizes and offsets.include/pika_define.h (1)
35-35: Constant addition looks good.The addition of the constant
FWRITE_USER_SPACE_BUF_SIZEwith a value of 4KB is appropriate for defining the buffer size for buffered writable files.src/pika_binlog.cc (8)
84-84: Buffered writable file creation looks good.The use of
NewBufferedWritableFileto create a new buffered writable file with the specified buffer size is appropriate and aligns with the PR objectives.
115-115: Buffered appendable file creation looks good.The use of
BufferedAppendableFileto create a buffered appendable file with the specified buffer size and offset is appropriate and aligns with the PR objectives.
125-127: Periodic flush timer task addition looks good.The addition of a timer task to flush the buffered file every 500 milliseconds is appropriate and aligns with the PR objectives.
132-132: Stopping the timer task thread in the destructor looks good.Calling
StopThreadon thetimer_task_thread_member in theBinlogdestructor ensures proper cleanup.
136-144: FlushBufferedFile method implementation looks good.The
FlushBufferedFilemethod locks the mutex, checks if the file is open, and flushes the buffer if it is, ensuring thread safety and proper flushing.
227-227: Buffered writable file creation during log file rollover looks good.The use of
NewBufferedWritableFileto create a new buffered writable file with the specified buffer size during log file rollover is appropriate and aligns with the PR objectives.
404-404: Buffered writable file creation in SetProducerStatus method looks good.The use of
NewBufferedWritableFileto create a new buffered writable file with the specified buffer size in theSetProducerStatusmethod is appropriate and aligns with the PR objectives.
443-443: Buffered appendable file creation in Truncate method looks good.The use of
BufferedAppendableFileto create a buffered appendable file with the specified buffer size and offset in theTruncatemethod is appropriate and aligns with the PR objectives.src/pstd/src/env.cc (8)
494-501: Destructor should ensure resources are released.The destructor should ensure that the file is closed and the buffer is freed.
535-541: Handle fclose error properly in Close method.The
Closemethod should handle the case wherefclosefails and ensure the file pointer is set tonullptr.
543-548: Ensure proper handling of fflush errors in Flush method.The
Flushmethod should handle the case wherefflushfails and return an appropriate error status.
550-560: Ensure proper handling of fsync errors in Sync method.The
Syncmethod should handle the case wherefsyncfails and return an appropriate error status.
562-564: Trim method implementation is currently a no-op.The
Trimmethod is currently a no-op. If trimming functionality is required, implement the method; otherwise, document why it's a no-op.
564-565: Filesize method looks good.The
Filesizemethod correctly returns the current file size.
743-785: Handle file errors and buffer allocation failures properly.The function correctly handles file errors and buffer allocation failures. Ensure that resources are properly released in case of errors.
787-824: Handle file errors and buffer allocation failures properly.The function correctly handles file errors and buffer allocation failures. Ensure that resources are properly released in case of errors.
| Status Append(const Slice& data) override { | ||
| if (!file_) { | ||
| return IOError("fwrite target: " + filename_ + " is not opened", errno); | ||
| } | ||
| const char* src = data.data(); | ||
| size_t left = data.size(); | ||
| int32_t max_retries = 4; | ||
| int retry_count = 0; | ||
|
|
||
| while (left > 0) { | ||
| size_t written = fwrite(src, sizeof(char), left, file_); | ||
| if (written == 0) { | ||
| if (ferror(file_)) { | ||
| int err_num = errno; | ||
| clearerr(file_); | ||
| return IOError("fwrite error with " + filename_, err_num); | ||
| } | ||
| if (errno == ENOSPC || ++retry_count > max_retries) { | ||
| return IOError(filename_, errno); | ||
| } | ||
| } | ||
| src += written; | ||
| left -= written; | ||
| curr_file_size_ += written; | ||
| retry_count = 0; | ||
| } | ||
|
|
||
| return Status::OK(); | ||
| } |
There was a problem hiding this comment.
Handle retry logic and error conditions in Append method.
The Append method includes retry logic but does not handle the case where fwrite fails due to reasons other than ENOSPC. Consider adding a retry limit and handling other error scenarios.
Status Append(const Slice& data) override {
if (!file_) {
return IOError("fwrite target: " + filename_ + " is not opened", errno);
}
const char* src = data.data();
size_t left = data.size();
int32_t max_retries = 4;
int retry_count = 0;
while (left > 0) {
size_t written = fwrite(src, sizeof(char), left, file_);
if (written == 0) {
if (ferror(file_)) {
int err_num = errno;
clearerr(file_);
return IOError("fwrite error with " + filename_, err_num);
}
if (errno == ENOSPC || ++retry_count > max_retries) {
return IOError(filename_, errno);
}
}
src += written;
left -= written;
curr_file_size_ += written;
retry_count = 0;
}
return Status::OK();
}Committable suggestion was skipped due to low confidence.
| class BufferedWritableFile : public WritableFile { | ||
| private: | ||
| std::string filename_; | ||
| FILE* file_; | ||
| int32_t user_space_buf_size_; | ||
| uint64_t curr_file_size_; | ||
| char* buffer_; | ||
|
|
||
| public: | ||
| BufferedWritableFile() = delete; | ||
| BufferedWritableFile(const BufferedWritableFile&) = delete; | ||
| BufferedWritableFile& operator=(const BufferedWritableFile&) = delete; | ||
|
|
||
| // caller must ensure passing-in 'file' is not nullptr | ||
| BufferedWritableFile(std::string file_name, FILE* file, int32_t user_space_buf_size, int32_t curr_file_size, char* buf) | ||
| : filename_(std::move(file_name)), | ||
| file_(file), | ||
| user_space_buf_size_(user_space_buf_size), | ||
| curr_file_size_(curr_file_size), | ||
| buffer_(buf) { | ||
| assert(file_ && "file_ can not be nullptr"); | ||
| } |
There was a problem hiding this comment.
Ensure proper buffer management in the constructor.
The constructor takes ownership of the buffer but does not handle the case where setvbuf fails after buffer allocation.
- BufferedWritableFile(std::string file_name, FILE* file, int32_t user_space_buf_size, int32_t curr_file_size, char* buf)
+ BufferedWritableFile(std::string file_name, FILE* file, int32_t user_space_buf_size, int32_t curr_file_size, char* buf)
: filename_(std::move(file_name)),
file_(file),
user_space_buf_size_(user_space_buf_size),
curr_file_size_(curr_file_size),
buffer_(buf) {
assert(file_ && "file_ can not be nullptr");
if (user_space_buf_size_ > 0 && setvbuf(file_, buffer_, _IOFBF, user_space_buf_size_) != 0) {
free(buffer_);
buffer_ = nullptr;
user_space_buf_size_ = 0;
// Handle error appropriately, e.g., throw an exception or set an error status
}
}Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| class BufferedWritableFile : public WritableFile { | |
| private: | |
| std::string filename_; | |
| FILE* file_; | |
| int32_t user_space_buf_size_; | |
| uint64_t curr_file_size_; | |
| char* buffer_; | |
| public: | |
| BufferedWritableFile() = delete; | |
| BufferedWritableFile(const BufferedWritableFile&) = delete; | |
| BufferedWritableFile& operator=(const BufferedWritableFile&) = delete; | |
| // caller must ensure passing-in 'file' is not nullptr | |
| BufferedWritableFile(std::string file_name, FILE* file, int32_t user_space_buf_size, int32_t curr_file_size, char* buf) | |
| : filename_(std::move(file_name)), | |
| file_(file), | |
| user_space_buf_size_(user_space_buf_size), | |
| curr_file_size_(curr_file_size), | |
| buffer_(buf) { | |
| assert(file_ && "file_ can not be nullptr"); | |
| } | |
| class BufferedWritableFile : public WritableFile { | |
| private: | |
| std::string filename_; | |
| FILE* file_; | |
| int32_t user_space_buf_size_; | |
| uint64_t curr_file_size_; | |
| char* buffer_; | |
| public: | |
| BufferedWritableFile() = delete; | |
| BufferedWritableFile(const BufferedWritableFile&) = delete; | |
| BufferedWritableFile& operator=(const BufferedWritableFile&) = delete; | |
| // caller must ensure passing-in 'file' is not nullptr | |
| BufferedWritableFile(std::string file_name, FILE* file, int32_t user_space_buf_size, int32_t curr_file_size, char* buf) | |
| : filename_(std::move(file_name)), | |
| file_(file), | |
| user_space_buf_size_(user_space_buf_size), | |
| curr_file_size_(curr_file_size), | |
| buffer_(buf) { | |
| assert(file_ && "file_ can not be nullptr"); | |
| if (user_space_buf_size_ > 0 && setvbuf(file_, buffer_, _IOFBF, user_space_buf_size_) != 0) { | |
| free(buffer_); | |
| buffer_ = nullptr; | |
| user_space_buf_size_ = 0; | |
| // Handle error appropriately, e.g., throw an exception or set an error status | |
| } | |
| } |
|
目前增加了定时任务,每500ms主动flush一次。调研一下这种使用方式。 |
Currently, a scheduled task has been added, which actively flushes every 500ms. Research this usage. |
2 remove auto Flush() for every write
db8848f to
009952c
Compare
There was a problem hiding this comment.
Actionable comments posted: 0
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (2)
- include/pika_define.h (1 hunks)
- src/pika_binlog.cc (6 hunks)
Additional comments not posted (9)
include/pika_define.h (1)
35-35: LGTM!The new constant
FWRITE_USER_SPACE_BUF_SIZEis correctly defined as512LL << 10(512KB).src/pika_binlog.cc (8)
84-84: LGTM!The use of
pstd::NewBufferedWritableFilewith the buffer sizeFWRITE_USER_SPACE_BUF_SIZEis correct.
115-115: LGTM!The use of
pstd::BufferedAppendableFilewith the buffer sizeFWRITE_USER_SPACE_BUF_SIZEis correct.
124-127: LGTM!The addition of the timer task to flush the buffered file every 500 milliseconds is correctly implemented.
132-132: LGTM!The timer task thread is correctly stopped in the destructor to prevent resource leaks.
136-144: LGTM!The
FlushBufferedFilemethod is correctly implemented and thread-safe.
227-227: LGTM!The use of
pstd::NewBufferedWritableFilewith the buffer sizeFWRITE_USER_SPACE_BUF_SIZEin thePutmethod is correct.
280-282: Verify the commenting out of the flush operation.The flush operation in the
EmitPhysicalRecordmethod is commented out. Ensure this change is intentional and does not affect functionality.
404-404: LGTM!The use of
pstd::NewBufferedWritableFilewith the buffer sizeFWRITE_USER_SPACE_BUF_SIZEin theSetProducerStatusmethod is correct.
因为需要将崩溃场景以及无锁优化考虑进来,所以重写了方案,更换了PR:#2848