How can I stop listen Transactions

When I get the desired data, I want to stop listening. How should I implement it correctly?
I try use ledgerClient.close(),but have some error

flow.doOnError(e-> {log.info("do nothing");})
                .forEach(events -> {
                    events.getEvents().forEach(
                            event -> {
                                if (event instanceof CreatedEvent) {
                                    CreatedEvent createdEvent = (CreatedEvent) event;
                                    ProductDividend productDividend = ProductDividend.valueDecoder().decode(createdEvent.getArguments());
                                    ProductDividend.Contract productDividendContract = new ProductDividend.Contract(new ProductDividend.ContractId(createdEvent.getContractId()), productDividend, createdEvent.getAgreementText(), Optional.empty(), createdEvent.getSignatories(), createdEvent.getObservers());
                                    if (ServiceNameEnum.FF.getPoolType().equals(productDividend.productId) || ServiceNameEnum.QTT.getPoolType().equals(productDividend.productId)) {
                                        if (wageringPool.productOfferId.equals(productDividend.productOfferId)) {
                                            //query latest wageringPool by productOfferId
                                            WageringPool.Contract lastWageringPool = wageringPoolService.getWageringPoolByProOfferId(productDividend.productOfferId,partyHk);
                                            if(lastWageringPool.id.contractId .equals(poolContractId)){
                                                poolPayoutStartedProcess(productDividend.productOfferId,productDividendContract,payoutSummary,ledgerClient,partyHk,wageringPool,wageringPoolOffset,poolContractId);
                                            }
                                            try {
                                                ledgerClient2.close();
                                            } catch (Exception e) {
                                                throw new RuntimeException(e);
                                            }
                                        }
                                    }

                                }
                            }
                    );
                });

Instead of forEach use takeWhile and return a boolean to determine whether to continue or not.

thank you I’ll try