Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

spring boot integration test is failing #60

Open
habeep opened this issue Oct 28, 2021 · 1 comment
Open

spring boot integration test is failing #60

habeep opened this issue Oct 28, 2021 · 1 comment

Comments

@habeep
Copy link

habeep commented Oct 28, 2021

Hello,
we are trying to write the integration/unit test but it is failing or not working as we expected. can you please help us?

dependency details:

<java.version>1.8</java.version>

      <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.3.10.RELEASE</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    <dependency>
        <groupId>io.github.resilience4j</groupId>
        <artifactId>resilience4j-spring-boot2</artifactId>
        <version>1.7.0</version>
    </dependency>

Basically I tried to run the integration test but it is failing. can you please help me to resolve this issue?

Do you have any sample which help me to refer and correct the issue?

@habeep
Copy link
Author

habeep commented Oct 28, 2021

package com.processor.config;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.processor.model.PrimaryKeyColumn;
import com.processor.model.ValueColumn;
import com.processor.exception.UDTFlowException;
import com.processor.mapping.configuration.UDTMappingDefinition;
import com.processor.mapping.model.UDTPayload;
import com.processor.mapping.model.UDTRequest;
import com.processor.model.UDTFlowMessage;
import com.processor.model.UDTResponseDTO;
import com.processor.service.UDTWSClient;
import com.processor.service.impl.ControlPersistenceServiceImpl;
import com.processor.common.enums.RequestControlStatus;
import com.processor.abc.web.service.models.axon.ControlMessage;
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;
import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.http.HttpStatus;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.web.reactive.function.client.WebClient;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static com.processor.common.common.Constants.CONTROL_ID_AXON_MESSAGE_KEY;
import static com.processor.common.common.Constants.CONTROL_OPERATION_AXON_MESSAGE_KEY;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.anyBoolean;
import static org.mockito.Mockito.anyLong;
import static org.mockito.Mockito.isNull;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@RunWith(SpringRunner.class)
@ExtendWith(SpringExtension.class)
@ContextConfiguration(classes = {Resilience4JConfig.class, UDTWSClient.class, UDTWebClientConfig.class, CommonConfigData.class})
@SpringBootTest(classes = {UDTWSClient.class, CommonConfigData.class}, properties = {
"pcf.enabled=false",
"udt.ws.uri=http://webhook.site/56352051-90cc-4187-929b-38decbb9ee13"
})
@slf4j
class AbcIntegrationFlowConfigTestWithCircuitBreaker {

@InjectMocks
private AbcIntegrationFlowConfig abcIntegrationFlowConfig;

@Autowired
private WebClient webClient;

@Autowired
private UDTWSClient udtwsClient;

@Mock
private ControlPersistenceServiceImpl controlPersistenceService;

@Mock
private ObjectMapper mapper;

private UDTFlowMessage udtFlowMessage = new UDTFlowMessage();
private String payload = "[{\"primaryKeyColumns\":[{\"name\": \"account_range\"," +
        "\"value\": \"5555600000000000-5555600000000099\"}]},{\"valueColumns\": [{\"name\": \"threshold\"," +
        "\"value\": 50}]}]";

@Mock
Acknowledgment acknowledgment;

@Mock
Message<UDTFlowMessage> message;

@Mock
MessageHeaders messageHeaders;

@Mock
AxonConfig axonConfig;


@Autowired
Resilience4JConfig resilience4JConfig;

@Autowired
CircuitBreakerRegistry circuitBreakerRegistry;

@BeforeEach
public void setUp() {
    MockitoAnnotations.initMocks(this);
    UDTRequest request = new UDTRequest();
    List<PrimaryKeyColumn> pk = new ArrayList<>();
    PrimaryKeyColumn pk1 = new PrimaryKeyColumn();
    pk1.setName("account_range");
    pk1.setValue("5555600000000000-5555600000000099");
    pk.add(pk1);

    List<ValueColumn> vk = new ArrayList<>();
    ValueColumn vk1 = new ValueColumn();
    vk1.setName("threshold");
    vk1.setValue("50");
    vk.add(vk1);
    request.setPayload((new UDTPayload.UDTPayloadBuilder()).withUDTRecord(pk, vk).build());
    UDTMappingDefinition udtMappingDefinition = new UDTMappingDefinition();
    udtMappingDefinition.setId("90000");
    udtMappingDefinition.setUdtURIs(Collections.singletonMap("PUT", "/udt/test"));
    request.setUdtMappingDefinition(udtMappingDefinition);

    ControlMessage controlMessage = new ControlMessage();
    Map<String, String> metaData = new HashMap<>();
    metaData.put(CONTROL_ID_AXON_MESSAGE_KEY, "123456");
    metaData.put(CONTROL_OPERATION_AXON_MESSAGE_KEY, "UPDATE");
    controlMessage.setMetaData(metaData);

    udtFlowMessage.setControlMessage(controlMessage);
    udtFlowMessage.setUdtRequests(Collections.singletonList(request));

// Create a CircuitBreakerRegistry with a custom global configuration CircuitBreakerRegistry circuitBreakerRegistry = CircuitBreakerRegistry.of(circuitBreakerConfig);

    when(message.getPayload()).thenReturn(udtFlowMessage);
    when(message.getHeaders()).thenReturn(messageHeaders);

    when(messageHeaders.get(any(), any())).thenReturn(acknowledgment);
}

@Test
void testSendToUDTWs_ShouldCallControlPersistenceAndAck_WhenUpdateRequestIsSuccessful() throws JsonProcessingException, UDTFlowException {

    CircuitBreakerConfig circuitBreakerConfig = CircuitBreakerConfig.custom()
            .failureRateThreshold(2)
            .waitDurationInOpenState(Duration.ofMillis(1000))
            .permittedNumberOfCallsInHalfOpenState(2)
            .slidingWindowSize(2)
            .ignoreExceptions(com.processor.exception.WebClientCBIgnoreException.class, io.github.resilience4j.ratelimiter.RequestNotPermitted.class)
            .build();
    CircuitBreakerRegistry circuitBreakerRegistry =
            CircuitBreakerRegistry.of(circuitBreakerConfig);

    CircuitBreaker circuitBreaker = circuitBreakerRegistry
            .circuitBreaker("enterprise_circuitbreaker");
    circuitBreaker.transitionToOpenState();
    CircuitBreaker.Metrics metrics = circuitBreaker.getMetrics();

    // Arrange
    UDTResponseDTO udtResponseDTO = new UDTResponseDTO(HttpStatus.ACCEPTED, "SUCCESS");
    when(udtwsClient.sendUDTRequest(anyString(), anyString(), "PUT")).thenReturn(udtResponseDTO);
    when(mapper.writeValueAsString(any())).thenReturn(payload);


    // Act & Assert
    assertDoesNotThrow(() -> {
        abcIntegrationFlowConfig.sendToUDTWs(message);
    });

    // Assert

    assertEquals(1, metrics.getNumberOfBufferedCalls());
    assertEquals(0, metrics.getNumberOfFailedCalls());
    assertEquals(1, metrics.getNumberOfSuccessfulCalls());
    verify(udtwsClient, times(1)).sendUDTRequest(anyString(), anyString(), anyString());
    verify(controlPersistenceService, times(1)).updateControlOperationStatusInDb(anyString(), any(RequestControlStatus.class), isNull(), anyBoolean(), isNull());
    verify(controlPersistenceService, never()).deleteControlById(anyString());
    verify(acknowledgment, times(1)).acknowledge();
    verify(acknowledgment, never()).nack(anyLong());
}

}


