From ab2b842f5642115d9e9cd834b13392a62751f60e Mon Sep 17 00:00:00 2001 From: Jason Koch Date: Wed, 13 May 2026 23:44:48 +0000 Subject: [PATCH] Add a metric that exposes IO thread utilization This allows to spot issues where evcache is overloaded and supports some basic prediction of whether an instance might become overloaded with a significant traffic increase or whether the thread pool could be tuned. --- .../netflix/evcache/test/EVCacheTestDI.java | 35 +++++ .../metrics/EVCacheMetricsFactory.java | 1 + .../netflix/evcache/pool/EVCacheClient.java | 15 ++ .../evcache/pool/EVCacheLoopProbe.java | 133 ++++++++++++++++++ .../net/spy/memcached/EVCacheConnection.java | 20 +++ .../spy/memcached/EVCacheMemcachedClient.java | 5 + .../evcache/pool/EVCacheLoopProbeTest.java | 120 ++++++++++++++++ evcache-core/src/test/java/test-suite.xml | 1 + 8 files changed, 330 insertions(+) create mode 100644 evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheLoopProbe.java create mode 100644 evcache-core/src/test/java/com/netflix/evcache/pool/EVCacheLoopProbeTest.java diff --git a/evcache-client/test/com/netflix/evcache/test/EVCacheTestDI.java b/evcache-client/test/com/netflix/evcache/test/EVCacheTestDI.java index e69280e3..63cc7677 100644 --- a/evcache-client/test/com/netflix/evcache/test/EVCacheTestDI.java +++ b/evcache-client/test/com/netflix/evcache/test/EVCacheTestDI.java @@ -11,12 +11,17 @@ import com.netflix.evcache.EVCacheException; import com.netflix.evcache.EVCacheGetOperationListener; import com.netflix.evcache.EVCacheLatch; +import com.netflix.evcache.metrics.EVCacheMetricsFactory; import com.netflix.evcache.operation.EVCacheOperationFuture; import com.netflix.evcache.pool.EVCacheClient; import com.netflix.evcache.pool.ServerGroup; import com.netflix.evcache.test.transcoder.Movie; import com.netflix.evcache.test.transcoder.MovieTranscoder; import com.netflix.evcache.util.KeyHasher; +import com.netflix.spectator.api.Gauge; +import com.netflix.spectator.api.Id; +import com.netflix.spectator.api.Registry; +import com.netflix.spectator.api.patterns.PolledMeter; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -76,6 +81,36 @@ public void testEVCache() { assertNotNull(evCache); } + @Test(dependsOnMethods = { "testGet" }) + public void testLoopCpuUtilizationMetricRegistered() throws Exception { + final Registry registry = EVCacheMetricsFactory.getInstance().getRegistry(); + final Map> clientsByServerGroup = manager.getEVCacheClientPool(appName).getAllInstancesByServerGroup(); + assertFalse(clientsByServerGroup.isEmpty(), "expected EVCache clients for " + appName); + + PolledMeter.update(registry); + for (List clients : clientsByServerGroup.values()) { + for (EVCacheClient client : clients) { + final Id id = EVCacheMetricsFactory.getInstance().getId(EVCacheMetricsFactory.INTERNAL_LOOP_CPU_UTILIZATION, client.getTagList()); + assertTrue(registry.state().containsKey(id), "expected loop CPU utilization meter for client " + client); + } + } + + boolean nonZero = false; + for (int attempt = 0; attempt < 10 && !nonZero; attempt++) { + get(0, evCache); + Thread.sleep(1_100); + PolledMeter.update(registry); + for (List clients : clientsByServerGroup.values()) { + for (EVCacheClient client : clients) { + final Id id = EVCacheMetricsFactory.getInstance().getId(EVCacheMetricsFactory.INTERNAL_LOOP_CPU_UTILIZATION, client.getTagList()); + final Gauge gauge = registry.gauge(id); + nonZero |= gauge.value() > 0.0; + } + } + } + assertTrue(nonZero, "expected loop CPU utilization meter to report a non-zero value"); + } + @Test(dependsOnMethods = { "testEVCache" }) public void testKeySizeCheck() throws Exception { final String key = "This is an invalid key"; diff --git a/evcache-core/src/main/java/com/netflix/evcache/metrics/EVCacheMetricsFactory.java b/evcache-core/src/main/java/com/netflix/evcache/metrics/EVCacheMetricsFactory.java index eeb7941d..2de0adda 100644 --- a/evcache-core/src/main/java/com/netflix/evcache/metrics/EVCacheMetricsFactory.java +++ b/evcache-core/src/main/java/com/netflix/evcache/metrics/EVCacheMetricsFactory.java @@ -284,6 +284,7 @@ public String getStatusCode(StatusCode sc) { public static final String INTERNAL_EXECUTOR = "internal.evc.client.executor"; public static final String INTERNAL_EXECUTOR_SCHEDULED = "internal.evc.client.scheduledExecutor"; public static final String INTERNAL_POOL_INIT_ERROR = "internal.evc.client.init.error"; + public static final String INTERNAL_LOOP_CPU_UTILIZATION = "internal.evc.client.loop.cpuUtilization"; public static final String INTERNAL_NUM_CHUNK_SIZE = "internal.evc.client.chunking.numOfChunks"; public static final String INTERNAL_CHUNK_DATA_SIZE = "internal.evc.client.chunking.dataSize"; diff --git a/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClient.java b/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClient.java index 3a0dbcd9..6895389f 100644 --- a/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClient.java +++ b/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClient.java @@ -20,7 +20,10 @@ import com.netflix.evcache.util.KeyHasher.HashingAlgorithm; import com.netflix.spectator.api.BasicTag; import com.netflix.spectator.api.Counter; +import com.netflix.spectator.api.Id; +import com.netflix.spectator.api.Registry; import com.netflix.spectator.api.Tag; +import com.netflix.spectator.api.patterns.PolledMeter; import java.io.BufferedInputStream; import java.io.IOException; import java.io.PrintWriter; @@ -102,6 +105,7 @@ public class EVCacheClient { private final Property ignoreTouch; private List tags; private final Map counterMap = new ConcurrentHashMap(); + private final Id loopCpuUtilizationId; private final Property hashingAlgo; protected final Counter operationsCounter; private final boolean isDuetClient; @@ -133,6 +137,8 @@ public class EVCacheClient { tagList.add(new BasicTag(EVCacheMetricsFactory.STAT_NAME, EVCacheMetricsFactory.POOL_OPERATIONS)); operationsCounter = EVCacheMetricsFactory.getInstance().getCounter(EVCacheMetricsFactory.INTERNAL_STATS, tagList); + final Registry registry = EVCacheMetricsFactory.getInstance().getRegistry(); + this.enableChunking = EVCacheConfig.getInstance().getPropertyRepository().get(this.serverGroup.getName()+ ".chunk.data", Boolean.class).orElseGet(appName + ".chunk.data").orElse(false); this.chunkSize = EVCacheConfig.getInstance().getPropertyRepository().get(this.serverGroup.getName() + ".chunk.size", Integer.class).orElseGet(appName + ".chunk.size").orElse(1180); this.writeBlock = EVCacheConfig.getInstance().getPropertyRepository().get(appName + "." + this.serverGroup.getName() + ".write.block.duration", Integer.class).orElseGet(appName + ".write.block.duration").orElse(25); @@ -141,10 +147,14 @@ public class EVCacheClient { this.ignoreTouch = EVCacheConfig.getInstance().getPropertyRepository().get(appName + "." + this.serverGroup.getName() + ".ignore.touch", Boolean.class).orElseGet(appName + ".ignore.touch").orElse(false); this.connectionFactory = pool.getEVCacheClientPoolManager().getConnectionFactoryProvider().getConnectionFactory(this); + loopCpuUtilizationId = EVCacheMetricsFactory.getInstance().getId(EVCacheMetricsFactory.INTERNAL_LOOP_CPU_UTILIZATION, this.tags); this.connectionObserver = new EVCacheConnectionObserver(this); this.ignoreInactiveNodes = EVCacheConfig.getInstance().getPropertyRepository().get(appName + ".ignore.inactive.nodes", Boolean.class).orElse(true); this.evcacheMemcachedClient = new EVCacheMemcachedClient(connectionFactory, memcachedNodesInZone, readTimeout, this); + PolledMeter.using(registry) + .withId(loopCpuUtilizationId) + .monitorValue(this.evcacheMemcachedClient.getLoopProbe(), EVCacheLoopProbe::sampleUtilization); this.evcacheMemcachedClient.addObserver(connectionObserver); this.decodingTranscoder = new EVCacheSerializingTranscoder(Integer.MAX_VALUE); @@ -1342,6 +1352,11 @@ public boolean shutdown(long timeout, TimeUnit unit) { if(shutdown) return true; shutdown = true; + try { + PolledMeter.remove(EVCacheMetricsFactory.getInstance().getRegistry(), loopCpuUtilizationId); + } catch(Throwable t) { + log.warn("Exception while removing loop CPU utilization meter", t); + } try { evcacheMemcachedClient.shutdown(timeout, unit); } catch(Throwable t) { diff --git a/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheLoopProbe.java b/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheLoopProbe.java new file mode 100644 index 00000000..9b19c7d4 --- /dev/null +++ b/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheLoopProbe.java @@ -0,0 +1,133 @@ +package com.netflix.evcache.pool; + +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadMXBean; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Publishes event-loop CPU utilization from the loop thread itself. + * + *

The loop thread periodically publishes an immutable {@code long[]} snapshot + * containing {@code {threadCpuNs, wallNs}}. Spectator's polling thread reads the + * latest snapshot and computes the delta ratio without performing cross-thread + * ThreadMXBean lookups.

+ */ +public final class EVCacheLoopProbe { + private static final Logger log = LoggerFactory.getLogger(EVCacheLoopProbe.class); + private static final ThreadMXBean THREAD_MX_BEAN = ManagementFactory.getThreadMXBean(); + private static final long PUBLISH_INTERVAL_NS = TimeUnit.MILLISECONDS.toNanos(1_000); + private static final int CPU_UTILIZATION_WARNING_THRESHOLD = 3; + + private final AtomicReference snapshot = new AtomicReference(new long[] { 0L, 0L }); + private final boolean cpuTimeAvailable; + + // Loop-thread-private throttle state. + private long nextPublishNs; + private boolean tickFailureLogged; + private boolean negativeCpuTimeLogged; + + // PolledMeter-reader-private state. + private long prevCpuNs; + private long prevWallNs; + private int aboveOneSamples; + private boolean aboveOneLogged; + + public EVCacheLoopProbe() { + this.cpuTimeAvailable = isCurrentThreadCpuTimeAvailable(); + if (!cpuTimeAvailable) { + log.warn("Thread CPU time is not available; EVCache loop CPU utilization will report NaN"); + } + } + + /** + * Publish the current thread's CPU time and wall time at most every second. + * + *

This method is intentionally no-throw: it is called from the EVCache IO + * loop in a finally block and must never terminate the loop.

+ */ + public void tick() { + try { + tickInternal(); + } catch (Throwable t) { + if (!tickFailureLogged) { + tickFailureLogged = true; + try { + log.warn("EVCache loop CPU utilization probe failed; suppressing future probe errors", t); + } catch (Throwable ignored) { + // Keep the event loop alive even if logging fails. + } + } + } + } + + private void tickInternal() { + if (!cpuTimeAvailable) return; + + final long now = System.nanoTime(); + if (nextPublishNs != 0L && now - nextPublishNs < 0L) return; + nextPublishNs = now + PUBLISH_INTERVAL_NS; + + final long cpuNs = THREAD_MX_BEAN.getCurrentThreadCpuTime(); + if (cpuNs < 0L) { + if (!negativeCpuTimeLogged) { + negativeCpuTimeLogged = true; + log.warn("Thread CPU time returned a negative value; skipping EVCache loop CPU utilization publish"); + } + return; + } + + snapshot.lazySet(new long[] { cpuNs, now }); + } + + /** + * Return loop-thread CPU utilization over the interval since the previous poll. + */ + public double sampleUtilization() { + if (!cpuTimeAvailable) return Double.NaN; + + final long[] s = snapshot.get(); + final long cpuNs = s[0]; + final long wallNs = s[1]; + if (prevWallNs == 0L) { + prevCpuNs = cpuNs; + prevWallNs = wallNs; + return Double.NaN; + } + + final long dWall = wallNs - prevWallNs; + if (dWall <= 0L) return 0.0; + + final long dCpu = cpuNs - prevCpuNs; + prevCpuNs = cpuNs; + prevWallNs = wallNs; + + double utilization = (double) dCpu / (double) dWall; + if (utilization < 0.0) return 0.0; + + if (utilization > 1.0) { + aboveOneSamples++; + if (aboveOneSamples >= CPU_UTILIZATION_WARNING_THRESHOLD && !aboveOneLogged) { + aboveOneLogged = true; + log.warn("EVCache loop CPU utilization exceeded 1.0 for {} consecutive samples; latest value={}", + CPU_UTILIZATION_WARNING_THRESHOLD, utilization); + } + } else { + aboveOneSamples = 0; + } + + return Math.min(utilization, 1.05); + } + + private static boolean isCurrentThreadCpuTimeAvailable() { + try { + return THREAD_MX_BEAN.isThreadCpuTimeSupported() && THREAD_MX_BEAN.isThreadCpuTimeEnabled(); + } catch (Throwable t) { + log.warn("Unable to determine ThreadMXBean CPU-time capability", t); + return false; + } + } +} diff --git a/evcache-core/src/main/java/net/spy/memcached/EVCacheConnection.java b/evcache-core/src/main/java/net/spy/memcached/EVCacheConnection.java index 19784073..ed5489db 100644 --- a/evcache-core/src/main/java/net/spy/memcached/EVCacheConnection.java +++ b/evcache-core/src/main/java/net/spy/memcached/EVCacheConnection.java @@ -11,6 +11,8 @@ import java.util.Map; import java.util.concurrent.CountDownLatch; +import com.netflix.evcache.pool.EVCacheLoopProbe; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -18,6 +20,7 @@ public class EVCacheConnection extends MemcachedConnection { private static final Logger log = LoggerFactory.getLogger(EVCacheConnection.class); + private EVCacheLoopProbe probe; private final net.spy.memcached.compat.log.Logger spyLogger; public EVCacheConnection(String name, int bufSize, ConnectionFactory f, @@ -28,6 +31,21 @@ public EVCacheConnection(String name, int bufSize, ConnectionFactory f, spyLogger = super.getLogger(); } + @Override + public synchronized void start() { + // MemcachedConnection starts the thread from its constructor. Initialize + // the probe before super.start() so Thread.start() safely publishes it + // to run() without requiring a volatile read on the event-loop path. + if (probe == null) { + probe = new EVCacheLoopProbe(); + } + super.start(); + } + + public EVCacheLoopProbe getProbe() { + return probe; + } + @Override public void shutdown() throws IOException { try { @@ -63,6 +81,8 @@ public void run() { } catch (Throwable e) { log.error("SEVERE EVCACHE ISSUE.", e);// This ensures the thread // doesn't die + } finally { + probe.tick(); } } if (log.isDebugEnabled()) log.debug(toString() + " : Shutdown"); diff --git a/evcache-core/src/main/java/net/spy/memcached/EVCacheMemcachedClient.java b/evcache-core/src/main/java/net/spy/memcached/EVCacheMemcachedClient.java index 8272f0cb..2d196dea 100644 --- a/evcache-core/src/main/java/net/spy/memcached/EVCacheMemcachedClient.java +++ b/evcache-core/src/main/java/net/spy/memcached/EVCacheMemcachedClient.java @@ -15,6 +15,7 @@ import com.netflix.evcache.operation.EVCacheOperationFuture; import com.netflix.evcache.pool.EVCacheClient; import com.netflix.evcache.pool.EVCacheClientUtil; +import com.netflix.evcache.pool.EVCacheLoopProbe; import com.netflix.evcache.pool.EVCacheValue; import com.netflix.evcache.util.EVCacheConfig; import com.netflix.spectator.api.BasicTag; @@ -123,6 +124,10 @@ public NodeLocator getNodeLocator() { return this.mconn.getLocator(); } + public EVCacheLoopProbe getLoopProbe() { + return ((EVCacheConnection) this.mconn).getProbe(); + } + public MemcachedNode getEVCacheNode(String key) { return this.mconn.getLocator().getPrimary(key); } diff --git a/evcache-core/src/test/java/com/netflix/evcache/pool/EVCacheLoopProbeTest.java b/evcache-core/src/test/java/com/netflix/evcache/pool/EVCacheLoopProbeTest.java new file mode 100644 index 00000000..20e1d134 --- /dev/null +++ b/evcache-core/src/test/java/com/netflix/evcache/pool/EVCacheLoopProbeTest.java @@ -0,0 +1,120 @@ +package com.netflix.evcache.pool; + +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadMXBean; + +import com.netflix.evcache.metrics.EVCacheMetricsFactory; +import com.netflix.spectator.api.DefaultRegistry; +import com.netflix.spectator.api.Gauge; +import com.netflix.spectator.api.Id; +import com.netflix.spectator.api.Registry; +import com.netflix.spectator.api.patterns.PolledMeter; + +import org.testng.SkipException; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + +public class EVCacheLoopProbeTest { + + @Test + public void firstSampleEstablishesBaseline() throws Exception { + assumeThreadCpuTimeAvailable(); + EVCacheLoopProbe probe = new EVCacheLoopProbe(); + + probe.tick(); + + assertTrue(Double.isNaN(probe.sampleUtilization())); + } + + @Test + public void reportsCpuUtilizationForCurrentThread() throws Exception { + assumeThreadCpuTimeAvailable(); + EVCacheLoopProbe probe = new EVCacheLoopProbe(); + + probe.tick(); + assertTrue(Double.isNaN(probe.sampleUtilization())); + + double utilization = samplePositiveUtilization(probe); + assertTrue(utilization > 0.0, "expected positive utilization, got " + utilization); + assertTrue(utilization <= 1.05, "expected utilization to be clamped, got " + utilization); + } + + @Test + public void returnsZeroWhenNoNewSampleWasPublished() throws Exception { + assumeThreadCpuTimeAvailable(); + EVCacheLoopProbe probe = new EVCacheLoopProbe(); + + probe.tick(); + assertTrue(Double.isNaN(probe.sampleUtilization())); + + assertEquals(probe.sampleUtilization(), 0.0); + } + + @Test + public void polledMeterPublishesProbeUtilization() throws Exception { + assumeThreadCpuTimeAvailable(); + Registry registry = new DefaultRegistry(); + Id id = registry.createId(EVCacheMetricsFactory.INTERNAL_LOOP_CPU_UTILIZATION, "evc.connection.id", "0", "ipc.server.asg", "test"); + EVCacheLoopProbe probe = new EVCacheLoopProbe(); + + try { + PolledMeter.using(registry) + .withId(id) + .monitorValue(probe, EVCacheLoopProbe::sampleUtilization); + + probe.tick(); + PolledMeter.update(registry); + Gauge gauge = registry.gauge(id); + assertTrue(Double.isNaN(gauge.value())); + + double value = pollPositiveUtilization(registry, probe, gauge); + assertTrue(value > 0.0, "expected polled meter to publish positive utilization, got " + value); + assertTrue(value <= 1.05, "expected utilization to be clamped, got " + value); + } finally { + PolledMeter.remove(registry, id); + } + } + + private static double samplePositiveUtilization(EVCacheLoopProbe probe) throws Exception { + double utilization = 0.0; + for (int i = 0; i < 10 && utilization <= 0.0; i++) { + Thread.sleep(1_100); + busySpinForAtLeastMillis(50); + probe.tick(); + utilization = probe.sampleUtilization(); + } + return utilization; + } + + private static double pollPositiveUtilization(Registry registry, EVCacheLoopProbe probe, Gauge gauge) throws Exception { + double utilization = 0.0; + for (int i = 0; i < 10 && utilization <= 0.0; i++) { + Thread.sleep(1_100); + busySpinForAtLeastMillis(50); + probe.tick(); + PolledMeter.update(registry); + utilization = gauge.value(); + } + return utilization; + } + + private static void assumeThreadCpuTimeAvailable() { + ThreadMXBean tmx = ManagementFactory.getThreadMXBean(); + if (!tmx.isThreadCpuTimeSupported() || !tmx.isThreadCpuTimeEnabled()) { + throw new SkipException("thread CPU time is not available on this JVM"); + } + } + + private static void busySpinForAtLeastMillis(long millis) { + long deadline = System.nanoTime() + millis * 1_000_000L; + long value = 0L; + while (System.nanoTime() < deadline) { + value += System.nanoTime(); + } + if (value == 42L) { + throw new AssertionError("unreachable"); + } + } +} diff --git a/evcache-core/src/test/java/test-suite.xml b/evcache-core/src/test/java/test-suite.xml index f031a615..7d7568a6 100644 --- a/evcache-core/src/test/java/test-suite.xml +++ b/evcache-core/src/test/java/test-suite.xml @@ -3,6 +3,7 @@ +