Offset commit

May 15, 2023

커밋 전략

Producer에 의해 브로커에 발행된 레코드에는 오프셋과 타임스탬프 값이 존재하며, 브로커는 각 토픽, 파티션에 대해 어떤 컨슈머 그룹이 몇 번째 레코드까지 읽어갔는지를 기록한다. 이는 컨슈머의 커밋에 의해 기록되며, 컨슈머에서 커밋을 위한 전략이 몇 가지 존재한다.

Auto commit

@Slf4j
public class SimpleConsumer {
	private static final String TOPIC_NAME = "simple.1";
	private static final String BOOTSTRAP_SERVERS = "http://localhost:9092";
	private static final String GROUP_ID = "group.1";

	public static void main(String[] args) throws Exception {
		try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(getProperties())) {
			consumer.subscribe(List.of(TOPIC_NAME));

			while (true) {
				ConsumerRecords<String, String> kafkaRecords = consumer.poll(Duration.ofSeconds(1));
				for (var kafkaRecord : kafkaRecords) {
					log.info("{}", kafkaRecord);
				}
			}
		}
	}

	private static Properties getProperties() {
		Properties properties = new Properties();

		properties.put(
			ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
			BOOTSTRAP_SERVERS
		);

		properties.put(
			ConsumerConfig.GROUP_ID_CONFIG,
			GROUP_ID
		);

		properties.put(
			ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
			StringDeserializer.class.getName()
		);

		properties.put(
			ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
			StringDeserializer.class.getName()
		);

		return properties;
	}
}

카프카 컨슈머의 기본 전략이다. 오토 커밋은 기본적으로 활성화된 상태이며 5초 간격으로 커밋을 하게 된다.

auto-commit

별도의 설정이 필요 없고, 간편하게 사용할 수 있다. 한 가지 주의해야할 점은 커밋을 위해 별도의 스레드가 돌거나 하는 것은 아니고 poll하는 시점에 동작한다. 위 케이스가 1초마다 poll이 발생하고 오토 커밋 간격은 5초라고 했을 때 동작은 아래와 같을 것이다.

Time Action
1 poll
2 poll
3 poll
4 poll
5 poll, commit
6 poll
7 poll
8 poll
9 poll
10 poll, commit
11 poll
12 poll

하지만 poll 간격이 3초라면 오토 커밋 간격이 5초라도 아래와 같이 동작할 것이다.

Time Action
1 -
2 -
3 poll
4 -
5 -
6 poll, commit
7 -
8 -
9 poll
10 -
11 -
12 poll, commit

코드는 간결해질 수 있으나 상세한 핸들링을 하지 못해 프로덕션에서 사용되는 일은 거의 없다.

SyncCommit

오토 커밋을 비활성화하고 명시적인 커밋을 사용할 수도 있다. 아래와 같이 커밋하면 poll을 통해 가져온 가장 마지막 오프셋을 기준으로 커밋한다.

@Slf4j
public class SyncCommitConsumer {
	private static final String TOPIC_NAME = "simple.1";
	private static final String BOOTSTRAP_SERVERS = "http://localhost:9092";
	private static final String GROUP_ID = "group.1";

	public static void main(String[] args) {
		try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(getProperties())) {
			consumer.subscribe(List.of(TOPIC_NAME));

			while (true) {
				ConsumerRecords<String, String> kafkaRecords = consumer.poll(Duration.ofSeconds(1));
				for (var kafkaRecord : kafkaRecords) {
					log.info("{}", kafkaRecord);
				}

				consumer.commitSync();
			}
		}
	}

	private static Properties getProperties() {
		Properties properties = new Properties();

		properties.put(
			ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
			BOOTSTRAP_SERVERS
		);

		properties.put(
			ConsumerConfig.GROUP_ID_CONFIG,
			GROUP_ID
		);

		properties.put(
			ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
			StringDeserializer.class.getName()
		);

		properties.put(
			ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
			StringDeserializer.class.getName()
		);

		properties.put(
			ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
			false
		);

		return properties;
	}
}

레코드 별로 커밋할 수도 있다. 다만 이 경우에는 커밋에 대한 정보를 직접 채워주는 것이 필요하다.

