Skip to content

Commit b871b34

Browse files
committed
feat: cherry picked commits about HA from apache-ratis branch
1 parent 6b93231 commit b871b34

13 files changed

Lines changed: 234 additions & 48 deletions

File tree

engine/src/main/java/com/arcadedb/GlobalConfiguration.java

Lines changed: 61 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -526,9 +526,16 @@ Enable diagnostic logging during vector graph build progress (heap/off-heap memo
526526
"Example: localhost:2434:2480:10,192.168.0.1:2434:2480:0",
527527
String.class, ""),
528528

529+
HA_SERVER_ROLE("arcadedb.ha.serverRole", SCOPE.SERVER,
530+
"Enforces a role in a cluster. 'any' (default) means this node can be elected leader. "
531+
+ "'replica' sets the Raft peer priority to 0 so the node is never elected leader "
532+
+ "(useful for read-scale or witness deployments).",
533+
String.class, "any", Set.of("any", "replica")),
534+
529535
HA_QUORUM("arcadedb.ha.quorum", SCOPE.SERVER,
530-
"Default quorum between 'none', one, two, three, 'majority' and 'all' servers. Default is majority", String.class, "majority",
531-
Set.of("none", "one", "two", "three", "majority", "all")),
536+
"Write quorum: 'majority' (standard Raft, default) or 'all' (every configured peer must acknowledge). "
537+
+ "Legacy values 'none', 'one', 'two', 'three' are no longer supported.",
538+
String.class, "majority", Set.of("majority", "all")),
532539

533540
HA_QUORUM_TIMEOUT("arcadedb.ha.quorumTimeout", SCOPE.SERVER, "Timeout waiting for the quorum", Long.class, 10000),
534541

