Following https://stackoverflow.com/a/36009859/9911256, it would appear that Kafka commits/autocommits could fail if the consumer suddenly dies. In fact, my Kafka application works fine in production, but during tests, SOMETIMES I get this recurrent issue (until rebooting kafka): the offset is the same.
My unit test (one java producer sending 10 packets to one java consumer, in one broker, one topic, one partition, one group) launches 10 packets, and I check them, starting from the first:
SENT: (0) NAME:person-001; UUID:352c1f8e-c141-4446-8ac7-18eb044a6b92
SENT: (1) NAME:person-001; UUID:81681a30-83e1-4f85-b07f-da140cfdb874
SENT: (2) NAME:person-001; UUID:3b9db497-460a-4a1c-86b9-f724af1a0449
SENT: (3) NAME:person-001; UUID:63c0edf9-ec00-4ef7-b81a-4b1b8919a42d
SENT: (4) NAME:person-001; UUID:346f265c-1964-4460-97de-1a7b43285c06
SENT: (5) NAME:person-001; UUID:2d1bb49c-03ce-4762-abb3-2bbb963e87d1
SENT: (6) NAME:person-001; UUID:3c8ddda0-6cb8-45b4-b1d2-3a99ba57a48a
SENT: (7) NAME:person-001; UUID:3f819408-41d5-4cad-ad39-322616a86b99
SENT: (8) NAME:person-001; UUID:1db09bc1-4c90-4a0d-8efc-d6ea8a791985
SENT: (9) NAME:person-001; UUID:705a3a3c-fd15-45a9-a96c-556350f1f79a
Exception in thread "Thread-2" org.opentest4j.AssertionFailedError: expected: <352c1f8e-c141-4446-8ac7-18eb044a6b92> but was: <6785fa5d-ef63-4fe6-85c5-c525bfc4ee12>
And if I run the test again:
SENT: (0) NAME:person-001; UUID:d171e7ee-fa73-4cb4-826e-f7bffdef9e92
SENT: (1) NAME:person-001; UUID:25da6b6e-57e9-4f8a-a3ff-1099f94fcaf5
SENT: (2) NAME:person-001; UUID:d05b4693-ba60-4db2-a5ae-30dcd44ce5b7
SENT: (3) NAME:person-001; UUID:fbd75ee7-6f34-4ab1-abda-d31ee91d0ff8
SENT: (4) NAME:person-001; UUID:798fe246-f10e-4fc3-90c9-df3e181bb641
SENT: (5) NAME:person-001; UUID:26b33a19-7e65-49ec-b54d-3379ef76b797
SENT: (6) NAME:person-001; UUID:45ecef46-69f5-4bff-99b5-c7c2dce67ec8
SENT: (7) NAME:person-001; UUID:464df926-cd66-4cfa-b282-36047522dfe8
SENT: (8) NAME:person-001; UUID:982c82c0-c669-400c-a70f-62c57e3552a4
SENT: (9) NAME:person-001; UUID:ecdbfce6-d378-496d-9e0b-30f16b7cf484
Exception in thread "Thread-2" org.opentest4j.AssertionFailedError: expected: <d171e7ee-fa73-4cb4-826e-f7bffdef9e92> but was: <6785fa5d-ef63-4fe6-85c5-c525bfc4ee12>
- Notice that the message <6785fa5d-ef63-4fe6-85c5-c525bfc4ee12> is repeatedly sent to each attempt, despite they are different lanches, that I do manually.
-
I use:
properties.put("auto.offset.reset", "latest");
-
I've already tried the
autocommit
option, with equivalent results. - The worst, worst, is that this happens (or not) everytime I restart the kafka server:
- sometimes, I restart it, and the tests could go fine, even if I repeat any number of times.
- sometimes, it seems to enter in this failure state, and will ALWAYS fail.
- As said, when consumers don't die, and massive flows of mesages are managed, this issue does not appear.
- I've also noticed that if the server has been recently rebooted and all logs and data directories of kafka have been deleted, the first tests could delay and fail.
My logs show this:
2019-01-25T12:20:02.874119+01:00 TLS dockcompose: #033[32mkafka_1 |#033[0m [2019-01-25 11:20:02,850] INFO [GroupCoordinator 1001]: Preparing to rebalance group 0 in state PreparingRebalance with old generation 26 (__consumer_offsets-48) (reason: Adding new member consumer-1-a0b94a2a-0cae-4ba8-85f0-9a84030f4beb) (kafka.coordinator.group.GroupCoordinator)
2019-01-25T12:20:02.874566+01:00 TLS dockcompose: #033[32mkafka_1 |#033[0m [2019-01-25 11:20:02,851] INFO [GroupCoordinator 1001]: Stabilized group 0 generation 27 (__consumer_offsets-48) (kafka.coordinator.group.GroupCoordinator)
2019-01-25T12:20:02.874810+01:00 TLS dockcompose: #033[32mkafka_1 |#033[0m [2019-01-25 11:20:02,858] INFO [GroupCoordinator 1001]: Assignment received from leader for group 0 for generation 27 (kafka.coordinator.group.GroupCoordinator)
13 seconds after the test:
2019-01-25T12:20:15.894185+01:00 TLS dockcompose: #033[32mkafka_1 |#033[0m [2019-01-25 11:20:15,871] INFO [GroupCoordinator 1001]: Member consumer-2-79f97c80-294c-438b-8a8a-3745f4a57010 in group 0 has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
2019-01-25T12:20:15.894522+01:00 TLS dockcompose: #033[32mkafka_1 |#033[0m [2019-01-25 11:20:15,871] INFO [GroupCoordinator 1001]: Preparing to rebalance group 0 in state PreparingRebalance with old generation 27 (__consumer_offsets-48) (reason: removing member consumer-2-79f97c80-294c-438b-8a8a-3745f4a57010 on heartbeat expiration) (kafka.coordinator.group.GroupCoordinator)
2019-01-25T12:20:17.897272+01:00 TLS dockcompose: #033[32mkafka_1 |#033[0m [2019-01-25 11:20:17,865] INFO [GroupCoordinator 1001]: Member consumer-1-a0b94a2a-0cae-4ba8-85f0-9a84030f4beb in group 0 has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
2019-01-25T12:20:17.897579+01:00 TLS dockcompose: #033[32mkafka_1 |#033[0m [2019-01-25 11:20:17,866] INFO [GroupCoordinator 1001]: Group 0 with generation 28 is now empty (__consumer_offsets-48) (kafka.coordinator.group.GroupCoordinator)
What I can conclude, reading the previous topic is that kafka would work fine if the consumer is permanently connected (production). But this is not possible during tests! What's the problem here???
Aucun commentaire:
Enregistrer un commentaire