package com.processor.config;

import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;
import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry;
import io.github.resilience4j.circuitbreaker.event.CircuitBreakerEvent;
import io.github.resilience4j.circuitbreaker.event.CircuitBreakerOnStateTransitionEvent;
import io.github.resilience4j.core.registry.EntryAddedEvent;
import io.github.resilience4j.core.registry.EntryRemovedEvent;
import io.github.resilience4j.core.registry.EntryReplacedEvent;
import io.github.resilience4j.core.registry.RegistryEventConsumer;
import io.github.resilience4j.ratelimiter.RateLimiter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.time.Duration;

import static io.github.resilience4j.circuitbreaker.CircuitBreakerConfig.SlidingWindowType.COUNT_BASED;

@slf4j
@configuration
public class Resilience4JConfig {

@Bean
public RegistryEventConsumer<RateLimiter> abcACPRateLimiterRegistryEventConsumer() {
    return new RegistryEventConsumer<RateLimiter>() {

        @Override
        public void onEntryAddedEvent(EntryAddedEvent<RateLimiter> entryAddedEvent) {
            entryAddedEvent.getAddedEntry().getEventPublisher().onEvent(e-> log.debug(e.toString()));
        }

        @Override
        public void onEntryRemovedEvent(EntryRemovedEvent<RateLimiter> entryRemoveEvent) {
            entryRemoveEvent.getRemovedEntry().getEventPublisher().onEvent(e-> log.debug(e.toString()));
        }

        @Override
        public void onEntryReplacedEvent(EntryReplacedEvent<RateLimiter> entryReplacedEvent) {
            entryReplacedEvent.getNewEntry().getEventPublisher().onEvent(e-> log.debug(e.toString()));
            entryReplacedEvent.getOldEntry().getEventPublisher().onEvent(e-> log.debug(e.toString()));
        }
    };
}

@Bean
public CircuitBreakerRegistry circuitBreakerRegistry() {
    CircuitBreakerConfig circuitBreakerConfig = CircuitBreakerConfig.custom()
            .slidingWindowType(COUNT_BASED)
            .minimumNumberOfCalls(3)
            .waitDurationInOpenState(Duration.ofMillis(1000))
            .failureRateThreshold(5)
            .permittedNumberOfCallsInHalfOpenState(5)
            .slowCallRateThreshold(70)
            .slowCallDurationThreshold(Duration.ofMillis(1000))
            .slidingWindowSize(100)
            .ignoreExceptions(io.github.resilience4j.ratelimiter.RequestNotPermitted.class)
            .build();
    return CircuitBreakerRegistry.of(circuitBreakerConfig);
}

// The below class is needed to validate circuit breaker processing. It allows us to track circuit breaker state changes
// and configurations. It is not needed for normal functionality
@Bean
public RegistryEventConsumer<CircuitBreaker> abcACPCircuitBreakerRegistryEventConsumer() {
    return new RegistryEventConsumer<CircuitBreaker>() {
        @Override
        public void onEntryAddedEvent(EntryAddedEvent<CircuitBreaker> entryAddedEvent) {
            CircuitBreaker cb = entryAddedEvent.getAddedEntry();
            cb.getEventPublisher().onStateTransition(this::onStateTransition);
            cb.getEventPublisher().onEvent(event -> onEvent(cb, event));
        }

        private void onStateTransition(CircuitBreakerOnStateTransitionEvent event) {
            log.info(String.format("%s state : %s", event.getCircuitBreakerName(), event));
        }

        private void onEvent(CircuitBreaker cb, CircuitBreakerEvent event) {
            log.info(String.format("%s event(%s) : %s", event.getCircuitBreakerName(), cb.getState(), event));
        }

        @Override
        public void onEntryRemovedEvent(EntryRemovedEvent<CircuitBreaker> entryRemoveEvent) {
            entryRemoveEvent.getRemovedEntry().getEventPublisher().onEvent(event -> log.info(event.toString()));
        }

        @Override
        public void onEntryReplacedEvent(EntryReplacedEvent<CircuitBreaker> entryReplacedEvent) {
            entryReplacedEvent.getOldEntry().getEventPublisher().onEvent(event -> log.info(event.toString()));
            entryReplacedEvent.getNewEntry().getEventPublisher().onEvent(event -> log.info(event.toString()));
        }
    };
}

}

