Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,25 @@ replication-num : 0
# The default value of consensus-level is 0, which means this feature is not enabled.
consensus-level : 0

# Batch processing configuration (used by both command collection and consensus mechanism)
# The maximum number of items in a batch (both command collection and consensus)
# Default: 100
batch-size : 100

# Batch processing configuration (used by both command collection and consensus mechanism)
# The maximum waiting batch for (both command collection and consensus)
# Default: 5
batch-max-wait-time : 5

# The timeout in milliseconds for waiting for a batch ACK from a slave.
# Default: 500
replication-ack-timeout : 500

# Enable command batch processing for better performance
# When enabled, write commands will be collected and processed in batches
# Default: no
command-batch-enabled : yes

# The Prefix of dump file's name.
# All the files that generated by command "bgsave" will be name with this prefix.
dump-prefix :
Expand Down
4 changes: 3 additions & 1 deletion include/pika_binlog.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class Version final : public pstd::noncopyable {

void debug() {
std::shared_lock l(rwlock_);
printf("Current pro_num %u pro_offset %llu\n", pro_num_, pro_offset_);
printf("Current pro_num %u pro_offset %lu\n", pro_num_, pro_offset_);
}

private:
Expand All @@ -61,6 +61,8 @@ class Binlog : public pstd::noncopyable {
* Set Producer pro_num and pro_offset with lock
*/
pstd::Status SetProducerStatus(uint32_t pro_num, uint64_t pro_offset, uint32_t term = 0, uint64_t index = 0);
// Force sync data to disk
pstd::Status Sync();
// Need to hold Lock();
pstd::Status Truncate(uint32_t pro_num, uint64_t pro_offset, uint64_t index);

Expand Down
2 changes: 2 additions & 0 deletions include/pika_client_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include <bitset>
#include <utility>
#include <future>

#include "acl.h"
#include "include/pika_command.h"
Expand Down Expand Up @@ -52,6 +53,7 @@ class PikaClientConn : public net::RedisConn {
bool cache_miss_in_rtc_;
};


struct TxnStateBitMask {
public:
static constexpr uint8_t Start = 0;
Expand Down
192 changes: 192 additions & 0 deletions include/pika_command_collector.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.

#ifndef PIKA_COMMAND_COLLECTOR_H_
#define PIKA_COMMAND_COLLECTOR_H_

#include <atomic>
#include <condition_variable>
#include <deque>
#include <functional>
#include <list>
#include <map>
#include <memory>
#include <mutex>
#include <string>
#include <thread>
#include <unordered_map>
#include <vector>
#include <chrono>
#include <optional>

#include "include/pika_command.h"
#include "include/pika_define.h"
#include "pstd/include/pstd_status.h"

#include "include/pika_consensus.h"

/**
* @brief PikaCommandCollector is used to collect write commands and process them in batches
*
* Main functions:
* 1. Collect write commands and process them in optimized batches after reaching the threshold
* 2. Handle the conflict of the same key (the later command will overwrite the earlier command)
* 3. Send commands in batches to the consensus coordinator with batch-level synchronization
* 4. Support asynchronous callback notification of command processing results
* 5. Track performance metrics for batch processing
* 6. Provide intelligent retry mechanisms for failed batches
*/
class PikaCommandCollector {
public:
// Callback function type after command processing is completed
using CommandCallback = std::function<void(const LogOffset& offset, pstd::Status status)>;

/**
* @brief constructor
* @param coordinator consensus coordinator reference
* @param batch_size batch size (number of commands)
* @param batch_max_wait_time forced flush interval (milliseconds)
*/
// Constructor with raw pointer (original)
PikaCommandCollector(ConsensusCoordinator* coordinator, size_t batch_size = 100, int batch_max_wait_time = 5);

// Constructor with shared_ptr (for compatibility with make_shared calls)
PikaCommandCollector(std::shared_ptr<ConsensusCoordinator> coordinator, size_t batch_size = 100, int batch_max_wait_time = 5);


~PikaCommandCollector();

/**
* @brief Add command to collector
* @param cmd_ptr command pointer
* @param callback callback function after processing is completed
* @return whether the addition was successful
*/
bool AddCommand(const std::shared_ptr<Cmd>& cmd_ptr, CommandCallback callback);

/**
* @brief Called periodically by external systems to process batches
* @param force Force processing even if batch is not full or timeout not reached
* @return Number of commands processed
*/

/**
* @brief Immediately process all currently collected commands
* @return The number of commands processed
*/
size_t FlushCommands(bool force = false);


/**
* @brief Get the current number of pending commands
* @return number of commands
*/
size_t PendingCommands() const;

/**
* @brief Set the batch size
* @param batch_size batch size
*/
void SetBatchSize(size_t batch_size);

/**
* @brief Set the batch max wait time
* @param batch_max_wait_time maximum wait time in milliseconds
*/
void SetBatchMaxWaitTime(int batch_max_wait_time);

/**
* @brief Get batch processing statistics
* @return Pair of (total_processed_commands, total_batches)
*/
std::pair<uint64_t, uint64_t> GetBatchStats() const;

/**
* @brief Get average batch processing time in milliseconds
* @return Average processing time or nullopt if no batches processed
*/
std::optional<double> GetAverageBatchTime() const;

private:

/**
* @brief batch processing command
* @param batch command batch
* @return Whether the processing is successful
*/
pstd::Status ProcessBatch(const std::vector<std::shared_ptr<Cmd>>& commands,
const std::vector<CommandCallback>& callbacks);

/**
* @brief Check for conflicts based on command type and key name
* @param cmd_ptr command pointer
* @return true if there is a conflict (should be replaced), false if there is no conflict
*/
bool CheckConflict(const std::shared_ptr<Cmd>& cmd_ptr) const;

/**
* @brief Handle key conflicts and remove conflicting commands
* @param cmd_ptr new command
*/
void HandleConflict(const std::shared_ptr<Cmd>& cmd_ptr);

/**
* @brief Retry batch processing commands
* @param commands List of commands to retry
* @param callbacks Corresponding callback function list
* @param priority Priority level for the retry (higher means more urgent)
* @return Whether the commands were successfully requeued
*/
bool RetryBatch(const std::vector<std::shared_ptr<Cmd>>& commands,
const std::vector<CommandCallback>& callbacks,
int priority = 100);

private:
//Consensus coordinator reference
ConsensusCoordinator* coordinator_;

// Batch processing configuration
std::atomic<size_t> batch_size_;
std::atomic<int> batch_max_wait_time_;

// Retry configuration
std::atomic<int> max_retry_attempts_{3};
std::atomic<int> retry_backoff_ms_{50};

// Command collection and processing
mutable std::mutex mutex_;

// Pending command queue and callbacks
std::list<std::pair<std::shared_ptr<Cmd>, CommandCallback>> pending_commands_;

// Priority queue for retries
std::deque<std::tuple<int, std::vector<std::shared_ptr<Cmd>>, std::vector<CommandCallback>>> retry_queue_;

// Command key mapping, used to handle same-key conflicts
std::unordered_map<std::string, std::list<std::pair<std::shared_ptr<Cmd>, CommandCallback>>::iterator> key_map_;

// Batch statistics
std::atomic<uint64_t> total_processed_{0};
std::atomic<uint64_t> total_batches_{0};
std::atomic<uint64_t> total_retries_{0};
std::atomic<uint64_t> total_conflicts_{0};
std::atomic<uint64_t> total_batch_time_ms_{0};
std::chrono::time_point<std::chrono::steady_clock> batch_start_time_;

// Performance tracking
struct BatchMetrics {
uint64_t batch_size;
uint64_t processing_time_ms;
uint64_t wait_time_ms;
bool successful;
};

// Circular buffer for recent batch metrics
static constexpr size_t kMetricsBufferSize = 100;
std::vector<BatchMetrics> recent_metrics_;
std::mutex metrics_mutex_;
};

#endif // PIKA_COMMAND_COLLECTOR_H_
62 changes: 60 additions & 2 deletions include/pika_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,21 @@ class PikaConf : public pstd::BaseConf {
std::shared_lock l(rwlock_);
return sync_thread_num_;
}

bool command_batch_enabled() {
std::shared_lock l(rwlock_);
return command_batch_enabled_;
}

int batch_size() {
std::shared_lock l(rwlock_);
return batch_size_;
}

int batch_max_wait_time() {
std::shared_lock l(rwlock_);
return batch_max_wait_time_;
}
int sync_binlog_thread_num() {
std::shared_lock l(rwlock_);
return sync_binlog_thread_num_;
Expand Down Expand Up @@ -350,6 +365,16 @@ class PikaConf : public pstd::BaseConf {
int max_conn_rbuf_size() { return max_conn_rbuf_size_.load(); }
int consensus_level() { return consensus_level_.load(); }
int replication_num() { return replication_num_.load(); }
int replication_ack_timeout() {
std::shared_lock l(rwlock_);
return replication_ack_timeout_;
}

// Function to set replication acknowledgment timeout (used by batch system)
void SetReplicationAckTimeout(int timeout) {
std::lock_guard l(rwlock_);
replication_ack_timeout_ = timeout;
}
int rate_limiter_mode() {
std::shared_lock l(rwlock_);
return rate_limiter_mode_;
Expand Down Expand Up @@ -436,7 +461,6 @@ class PikaConf : public pstd::BaseConf {
bool is_admin_cmd(const std::string& cmd) {
return admin_cmd_set_.find(cmd) != admin_cmd_set_.end();
}

// Immutable config items, we don't use lock.
bool daemonize() { return daemonize_; }
bool rtc_cache_read_enabled() { return rtc_cache_read_enabled_; }
Expand All @@ -462,6 +486,23 @@ class PikaConf : public pstd::BaseConf {
std::lock_guard l(rwlock_);
thread_num_ = value;
}

void SetCommandBatchEnabled(const bool value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("command-batch-enabled", value ? "yes" : "no");
command_batch_enabled_ = value;
}

void SetCommandBatchSize(const int value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("batch-size", std::to_string(value));
batch_size_ = value;
}
void SetCommandBatchMaxWaitTime(const int value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("batch-max-wait-time", std::to_string(value));
batch_max_wait_time_ = value;
}
void SetTimeout(const int value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("timeout", std::to_string(value));
Expand Down Expand Up @@ -665,6 +706,17 @@ class PikaConf : public pstd::BaseConf {
TryPushDiffCommands("max-conn-rbuf-size", std::to_string(value));
max_conn_rbuf_size_.store(value);
}
void SetConsensusBatchSize(const int value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("batch-size", std::to_string(value));
batch_size_ = value;
}
// This method is used by config update system
void UpdateReplicationAckTimeout(const int value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("replication-ack-timeout", std::to_string(value));
replication_ack_timeout_ = value;
}
void SetMaxCacheFiles(const int& value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("max-cache-files", std::to_string(value));
Expand Down Expand Up @@ -929,6 +981,12 @@ class PikaConf : public pstd::BaseConf {
std::string server_id_;
std::string run_id_;
std::string replication_id_;

// 命令批处理相关配置
bool command_batch_enabled_ = true;
int batch_size_ = 100;
int batch_max_wait_time_ = 5;
int replication_ack_timeout_ = 5000;
std::string requirepass_;
std::string masterauth_;
std::string userpass_;
Expand Down Expand Up @@ -1047,7 +1105,7 @@ class PikaConf : public pstd::BaseConf {
int throttle_bytes_per_second_ = 200 << 20; // 200MB/s
int max_rsync_parallel_num_ = kMaxRsyncParallelNum;
std::atomic_int64_t rsync_timeout_ms_ = 1000;

/*
kUninitialized = 0, // unknown setting
kDisable = 1, // disable perf stats
Expand Down
Loading