1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283 |
- package kafka
- import (
- "fmt"
- "github.com/Shopify/sarama"
- "strconv"
- "time"
- )
- // 消息生产者
- func RunProducer() {
- //获取配置类
- config := sarama.NewConfig() //配置类实例(指针类型)
- config.Producer.RequiredAcks = sarama.WaitForAll //代理需要的确认可靠性级别(默认为WaitForLocal)
- config.Producer.Partitioner = sarama.NewRandomPartitioner //生成用于选择要发送消息的分区的分区(默认为散列消息键)。
- config.Producer.Return.Successes = true //如果启用,成功传递的消息将在成功通道(默认禁用)。
- //获取客户端对象
- client, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
- if err != nil {
- //获取客户端失败
- fmt.Println("producer close, err:", err)
- return
- }
- //延迟执行,类似于栈,等到其他代码都执行完毕后再执行
- defer client.Close()
- //一直循环
- for {
- //获取Message对象
- msg := &sarama.ProducerMessage{}
- //设置topic
- msg.Topic = "go_kafka"
- //设置Message值
- msg.Value = sarama.StringEncoder("this is a good test, my message is good")
- //发送消息,返回pid、片偏移
- pid, offset, err := client.SendMessage(msg)
- //发送失败
- if err != nil {
- fmt.Println("send message failed,", err)
- return
- }
- //打印返回结果
- fmt.Printf("pid:%v offset:%v\n", pid, offset)
- //线程休眠下
- time.Sleep(10 * time.Second)
- }
- }
- func ProducerSend(str string, key int64) {
- //获取配置类
- config := sarama.NewConfig() //配置类实例(指针类型)
- config.Producer.RequiredAcks = sarama.WaitForAll //代理需要的确认可靠性级别(默认为WaitForLocal)
- config.Producer.Partitioner = sarama.NewRandomPartitioner //生成用于选择要发送消息的分区的分区(默认为散列消息键)。
- config.Producer.Return.Successes = true //如果启用,成功传递的消息将在成功通道(默认禁用)。
- //获取客户端对象
- client, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
- if err != nil {
- //获取客户端失败
- fmt.Println("producer close, err:", err)
- return
- }
- //延迟执行,类似于栈,等到其他代码都执行完毕后再执行
- defer client.Close()
- //一直循环
- //获取Message对象
- msg := &sarama.ProducerMessage{}
- //设置topic
- msg.Topic = "go_kafka"
- //设置Message值
- msg.Value = sarama.StringEncoder(str + " " + strconv.FormatInt(key, 10))
- //发送消息,返回pid、片偏移
- pid, offset, err := client.SendMessage(msg)
- //发送失败
- if err != nil {
- fmt.Println("send message failed,", err)
- return
- }
- //打印返回结果
- fmt.Printf("pid:%v offset:%v\n", pid, offset)
- //线程休眠下
- }
|