try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(getProperties())) {
  consumer.subscribe(List.of(TOPIC_NAME));

  while (true) {
    ConsumerRecords<String, String> kafkaRecords = consumer.poll(Duration.ofSeconds(1));
    for (var kafkaRecord : kafkaRecords) {
      log.info("{}", kafkaRecord);

      consumer.commitSync(
        Map.of(
          new TopicPartition(kafkaRecord.topic(), kafkaRecord.partition()),
          new OffsetAndMetadata(kafkaRecord.offset() + 1, null)
        )
      );
    }
  }
}

AsyncCommit

동기식 커밋의 경우에는 커밋 응답을 기다리는 동안 블록되며, 이는 처리량 감소로 이어질 수 있다. 명시적 커밋이지만 응답을 기다리지 않고 비동기로 커밋 요청할 수 있으며 커밋에 대한 결과는 콜백을 통해 처리한다.

@Slf4j
public class SyncCommitConsumer {
	private static final String TOPIC_NAME = "simple.1";
	private static final String BOOTSTRAP_SERVERS = "http://localhost:9092";
	private static final String GROUP_ID = "group.1";

	public static void main(String[] args) {
		try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(getProperties())) {
			consumer.subscribe(List.of(TOPIC_NAME));

			while (true) {
				ConsumerRecords<String, String> kafkaRecords = consumer.poll(Duration.ofSeconds(1));
				for (var kafkaRecord : kafkaRecords) {
					log.info("{}", kafkaRecord);
				}

				consumer.commitAsync(
					(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) -> {
						if (e == null) {
							log.info("success!");
						} else {
							log.error("error!, {}", e.getMessage());
						}
					}
				);
			}
		}
	}

	private static Properties getProperties() {
		Properties properties = new Properties();

		properties.put(
			ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
			BOOTSTRAP_SERVERS
		);

		properties.put(
			ConsumerConfig.GROUP_ID_CONFIG,
			GROUP_ID
		);

		properties.put(
			ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
			StringDeserializer.class.getName()
		);

		properties.put(
			ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
			StringDeserializer.class.getName()
		);

		properties.put(
			ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
			false
		);

		return properties;
	}
}

Manual Committing offset in Spring Kafka

Spring Kafka 사용 시 디폴트로는 enable.auto.commit이 false로 Auto commit을 활성화 하지 않는다. 그리고 여러 개의 AckMode를 제공하며, 이를 통해 offset commit을 할 수 있다.

AckMode Description
RECORD Commit the offset when the listener returns after processing the record.
BATCH Commit the offset when all the records returned by the poll() have been processed.
TIME Commit the offset when all the records returned by the poll() have been processed, as long as the ackTime since the last commit has been exceeded.
COUNT Commit the offset when all the records returned by the poll() have been processed, as long as ackCount records have been received since the last commit.
COUNT_TIME Similar to TIME and COUNT, but the commit is performed if either condition is true.
MANUAL The message listener is responsible to acknowledge() the Acknowledgment. After that, the same semantics as BATCH are applied.
MANUAL_IMMEDIATE Commit the offset immediately when the Acknowledgment.acknowledge() method is called by the listener.

Rebalncing

각 파티션은 최대 하나의 컨슈머에 할당될 수 있다. 따라서 컨슈머가 스케일 다운되거나 노드 자체가 비정상이라고 판단되면 해당 컨슈머가 처리하던 파티션을 처리할 컨슈머를 새롭게 선정해야하며, 컨슈머가 추가되는 경우에는 이 친구에게 할당될 파티션을 선정하는 작업이 필요하다. 이러한 작업을 리밸런싱이라 한다. 리밸런싱이 일어나면 해당 컨슈머 그룹 관리하는 코디네이터가 기존 컨슈머의 모든 소유권을 박탈하고 재조정하는 작업을 진행한다.

리밸런싱 이벤트가 발생하면 모든 컨슈머는 리밸런싱이 끝나기 전 까지 poll()을 할 수 없다. 이는 처리량 감소로 이어지기에 너무 잦은 리밸런싱은 경계해야 한다.

또한, 리밸런싱은 메시지의 중복 처리를 야기할 수 있다. 만약 아래와 같은 시점에 리밸런싱이 발생했다고 해보자. 오프셋 7에 해당하는 데이터를 읽고 처리했으나 리밸런싱으로 인해 이 처리는 커밋되지 못한다. 결국 리밸런싱 이 후 마지막 커밋은 6이므로 레코드 7은 재처리 된다.