@@ -544,6 +551,21 @@ Enable diagnostic logging during vector graph build progress (heap/off-heap memo
544551
HA_APPEND_BUFFER_SIZE("arcadedb.ha.appendBufferSize", SCOPE.SERVER,
545552
"AppendEntries batch byte limit for replication (e.g. '4MB')", String.class, "4MB"),
546553

554+
HA_WRITE_BUFFER_SIZE("arcadedb.ha.writeBufferSize", SCOPE.SERVER,
555+
"Raft log write buffer size (e.g. '8MB'). Must be at least appendBufferSize + 8 bytes, "
556+
+ "otherwise the server fails to start with ConfigurationException.",
557+
String.class, "8MB"),
558+
559+
HA_LOG_PURGE_GAP("arcadedb.ha.logPurgeGap", SCOPE.SERVER,
560+
"Number of Raft log entries retained after a snapshot as a buffer for slightly lagging followers. "
561+
+ "Lower values free disk faster but raise the chance a slow follower needs a full snapshot resync.",
562+
Integer.class, 1024),
563+
564+
HA_LOG_PURGE_UPTO_SNAPSHOT("arcadedb.ha.logPurgeUptoSnapshot", SCOPE.SERVER,
565+
"When true (default), deletes old Raft log segments after each snapshot to bound disk growth. "
566+
+ "Set to false to retain full log history for debugging/auditing.",
567+
Boolean.class, true),
568+
547569
HA_REPLICATION_CHUNK_MAXSIZE("arcadedb.ha.replicationChunkMaxSize", SCOPE.SERVER,
548570
"Maximum channel chunk size for replicating messages between servers. Default is 16777216", Integer.class, 16384 * 1024),
549571

@@ -574,19 +596,30 @@ Enable diagnostic logging during vector graph build progress (heap/off-heap memo
574596
"set to true for durable deployments.",
575597
Boolean.class, false),
576598

577-
HA_RAFT_SNAPSHOT_THRESHOLD("arcadedb.ha.raftSnapshotThreshold", SCOPE.SERVER,
578-
"Number of Raft log entries after which the leader automatically takes a snapshot. " +
579-
"Lower values cause more frequent snapshots and earlier log compaction.",
580-
Long.class, 10000L),
599+
HA_SNAPSHOT_THRESHOLD("arcadedb.ha.snapshotThreshold", SCOPE.SERVER,
600+
"Number of Raft log entries after which the leader automatically takes a snapshot. "
601+
+ "Lower values cause more frequent snapshots and earlier log compaction.",
602+
Long.class, 100_000L),
581603

582604
HA_LOG_VERBOSE("arcadedb.ha.logVerbose", SCOPE.SERVER,
583605
"HA verbose logging level: 0=off, 1=basic (elections, leader changes), 2=detailed (replication, forwarding), 3=trace (every state machine apply)",
584606
Integer.class, 0),
585607

586-
HA_RAFT_GROUP_COMMIT_BATCH_SIZE("arcadedb.ha.raftGroupCommitBatchSize", SCOPE.SERVER,
587-
"Maximum number of Raft log entries to batch in a single group commit flush. Higher values improve throughput under concurrent load.",
608+
HA_GROUP_COMMIT_BATCH_SIZE("arcadedb.ha.groupCommitBatchSize", SCOPE.SERVER,
609+
"Maximum number of Raft log entries to batch in a single group commit flush. "
610+
+ "Higher values improve throughput under concurrent load.",
588611
Integer.class, 500),
589612

613+
HA_GROUP_COMMIT_QUEUE_SIZE("arcadedb.ha.groupCommitQueueSize", SCOPE.SERVER,
614+
"Maximum pending transactions allowed in the Raft group-commit queue. "
615+
+ "When the queue is full, the server applies backpressure by throwing ReplicationQueueFullException "
616+
+ "(a NeedRetryException that clients can retry).",
617+
Integer.class, 10_000),
618+
619+
HA_GROUP_COMMIT_OFFER_TIMEOUT("arcadedb.ha.groupCommitOfferTimeout", SCOPE.SERVER,
620+
"Timeout in ms waiting for space in the group-commit queue before throwing ReplicationQueueFullException.",
621+
Integer.class, 100),
622+
590623
HA_CLUSTER_TOKEN("arcadedb.ha.clusterToken", SCOPE.SERVER,
591624
"Shared secret for inter-node request forwarding authentication. " +
592625
"Must be identical on all cluster nodes. " +
@@ -605,6 +638,10 @@ Enable diagnostic logging during vector graph build progress (heap/off-heap memo
605638
"Maximum number of concurrent snapshot downloads served by the leader. Requests over this limit receive HTTP 503.",
606639
Integer.class, 2),
607640

641+
HA_SNAPSHOT_DOWNLOAD_TIMEOUT("arcadedb.ha.snapshotDownloadTimeout", SCOPE.SERVER,
642+
"Read timeout in ms for downloading a database snapshot from the leader during follower resync.",
643+
Integer.class, 300_000),
644+
608645
HA_SNAPSHOT_INSTALL_RETRIES("arcadedb.ha.snapshotInstallRetries", SCOPE.SERVER,
609646
"Maximum retry attempts for snapshot download from the leader during snapshot installation.",
610647
Integer.class, 3),
@@ -633,10 +670,17 @@ Enable diagnostic logging during vector graph build progress (heap/off-heap memo
633670
"Delay in milliseconds between RemoteDatabase election retries.",
634671
Long.class, 2000L),
635672

673+
HA_RATIS_RESTART_MAX_RETRIES("arcadedb.ha.ratisRestartMaxRetries", SCOPE.SERVER,
674+
"Maximum consecutive Ratis restart attempts by the health monitor before the server shuts down "
675+
+ "for cluster-level recovery. Raise when partition-recovery scenarios cause legitimate rapid restarts.",
676+
Integer.class, 10),
677+
636678
HA_STOP_SERVER_ON_REPLICATION_FAILURE("arcadedb.ha.stopServerOnReplicationFailure", SCOPE.SERVER,
637-
"If true, stops the JVM after exhausting step-down retries on a phase-2 replication failure. "
638-
+ "If false, logs CRITICAL but leaves the server running (useful for debugging).",
639-
Boolean.class, true),
679+
"After a phase-2 local commit fails on the leader while followers have applied the entry, step-down "
680+
+ "is attempted first. If every step-down fails and this flag is true, the JVM exits so an "
681+
+ "orchestrator can restart and let Raft log replay correct the state. "
682+
+ "Default is false: the server keeps running and logs CRITICAL, useful for debugging without an orchestrator.",
683+
Boolean.class, false),
640684

641685
HA_SNAPSHOT_WRITE_TIMEOUT("arcadedb.ha.snapshotWriteTimeout", SCOPE.SERVER,
642686
"Timeout in milliseconds for writing a snapshot to a follower. "
@@ -664,6 +708,12 @@ Enable diagnostic logging during vector graph build progress (heap/off-heap memo
664708
"Maximum number of entries in the HTTP idempotency cache. Oldest entry is evicted when full.",
665709
Integer.class, 10_000),
666710

711+
HA_PEER_ALLOWLIST_ENABLED("arcadedb.ha.peerAllowlist.enabled", SCOPE.SERVER,
712+
"Reject inbound Raft gRPC connections whose remote address does not resolve to a host in "
713+
+ "arcadedb.ha.serverList. Loopback is always allowed. Does not provide peer identity or encryption: "
714+
+ "use mTLS on untrusted networks.",
715+
Boolean.class, true),
716+
667717
HA_GRPC_ALLOWLIST_REFRESH_MS("arcadedb.ha.grpcAllowlistRefreshMs", SCOPE.SERVER,
668718
"Rate-limiting interval in milliseconds for DNS re-resolution in the gRPC peer address allowlist filter.",
669719
Long.class, 30_000L),

ha-raft/src/main/java/com/arcadedb/server/ha/raft/RaftGroupCommitter.java

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import com.arcadedb.log.LogManager;
2222
import com.arcadedb.network.binary.QuorumNotReachedException;
23+
import com.arcadedb.network.binary.ReplicationQueueFullException;
2324
import org.apache.ratis.client.RaftClient;
2425
import org.apache.ratis.proto.RaftProtos;
2526
import org.apache.ratis.protocol.Message;
@@ -41,24 +42,32 @@
4142
*/
4243
class RaftGroupCommitter {
4344

44-
private final RaftClient raftClient;
45-
private final Quorum quorum;
46-
private final long quorumTimeout;
47-
private final int maxBatchSize;
48-
private final LinkedBlockingQueue<CancellablePendingEntry> queue = new LinkedBlockingQueue<>();
49-
private final Thread flusher;
50-
private volatile boolean running = true;
45+
private final RaftClient raftClient;
46+
private final Quorum quorum;
47+
private final long quorumTimeout;
48+
private final int maxBatchSize;
49+
private final int offerTimeoutMs;
50+
private final LinkedBlockingQueue<CancellablePendingEntry> queue;
51+
private final Thread flusher;
52+
private volatile boolean running = true;
5153

5254
RaftGroupCommitter(final RaftClient raftClient, final Quorum quorum, final long quorumTimeout) {
53-
this(raftClient, quorum, quorumTimeout, 500);
55+
this(raftClient, quorum, quorumTimeout, 500, 10_000, 100);
5456
}
5557

5658
RaftGroupCommitter(final RaftClient raftClient, final Quorum quorum, final long quorumTimeout,
5759
final int maxBatchSize) {
60+
this(raftClient, quorum, quorumTimeout, maxBatchSize, 10_000, 100);
61+
}
62+
63+
RaftGroupCommitter(final RaftClient raftClient, final Quorum quorum, final long quorumTimeout,
64+
final int maxBatchSize, final int maxQueueSize, final int offerTimeoutMs) {
5865
this.raftClient = raftClient;
5966
this.quorum = quorum;
6067
this.quorumTimeout = quorumTimeout;
6168
this.maxBatchSize = maxBatchSize;
69+
this.offerTimeoutMs = offerTimeoutMs;
70+
this.queue = new LinkedBlockingQueue<>(maxQueueSize);
6271
this.flusher = new Thread(this::flushLoop, "arcadedb-raft-group-committer");
6372
this.flusher.setDaemon(true);
6473
this.flusher.start();
@@ -67,7 +76,15 @@ class RaftGroupCommitter {
6776
void submitAndWait(final byte[] entry) {
6877
final long timeoutMs = 2 * quorumTimeout;
6978
final CancellablePendingEntry pending = new CancellablePendingEntry(entry);
70-
queue.add(pending);
79+
try {
80+
if (!queue.offer(pending, offerTimeoutMs, TimeUnit.MILLISECONDS))
81+
throw new ReplicationQueueFullException(
82+
"Replication queue is full (" + queue.remainingCapacity() + " remaining of " + (queue.size()
83+
+ queue.remainingCapacity()) + " max). Server is overloaded, retry later");
84+
} catch (final InterruptedException e) {
85+
Thread.currentThread().interrupt();
86+
throw new ReplicationQueueFullException("Interrupted while waiting for replication queue space");
87+
}
7188

7289
try {
7390
final Exception error = pending.future.get(timeoutMs, TimeUnit.MILLISECONDS);

ha-raft/src/main/java/com/arcadedb/server/ha/raft/RaftHAServer.java

Lines changed: 63 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ public class RaftHAServer implements HealthMonitor.HealthTarget {
106106
private volatile LifeCycle.State forcedStateForTesting = null;
107107
private HealthMonitor healthMonitor;
108108
private ClusterTokenProvider tokenProvider;
109+
private volatile int restartFailureCount = 0;
109110

110111
public RaftHAServer(final ArcadeDBServer arcadeServer, final ContextConfiguration configuration) {
111112
this.arcadeServer = arcadeServer;
@@ -118,11 +119,28 @@ public RaftHAServer(final ArcadeDBServer arcadeServer, final ContextConfiguratio
118119
final int raftPort = configuration.getValueAsInteger(GlobalConfiguration.HA_RAFT_PORT);
119120

120121
final RaftPeerAddressResolver.ParsedPeerList parsed = RaftPeerAddressResolver.parsePeerList(serverList, raftPort);
121-
final List<RaftPeer> peers = parsed.peers();
122+
List<RaftPeer> peers = parsed.peers();
122123
final String serverName = arcadeServer.getServerName();
123124

124125
this.httpAddresses.putAll(parsed.httpAddresses());
125126
this.localPeerId = RaftPeerAddressResolver.findLocalPeerId(peers, serverName, arcadeServer);
127+
128+
// If this node is configured as a replica, override its Raft peer priority to 0
129+
// so Ratis never elects it as leader (useful for read-scale or witness nodes).
130+
final String serverRole = configuration.getValueAsString(GlobalConfiguration.HA_SERVER_ROLE);
131+
if ("replica".equalsIgnoreCase(serverRole)) {
132+
final List<RaftPeer> rebuilt = new ArrayList<>(peers.size());
133+
for (final RaftPeer p : peers) {
134+
if (p.getId().equals(localPeerId)) {
135+
rebuilt.add(RaftPeer.newBuilder().setId(p.getId()).setAddress(p.getAddress()).setPriority(0).build());
136+
LogManager.instance().log(this, Level.INFO,
137+
"Node configured as replica (priority=0, will not become leader): %s", localPeerId);
138+
} else
139+
rebuilt.add(p);
140+
}
141+
peers = Collections.unmodifiableList(rebuilt);
142+
}
143+
126144
this.raftGroup = RaftGroup.valueOf(
127145
RaftGroupId.valueOf(UUID.nameUUIDFromBytes(clusterName.getBytes(StandardCharsets.UTF_8))),
128146
peers);
@@ -232,8 +250,10 @@ public void start() throws IOException {
232250
LogManager.instance()
233251
.log(this, Level.INFO, "Raft cluster joined: %d nodes %s", peerDisplayNames.size(), peerDisplayNames.values());
234252

235-
final int batchSize = configuration.getValueAsInteger(GlobalConfiguration.HA_RAFT_GROUP_COMMIT_BATCH_SIZE);
236-
transactionBroker = new RaftTransactionBroker(raftClient, quorum, quorumTimeout, batchSize);
253+
final int batchSize = configuration.getValueAsInteger(GlobalConfiguration.HA_GROUP_COMMIT_BATCH_SIZE);
254+
final int queueSize = configuration.getValueAsInteger(GlobalConfiguration.HA_GROUP_COMMIT_QUEUE_SIZE);
255+
final int offerTimeout = configuration.getValueAsInteger(GlobalConfiguration.HA_GROUP_COMMIT_OFFER_TIMEOUT);
256+
transactionBroker = new RaftTransactionBroker(raftClient, quorum, quorumTimeout, batchSize, queueSize, offerTimeout);
237257

238258
// K8s auto-join: if running in Kubernetes with no existing storage, try to join an existing cluster
239259
if (configuration.getValueAsBoolean(GlobalConfiguration.HA_K8S) && !hadExistingStorage)
@@ -287,6 +307,19 @@ public void restartRatisIfNeeded() {
287307
return;
288308
}
289309

310+
final int maxRetries = configuration.getValueAsInteger(GlobalConfiguration.HA_RATIS_RESTART_MAX_RETRIES);
311+
if (restartFailureCount >= maxRetries) {
312+
LogManager.instance().log(this, Level.SEVERE,
313+
"Ratis restart failed %d consecutive times (max=%d). Stopping server for cluster-level recovery",
314+
restartFailureCount, maxRetries);
315+
final Thread stopThread = new Thread(() -> {
316+
try { arcadeServer.stop(); } catch (final Exception ignored) {}
317+
}, "arcadedb-restart-failure-stop");
318+
stopThread.setDaemon(true);
319+
stopThread.start();
320+
return;
321+
}
322+
290323
final RaftClient oldClient = this.raftClient;
291324
final RaftServer oldServer = this.raftServer;
292325
final RaftTransactionBroker oldBroker = this.transactionBroker;
@@ -330,12 +363,21 @@ public void restartRatisIfNeeded() {
330363
this.raftProperties = properties;
331364
this.raftClient = buildRaftClient(raftGroup, properties);
332365

333-
final int batchSize = configuration.getValueAsInteger(GlobalConfiguration.HA_RAFT_GROUP_COMMIT_BATCH_SIZE);
334-
this.transactionBroker = new RaftTransactionBroker(raftClient, quorum, quorumTimeout, batchSize);
366+
final int batchSize = configuration.getValueAsInteger(GlobalConfiguration.HA_GROUP_COMMIT_BATCH_SIZE);
367+
final int queueSize = configuration.getValueAsInteger(GlobalConfiguration.HA_GROUP_COMMIT_QUEUE_SIZE);
368+
final int offerTimeout = configuration.getValueAsInteger(GlobalConfiguration.HA_GROUP_COMMIT_OFFER_TIMEOUT);
369+
this.transactionBroker = new RaftTransactionBroker(raftClient, quorum, quorumTimeout, batchSize, queueSize,
370+
offerTimeout);
335371

372+
restartFailureCount = 0;
336373
HALog.log(this, HALog.BASIC, "Ratis recovered successfully");
337374
} catch (final Throwable t) {
338-
LogManager.instance().log(this, Level.SEVERE, "HealthMonitor recovery failed: %s", t, t.getMessage());
375+
restartFailureCount++;
376+
LogManager.instance().log(this, Level.SEVERE,
377+
"HealthMonitor recovery failed (attempt %d/%d): %s",
378+
t, restartFailureCount,
379+
configuration.getValueAsInteger(GlobalConfiguration.HA_RATIS_RESTART_MAX_RETRIES),
380+
t.getMessage());
339381
}
340382
}
341383
}
@@ -491,9 +533,12 @@ public synchronized void refreshRaftClient(final RaftPeerId knownLeaderId) {
491533
raftClient = buildRaftClient(raftGroup, raftProperties, knownLeaderId);
492534

493535
if (transactionBroker != null) {
494-
final int batchSize = configuration.getValueAsInteger(GlobalConfiguration.HA_RAFT_GROUP_COMMIT_BATCH_SIZE);
536+
final int batchSize = configuration.getValueAsInteger(GlobalConfiguration.HA_GROUP_COMMIT_BATCH_SIZE);
537+
final int queueSize = configuration.getValueAsInteger(GlobalConfiguration.HA_GROUP_COMMIT_QUEUE_SIZE);
538+
final int offerTimeout = configuration.getValueAsInteger(GlobalConfiguration.HA_GROUP_COMMIT_OFFER_TIMEOUT);
495539
final RaftTransactionBroker oldBroker = transactionBroker;
496-
transactionBroker = new RaftTransactionBroker(raftClient, quorum, quorumTimeout, batchSize);
540+
transactionBroker = new RaftTransactionBroker(raftClient, quorum, quorumTimeout, batchSize, queueSize,
541+
offerTimeout);
497542
oldBroker.stop();
498543
}
499544

@@ -917,14 +962,20 @@ void stopLagMonitor() {
917962
* connections from IPs not listed in {@code arcadedb.ha.serverList}.
918963
*/
919964
private static Parameters buildParameters(final ContextConfiguration configuration) {
965+
final Parameters parameters = new Parameters();
966+
if (!configuration.getValueAsBoolean(GlobalConfiguration.HA_PEER_ALLOWLIST_ENABLED))
967+
return parameters;
968+
920969
final String serverList = configuration.getValueAsString(GlobalConfiguration.HA_SERVER_LIST);
921970
final long refreshMs = configuration.getValueAsLong(GlobalConfiguration.HA_GRPC_ALLOWLIST_REFRESH_MS);
922971
final List<String> peerHosts = PeerAddressAllowlistFilter.extractPeerHosts(serverList);
923-
final Parameters parameters = new Parameters();
924-
if (!peerHosts.isEmpty()) {
925-
final PeerAddressAllowlistFilter allowlistFilter = new PeerAddressAllowlistFilter(peerHosts, refreshMs);
926-
GrpcConfigKeys.Server.setServicesCustomizer(parameters, new RaftGrpcServicesCustomizer(allowlistFilter));
972+
if (peerHosts.isEmpty()) {
973+
LogManager.instance().log(RaftHAServer.class, Level.WARNING,
974+
"arcadedb.ha.peerAllowlist.enabled=true but arcadedb.ha.serverList is empty; allowlist not installed");
975+
return parameters;
927976
}
977+
final PeerAddressAllowlistFilter allowlistFilter = new PeerAddressAllowlistFilter(peerHosts, refreshMs);
978+
GrpcConfigKeys.Server.setServicesCustomizer(parameters, new RaftGrpcServicesCustomizer(allowlistFilter));
928979
return parameters;
929980
}
930981

0 commit comments

Comments
 (0)