Kafka Operations Guide

1. Topics Management

Here is a handful set of commands to manage your Kafka topics manually.

Create a new Kafka topic named my-topic as follows:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic my-topic

Verify that the my-topic topic was successfully created by listing all available topics:

bin/kafka-topics.sh --list --zookeeper localhost:2181

Add more partitions as follows:

bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic --partitions 16

Temporarily update the retention time on the topic to one second:

bin/kafka-topics.sh --zookeeper localhost:13003 --alter --topic MyTopic --config retention.ms=1000

then wait for the purge to take effect (about one minute). Once purged, restore the previous retention.ms value.

You can also try to delete the topic. Verify first that the following line is present in server.properties file under config folder:

delete.topic.enable=true

You can delete a topic named my-topic as follows:

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic my-topic

You can find more details about a topic named cc_payments as follows:

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic cc_payments

You can see the under-replicated partitions for all topics as follows:

bin/kafka-topics.sh --zookeeper localhost:2181/kafka-cluster --describe --under-replicated-partitions

2. Producers

You can produce messages from standard input as follows:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-topic

You can produce new messages from an existing file named messages.txt as follows:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test < messages.txt

You can produce Avro messages as follows:

kafka-avro-console-producer --broker-list localhost:9092 --topic my.Topic --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}' --property schema.registry.url=http://localhost:8081

You can enter a few new values from the console as follows:

{"f1": "value1"}

3. Consumers

3.1. Consume messages

You can begin a consumer from the beginning of the log as follows:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning

You can consume a single message as follows:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic  --max-messages 1

You can consume a single message from __consumer_offsets as follows:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic __consumer_offsets --formatter 'kafka.coordinator.GroupMetadataManager$OffsetsMessageFormatter' --max-messages 1

You can consume and specify a consumer group as follows:

bin/kafka-console-consumer.sh --topic my-topic --new-consumer --bootstrap-server localhost:9092 --consumer-property group.id=my-group

3.2. Consume Avro messages

You can consume 10 Avro messages from a topic named position-reports as follows:
kafka-avro-console-consumer --topic position-reports --new-consumer --bootstrap-server localhost:9092 --from-beginning --property schema.registry.url=localhost:8081 --max-messages 10

You can consume all existing Avro messages from a topic named position-reports as follows:

kafka-avro-console-consumer --topic position-reports --new-consumer --bootstrap-server localhost:9092 --from-beginning --property schema.registry.url=localhost:8081

4. Consumers admin operations

You can list all groups as follows:

kafka-consumer-groups --new-consumer --list --bootstrap-server localhost:9092

You can describe a Group named testgroup as follows:

kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group testgroup