vendredi 3 juillet 2020

LockRegistryLeaderInitiator stops working in tests when TestPropertySource changes

I am using 3 x LockRegistryLeaderInitiator's to manage 3 x Candidate's - BILLING, MODEL, EXPORT. I am using JdbcLockRegistry to point to my embedded H2 database.

I have 3 x JUnit tests to test my BILLING, MODEL, and EXPORT candidates, which are executed sequentially.
All use @TestPropertySource. But the 2nd test adds an extra property.

After the 1st test class is run, when I stop and try to restart the initiator, it fails to start:

initiatorBilling.start();
while( !billing.isLeader() ) {
    Thread.sleep(500);
}

I tried replacing this code with:

initiatorBilling.start();
while( !initiatorBilling.getContext().isLeader() ) {
    Thread.sleep(500);
}

but this also doesn't work.

My LockRegistryLeaderInitiator's seem to stop working - all my candidates are NOT leaders, and even after waiting 5 minutes, they never become leaders. Usually a message is printed to the console as each Candidate is granted/revoked leadership.

My DataSource is an H2 embedded database. I don't delete any tables or data inbetween JUnit classes.

If I specify @DirtiesContext in these classes, it does seem to work. But I have other JUnits which utilise those Candidates, which also fail, and it feels wrong to put @DirtiesContext in every single JUnit I have.

I get the impression that my Initiators are not being recreated or even destroyed inbetween tests. I would love to know if I've architected this incorrectly, or it's something silly.

Using:

  • SpringBoot 1.5.22.RELEASE
  • spring-integration-jdbc 4.3.21.RELEASE
@Configuration
public class LeaderElectionConfiguration {
    
    @Bean
    public LockRegistry lockRegistry(LockRepository lockRepository) {
        return new JdbcLockRegistry(lockRepository);
    }

    @Bean
    public DefaultLockRepository lockRepository(DataSource dataSource) {
        return new FixedDefaultLockRepository(dataSource, UUID.randomUUID().toString());
    }

    @Bean(name = "billing")
    public LockRegistryLeaderInitiator leaderInitiatorBilling(LockRegistry lockRegistry, BillingLeaderCandidate billingCandidate) {
        LockRegistryLeaderInitiator lockRegistryLeaderInitiator = new LockRegistryLeaderInitiator(lockRegistry, billingCandidate);
        lockRegistryLeaderInitiator.start();
        return lockRegistryLeaderInitiator;
    }

    @Bean(name="export")
    public LockRegistryLeaderInitiator leaderInitiatorExport(LockRegistry lockRegistry, ExportLeaderCandidate exportCandidate) {
        LockRegistryLeaderInitiator lockRegistryLeaderInitiator = new LockRegistryLeaderInitiator(lockRegistry, exportCandidate);
        lockRegistryLeaderInitiator.start();
        return lockRegistryLeaderInitiator;
    }
    
    @Bean(name="model")
    public LockRegistryLeaderInitiator leaderInitiatorModel(LockRegistry lockRegistry, ModelSyncLeaderCandidate modelCandidate ) {
        LockRegistryLeaderInitiator lockRegistryLeaderInitiator = new LockRegistryLeaderInitiator(lockRegistry, modelCandidate);
        lockRegistryLeaderInitiator.start();
        return lockRegistryLeaderInitiator;
    }
    
    @Bean
    public ModelSyncLeaderCandidate createModelCandidate() {
        return new ModelSyncLeaderCandidate();
    }
    @Bean
    public BillingLeaderCandidate createBillingCandidate() {
        return new BillingLeaderCandidate();
    }
    @Bean
    public ExportLeaderCandidate createExportCandidate() {
        return new ExportLeaderCandidate();
    }
}

Made this based on another user's issue, where apparently there was a bug when things were being unlocked. I suspect this isn't really needed anymore.

public class FixedDefaultLockRepository extends DefaultLockRepository {

    public FixedDefaultLockRepository(DataSource dataSource, String id) {
        super(dataSource);
    }

    /**
     * Used to help fix a bug in the Spring implementation of DefaultLockRepository.
     */
    @Override
    public void delete(String lock) {
        try {
            super.delete(lock);
        } catch (Exception ex) {
            // Ignore exception on purpose.
        }
    }
public class ModelSyncLeaderCandidate extends GenericLeaderCandidate {

