Kafka Useful Cmds

Start Kafka

confluent start

OR

zookeeper-server-start ~/Downloads/confluent-4.0.0/etc/kafka/zookeeper.properties&
kafka-server-start ~/Downloads/confluent-4.0.0/etc/kafka/server.properties&

If using Avro:

schema-registry-start ~/Downloads/confluent-4.0.0/etc/schema-registry/schema-registry.properties&

Topics

kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
kafka-topics --delete --zookeeper localhost:2181 --topic test
kafka-topics --list --zookeeper localhost:2181

to describe info of a topic (leader, #partitions, #replications etc)

kafka-topics --describe --zookeeper localhost:2181 --topic test

Console

kafka-console-producer --broker-list localhost:9092 --topic test
kafka-console-consumer --bootstrap-server localhost:9092 --topic test

If serialization needed for consumer use:

kafka-console-consumer --bootstrap-server localhost:9092 \
--topic benchmark-test-out \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

kafka-avro-console-producer \
        --broker-list localhost:9092 --topic avro-test \
        --property value.schema='{"namespace": "com.WireFilter.avro",
"type": "record",
"name": "LogRecord",
"fields": [
    {"name": "timestamp", "type": "double"},
    {"name": "ipSource", "type": "string"},
    {"name": "tcpStatus", "type": "string"},
    {"name": "requestType", "type": "string"},
    {"name": "destinationUrl", "type": "string"},
    {"name": "destinationIp", "type": "string"}
]
}'   < /Users/malsayed/workspace/access-log-reporter/access-log.avro

Important settings

Change log retention time, to remove files in the server

vi ~/Downloads/confluent-4.0.0/etc/kafka/server.properties

add:

log.retention.ms = 1000

then:

kafka-server-stop
kafka-server-start ~/Downloads/confluent-4.0.0/etc/kafka/server.properties&

OR clean kafka log dir (specified by the log.dir attribute in kafka config file ) as well the zookeeper data.

Monitoring Kafka with JMX

https://www.robustperception.io/monitoring-kafka-with-prometheus/

Running application

java -cp target/access-log-reporter-1.0-SNAPSHOT.jar com.WireFilter.app.App

OR

mvn exec:java -Dexec.mainClass="com.mycompany.app.App" 

Running jmx with java.

KAFKA_OPTS="$KAFKA_OPTS -javaagent:$PWD/Downloads/prometheus_kafka/jmx_prometheus_javaagent-0.6.jar=7071:$PWD/Downloads/prometheus_kafka/kafka-0-8-2.yml" \

kafka-server-start ~/Downloads/confluent-4.0.0/etc/kafka/server.properties&

java -jar /Users/malsayed/workspace/access-log-reporter/target/access-log-reporter-1.0-SNAPSHOT.jar

java -javaagent:/Users/malsayed/Downloads/prometheus_kafka/jmx_prometheus_javaagent-0.6.jar=8080:/Users/malsayed/Downloads/prometheus_kafka/my-application.yml -jar /Users/malsayed/workspace/access-log-reporter/target/access-log-reporter-1.0-SNAPSHOT.jar  com.WireFilter.app.StreamAnalyzer



      <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>1.7.25</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-simple</artifactId>
        <version>1.7.25</version>
    </dependency>