When I get the desired data, I want to stop listening. How should I implement it correctly?
I try use ledgerClient.close(),but have some error
flow.doOnError(e-> {log.info("do nothing");})
.forEach(events -> {
events.getEvents().forEach(
event -> {
if (event instanceof CreatedEvent) {
CreatedEvent createdEvent = (CreatedEvent) event;
ProductDividend productDividend = ProductDividend.valueDecoder().decode(createdEvent.getArguments());
ProductDividend.Contract productDividendContract = new ProductDividend.Contract(new ProductDividend.ContractId(createdEvent.getContractId()), productDividend, createdEvent.getAgreementText(), Optional.empty(), createdEvent.getSignatories(), createdEvent.getObservers());
if (ServiceNameEnum.FF.getPoolType().equals(productDividend.productId) || ServiceNameEnum.QTT.getPoolType().equals(productDividend.productId)) {
if (wageringPool.productOfferId.equals(productDividend.productOfferId)) {
//query latest wageringPool by productOfferId
WageringPool.Contract lastWageringPool = wageringPoolService.getWageringPoolByProOfferId(productDividend.productOfferId,partyHk);
if(lastWageringPool.id.contractId .equals(poolContractId)){
poolPayoutStartedProcess(productDividend.productOfferId,productDividendContract,payoutSummary,ledgerClient,partyHk,wageringPool,wageringPoolOffset,poolContractId);
}
try {
ledgerClient2.close();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
}
}
);
});