    public ModelSyncLeaderCandidate() {
        super(UUID.randomUUID().toString(), "MODEL_SYNC");
    }
}
public class BillingLeaderCandidate extends GenericLeaderCandidate {

    public BillingLeaderCandidate() {
        super(UUID.randomUUID().toString(), "BILLING");
    }
}
public class ExportLeaderCandidate extends GenericLeaderCandidate {

    public ExportLeaderCandidate() {
        super(UUID.randomUUID().toString(), "EXPORT_PREDICTIONS");
    }   
}

JUnits:

@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT)
@TestPropertySource(properties = { "db.embedded=true" })
public class LeaderCandidatesTest {
    @Autowired
    private BillingLeaderCandidate billing;
    
    @Autowired
    private ModelSyncLeaderCandidate model;
    
    @Autowired
    private ExportLeaderCandidate export;
    
    @Autowired
    @Qualifier("billing")
    private LockRegistryLeaderInitiator initiatorBilling;
    
    @Autowired
    @Qualifier("model")
    private LockRegistryLeaderInitiator initiatorModel;
    
    @Autowired
    @Qualifier("export")
    private LockRegistryLeaderInitiator initiatorExport;

    @Test
    public void candidatesAreLeadersByDefault() {
        assertTrue(billing.isLeader());
        assertTrue(model.isLeader());
        assertTrue(export.isLeader());
    }

    @Test
    public void billingLeaderChanged() throws InterruptedException {
        try {
            // Simulate losing Leadership
            initiatorBilling.stop();

            Thread.sleep(500);

            assertFalse(billing.isLeader());
        } finally {
            initiatorBilling.start();
            while( !billing.isLeader() ) {
                Thread.sleep(500);
            }
        }
    }

    @Test
    public void modelLeaderChanged() throws InterruptedException {
        try {
            // Simulate losing Leadership
            initiatorModel.stop();

            Thread.sleep(500);

            assertFalse(model.isLeader());
        } finally {
            initiatorModel.start();
            while( !initiatorModel.getContext().isLeader() ) {
                Thread.sleep(500);
                Thread.yield();
            }
        }
    }

    @Test
    public void exportLeaderChanged() throws InterruptedException {
        try {
            // Simulate losing Leadership
            initiatorExport.stop();

            Thread.sleep(500);

            assertFalse(export.isLeader());
        } finally {
            initiatorExport.start();
            while( !export.isLeader() ) {
                Thread.sleep(500);
            }
        }
    }
}
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT)
@TestPropertySource(properties = { "db.embedded=true","newProperty=doesntMatter" })
public class LeaderCandidatesTest2 {
    @Autowired
    private BillingLeaderCandidate billing;
    
    @Autowired
    private ModelSyncLeaderCandidate model;
    
    @Autowired
    private ExportLeaderCandidate export;
    
    @Autowired
    @Qualifier("billing")
    private LockRegistryLeaderInitiator initiatorBilling;
    
    @Autowired
    @Qualifier("model")
    private LockRegistryLeaderInitiator initiatorModel;
    
    @Autowired
    @Qualifier("export")
    private LockRegistryLeaderInitiator initiatorExport;

    @Test
    public void candidatesAreLeadersByDefault() {
        assertTrue(billing.isLeader());
        assertTrue(model.isLeader());
        assertTrue(export.isLeader());
    }

    @Test
    public void billingLeaderChanged() throws InterruptedException {
        try {
            // Simulate losing Leadership
            initiatorBilling.stop();

            Thread.sleep(500);

            assertFalse(billing.isLeader());
        } finally {
            initiatorBilling.start();
            while( !billing.isLeader() ) {
                Thread.sleep(500);
            }
        }
    }

    @Test
    public void modelLeaderChanged() throws InterruptedException {
        try {
            // Simulate losing Leadership
            initiatorModel.stop();

            Thread.sleep(500);

            assertFalse(model.isLeader());
        } finally {
            initiatorModel.start();
            while( !model.isLeader() ) {
                Thread.sleep(500);
            }
        }
    }

    @Test
    public void exportLeaderChanged() throws InterruptedException {
        try {
            // Simulate losing Leadership
            initiatorExport.stop();

            Thread.sleep(500);

            assertFalse(export.isLeader());
        } finally {
            initiatorExport.start();
            while( !export.isLeader() ) {
                Thread.sleep(500);
            }
        }
    }
}

Aucun commentaire:

Enregistrer un commentaire