diff --git a/spring-core/src/jmh/java/org/springframework/util/ConcurrentLruCacheBenchmark.java b/spring-core/src/jmh/java/org/springframework/util/ConcurrentLruCacheBenchmark.java new file mode 100644 index 00000000000..7aa4e8322df --- /dev/null +++ b/spring-core/src/jmh/java/org/springframework/util/ConcurrentLruCacheBenchmark.java @@ -0,0 +1,76 @@ +/* + * Copyright 2002-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.util; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.function.Function; + +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.infra.Blackhole; + +/** + * Benchmarks for {@link ConcurrentLruCache}. + * @author Brian Clozel + */ +@BenchmarkMode(Mode.Throughput) +public class ConcurrentLruCacheBenchmark { + + @Benchmark + public void lruCache(BenchmarkData data, Blackhole bh) { + for (String element : data.elements) { + String value = data.lruCache.get(element); + bh.consume(value); + } + } + + @State(Scope.Benchmark) + public static class BenchmarkData { + + ConcurrentLruCache lruCache; + + @Param({"100"}) + public int capacity; + + @Param({"0.1"}) + public float cacheMissRate; + + public List elements; + + public Function generator; + + @Setup(Level.Iteration) + public void setup() { + this.generator = key -> key + "value"; + this.lruCache = new ConcurrentLruCache<>(this.capacity, this.generator); + Assert.isTrue(this.cacheMissRate < 1, "cache miss rate should be < 1"); + Random random = new Random(); + int elementsCount = Math.round(this.capacity * (1 + this.cacheMissRate)); + this.elements = new ArrayList<>(elementsCount); + random.ints(elementsCount).forEach(value -> this.elements.add(String.valueOf(value))); + this.elements.sort(String::compareTo); + } + } +} diff --git a/spring-core/src/main/java/org/springframework/util/ConcurrentLruCache.java b/spring-core/src/main/java/org/springframework/util/ConcurrentLruCache.java index 1d99b4cd093..c4d02819bfb 100644 --- a/spring-core/src/main/java/org/springframework/util/ConcurrentLruCache.java +++ b/spring-core/src/main/java/org/springframework/util/ConcurrentLruCache.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2021 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,116 +16,199 @@ package org.springframework.util; +import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; +import org.springframework.lang.Nullable; + + /** - * Simple LRU (Least Recently Used) cache, bounded by a specified cache limit. - * - *

This implementation is backed by a {@code ConcurrentHashMap} for storing - * the cached values and a {@code ConcurrentLinkedDeque} for ordering the keys - * and choosing the least recently used key when the cache is at full capacity. + * Simple LRU (Least Recently Used) cache, bounded by a specified cache capacity. + *

This is a simplified, opinionated implementation of a LRU cache for internal + * use in Spring Framework. It is inspired from + * ConcurrentLinkedHashMap. + *

Read and write operations are internally recorded in dedicated buffers, + * then drained at chosen times to avoid contention. * * @author Brian Clozel - * @author Juergen Hoeller + * @author Ben Manes * @since 5.3 * @param the type of the key used for cache retrieval - * @param the type of the cached values - * @see #get + * @param the type of the cached values, does not allow null values + * @see #get(Object) */ -public class ConcurrentLruCache { +@SuppressWarnings({"unchecked"}) +public final class ConcurrentLruCache { + + private final int capacity; - private final int sizeLimit; + private final AtomicInteger currentSize = new AtomicInteger(); + + private final ConcurrentMap> cache; private final Function generator; - private final ConcurrentHashMap cache = new ConcurrentHashMap<>(); + private final ReadOperations readOperations; - private final ConcurrentLinkedDeque queue = new ConcurrentLinkedDeque<>(); + private final WriteOperations writeOperations; - private final ReadWriteLock lock = new ReentrantReadWriteLock(); + private final Lock evictionLock = new ReentrantLock(); - private volatile int size; + /* + * Queue that contains all ACTIVE cache entries, ordered with least recently used entries first. + * Read and write operations are buffered and periodically processed to reorder the queue. + */ + private final EvictionQueue evictionQueue = new EvictionQueue<>(); + private final AtomicReference drainStatus = new AtomicReference<>(DrainStatus.IDLE); /** - * Create a new cache instance with the given limit and generator function. - * @param sizeLimit the maximum number of entries in the cache + * Create a new cache instance with the given capacity and generator function. + * @param capacity the maximum number of entries in the cache * (0 indicates no caching, always generating a new value) * @param generator a function to generate a new value for a given key */ - public ConcurrentLruCache(int sizeLimit, Function generator) { - Assert.isTrue(sizeLimit >= 0, "Cache size limit must not be negative"); - Assert.notNull(generator, "Generator function must not be null"); - this.sizeLimit = sizeLimit; - this.generator = generator; + public ConcurrentLruCache(int capacity, Function generator) { + this(capacity, generator, 16); } + private ConcurrentLruCache(int capacity, Function generator, int concurrencyLevel) { + Assert.isTrue(capacity > 0, "Capacity should be > 0"); + this.capacity = capacity; + this.cache = new ConcurrentHashMap<>(16, 0.75f, concurrencyLevel); + this.generator = generator; + this.readOperations = new ReadOperations<>(this.evictionQueue); + this.writeOperations = new WriteOperations(); + } /** - * Retrieve an entry from the cache, potentially triggering generation - * of the value. + * Retrieve an entry from the cache, potentially triggering generation of the value. * @param key the key to retrieve the entry for * @return the cached or newly generated value */ public V get(K key) { - if (this.sizeLimit == 0) { - return this.generator.apply(key); + final Node node = this.cache.get(key); + if (node == null) { + V value = this.generator.apply(key); + put(key, value); + return value; } + processRead(node); + return node.getValue(); + } - V cached = this.cache.get(key); - if (cached != null) { - if (this.size < this.sizeLimit) { - return cached; - } - this.lock.readLock().lock(); + private void put(K key, V value) { + Assert.notNull(key, "key should not be null"); + Assert.notNull(value, "value should not be null"); + final CacheEntry cacheEntry = new CacheEntry<>(value, CacheEntryState.ACTIVE); + final Node node = new Node<>(key, cacheEntry); + final Node prior = this.cache.put(node.key, node); + if (prior == null) { + processWrite(new AddTask(node)); + } + else { + processRead(prior); + } + } + + private void processRead(Node node) { + boolean drainRequested = this.readOperations.recordRead(node); + final DrainStatus status = this.drainStatus.get(); + if (status.shouldDrainBuffers(drainRequested)) { + drainOperations(); + } + } + + private void processWrite(Runnable task) { + this.writeOperations.add(task); + this.drainStatus.lazySet(DrainStatus.REQUIRED); + drainOperations(); + } + + private void drainOperations() { + if (this.evictionLock.tryLock()) { try { - if (this.queue.removeLastOccurrence(key)) { - this.queue.offer(key); - } - return cached; + this.drainStatus.lazySet(DrainStatus.PROCESSING); + this.readOperations.drain(); + this.writeOperations.drain(); } finally { - this.lock.readLock().unlock(); + this.drainStatus.compareAndSet(DrainStatus.PROCESSING, DrainStatus.IDLE); + this.evictionLock.unlock(); } } + } + + /** + * Return the maximum number of entries in the cache. + * @see #size() + */ + public int capacity() { + return this.capacity; + } - this.lock.writeLock().lock(); + /** + * Return the maximum number of entries in the cache. + * @deprecated in favor of {@link #capacity()} as of 6.0. + */ + @Deprecated + public int sizeLimit() { + return this.capacity; + } + + /** + * Return the current size of the cache. + * @see #capacity() + */ + public int size() { + return this.cache.size(); + } + + /** + * Immediately remove all entries from this cache. + */ + public void clear() { + this.evictionLock.lock(); try { - // Retrying in case of concurrent reads on the same key - cached = this.cache.get(key); - if (cached != null) { - if (this.queue.removeLastOccurrence(key)) { - this.queue.offer(key); - } - return cached; - } - // Generate value first, to prevent size inconsistency - V value = this.generator.apply(key); - if (this.size == this.sizeLimit) { - K leastUsed = this.queue.poll(); - if (leastUsed != null) { - this.cache.remove(leastUsed); - } + Node node; + while ((node = this.evictionQueue.poll()) != null) { + this.cache.remove(node.key, node); + markAsRemoved(node); } - this.queue.offer(key); - this.cache.put(key, value); - this.size = this.cache.size(); - return value; + this.readOperations.clear(); + this.writeOperations.drainAll(); } finally { - this.lock.writeLock().unlock(); + this.evictionLock.unlock(); + } + } + + /* + * Transition the node to the {@code removed} state and decrement the current size of the cache. + */ + private void markAsRemoved(Node node) { + for (; ; ) { + CacheEntry current = node.get(); + CacheEntry removed = new CacheEntry<>(current.value, CacheEntryState.REMOVED); + if (node.compareAndSet(current, removed)) { + this.currentSize.lazySet(this.currentSize.get() - 1); + return; + } } } /** * Determine whether the given key is present in this cache. * @param key the key to check for - * @return {@code true} if the key is present, - * {@code false} if there was no matching key + * @return {@code true} if the key is present, {@code false} if there was no matching key */ public boolean contains(K key) { return this.cache.containsKey(key); @@ -137,49 +220,393 @@ public class ConcurrentLruCache { * @return {@code true} if the key was present before, * {@code false} if there was no matching key */ + @Nullable public boolean remove(K key) { - this.lock.writeLock().lock(); - try { - boolean wasPresent = (this.cache.remove(key) != null); - this.queue.remove(key); - this.size = this.cache.size(); - return wasPresent; + final Node node = this.cache.remove(key); + if (node == null) { + return false; } - finally { - this.lock.writeLock().unlock(); + markForRemoval(node); + processWrite(new RemovalTask(node)); + return true; + } + + /* + * Transition the node from the {@code active} state to the {@code pending removal} state, + * if the transition is valid. + */ + private void markForRemoval(Node node) { + for (; ; ) { + final CacheEntry current = node.get(); + if (!current.isActive()) { + return; + } + final CacheEntry pendingRemoval = new CacheEntry<>(current.value, CacheEntryState.PENDING_REMOVAL); + if (node.compareAndSet(current, pendingRemoval)) { + return; + } } } /** - * Immediately remove all entries from this cache. + * Write operation recorded when a new entry is added to the cache. */ - public void clear() { - this.lock.writeLock().lock(); - try { - this.cache.clear(); - this.queue.clear(); - this.size = 0; + private final class AddTask implements Runnable { + final Node node; + + AddTask(Node node) { + this.node = node; } - finally { - this.lock.writeLock().unlock(); + + @Override + public void run() { + currentSize.lazySet(currentSize.get() + 1); + if (this.node.get().isActive()) { + evictionQueue.add(this.node); + evictEntries(); + } } + + private void evictEntries() { + while (currentSize.get() > capacity) { + final Node node = evictionQueue.poll(); + if (node == null) { + return; + } + cache.remove(node.key, node); + markAsRemoved(node); + } + } + } + /** - * Return the current size of the cache. - * @see #sizeLimit() + * Write operation recorded when an entry is removed to the cache. */ - public int size() { - return this.size; + private final class RemovalTask implements Runnable { + final Node node; + + RemovalTask(Node node) { + this.node = node; + } + + @Override + public void run() { + evictionQueue.remove(this.node); + markAsRemoved(this.node); + } } - /** - * Return the maximum number of entries in the cache - * (0 indicates no caching, always generating a new value). - * @see #size() + + /* + * Draining status for the read/write buffers. */ - public int sizeLimit() { - return this.sizeLimit; + private enum DrainStatus { + + /* + * No drain operation currently running. + */ + IDLE { + @Override + boolean shouldDrainBuffers(boolean delayable) { + return !delayable; + } + }, + + /* + * A drain operation is required due to a pending write modification. + */ + REQUIRED { + @Override + boolean shouldDrainBuffers(boolean delayable) { + return true; + } + }, + + /* + * A drain operation is in progress. + */ + PROCESSING { + @Override + boolean shouldDrainBuffers(boolean delayable) { + return false; + } + }; + + /** + * Determine whether the buffers should be drained. + * @param delayable if a drain should be delayed until required + * @return if a drain should be attempted + */ + abstract boolean shouldDrainBuffers(boolean delayable); + } + + private enum CacheEntryState { + ACTIVE, PENDING_REMOVAL, REMOVED + } + + private record CacheEntry(V value, CacheEntryState state) { + + boolean isActive() { + return this.state == CacheEntryState.ACTIVE; + } + } + + private static final class ReadOperations { + + private static final int BUFFER_COUNT = detectNumberOfBuffers(); + + private static int detectNumberOfBuffers() { + int availableProcessors = Runtime.getRuntime().availableProcessors(); + return 1 << (Integer.SIZE - Integer.numberOfLeadingZeros(availableProcessors - 1)); + } + + private static final int BUFFERS_MASK = BUFFER_COUNT - 1; + + private static final int MAX_PENDING_OPERATIONS = 32; + + private static final int MAX_DRAIN_COUNT = 2 * MAX_PENDING_OPERATIONS; + + private static final int BUFFER_SIZE = 2 * MAX_DRAIN_COUNT; + + private static final int BUFFER_INDEX_MASK = BUFFER_SIZE - 1; + + /* + * Number of operations recorded, for each buffer + */ + private final AtomicLong[] recordedCount = new AtomicLong[BUFFER_COUNT]; + + /* + * Number of operations read, for each buffer + */ + private final long[] readCount = new long[BUFFER_COUNT]; + + /* + * Number of operations processed, for each buffer + */ + private final AtomicLong[] processedCount = new AtomicLong[BUFFER_COUNT]; + + @SuppressWarnings("rawtypes") + private final AtomicReference>[][] buffers = new AtomicReference[BUFFER_COUNT][BUFFER_SIZE]; + + private final EvictionQueue evictionQueue; + + @SuppressWarnings("rawtypes") + ReadOperations(EvictionQueue evictionQueue) { + this.evictionQueue = evictionQueue; + for (int i = 0; i < BUFFER_COUNT; i++) { + this.recordedCount[i] = new AtomicLong(); + this.processedCount[i] = new AtomicLong(); + this.buffers[i] = new AtomicReference[BUFFER_SIZE]; + for (int j = 0; j < BUFFER_SIZE; j++) { + this.buffers[i][j] = new AtomicReference<>(); + } + } + } + + private static int getBufferIndex() { + return ((int) Thread.currentThread().getId()) & BUFFERS_MASK; + } + + boolean recordRead(Node node) { + int bufferIndex = getBufferIndex(); + final AtomicLong counter = this.recordedCount[bufferIndex]; + final long writeCount = counter.get(); + counter.lazySet(writeCount + 1); + final int index = (int) (writeCount & BUFFER_INDEX_MASK); + this.buffers[bufferIndex][index].lazySet(node); + final long pending = (writeCount - this.processedCount[bufferIndex].get()); + return (pending < MAX_PENDING_OPERATIONS); + } + + void drain() { + final int start = (int) Thread.currentThread().getId(); + final int end = start + BUFFER_COUNT; + for (int i = start; i < end; i++) { + drainReadBuffer(i & BUFFERS_MASK); + } + } + + void clear() { + for (AtomicReference>[] buffer : this.buffers) { + for (AtomicReference> slot : buffer) { + slot.lazySet(null); + } + } + } + + private void drainReadBuffer(int bufferIndex) { + final long writeCount = this.recordedCount[bufferIndex].get(); + for (int i = 0; i < MAX_DRAIN_COUNT; i++) { + final int index = (int) (this.readCount[bufferIndex] & BUFFER_INDEX_MASK); + final AtomicReference> slot = this.buffers[bufferIndex][index]; + final Node node = slot.get(); + if (node == null) { + break; + } + slot.lazySet(null); + this.evictionQueue.moveToBack(node); + this.readCount[bufferIndex]++; + } + this.processedCount[bufferIndex].lazySet(writeCount); + } + } + + private static final class WriteOperations { + + private static final int DRAIN_THRESHOLD = 16; + + private final Queue operations = new ConcurrentLinkedQueue<>(); + + public void add(Runnable task) { + this.operations.add(task); + } + + public void drain() { + for (int i = 0; i < DRAIN_THRESHOLD; i++) { + final Runnable task = this.operations.poll(); + if (task == null) { + break; + } + task.run(); + } + } + + public void drainAll() { + Runnable task; + while ((task = this.operations.poll()) != null) { + task.run(); + } + } + + } + + @SuppressWarnings("serial") + private static final class Node extends AtomicReference> { + final K key; + + @Nullable + Node prev; + + @Nullable + Node next; + + Node(K key, CacheEntry cacheEntry) { + super(cacheEntry); + this.key = key; + } + + @Nullable + public Node getPrevious() { + return this.prev; + } + + public void setPrevious(@Nullable Node prev) { + this.prev = prev; + } + + @Nullable + public Node getNext() { + return this.next; + } + + public void setNext(@Nullable Node next) { + this.next = next; + } + + V getValue() { + return get().value; + } + } + + + private static final class EvictionQueue { + + @Nullable + Node first; + + @Nullable + Node last; + + + @Nullable + Node poll() { + if (this.first == null) { + return null; + } + final Node f = this.first; + final Node next = f.getNext(); + f.setNext(null); + + this.first = next; + if (next == null) { + this.last = null; + } + else { + next.setPrevious(null); + } + return f; + } + + void add(Node e) { + if (contains(e)) { + return; + } + linkLast(e); + } + + private boolean contains(Node e) { + return (e.getPrevious() != null) + || (e.getNext() != null) + || (e == this.first); + } + + private void linkLast(final Node e) { + final Node l = this.last; + this.last = e; + + if (l == null) { + this.first = e; + } + else { + l.setNext(e); + e.setPrevious(l); + } + } + + private void unlink(Node e) { + final Node prev = e.getPrevious(); + final Node next = e.getNext(); + if (prev == null) { + this.first = next; + } + else { + prev.setNext(next); + e.setPrevious(null); + } + if (next == null) { + this.last = prev; + } + else { + next.setPrevious(prev); + e.setNext(null); + } + } + + void moveToBack(Node e) { + if (contains(e) && e != this.last) { + unlink(e); + linkLast(e); + } + } + + void remove(Node e) { + if (contains(e)) { + unlink(e); + } + } + } } diff --git a/spring-core/src/test/java/org/springframework/util/ConcurrentLruCacheTests.java b/spring-core/src/test/java/org/springframework/util/ConcurrentLruCacheTests.java index 31ec75944f3..5c62362c112 100644 --- a/spring-core/src/test/java/org/springframework/util/ConcurrentLruCacheTests.java +++ b/spring-core/src/test/java/org/springframework/util/ConcurrentLruCacheTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,6 +21,7 @@ import org.junit.jupiter.api.Test; import static org.assertj.core.api.Assertions.assertThat; /** + * Tests for {@link ConcurrentLruCache}. * @author Juergen Hoeller */ class ConcurrentLruCacheTests { @@ -30,7 +31,7 @@ class ConcurrentLruCacheTests { @Test void getAndSize() { - assertThat(this.cache.sizeLimit()).isEqualTo(2); + assertThat(this.cache.capacity()).isEqualTo(2); assertThat(this.cache.size()).isEqualTo(0); assertThat(this.cache.get("k1")).isEqualTo("k1value"); assertThat(this.cache.size()).isEqualTo(1); diff --git a/spring-jdbc/src/main/java/org/springframework/jdbc/core/namedparam/NamedParameterJdbcTemplate.java b/spring-jdbc/src/main/java/org/springframework/jdbc/core/namedparam/NamedParameterJdbcTemplate.java index 3d3752b5bf5..5159ed16cd2 100644 --- a/spring-jdbc/src/main/java/org/springframework/jdbc/core/namedparam/NamedParameterJdbcTemplate.java +++ b/spring-jdbc/src/main/java/org/springframework/jdbc/core/namedparam/NamedParameterJdbcTemplate.java @@ -135,7 +135,7 @@ public class NamedParameterJdbcTemplate implements NamedParameterJdbcOperations * Return the maximum number of entries for this template's SQL cache. */ public int getCacheLimit() { - return this.parsedSqlCache.sizeLimit(); + return this.parsedSqlCache.capacity(); }