vendredi 6 novembre 2020

can't get testcontainers Kafka with sasl.jaas.config to test ACLs to work

I'm trying to leverage testcontainers to test Kafka locally in some automated unit tests. I'm having trouble testing authorization.

My goal is to test

(1) if there are no ACLs in this test container that no KafkaProducer should be allowed to write to it (currently, even with no ACLs created as long as a producer is configured correctly, it can send to the topic - I thought setting the kafka env variable of allow.everyone.if.no.acl.found to false would do the trick - but doesn't seem to be the case)

(2) to test if the KafkaProducer is not using the correct sasl.jaas.config (i.e incorrect apiKey and pasword) that it gets denied access to write to the test topic, even if an ACL is setup for all principals.

Below is my code. I can get it to "work" but testing the above two scenarios I haven't been able to figure out. I think I might not be actually creating ACLs, as when I add a line after I create the ACLs (adminClient.describeAcls(AclBindingFilter.ANY).values().get(); I get a No Authorizer is configured on the broker error) -> looking at posts similar to this I thought this implies that no ACL binding had actually been created.

        String topicName = "this-is-a-topic";
        String confluentVersion = "5.5.1";
        network = Network.newNetwork();
        String jaasTemplate = "org.apache.kafka.common.security.plain.PlainLoginModule required %s=\"%s\" %s=\"%s\";";
        String jaasConfig = String.format(jaasTemplate, "username", "apiKey", "password", "apiPassword");
        kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:" + confluentVersion))
                .withNetwork(network)
                .withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "false")
                .withEnv("KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND", "false")
                .withEnv("KAFKA_SUPER_USERS", "User:OnlySuperUser")
                .withEnv("KAFKA_SASL_MECHANISM", "PLAIN")
                .withEnv("KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM", "http")
                .withEnv("KAFKA_SASL_JAAS_CONFIG", jaasConfig);

        kafka.start();
        schemaRegistryContainer = new SchemaRegistryContainer(confluentVersion).withKafka(kafka);
        schemaRegistryContainer.start();

        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
        properties.put("input.topic.name", topicName);
        properties.put("input.topic.partitions", "1");
        adminClient = KafkaAdminClient.create(properties);
        AclBinding ACL = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "differentTopic", PatternType.LITERAL),
                new AccessControlEntry( "User:*", "*", AclOperation.WRITE, AclPermissionType.ALLOW));
        var acls = adminClient.createAcls(List.of(ACL)).values();


        List<NewTopic> topics = new ArrayList<>();
        topics.add(
                new NewTopic(topicName,
                        Integer.parseInt(properties.getProperty("input.topic.partitions")),
                        Short.parseShort(properties.getProperty("input.topic.replication.factor")))
        );
        adminClient.createTopics(topics);

        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        props.put("input.topic.name", topicName);
        props.put("security.protocol", "PLAINTEXT");
        props.put("input.topic.partitions", "1");
        props.put("input.topic.replication.factor", "1");
        props.put("metadata.fetch.timeout.ms", "10000");
        props.put("sasl.jaas.config", jaasConfig);

        producer = new KafkaProducer<>(props);

        String key = "testContainers";
        String value = "AreAwesome";
        ProducerRecord<String, String> record = new ProducerRecord<>(
                        (String) props.get("input.topic.name"), key, value);
        try {
             RecordMetadata o = (RecordMetadata) producer.send(record).get();
             System.out.println(o.toString());
        } catch (Exception e) {
             e.printStackTrace();
        }

Aucun commentaire:

Enregistrer un commentaire