package com.processor.service;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.github.resilience4j.circuitbreaker.CallNotPermittedException;
import io.github.resilience4j.ratelimiter.RequestNotPermitted;
import io.netty.handler.timeout.TimeoutException;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.kafka.dsl.Kafka;
import org.springframework.integration.kafka.dsl.KafkaMessageDrivenChannelAdapterSpec;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.web.reactive.function.client.WebClientResponseException;

import java.util.Objects;
import java.util.UUID;

@configuration
@EnableIntegration
@AllArgsConstructor
@IntegrationComponentScan
@slf4j
public class AbcIntegrationFlowConfig {

private UDTWSClient udtwsClient;

@ServiceActivator(inputChannel = UDT_INPUT_CHANNEL)
public void sendToUDTWs(Message<UDTFlowMessage> message) throws UDTFlowException {
    UDTFlowMessage udtFlowMessage = message.getPayload();
    String controlOperation = udtFlowMessage.getControlMessage().getMetaData()
            .get(CONTROL_OPERATION_AXON_MESSAGE_KEY);
    String udtOperation = UDTRequestOperation.valueOf(controlOperation).getUdtOperation();

    try {
        for (UDTRequest r : udtFlowMessage.getUdtRequests()) {
            udtwsClient.sendUDTRequest(r.getUdtMappingDefinition().getUdtURIs().get(udtOperation),
                    mapper.writeValueAsString(r.getPayload().getRecords()), udtOperation);
        }
    } catch (JsonProcessingException jsonProcessingException) {
        throw new UDTFlowUnexpectedException(jsonProcessingException);
    } catch (WebClientCBIgnoreException exception) {
        throw new UDTFlowException(true, true, false, exception);
    } catch (WebClientResponseException exception) {
        if (exception.getStatusCode().is5xxServerError()) {
            throw new UDTFlowException(false, true, true, exception);
        } else {
            throw new UDTFlowUnexpectedException(exception);
        }
    } catch (TimeoutException | CallNotPermittedException | RequestNotPermitted e) {
        throw new UDTFlowException(false, false, false, e);
    } catch (Throwable throwable) {
        throw new UDTFlowUnexpectedException(throwable);
    }
}

}


