producer.go 2.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. package kafka
  2. import (
  3. "fmt"
  4. "github.com/Shopify/sarama"
  5. "strconv"
  6. "time"
  7. )
  8. // 消息生产者
  9. func RunProducer() {
  10. //获取配置类
  11. config := sarama.NewConfig() //配置类实例(指针类型)
  12. config.Producer.RequiredAcks = sarama.WaitForAll //代理需要的确认可靠性级别(默认为WaitForLocal)
  13. config.Producer.Partitioner = sarama.NewRandomPartitioner //生成用于选择要发送消息的分区的分区(默认为散列消息键)。
  14. config.Producer.Return.Successes = true //如果启用,成功传递的消息将在成功通道(默认禁用)。
  15. //获取客户端对象
  16. client, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
  17. if err != nil {
  18. //获取客户端失败
  19. fmt.Println("producer close, err:", err)
  20. return
  21. }
  22. //延迟执行,类似于栈,等到其他代码都执行完毕后再执行
  23. defer client.Close()
  24. //一直循环
  25. for {
  26. //获取Message对象
  27. msg := &sarama.ProducerMessage{}
  28. //设置topic
  29. msg.Topic = "go_kafka"
  30. //设置Message值
  31. msg.Value = sarama.StringEncoder("this is a good test, my message is good")
  32. //发送消息,返回pid、片偏移
  33. pid, offset, err := client.SendMessage(msg)
  34. //发送失败
  35. if err != nil {
  36. fmt.Println("send message failed,", err)
  37. return
  38. }
  39. //打印返回结果
  40. fmt.Printf("pid:%v offset:%v\n", pid, offset)
  41. //线程休眠下
  42. time.Sleep(10 * time.Second)
  43. }
  44. }
  45. func ProducerSend(str string, key int64) {
  46. //获取配置类
  47. config := sarama.NewConfig() //配置类实例(指针类型)
  48. config.Producer.RequiredAcks = sarama.WaitForAll //代理需要的确认可靠性级别(默认为WaitForLocal)
  49. config.Producer.Partitioner = sarama.NewRandomPartitioner //生成用于选择要发送消息的分区的分区(默认为散列消息键)。
  50. config.Producer.Return.Successes = true //如果启用,成功传递的消息将在成功通道(默认禁用)。
  51. //获取客户端对象
  52. client, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
  53. if err != nil {
  54. //获取客户端失败
  55. fmt.Println("producer close, err:", err)
  56. return
  57. }
  58. //延迟执行,类似于栈,等到其他代码都执行完毕后再执行
  59. defer client.Close()
  60. //一直循环
  61. //获取Message对象
  62. msg := &sarama.ProducerMessage{}
  63. //设置topic
  64. msg.Topic = "go_kafka"
  65. //设置Message值
  66. msg.Value = sarama.StringEncoder(str + " " + strconv.FormatInt(key, 10))
  67. //发送消息,返回pid、片偏移
  68. pid, offset, err := client.SendMessage(msg)
  69. //发送失败
  70. if err != nil {
  71. fmt.Println("send message failed,", err)
  72. return
  73. }
  74. //打印返回结果
  75. fmt.Printf("pid:%v offset:%v\n", pid, offset)
  76. //线程休眠下
  77. }