Skip to content

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,11 @@ public ConsumerInvokerProxy<T> createInvoker(ServiceInstance instance) {
value = invokerCache.get(key);
if (value == null || !value.isAvailable()) {
if (value != null && !value.isAvailable()) {
invokerCache.remove(key);
// CAS remove: only evict if the current cache slot is still the stale
// proxy we observed. Avoid blowing away a fresh proxy that another
// thread may have just installed (or that an earlier closeFuture hook
// would otherwise also race against).
invokerCache.remove(key, value);
}
RpcClient rpcClient;
try {
Expand All @@ -100,24 +104,30 @@ public ConsumerInvokerProxy<T> createInvoker(ServiceInstance instance) {
backendConfig.generateProtocolConfig(instance.getHost(), instance.getPort(),
backendConfig.getNetwork(), protocolType.getName(), backendConfig.getExtMap()));
ConsumerInvoker<T> originInvoker = rpcClient.createInvoker(consumerConfig);
value = new ConsumerInvokerProxy<>(FilterChain.buildConsumerChain(consumerConfig,
originInvoker), rpcClient);
invokerCache.put(key, value);
// When the rpcClient is cleaned up and closed during idle time,
// the corresponding map should also be cleaned up.
ConsumerInvokerProxy<T> created = new ConsumerInvokerProxy<>(
FilterChain.buildConsumerChain(consumerConfig, originInvoker), rpcClient);
invokerCache.put(key, created);
// Long-connection mode: the client is no longer evicted by an idle scanner.
// It is only closed on explicit shutdown or fatal transport error (or by
// the reconnect-check timer in RpcClusterClientManager). When that
// happens we still need to clean up the local invoker cache to avoid
// memory leak.
// Use CAS remove: if the cache slot has already been replaced by a newer
// proxy (e.g. after a series of rebuilds), this stale closeFuture must
// NOT evict that newer entry.
rpcClient.closeFuture().whenComplete((r, e) -> {
ConsumerInvokerProxy<T> remove = invokerCache.remove(key);
if (remove != null) {
logger.warn("Service [name=" + consumerConfig.getServiceInterface()
boolean removed = invokerCache.remove(key, created);
if (removed && logger.isDebugEnabled()) {
logger.debug("Service [name=" + consumerConfig.getServiceInterface()
.getName()
+ ", naming=" + backendConfig.getNamingOptions()
.getServiceNaming()
+ ")], remove rpc client invoker["
+ remove.getInvoker().getProtocolConfig().toSimpleString()
+ "], remove rpc client invoker["
+ created.getInvoker().getProtocolConfig().toSimpleString()
+ "], due to rpc client close");
}
});
return value;
return created;
} catch (Exception ex) {
throw TRpcException.newFrameException(ErrorCode.TRPC_INVOKE_UNKNOWN_ERR,
"Service(name=" + consumerConfig.getServiceInterface().getName()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@ public class Constants {

public static final String DEFAULT_KEEP_ALIVE = "true";
/**
* Default shared IO thread pool true
* Default shared IO thread pool: true. The shared pool now supports both NIO and
* Epoll: each transport joins one of two reference-counted shared groups based on
* {@code Epoll.isAvailable() && config.useEpoll()}, so enabling epoll no longer forces
* the user to also flip {@code ioThreadGroupShare} off. Server-side transports do not
* consult this flag (they always own their own group).
*/
public static final String DEFAULT_IO_THREAD_GROUPSHARE = "true";
/**
Expand Down Expand Up @@ -107,9 +111,27 @@ public class Constants {
*/
public static final String DEFAULT_CONNECT_TIMEOUT = "1000";
/**
* Default maximum number of connections 20480
*/
public static final String DEFAULT_MAX_CONNECTIONS = "20480";
* Default maximum number of connections.
*
* <p>200 covers the great majority of internal RPC workloads while keeping the worst-case
* resource footprint bounded:</p>
* <ul>
* <li>By Little's Law (concurrent in-flight = QPS × RT) 200 connections sustain
* e.g. {@code QPS=20000 × RT=10ms} or {@code QPS=2000 × RT=100ms} on the HTTP/1.1
* path where each connection serves one request at a time;</li>
* <li>Far below the Linux default ephemeral-port range (~28k usable) so a connection
* storm against an unreachable backend cannot exhaust the port pool;</li>
* <li>Friendly to backend LBs whose per-source keep-alive limits are typically in the
* low thousands.</li>
* </ul>
*
* <p>Workloads that genuinely need more (high-fanout aggregation services, very high QPS
* with long RT) should override per-{@code BackendConfig} via {@code maxConns}.</p>
*
* <p>Note: the trpc protocol path does not read this field — it sizes the connection pool
* via {@code connsPerAddr} (default 4). Only the HTTP / HTTP/2 paths consume it.</p>
*/
public static final String DEFAULT_MAX_CONNECTIONS = "200";
/**
* Default maximum payload limit 10M
*/
Expand All @@ -119,17 +141,69 @@ public class Constants {
*/
public static final String DEFAULT_BUFFER_SIZE = "16384";
/**
* Default client idle timeout 3 minutes
* Default client idle timeout 3 minutes. Drives the read-idle close handler installed
* by {@code NettyTcpClientTransport}: a long connection that has not received any
* inbound bytes for this duration is recycled by the client (the next request goes
* through lazy reconnect). Half-dead detection in faster paths is delegated to the
* configurable TCP keepalive parameters below; the read-idle handler is the slow
* universal fallback that also covers macOS / Windows where TCP keepalive tuning is
* not available.
*/
public static final String DEFAULT_IDLE_TIMEOUT = "180000";
/**
* Default TCP keepalive idle (Linux {@code TCP_KEEPIDLE}) in seconds: 30 s without
* traffic on the socket before the kernel starts emitting keepalive probes. Together
* with {@link #DEFAULT_TCP_KEEPALIVE_INTVL} and {@link #DEFAULT_TCP_KEEPALIVE_CNT}
* (Dubbo-style 30/10/3) this yields a half-dead detection window of roughly 60 s on
* Linux + epoll (30 + 10 × 3 = 60). Value 0 leaves the OS default (Linux: 7200 s).
*/
public static final String DEFAULT_TCP_KEEPALIVE_IDLE = "30";
/**
* Default TCP keepalive interval (Linux {@code TCP_KEEPINTVL}) in seconds between
* consecutive keepalive probes after the idle window has elapsed.
*/
public static final String DEFAULT_TCP_KEEPALIVE_INTVL = "10";
/**
* Default TCP keepalive probe count (Linux {@code TCP_KEEPCNT}): the number of
* unacknowledged keepalive probes after which the kernel marks the connection dead
* and emits a RST.
*/
public static final String DEFAULT_TCP_KEEPALIVE_CNT = "3";
/**
* Default server idle timeout 4 minutes
*/
public static final String DEFAULT_SERVER_IDLE_TIMEOUT = "240000";
/**
* Default number of connections per address 2
*/
public static final String DEFAULT_CONNECTIONS_PERADDR = "2";
* Default number of long-lived TCP connections to a single peer address.
*
* <p>Each business request is dispatched round-robin across these N channels
* ({@code channelIdx.getAndIncrement() % N}); on a single channel Netty multiplexes many
* in-flight RPCs via the protocol-level sequence id, so {@code N=1} would already saturate
* a 1Gbps link in pure throughput. The default of <b>4</b> is chosen so that:</p>
* <ul>
* <li>Multiple Netty {@code EventLoop} threads run in parallel for a single peer —
* no single EventLoop becomes a CPU bottleneck under high QPS;</li>
* <li>Failure-domain isolation: when a channel is invalidated by READ_IDLE / RST /
* half-close, the remaining 3 channels absorb traffic with minimal RT impact
* (compared to {@code N=2} which would double the load on the surviving channel);</li>
* <li>Resource cost stays bounded: 4 fd × ephemeral ports × ~16KB Netty buffer per
* peer is negligible even with hundreds of peer addresses.</li>
* </ul>
*
* <p>Tuning guidance (override per-{@code BackendConfig} via {@code connsPerAddr}):</p>
* <ul>
* <li>Low QPS / few peers: {@code 1~2} (saves resources);</li>
* <li>High QPS / few peers: {@code 8~16} (more EventLoop parallelism);</li>
* <li>Many peer IPs (hundreds+): leave at {@code 4}, the IP fan-out already
* provides parallelism &amp; failure isolation;</li>
* <li>Gateway / aggregation services: {@code 8}.</li>
* </ul>
*
* <p><b>Compatibility note</b>: this default was {@code 2} in earlier versions. Upgrades
* will see <b>per-peer inbound connections double</b> on the server side; review server
* fd ulimits and LB per-source connection limits before rollout.</p>
*/
public static final String DEFAULT_CONNECTIONS_PERADDR = "4";
public static final String DEFAULT_TRANSPORTER = TRANSPORTER_NETTY;
public static final String DEFAULT_IO_MODE = IO_MODE_EPOLL;
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,9 @@ public ProtocolConfig generateProtocolConfig(String host, int port, String netwo
config.setPort(port);
config.setNetwork(network);
config.setIdleTimeout(idleTimeout);
config.setTcpKeepAliveIdle(tcpKeepAliveIdle);
config.setTcpKeepAliveIntvl(tcpKeepAliveIntvl);
config.setTcpKeepAliveCnt(tcpKeepAliveCnt);
config.setCharset(charset);
config.setLazyinit(lazyinit);
config.setConnsPerAddr(connsPerAddr);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,28 @@ public class BaseProtocolConfig implements Serializable, Cloneable {
*/
@ConfigProperty(value = Constants.DEFAULT_IDLE_TIMEOUT, type = Integer.class, override = true, moreZero = false)
protected Integer idleTimeout;
/**
* TCP keepalive idle in seconds (Linux {@code TCP_KEEPIDLE}). Applied only on
* Linux + epoll TCP client transports. Value 0 disables tuning and leaves the OS
* default in place (Linux ≈ 7200 s).
*/
@ConfigProperty(value = Constants.DEFAULT_TCP_KEEPALIVE_IDLE, type = Integer.class, override = true,
moreZero = false)
protected Integer tcpKeepAliveIdle;
/**
* TCP keepalive probe interval in seconds (Linux {@code TCP_KEEPINTVL}). Applied only
* on Linux + epoll TCP client transports. Value 0 disables tuning.
*/
@ConfigProperty(value = Constants.DEFAULT_TCP_KEEPALIVE_INTVL, type = Integer.class, override = true,
moreZero = false)
protected Integer tcpKeepAliveIntvl;
/**
* TCP keepalive probe count (Linux {@code TCP_KEEPCNT}). Applied only on Linux +
* epoll TCP client transports. Value 0 disables tuning.
*/
@ConfigProperty(value = Constants.DEFAULT_TCP_KEEPALIVE_CNT, type = Integer.class, override = true,
moreZero = false)
protected Integer tcpKeepAliveCnt;
/**
* Whether to delay initialization.
*/
Expand Down Expand Up @@ -303,6 +325,33 @@ public void setIdleTimeout(Integer idleTimeout) {
this.idleTimeout = idleTimeout;
}

public Integer getTcpKeepAliveIdle() {
return tcpKeepAliveIdle;
}

public void setTcpKeepAliveIdle(Integer tcpKeepAliveIdle) {
checkFiledModifyPrivilege();
this.tcpKeepAliveIdle = tcpKeepAliveIdle;
}

public Integer getTcpKeepAliveIntvl() {
return tcpKeepAliveIntvl;
}

public void setTcpKeepAliveIntvl(Integer tcpKeepAliveIntvl) {
checkFiledModifyPrivilege();
this.tcpKeepAliveIntvl = tcpKeepAliveIntvl;
}

public Integer getTcpKeepAliveCnt() {
return tcpKeepAliveCnt;
}

public void setTcpKeepAliveCnt(Integer tcpKeepAliveCnt) {
checkFiledModifyPrivilege();
this.tcpKeepAliveCnt = tcpKeepAliveCnt;
}

public Boolean getLazyinit() {
return lazyinit;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

package com.tencent.trpc.core.transport;

import com.google.common.collect.Lists;
import com.tencent.trpc.core.common.LifecycleBase;
import com.tencent.trpc.core.common.config.ProtocolConfig;
import com.tencent.trpc.core.exception.LifecycleException;
Expand All @@ -24,6 +23,7 @@
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;

Expand Down Expand Up @@ -64,7 +64,11 @@ public abstract class AbstractClientTransport implements ClientTransport {
*/
protected AtomicInteger channelIdx = new AtomicInteger(0);
/**
* List of channels.
* Channel pool. Backed by {@link CopyOnWriteArrayList} so that the slot publication done by
* {@link #ensureChannelActive} (under {@link #connLock}) is visible to concurrent readers in
* {@link #getChannel0} with volatile semantics, eliminating the data race that an
* {@link java.util.ArrayList} would have. Size is bounded by {@code connsPerAddr}; writes
* are infrequent (slot rebuild on disconnect) so the COW copy cost is negligible.
*/
protected List<ChannelFutureItem> channels;
/**
Expand All @@ -80,7 +84,7 @@ public AbstractClientTransport(ProtocolConfig config, ChannelHandler handler,
this.config = Objects.requireNonNull(config, "config is null");
this.handler = Objects.requireNonNull(handler, "handler is null");
this.codec = clientCodec;
this.channels = Lists.newArrayListWithExpectedSize(config.getConnsPerAddr());
this.channels = new CopyOnWriteArrayList<>();
}

/**
Expand Down Expand Up @@ -253,21 +257,93 @@ protected void ensureChannelActive(int chIndex) {
ChannelFutureItem curChannelItem = channels.get(chIndex);
// initiate a new connection creation action when the connection is not yet established or the connection
// is broken
boolean futureIsDoneAndNotConnected =
!curChannelItem.isAvailable() && !curChannelItem.isConnecting();
// not available and not establishing a connection
if (futureIsDoneAndNotConnected || curChannelItem.isNotYetConnect()) {
if (!needsReconnect(curChannelItem)) {
return;
}
connLock.lock();
try {
// Double-check inside the lock: another thread may have already replaced this slot
// with a fresh ChannelFutureItem (either still connecting or already connected).
// Without this check, a thundering-herd of requests arriving right after a
// disconnect would each rebuild the slot, producing a connect/disconnect storm
// against the peer and a burst of short-lived TIME_WAIT sockets.
ChannelFutureItem latest = channels.get(chIndex);
if (!needsReconnect(latest)) {
return;
}
channels.set(chIndex, new ChannelFutureItem(createChannel().toCompletableFuture(), config));
try {
latest.close();
} catch (Exception ex) {
logger.error("close " + latest + " exception", ex);
}
} finally {
connLock.unlock();
}
}

/**
* A slot needs a fresh connection when it is either uninitialized (lazy-init not yet
* triggered) or its previous future has finished without producing a connected channel.
* A slot whose future is still in flight ({@code isConnecting()}) is left alone — the
* in-flight {@code bootstrap.connect} will publish the result into the same item.
*/
private static boolean needsReconnect(ChannelFutureItem item) {
if (item.isNotYetConnect()) {
return true;
}
return !item.isAvailable() && !item.isConnecting();
}

/**
* Invalidate the slot holding {@code target}: replace it with a blank placeholder
* ({@code channelFuture==null}, i.e. {@code isNotYetConnect=true}) so the next request
* unconditionally goes through {@link #ensureChannelActive} and rebuilds a fresh
* connection. The previous item is closed best-effort.
* <p>Called by the client-side idle handler so that <em>before</em> the actual
* {@code channel.close()} runs (asynchronously in the EventLoop), the request thread
* already sees the slot as needing a reconnect — eliminating the "request lands on a
* channel that is about to be closed" race window.</p>
*/
@Override
public void invalidateChannel(Channel target) {
if (target == null || channels == null || channels.isEmpty()) {
return;
}
// The list is bounded by connsPerAddr; a linear scan is fine.
for (int i = 0; i < channels.size(); i++) {
ChannelFutureItem item = channels.get(i);
if (item == null || item.channelFuture == null || !item.channelFuture.isDone()
|| item.channelFuture.isCompletedExceptionally()) {
continue;
}
Channel ch;
try {
ch = item.channelFuture.join();
} catch (Throwable ignore) {
continue;
}
if (ch != target) {
continue;
}
connLock.lock();
try {
channels.set(chIndex, new ChannelFutureItem(createChannel().toCompletableFuture(), config));
// Re-read under the lock to avoid clobbering a slot another thread already
// refreshed (e.g. concurrent reconnect).
ChannelFutureItem latest = channels.get(i);
if (latest != item) {
return;
}
channels.set(i, new ChannelFutureItem(null, config));
try {
curChannelItem.close();
item.close();
} catch (Exception ex) {
logger.error("close " + curChannelItem + " exception", ex);
logger.error("close invalidated " + item + " exception", ex);
}
} finally {
connLock.unlock();
}
return;
}
}

Expand Down
Loading
Loading