From e117ebe35dc7c35d476e0d4fb2e1a5a756e34700 Mon Sep 17 00:00:00 2001 From: sriram-rangarajan Date: Wed, 13 Jul 2022 07:39:41 -0700 Subject: [PATCH] router write changes. No writing to all replicas (#129) Co-authored-by: Sriram Rangarajan --- .../java/com/netflix/evcache/EVCacheImpl.java | 57 +++++++++++++------ 1 file changed, 41 insertions(+), 16 deletions(-) diff --git a/evcache-core/src/main/java/com/netflix/evcache/EVCacheImpl.java b/evcache-core/src/main/java/com/netflix/evcache/EVCacheImpl.java index e6c04124..90ae8f65 100644 --- a/evcache-core/src/main/java/com/netflix/evcache/EVCacheImpl.java +++ b/evcache-core/src/main/java/com/netflix/evcache/EVCacheImpl.java @@ -96,6 +96,7 @@ public class EVCacheImpl implements EVCache, EVCacheImplMBean { private final EVCacheTranscoder evcacheValueTranscoder; private final Property maxReadDuration, maxWriteDuration; private final Property clientReadRetry; + private final Property clientWriteToAllReplicas; protected final EVCacheClientPoolManager _poolManager; private final Map timerMap = new ConcurrentHashMap(); @@ -171,6 +172,7 @@ public class EVCacheImpl implements EVCache, EVCacheImplMBean { }); this.clientReadRetry = propertyRepository.get(appName + ".router.client.read.shouldRetry", Boolean.class).orElse(true); + this.clientWriteToAllReplicas = propertyRepository.get(appName + ".router.client.write.shouldWriteToAllReplicas", Boolean.class).orElse(true); _pool.pingServers(); @@ -1377,7 +1379,7 @@ public EVCacheLatch touch(String key, int timeToLive, Policy policy) throws checkTTL(timeToLive, Call.TOUCH); final boolean throwExc = doThrowException(); - final EVCacheClient[] clients = _pool.getEVCacheClientForWrite(); + final EVCacheClient[] clients = getClientsForWrite(); if (clients.length == 0) { incrementFastFail(EVCacheMetricsFactory.NULL_CLIENT, Call.TOUCH); if (throwExc) throw new EVCacheException("Could not find a client to set the data"); @@ -1405,7 +1407,7 @@ public EVCacheLatch touch(String key, int timeToLive, Policy policy) throws String status = EVCacheMetricsFactory.SUCCESS; final long start = EVCacheMetricsFactory.getInstance().getRegistry().clock().wallTime(); try { - final EVCacheLatchImpl latch = new EVCacheLatchImpl(policy == null ? Policy.ALL_MINUS_1 : policy, clients.length - _pool.getWriteOnlyEVCacheClients().length, _appName); + final EVCacheLatchImpl latch = new EVCacheLatchImpl(policy == null ? Policy.ALL_MINUS_1 : policy, getLatchCount(clients), _appName); touchData(evcKey, timeToLive, clients, latch); if (event != null) { @@ -1436,7 +1438,7 @@ public EVCacheLatch touch(String key, int timeToLive, Policy policy) throws } private void touchData(EVCacheKey evcKey, int timeToLive) throws Exception { - final EVCacheClient[] clients = _pool.getEVCacheClientForWrite(); + final EVCacheClient[] clients = getClientsForWrite(); touchData(evcKey, timeToLive, clients); } @@ -1925,8 +1927,8 @@ public EVCacheLatch set(String key, T value, Transcoder tc, EVCacheLatch. } public EVCacheLatch set(String key, T value, Transcoder tc, int timeToLive, Policy policy) throws EVCacheException { - EVCacheClient[] clients = _pool.getEVCacheClientForWrite(); - return this.set(key, value, tc, timeToLive, policy, clients, clients.length - _pool.getWriteOnlyEVCacheClients().length); + final EVCacheClient[] clients = getClientsForWrite(); + return this.set(key, value, tc, timeToLive, policy, clients, getLatchCount(clients)); } protected EVCacheLatch set(String key, T value, Transcoder tc, int timeToLive, Policy policy, EVCacheClient[] clients, int latchCount) throws EVCacheException { @@ -2023,7 +2025,7 @@ public EVCacheFuture[] append(String key, T value, Transcoder tc, int tim checkTTL(timeToLive, Call.APPEND); final boolean throwExc = doThrowException(); - final EVCacheClient[] clients = _pool.getEVCacheClientForWrite(); + final EVCacheClient[] clients = getClientsForWrite(); if (clients.length == 0) { incrementFastFail(EVCacheMetricsFactory.NULL_CLIENT, Call.APPEND); if (throwExc) throw new EVCacheException("Could not find a client to set the data"); @@ -2143,7 +2145,7 @@ protected EVCacheLatch deleteInternal(String key, Policy policy, boolean isO if (key == null) throw new IllegalArgumentException("Key cannot be null"); final boolean throwExc = doThrowException(); - final EVCacheClient[] clients = _pool.getEVCacheClientForWrite(); + final EVCacheClient[] clients = getClientsForWrite(); if (clients.length == 0) { incrementFastFail(EVCacheMetricsFactory.NULL_CLIENT, Call.DELETE); if (throwExc) throw new EVCacheException("Could not find a client to delete the keyAPP " + _appName @@ -2171,7 +2173,7 @@ protected EVCacheLatch deleteInternal(String key, Policy policy, boolean isO String status = EVCacheMetricsFactory.SUCCESS; final long start = EVCacheMetricsFactory.getInstance().getRegistry().clock().wallTime(); - final EVCacheLatchImpl latch = new EVCacheLatchImpl(policy == null ? Policy.ALL_MINUS_1 : policy, clients.length - _pool.getWriteOnlyEVCacheClients().length, _appName); + final EVCacheLatchImpl latch = new EVCacheLatchImpl(policy == null ? Policy.ALL_MINUS_1 : policy, getLatchCount(clients), _appName); try { for (int i = 0; i < clients.length; i++) { Future future = clients[i].delete(isOriginalKeyHashed ? evcKey.getKey() : evcKey.getDerivedKey(clients[i].isDuetClient(), clients[i].getHashingAlgorithm(), clients[i].shouldEncodeHashKey(), clients[i].getMaxDigestBytes(), clients[i].getMaxHashLength(), clients[i].getBaseEncoder()), latch); @@ -2210,12 +2212,27 @@ public int getDefaultTTL() { return _timeToLive; } + private EVCacheClient[] getClientsForWrite() { + EVCacheClient[] clients; + if (clientWriteToAllReplicas.get()) { + clients = _pool.getEVCacheClientForWrite(); + } else { + EVCacheClient client = _pool.getEVCacheClientForRead(); + if (client == null) { + clients = new EVCacheClient[]{}; + } else { + clients = new EVCacheClient[]{client}; + } + } + return clients; + } + public long incr(String key, long by, long defaultVal, int timeToLive) throws EVCacheException { if ((null == key) || by < 0 || defaultVal < 0 || timeToLive < 0) throw new IllegalArgumentException(); checkTTL(timeToLive, Call.INCR); final boolean throwExc = doThrowException(); - final EVCacheClient[] clients = _pool.getEVCacheClientForWrite(); + final EVCacheClient[] clients = getClientsForWrite(); if (clients.length == 0) { incrementFastFail(EVCacheMetricsFactory.NULL_CLIENT, Call.INCR); if (log.isDebugEnabled() && shouldLog()) log.debug("INCR : " + _metricPrefix + ":NULL_CLIENT"); @@ -2299,7 +2316,7 @@ public long decr(String key, long by, long defaultVal, int timeToLive) throws EV checkTTL(timeToLive, Call.DECR); final boolean throwExc = doThrowException(); - final EVCacheClient[] clients = _pool.getEVCacheClientForWrite(); + final EVCacheClient[] clients = getClientsForWrite(); if (clients.length == 0) { incrementFastFail(EVCacheMetricsFactory.NULL_CLIENT, Call.DECR); if (log.isDebugEnabled() && shouldLog()) log.debug("DECR : " + _metricPrefix + ":NULL_CLIENT"); @@ -2394,6 +2411,14 @@ public EVCacheLatch replace(String key, T value, int timeToLive, Policy pol return replace(key, value, (Transcoder)_transcoder, timeToLive, policy); } + private int getLatchCount(EVCacheClient[] clients) { + if (clientWriteToAllReplicas.get()) { + return clients.length - _pool.getWriteOnlyEVCacheClients().length; + } else { + return clients.length; + } + } + @Override public EVCacheLatch replace(String key, T value, Transcoder tc, int timeToLive, Policy policy) throws EVCacheException { @@ -2401,7 +2426,7 @@ public EVCacheLatch replace(String key, T value, Transcoder tc, int timeT checkTTL(timeToLive, Call.REPLACE); final boolean throwExc = doThrowException(); - final EVCacheClient[] clients = _pool.getEVCacheClientForWrite(); + final EVCacheClient[] clients = getClientsForWrite(); if (clients.length == 0) { incrementFastFail(EVCacheMetricsFactory.NULL_CLIENT, Call.REPLACE); if (throwExc) throw new EVCacheException("Could not find a client to set the data"); @@ -2428,7 +2453,7 @@ public EVCacheLatch replace(String key, T value, Transcoder tc, int timeT final long start = EVCacheMetricsFactory.getInstance().getRegistry().clock().wallTime(); String status = EVCacheMetricsFactory.SUCCESS; - final EVCacheLatchImpl latch = new EVCacheLatchImpl(policy == null ? Policy.ALL_MINUS_1 : policy, clients.length - _pool.getWriteOnlyEVCacheClients().length, _appName); + final EVCacheLatchImpl latch = new EVCacheLatchImpl(policy == null ? Policy.ALL_MINUS_1 : policy,getLatchCount(clients) , _appName); try { final EVCacheFuture[] futures = new EVCacheFuture[clients.length]; CachedData cd = null; @@ -2494,7 +2519,7 @@ public EVCacheLatch appendOrAdd(String key, T value, Transcoder tc, int t checkTTL(timeToLive, Call.APPEND_OR_ADD); final boolean throwExc = doThrowException(); - final EVCacheClient[] clients = _pool.getEVCacheClientForWrite(); + final EVCacheClient[] clients = getClientsForWrite(); if (clients.length == 0) { incrementFastFail(EVCacheMetricsFactory.NULL_CLIENT, Call.APPEND_OR_ADD); if (throwExc) throw new EVCacheException("Could not find a client to appendOrAdd the data"); @@ -2519,7 +2544,7 @@ public EVCacheLatch appendOrAdd(String key, T value, Transcoder tc, int t startEvent(event); } final long start = EVCacheMetricsFactory.getInstance().getRegistry().clock().wallTime(); - final EVCacheLatchImpl latch = new EVCacheLatchImpl(policy == null ? Policy.ALL_MINUS_1 : policy, clients.length - _pool.getWriteOnlyEVCacheClients().length, _appName); + final EVCacheLatchImpl latch = new EVCacheLatchImpl(policy == null ? Policy.ALL_MINUS_1 : policy, getLatchCount(clients), _appName); String status = EVCacheMetricsFactory.SUCCESS; try { CachedData cd = null; @@ -2598,8 +2623,8 @@ public boolean add(String key, T value, Transcoder tc, int timeToLive) th @Override public EVCacheLatch add(String key, T value, Transcoder tc, int timeToLive, Policy policy) throws EVCacheException { - EVCacheClient[] clients = _pool.getEVCacheClientForWrite(); - return this.add(key, value, tc, timeToLive, policy, clients, clients.length - _pool.getWriteOnlyEVCacheClients().length); + final EVCacheClient[] clients = getClientsForWrite(); + return this.add(key, value, tc, timeToLive, policy, clients, getLatchCount(clients)); } protected EVCacheLatch add(String key, T value, Transcoder tc, int timeToLive, Policy policy, EVCacheClient[] clients, int latchCount) throws EVCacheException {