Kafka之消费与激情
| throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions"); } // poll for new data until the timeout expires long elapsedTime = 0L; do { client.maybeTriggerWakeup(); final long metadataEnd; if (includeMetadataInTimeout) { final long metadataStart = time.milliseconds(); if (!updateAssignmentMetadataIfNeeded(remainingTimeAtLeastZero(timeoutMs, elapsedTime))) { return ConsumerRecords.empty(); } metadataEnd = time.milliseconds(); elapsedTime += metadataEnd - metadataStart; } else { while (!updateAssignmentMetadataIfNeeded(Long.MAX_VALUE)) { log.warn("Still waiting for metadata"); } metadataEnd = time.milliseconds(); } final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(remainingTimeAtLeastZero(timeoutMs, elapsedTime)); if (!records.isEmpty()) { // before returning the fetched records, we can send off the next round of fetches (编辑:扬州站长网) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! | 