auto-commit-rebalancing

이를 해결하기 위해서는 리밸런싱에 대한 방어 코드를 작성할 수 있다. 리밸런싱 이벤트를 감지할 수 있는 리스너인 ConsumerRebalanceListener 제공하며, 리밸런싱 발생 시 현재까지 처리한 내용을 커밋하면 된다.

@Slf4j
public class RebalanceListenConsumer {
	private static final String TOPIC_NAME = "simple.2";
	private static final String BOOTSTRAP_SERVERS = "http://localhost:9092";
	private static final String GROUP_ID = "group.1";
	private static final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();

	public static void main(String[] args) {
		try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(getProperties())) {
			consumer.subscribe(List.of(TOPIC_NAME), new ConsumerRebalanceListener() {
				@Override
				public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
					log.warn("Partitions are assigned.");
				}

				@Override
				public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
					log.warn("Partitions are revoked");
					consumer.commitSync(offsets);
				}
			});

			while (true) {
				ConsumerRecords<String, String> kafkaRecords = consumer.poll(Duration.ofSeconds(1));
				for (var kafkaRecord : kafkaRecords) {
					log.info("{}", kafkaRecord);

					offsets.put(
						new TopicPartition(kafkaRecord.topic(), kafkaRecord.partition()),
						new OffsetAndMetadata(kafkaRecord.offset() + 1, null)
					);
				}

				consumer.commitSync();
			}
		}
	}

	private static Properties getProperties() {
		Properties properties = new Properties();

		properties.put(
			ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
			BOOTSTRAP_SERVERS
		);

		properties.put(
			ConsumerConfig.GROUP_ID_CONFIG,
			GROUP_ID
		);

		properties.put(
			ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
			StringDeserializer.class.getName()
		);

		properties.put(
			ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
			StringDeserializer.class.getName()
		);

		properties.put(
			ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
			false
		);

		properties.put(
			ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
			10
		);

		return properties;
	}
}

Graceful Shutdown

애플리케이션의 종료는 어떻게 처리해야할까? 컨슈머가 비정상 종료되는 경우에는 세션 타임아웃이 발생하기 전까지 컨슈머 그룹에 남아 있게되므로 종료 처리를 잘 해주는 것도 필요하다. 이 경우 wakeup() 메서드가 제공되는데 해당 메서드가 호출된 컨슈머는 poll() 시도 시 WakeupException이 발생하게 된다. wakeup() 메서드는 ShutdownHook에서 호출하게 설정하고, Exception 처리 부분에 리소스 해제를 하면 된다.

ShutdownHook 설정을 하더라도 SIGTERM이나 SIGINT에서 먹지 SIGKILL은 핸들링하지 못한다.

@Slf4j
public class GracefulShutdownConsumer {
	private static final String TOPIC_NAME = "simple.2";
	private static final String BOOTSTRAP_SERVERS = "http://localhost:9092";
	private static final String GROUP_ID = "group.1";
	private static final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
	private static final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(getProperties());

	public static void main(String[] args) {
		Runtime.getRuntime().addShutdownHook(new Thread(consumer::wakeup));

		try {
			consumer.subscribe(List.of(TOPIC_NAME));

			while (true) {
				ConsumerRecords<String, String> kafkaRecords = consumer.poll(Duration.ofSeconds(1));
				for (var kafkaRecord : kafkaRecords) {
					log.info("{}", kafkaRecord);

					offsets.put(
						new TopicPartition(kafkaRecord.topic(), kafkaRecord.partition()),
						new OffsetAndMetadata(kafkaRecord.offset() + 1, null)
					);
				}

				consumer.commitSync();
			}
		} catch (WakeupException e) {
			log.warn("wakeup exception.");
		} finally {
			consumer.close();
		}
	}

	private static Properties getProperties() {
		Properties properties = new Properties();

		properties.put(
			ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
			BOOTSTRAP_SERVERS
		);

		properties.put(
			ConsumerConfig.GROUP_ID_CONFIG,
			GROUP_ID
		);

		properties.put(
			ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
			StringDeserializer.class.getName()
		);

		properties.put(
			ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
			StringDeserializer.class.getName()
		);

		properties.put(
			ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
			false
		);

		return properties;
	}
}

참고


songmk 🙁