Kafka Operations Guide
1. Topics Management
Here is a handful set of commands to manage your Kafka topics manually.
Create a new Kafka topic named my-topic as follows:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic my-topic
Verify that the my-topic topic was successfully created by listing all available topics:
bin/kafka-topics.sh --list --zookeeper localhost:2181
Add more partitions as follows:
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic --partitions 16
Temporarily update the retention time on the topic to one second:
bin/kafka-topics.sh --zookeeper localhost:13003 --alter --topic MyTopic --config retention.ms=1000
then wait for the purge to take effect (about one minute). Once purged, restore the previous retention.ms value.
You can also try to delete the topic. Verify first that the following line is present in server.properties
file under config folder:
delete.topic.enable=true
You can delete a topic named my-topic as follows:
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic my-topic
You can find more details about a topic named cc_payments as follows:
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic cc_payments
You can see the under-replicated partitions for all topics as follows:
bin/kafka-topics.sh --zookeeper localhost:2181/kafka-cluster --describe --under-replicated-partitions
2. Producers
You can produce messages from standard input as follows:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-topic
You can produce new messages from an existing file named messages.txt as follows:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test < messages.txt
You can produce Avro messages as follows:
kafka-avro-console-producer --broker-list localhost:9092 --topic my.Topic --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}' --property schema.registry.url=http://localhost:8081
You can enter a few new values from the console as follows:
{"f1": "value1"}
3. Consumers
3.1. Consume messages
You can begin a consumer from the beginning of the log as follows:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning
You can consume a single message as follows:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --max-messages 1
You can consume a single message from __consumer_offsets as follows:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic __consumer_offsets --formatter 'kafka.coordinator.GroupMetadataManager$OffsetsMessageFormatter' --max-messages 1
You can consume and specify a consumer group as follows:
bin/kafka-console-consumer.sh --topic my-topic --new-consumer --bootstrap-server localhost:9092 --consumer-property group.id=my-group
3.2. Consume Avro messages
You can consume 10 Avro messages from a topic named position-reports as follows:
kafka-avro-console-consumer --topic position-reports --new-consumer --bootstrap-server localhost:9092 --from-beginning --property schema.registry.url=localhost:8081 --max-messages 10
You can consume all existing Avro messages from a topic named position-reports as follows:
kafka-avro-console-consumer --topic position-reports --new-consumer --bootstrap-server localhost:9092 --from-beginning --property schema.registry.url=localhost:8081
4. Consumers admin operations
You can list all groups as follows:
kafka-consumer-groups --new-consumer --list --bootstrap-server localhost:9092
You can describe a Group named testgroup as follows:
kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group testgroup