@ -20,7 +20,9 @@ import java.util.ArrayList;
@@ -20,7 +20,9 @@ import java.util.ArrayList;
import java.util.List ;
import java.util.concurrent.atomic.AtomicReference ;
import java.util.function.Supplier ;
import java.util.stream.Collectors ;
import reactor.core.publisher.Flux ;
import reactor.core.publisher.Mono ;
import org.springframework.http.HttpCookie ;
@ -39,13 +41,21 @@ import org.springframework.util.MultiValueMap;
@@ -39,13 +41,21 @@ import org.springframework.util.MultiValueMap;
* /
public abstract class AbstractClientHttpRequest implements ClientHttpRequest {
/ * *
* COMMITTING - > COMMITTED is the period after doCommit is called but before
* the response status and headers have been applied to the underlying
* response during which time pre - commit actions can still make changes to
* the response status and headers .
* /
private enum State { NEW , COMMITTING , COMMITTED }
private final HttpHeaders headers ;
private final MultiValueMap < String , HttpCookie > cookies ;
private AtomicReference < State > state = new AtomicReference < > ( State . NEW ) ;
private final List < Supplier < ? extends Mono < Void > > > beforeCommitActions = new ArrayList < > ( 4 ) ;
private final List < Supplier < ? extends Mono < Void > > > c ommitActions = new ArrayList < > ( 4 ) ;
public AbstractClientHttpRequest ( ) {
@ -61,7 +71,7 @@ public abstract class AbstractClientHttpRequest implements ClientHttpRequest {
@@ -61,7 +71,7 @@ public abstract class AbstractClientHttpRequest implements ClientHttpRequest {
@Override
public HttpHeaders getHeaders ( ) {
if ( State . COMITTED . equals ( this . state . get ( ) ) ) {
if ( State . COMM ITTED . equals ( this . state . get ( ) ) ) {
return HttpHeaders . readOnlyHttpHeaders ( this . headers ) ;
}
return this . headers ;
@ -69,42 +79,66 @@ public abstract class AbstractClientHttpRequest implements ClientHttpRequest {
@@ -69,42 +79,66 @@ public abstract class AbstractClientHttpRequest implements ClientHttpRequest {
@Override
public MultiValueMap < String , HttpCookie > getCookies ( ) {
if ( State . COMITTED . equals ( this . state . get ( ) ) ) {
if ( State . COMM ITTED . equals ( this . state . get ( ) ) ) {
return CollectionUtils . unmodifiableMultiValueMap ( this . cookies ) ;
}
return this . cookies ;
}
protected Mono < Void > applyBeforeCommit ( ) {
Mono < Void > mono = Mono . empty ( ) ;
if ( this . state . compareAndSet ( State . NEW , State . COMMITTING ) ) {
for ( Supplier < ? extends Mono < Void > > action : this . beforeCommitActions ) {
mono = mono . then ( ( ) - > action . get ( ) ) ;
}
return mono
. otherwise ( ex - > {
// Ignore errors from beforeCommit actions
return Mono . empty ( ) ;
} )
. then ( ( ) - > {
this . state . set ( State . COMITTED ) ;
writeHeaders ( ) ;
writeCookies ( ) ;
return Mono . empty ( ) ;
} ) ;
/ * *
* A variant of { @link # doCommit ( Supplier ) } for a request without body .
* @return a completion publisher
* /
protected Mono < Void > doCommit ( ) {
return ( this . state . get ( ) = = State . NEW ? doCommit ( null ) : Mono . empty ( ) ) ;
}
/ * *
* Apply { @link # beforeCommit ( Supplier ) beforeCommit } actions , apply the
* request headers / cookies , and write the request body .
* @param writeAction the action to write the request body or { @code null }
* @return a completion publisher
* /
protected Mono < Void > doCommit ( Supplier < ? extends Mono < Void > > writeAction ) {
if ( ! this . state . compareAndSet ( AbstractClientHttpRequest . State . NEW , AbstractClientHttpRequest . State . COMMITTING ) ) {
return Mono . empty ( ) ;
}
this . commitActions . add ( ( ) - > {
applyHeaders ( ) ;
applyCookies ( ) ;
this . state . set ( AbstractClientHttpRequest . State . COMMITTED ) ;
return Mono . empty ( ) ;
} ) ;
if ( writeAction ! = null ) {
this . commitActions . add ( writeAction ) ;
}
return mono ;
List < ? extends Mono < Void > > actions = this . commitActions . stream ( )
. map ( Supplier : : get ) . collect ( Collectors . toList ( ) ) ;
return Flux . concat ( actions ) . next ( ) ;
}
@Override
public void beforeCommit ( Supplier < ? extends Mono < Void > > action ) {
Assert . notNull ( action ) ;
this . beforeCommitActions . add ( action ) ;
this . c ommitActions. add ( action ) ;
}
protected abstract void writeHeaders ( ) ;
/ * *
* Implement this method to apply header changes from { @link # getHeaders ( ) }
* to the underlying response . This method is called once only .
* /
protected abstract void applyHeaders ( ) ;
/ * *
* Implement this method to add cookies from { @link # getHeaders ( ) } to the
* underlying response . This method is called once only .
* /
protected abstract void applyCookies ( ) ;
protected abstract void writeCookies ( ) ;
private enum State { NEW , COMMITTING , COMITTED }
}