@ -15,10 +15,21 @@
@@ -15,10 +15,21 @@
* /
package org.springframework.data.jdbc.repository ;
import static org.assertj.core.api.Assertions.* ;
import junit.framework.AssertionFailedError ;
import lombok.AllArgsConstructor ;
import lombok.Getter ;
import lombok.With ;
import java.util.ArrayList ;
import java.util.Arrays ;
import java.util.List ;
import java.util.concurrent.CopyOnWriteArrayList ;
import java.util.concurrent.CountDownLatch ;
import java.util.function.UnaryOperator ;
import org.junit.Before ;
import org.junit.ClassRule ;
import org.junit.Rule ;
import org.junit.Test ;
@ -29,34 +40,20 @@ import org.springframework.context.annotation.Import;
@@ -29,34 +40,20 @@ import org.springframework.context.annotation.Import;
import org.springframework.dao.IncorrectUpdateSemanticsDataAccessException ;
import org.springframework.data.annotation.Id ;
import org.springframework.data.jdbc.repository.support.JdbcRepositoryFactory ;
import org.springframework.data.jdbc.testing.DatabaseProfileValueSource ;
import org.springframework.data.jdbc.testing.TestConfiguration ;
import org.springframework.data.repository.CrudRepository ;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate ;
import org.springframework.test.annotation.IfProfileValue ;
import org.springframework.test.annotation.ProfileValueSourceConfiguration ;
import org.springframework.test.context.ContextConfiguration ;
import org.springframework.test.context.junit4.rules.SpringClassRule ;
import org.springframework.test.context.junit4.rules.SpringMethodRule ;
import org.springframework.transaction.PlatformTransactionManager ;
import org.springframework.transaction.support.TransactionTemplate ;
import java.util.ArrayList ;
import java.util.Arrays ;
import java.util.List ;
import java.util.concurrent.CopyOnWriteArrayList ;
import java.util.concurrent.CountDownLatch ;
import static org.assertj.core.api.Assertions.assertThat ;
/ * * Tests that highly concurrent update operations of an entity don ' t cause deadlocks .
/ * *
* Tests that highly concurrent update operations of an entity don ' t cause deadlocks .
*
* @author Myeonghyeon Lee
* @author Jens Schauder
* /
@ContextConfiguration
@ProfileValueSourceConfiguration ( DatabaseProfileValueSource . class )
@IfProfileValue ( name = "current.database.is.not.mysql" , value = "false" )
public class JdbcRepositoryConcurrencyIntegrationTests {
@Configuration
@ -83,38 +80,37 @@ public class JdbcRepositoryConcurrencyIntegrationTests {
@@ -83,38 +80,37 @@ public class JdbcRepositoryConcurrencyIntegrationTests {
@Autowired DummyEntityRepository repository ;
@Autowired PlatformTransactionManager transactionManager ;
@Test // DATAJDBC-488
public void updateConcurrencyWithEmptyReferences ( ) throws Exception {
List < DummyEntity > concurrencyEntities ;
DummyEntity entity ;
DummyEntity entity = createDummyEntity ( ) ;
entity = repository . save ( entity ) ;
TransactionTemplate transactionTemplate ;
List < Exception > exceptions ;
@Before
public void before ( ) {
entity = repository . save ( createDummyEntity ( ) ) ;
assertThat ( entity . getId ( ) ) . isNotNull ( ) ;
List < DummyEntity > concurrencyEntities = createEntityStates ( entity ) ;
concurrencyEntities = createEntityStates ( entity ) ;
TransactionTemplate transactionTemplate = new TransactionTemplate ( this . transactionManager ) ;
transactionTemplate = new TransactionTemplate ( this . transactionManager ) ;
List < Exception > exceptions = new CopyOnWriteArrayList < > ( ) ;
CountDownLatch startLatch = new CountDownLatch ( concurrencyEntities . size ( ) ) ; // latch for all threads to wait on.
CountDownLatch doneLatch = new CountDownLatch ( concurrencyEntities . size ( ) ) ; // latch for main thread to wait on until all threads are done.
exceptions = new CopyOnWriteArrayList < > ( ) ;
}
concurrencyEntities . stream ( ) //
. map ( e - > new Thread ( ( ) - > {
@Test // DATAJDBC-488
public void updateConcurrencyWithEmptyReferences ( ) throws Exception {
try {
// latch for all threads to wait on.
CountDownLatch startLatch = new CountDownLatch ( concurrencyEntities . size ( ) ) ;
// latch for main thread to wait on until all threads are done.
CountDownLatch doneLatch = new CountDownLatch ( concurrencyEntities . size ( ) ) ;
startLatch . countDown ( ) ;
startLatch . await ( ) ;
UnaryOperator < DummyEntity > action = e - > repository . save ( e ) ;
transactionTemplate . execute ( status - > repository . save ( e ) ) ;
} catch ( Exception ex ) {
exceptions . add ( ex ) ;
} finally {
doneLatch . countDown ( ) ;
}
} ) ) //
. forEach ( Thread : : start ) ;
concurrencyEntities . forEach ( e - > executeInParallel ( startLatch , doneLatch , action , e ) ) ;
doneLatch . await ( ) ;
@ -124,62 +120,31 @@ public class JdbcRepositoryConcurrencyIntegrationTests {
@@ -124,62 +120,31 @@ public class JdbcRepositoryConcurrencyIntegrationTests {
}
@Test // DATAJDBC-493
public void updateConcurrencyWithDelete ( ) throws Exception {
DummyEntity entity = createDummyEntity ( ) ;
entity = repository . save ( entity ) ;
Long targetId = entity . getId ( ) ;
assertThat ( targetId ) . isNotNull ( ) ;
List < DummyEntity > concurrencyEntities = createEntityStates ( entity ) ;
TransactionTemplate transactionTemplate = new TransactionTemplate ( this . transactionManager ) ;
public void concurrentUpdateAndDelete ( ) throws Exception {
List < Exception > exceptions = new CopyOnWriteArrayList < > ( ) ;
CountDownLatch startLatch = new CountDownLatch ( concurrencyEntities . size ( ) + 1 ) ; // latch for all threads to wait on.
CountDownLatch doneLatch = new CountDownLatch ( concurrencyEntities . size ( ) + 1 ) ; // latch for main thread to wait on until all threads are done.
// update
concurrencyEntities . stream ( ) //
. map ( e - > new Thread ( ( ) - > {
try {
startLatch . countDown ( ) ;
startLatch . await ( ) ;
transactionTemplate . execute ( status - > repository . save ( e ) ) ;
} catch ( Exception ex ) {
// When the delete execution is complete, the Update execution throws an IncorrectUpdateSemanticsDataAccessException.
if ( ex . getCause ( ) instanceof IncorrectUpdateSemanticsDataAccessException ) {
return ;
}
exceptions . add ( ex ) ;
} finally {
doneLatch . countDown ( ) ;
}
} ) ) //
. forEach ( Thread : : start ) ;
// delete
new Thread ( ( ) - > {
CountDownLatch doneLatch = new CountDownLatch ( concurrencyEntities . size ( ) + 1 ) ; // latch for main thread to wait on
// until all threads are done.
UnaryOperator < DummyEntity > updateAction = e - > {
try {
startLatch . countDown ( ) ;
startLatch . await ( ) ;
transactionTemplate . execute ( status - > {
repository . deleteById ( targetId ) ;
return null ;
} ) ;
return repository . save ( e ) ;
} catch ( Exception ex ) {
exceptions . add ( ex ) ;
} finally {
doneLatch . countDown ( ) ;
// When the delete execution is complete, the Update execution throws an
// IncorrectUpdateSemanticsDataAccessException.
if ( ex . getCause ( ) instanceof IncorrectUpdateSemanticsDataAccessException ) {
return null ;
}
throw ex ;
}
} ) . start ( ) ;
} ;
UnaryOperator < DummyEntity > deleteAction = e - > {
repository . deleteById ( entity . id ) ;
return null ;
} ;
concurrencyEntities . forEach ( e - > executeInParallel ( startLatch , doneLatch , updateAction , e ) ) ;
executeInParallel ( startLatch , doneLatch , deleteAction , entity ) ;
doneLatch . await ( ) ;
@ -188,42 +153,41 @@ public class JdbcRepositoryConcurrencyIntegrationTests {
@@ -188,42 +153,41 @@ public class JdbcRepositoryConcurrencyIntegrationTests {
}
@Test // DATAJDBC-493
public void updateConcurrencyWithDeleteAll ( ) throws Exception {
DummyEntity entity = createDummyEntity ( ) ;
entity = repository . save ( entity ) ;
List < DummyEntity > concurrencyEntities = createEntityStates ( entity ) ;
TransactionTemplate transactionTemplate = new TransactionTemplate ( this . transactionManager ) ;
public void concurrentUpdateAndDeleteAll ( ) throws Exception {
List < Exception > exceptions = new CopyOnWriteArrayList < > ( ) ;
CountDownLatch startLatch = new CountDownLatch ( concurrencyEntities . size ( ) + 1 ) ; // latch for all threads to wait on.
CountDownLatch doneLatch = new CountDownLatch ( concurrencyEntities . size ( ) + 1 ) ; // latch for main thread to wait on until all threads are done.
CountDownLatch doneLatch = new CountDownLatch ( concurrencyEntities . size ( ) + 1 ) ; // latch for main thread to wait on
// until all threads are done.
// update
concurrencyEntities . stream ( ) //
. map ( e - > new Thread ( ( ) - > {
UnaryOperator < DummyEntity > updateAction = e - > {
try {
return repository . save ( e ) ;
} catch ( Exception ex ) {
// When the delete execution is complete, the Update execution throws an
// IncorrectUpdateSemanticsDataAccessException.
if ( ex . getCause ( ) instanceof IncorrectUpdateSemanticsDataAccessException ) {
return null ;
}
throw ex ;
}
} ;
try {
UnaryOperator < DummyEntity > deleteAction = e - > {
repository . deleteAll ( ) ;
return null ;
} ;
startLatch . countDown ( ) ;
startLatch . await ( ) ;
concurrencyEntities . forEach ( e - > executeInParallel ( startLatch , doneLatch , updateAction , e ) ) ;
executeInParallel ( startLatch , doneLatch , deleteAction , entity ) ;
transactionTemplate . execute ( status - > repository . save ( e ) ) ;
} catch ( Exception ex ) {
// When the delete execution is complete, the Update execution throws an IncorrectUpdateSemanticsDataAccessException.
if ( ex . getCause ( ) instanceof IncorrectUpdateSemanticsDataAccessException ) {
return ;
}
doneLatch . await ( ) ;
exceptions . add ( ex ) ;
} finally {
doneLatch . countDown ( ) ;
}
} ) ) //
. forEach ( Thread : : start ) ;
assertThat ( exceptions ) . isEmpty ( ) ;
assertThat ( repository . count ( ) ) . isEqualTo ( 0 ) ;
}
private void executeInParallel ( CountDownLatch startLatch , CountDownLatch doneLatch ,
UnaryOperator < DummyEntity > deleteAction , DummyEntity entity ) {
// delete
new Thread ( ( ) - > {
try {
@ -231,21 +195,13 @@ public class JdbcRepositoryConcurrencyIntegrationTests {
@@ -231,21 +195,13 @@ public class JdbcRepositoryConcurrencyIntegrationTests {
startLatch . countDown ( ) ;
startLatch . await ( ) ;
transactionTemplate . execute ( status - > {
repository . deleteAll ( ) ;
return null ;
} ) ;
transactionTemplate . execute ( status - > deleteAction . apply ( entity ) ) ;
} catch ( Exception ex ) {
exceptions . add ( ex ) ;
} finally {
doneLatch . countDown ( ) ;
}
} ) . start ( ) ;
doneLatch . await ( ) ;
assertThat ( exceptions ) . isEmpty ( ) ;
assertThat ( repository . count ( ) ) . isEqualTo ( 0 ) ;
}
private List < DummyEntity > createEntityStates ( DummyEntity entity ) {
@ -254,7 +210,7 @@ public class JdbcRepositoryConcurrencyIntegrationTests {
@@ -254,7 +210,7 @@ public class JdbcRepositoryConcurrencyIntegrationTests {
Element element1 = new Element ( null , 1L ) ;
Element element2 = new Element ( null , 2L ) ;
for ( int i = 0 ; i < 10 0; i + + ) {
for ( int i = 0 ; i < 5 0; i + + ) {
List < Element > newContent = Arrays . asList ( element1 . withContent ( element1 . content + i + 2 ) ,
element2 . withContent ( element2 . content + i + 2 ) ) ;