I need help, how i can test my project with junit? i use Kafka and springBoot I have my consumer config with this:
@EnableKafka
@Configuration
@Component
public class ConsumerCAPMDRConfir {
@Autowired
PropertyConfig propertyConfig;
private final static String SASL_PROTOCOL = "SASL_SSL";
private final static String SCRAM_SHA_256 = "SCRAM-SHA-256";
private final String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";";
private final String consJaasCfg = String.format(jaasTemplate, "user", "pass");
@Bean
public ConsumerFactory<String, Topic_CAP_MDR> DtoConsumerCapMDR() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, propertyConfig.getBootstrapServer());
props.put(ConsumerConfig.GROUP_ID_CONFIG, propertyConfig.getGroupId());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroDeserializer.class.getName());
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
if(propertyConfig.getFlag())
{
props.put("sasl.mechanism", SCRAM_SHA_256);
props.put("sasl.jaas.config", consJaasCfg);
props.put("security.protocol", SASL_PROTOCOL);
props.put("ssl.truststore.location", propertyConfig.getTruststore_location());
props.put("ssl.truststore.password", propertyConfig.getPasswordTrustore());
props.put("ssl.endpoint.identification.algorithm", "");
props.put("schema.registry.url", propertyConfig.getSchema_registry());
}
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Topic_CAP_MDR> TopicCAPMDRListener() {
ConcurrentKafkaListenerContainerFactory<String, Topic_CAP_MDR> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(DtoConsumerCapMDR());
return factory;
}
}
my class service with the listener with this:
@Service
public class CAPMDRServices {
private static final Logger LOGGER = LogManager.getLogger(CAPMDRServices.class);
@Autowired
DataBaseConfig databaseconfig;
@Autowired
private JdbcTemplate jdbcTemplate;
@KafkaListener(topics = "topic", containerFactory = "TopicCAPMDRListener")
public void publish(Topic_CAP_MDR CAPMDR) {
//INSERT HERE DB
}
}
In this point, i don't know how i can realize this test: connection with topic... i need declared a producer for consumer test?
Aucun commentaire:
Enregistrer un commentaire