package com.processor.service;

import com.processor.common.Constants;
import com.processor.exception.WebClientCBIgnoreException;
import com.processor.model.UDTHealthCheckResponse;
import com.processor.model.UDTResponseDTO;
import com.processor.model.UDTResponseEntity;
import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry;
import io.github.resilience4j.circuitbreaker.annotation.CircuitBreaker;
import io.github.resilience4j.ratelimiter.annotation.RateLimiter;
import io.vavr.collection.Seq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.WebClientResponseException;

import java.util.Collections;

import static com.processor.common.Constants.ENTERPRISE_RATELIMITER;

@component
@slf4j
public class UDTWSClient {

private String udtWsUri;
private WebClient webClient;

@Autowired
CircuitBreakerRegistry circuitBreakerRegistry;

@Autowired
public UDTWSClient(@Value("${udt.ws.uri}") String udtWsUri,
                   WebClient webClient) {
    this.udtWsUri = udtWsUri;
    this.webClient = webClient;
}

public ResponseEntity udtWebServiceHealthCheck(String udtWsHealthUri) {
    return webClient.get()
            .uri(udtWsHealthUri)
            .retrieve()
            .toEntity(UDTHealthCheckResponse.class)
            .block();
}

@CircuitBreaker(name = "enterprise_circuitbreaker")
public UDTResponseDTO sendUDTRequest(String uri, String payload, String method) {

    String requestUri = udtWsUri + uri;
    log.debug("Sending UDT Request [{}:{}] with payload: {}", method, requestUri, payload);
    ResponseEntity<UDTResponseEntity> response;

    try {
        response = webClient
                .method(HttpMethod.valueOf(method))
                .uri(requestUri)
                .headers(headers -> headers.addAll(initializeHeaders()))
                .body(BodyInserters.fromValue(payload))
                .retrieve()
                .toEntity(UDTResponseEntity.class)
                .block();
    } catch (WebClientResponseException exception) {
        if (exception.getStatusCode().is4xxClientError()) {
            throw new WebClientCBIgnoreException(exception.getMessage(), exception);
        } else {
            throw exception;
        }
    }

    UDTResponseDTO responseDTO = new UDTResponseDTO(response.getStatusCode(),
            response.getBody() == null ? null : response.getBody().getResult());

    log.info("Response from UDT WS: Status=[{}] Body=[{}]", responseDTO.getHttpStatus(), responseDTO.getMessage());
    return responseDTO;
}

private HttpHeaders initializeHeaders() {
    HttpHeaders headers = new HttpHeaders();
    headers.setContentType(MediaType.APPLICATION_JSON);
    headers.add(Constants.OPENAPI_CLIENT_ID, Constants.DSE_CLIENT_ID);
    headers.setAccept(Collections.singletonList(MediaType.APPLICATION_JSON));
    return headers;
}

}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant