Kafka简单示例

Kafka简单示例

Kafka

安装

首先需要安装Java

sudo apt install openjdk-11-jdk

下载安装kafka

wget https://dlcdn.apache.org/kafka/3.4.0/kafka_2.13-3.4.0.tgz
tar -xzf kafka_2.13-3.4.0.tgz
cd kafka_2.13-3.4.0

启动zookeeper和kafka:

bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties

go连接Kafka使用

生产者

使用给定代理地址和配置创建一个同步生产者

// 使用给定代理地址和配置创建一个同步生产者
SyncProducer, err := sarama.NewSyncProducer(
    []string{conn},
    config,
)

config可以自由配置:

config := sarama.NewConfig()
// 等待服务器所有副本都保存成功后的响应
config.Producer.RequiredAcks = sarama.WaitForAll
// 随机的分区类型:返回一个分区器,该分区器每次选择一个随机分区
config.Producer.Partitioner = sarama.NewRandomPartitioner
// 是否等待成功和失败后的响应
config.Producer.Return.Successes = true

构建发送的消息:

// 构建发送的消息
msg := &sarama.ProducerMessage{
    Topic: topic,
    Key:   sarama.StringEncoder(time.Now().String()),
    Value: sarama.StringEncoder(content),
}

生产者发送消息:

// SendMessage:该方法是生产者生产给定的消息
// 生产成功的时候返回该消息的分区和所在的偏移量
// 生产失败的时候返回error
partition, offset, err := SyncProducer.SendMessage(msg)

消费者

创建一个消费者的实例

config := sarama.NewConfig()
consumer, err := sarama.NewConsumer(c.Node, config)

查询这个 topic 有多少分区

partitions, err := consumer.Partitions(c.Topic)

每个分区开一个 goroutine 来消费

wg.Add(len(partitions))
// 然后每个分区开一个 goroutine 来消费
for _, partitionId := range partitions {
    //不开异步会导致一个消费完才会消费另外一个
    go c.consumeByPartition(consumer, c.Topic, partitionId, &wg)
}

消费

partitionConsumer, err := consumer.ConsumePartition(topic, partitionId, sarama.OffsetNewest)
// 然后可以通过partitionConsumer.Messages()打印得到的消息

主函数

func main() {
    Conn := "127.0.0.1:9092"
    topic := "test_log"

    var wg sync.WaitGroup
    wg.Add(2)

    // 消费者
    go func() {
        defer wg.Done()
        // 初始化consumer
        var kafkaConsumer = consumer.KafkaConsumer{
            Node:  []string{Conn},
            Topic: topic,
        }
        // 消费
        go kafkaConsumer.Consume()
    }()

    // 生产者
    go func() {
        defer wg.Done()

        index := 0
        for {
            // 生产者发送消息
            _, err := producer.Send(Conn, topic, fmt.Sprintf("lox_%d", index))
            if err != nil {
                log.Print("测试失败:" + err.Error())
                return
            }
            index++
            time.Sleep(1 * time.Second)
        }
    }()
    wg.Wait()
}

Kafka简单示例
https://zhangzhao219.github.io/2023/03/03/Backend/Kafka/
作者
Zhang Zhao
发布于
2023年3月3日
许可协议