lundi 22 mars 2021

Testing Consumer Kafka SpringBoot

I need help, how i can test my project with junit? i use Kafka and springBoot I have my consumer config with this:

@EnableKafka
@Configuration
@Component
public class ConsumerCAPMDRConfir {

    @Autowired
    PropertyConfig propertyConfig;

    private final static String SASL_PROTOCOL = "SASL_SSL";
    private final static String SCRAM_SHA_256 = "SCRAM-SHA-256";
    private final String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";";
    private final String consJaasCfg = String.format(jaasTemplate, "user", "pass");


    @Bean
    public ConsumerFactory<String, Topic_CAP_MDR> DtoConsumerCapMDR() {

        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, propertyConfig.getBootstrapServer());
        props.put(ConsumerConfig.GROUP_ID_CONFIG, propertyConfig.getGroupId());
        
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroDeserializer.class.getName());
        props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
        
        if(propertyConfig.getFlag())
        {
        props.put("sasl.mechanism", SCRAM_SHA_256); 
        props.put("sasl.jaas.config", consJaasCfg); 
        props.put("security.protocol", SASL_PROTOCOL); 
        props.put("ssl.truststore.location", propertyConfig.getTruststore_location()); 
        props.put("ssl.truststore.password", propertyConfig.getPasswordTrustore()); 
        props.put("ssl.endpoint.identification.algorithm", ""); 
        props.put("schema.registry.url", propertyConfig.getSchema_registry());
        
        }
        
        
        return new DefaultKafkaConsumerFactory<>(props);
        
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Topic_CAP_MDR> TopicCAPMDRListener() {
        ConcurrentKafkaListenerContainerFactory<String, Topic_CAP_MDR> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(DtoConsumerCapMDR());
        return factory;
    }
}

my class service with the listener with this:

@Service
public class CAPMDRServices {
    
    private static final Logger LOGGER = LogManager.getLogger(CAPMDRServices.class);
    
    @Autowired
    DataBaseConfig databaseconfig;
    
    @Autowired
    private JdbcTemplate jdbcTemplate;
    
    
    @KafkaListener(topics = "topic", containerFactory = "TopicCAPMDRListener")
    public void publish(Topic_CAP_MDR CAPMDR) {
        
        
        //INSERT HERE DB
    

     }
    }

In this point, i don't know how i can realize this test: connection with topic... i need declared a producer for consumer test?

Aucun commentaire:

Enregistrer un commentaire