consumer.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. package kafka
  2. import (
  3. "fmt"
  4. "github.com/RaymondCode/simple-demo/config"
  5. "github.com/RaymondCode/simple-demo/dao"
  6. "github.com/RaymondCode/simple-demo/service"
  7. "github.com/RaymondCode/simple-demo/util"
  8. "github.com/Shopify/sarama"
  9. "strconv"
  10. "strings"
  11. "sync"
  12. "time"
  13. )
  14. var (
  15. wg sync.WaitGroup //同步等待组
  16. //在类型上,它是一个结构体。一个WaitGroup的用途是等待一个goroutine的集合执行完成。
  17. //主goroutine调用了Add()方法来设置要等待的goroutine的数量。
  18. //然后,每个goroutine都会执行并且执行完成后调用Done()这个方法。
  19. //与此同时,可以使用Wait()方法来阻塞,直到所有的goroutine都执行完成。
  20. )
  21. func RunConsumer() {
  22. //获取消费者对象 可以设置多个IP地址和端口号,使用逗号进行分割
  23. consumer, err := sarama.NewConsumer(strings.Split("localhost:9092", ","), nil)
  24. //获取失败
  25. if err != nil {
  26. fmt.Println("Failed to start consumer: %s", err)
  27. return
  28. }
  29. //对该topic进行监听
  30. partitionList, err := consumer.Partitions("go_kafka")
  31. if err != nil {
  32. fmt.Println("Failed to get the list of partitions: ", err)
  33. return
  34. }
  35. //打印分区
  36. fmt.Println(partitionList)
  37. //获取分区和片偏移
  38. for partition := range partitionList {
  39. pc, err := consumer.ConsumePartition("go_kafka", int32(partition), sarama.OffsetNewest)
  40. if err != nil {
  41. fmt.Printf("Failed to start consumer for partition %d: %s\n", partition, err)
  42. return
  43. }
  44. //延迟执行
  45. defer pc.AsyncClose()
  46. //启动多线程
  47. go func(pc sarama.PartitionConsumer) {
  48. wg.Add(1)
  49. //获得message的信息
  50. for msg := range pc.Messages() {
  51. fmt.Printf("Partition:%d, Offset:%d, Key:%s, Value:%s", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
  52. fmt.Println()
  53. list := strings.Split(string(msg.Value), " ")
  54. finalName := list[0]
  55. key, _ := strconv.ParseInt(list[1], 10, 64)
  56. if err != nil {
  57. fmt.Println("bug")
  58. }
  59. util.GetSnapshot(config.PROJECTPATH+config.VIDEO_ADDR+finalName, config.PROJECTPATH+config.COVER_ADDR+finalName[0:len(finalName)-4], 10)
  60. fmt.Println("封面图生成完成")
  61. publishToDB(finalName, key)
  62. fmt.Println("审核完成#")
  63. }
  64. wg.Done()
  65. }(pc)
  66. }
  67. //线程休眠
  68. time.Sleep(10 * time.Second)
  69. wg.Wait()
  70. consumer.Close()
  71. }
  72. func Consumerget() {
  73. //获取消费者对象 可以设置多个IP地址和端口号,使用逗号进行分割
  74. consumer, err := sarama.NewConsumer(strings.Split("localhost:9092", ","), nil)
  75. //获取失败
  76. if err != nil {
  77. fmt.Println("Failed to start consumer: %s", err)
  78. return
  79. }
  80. //对该topic进行监听
  81. partitionList, err := consumer.Partitions("go_kafka")
  82. if err != nil {
  83. fmt.Println("Failed to get the list of partitions: ", err)
  84. return
  85. }
  86. //打印分区
  87. fmt.Println(partitionList)
  88. //获取分区和片偏移
  89. for partition := range partitionList {
  90. pc, err := consumer.ConsumePartition("go_kafka", int32(partition), sarama.OffsetNewest)
  91. if err != nil {
  92. fmt.Printf("Failed to start consumer for partition %d: %s\n", partition, err)
  93. return
  94. }
  95. //延迟执行
  96. defer pc.AsyncClose()
  97. //启动多线程
  98. go func(pc sarama.PartitionConsumer) {
  99. wg.Add(1)
  100. //获得message的信息
  101. for msg := range pc.Messages() {
  102. fmt.Printf("Partition:%d, Offset:%d, Key:%s, Value:%s", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
  103. fmt.Println()
  104. }
  105. wg.Done()
  106. }(pc)
  107. }
  108. //线程休眠
  109. time.Sleep(10 * time.Second)
  110. wg.Wait()
  111. consumer.Close()
  112. }
  113. func publishToDB(filename string, id int64) {
  114. /* vd :=entities.Video2{
  115. AuthorId: id,
  116. PlayUrl: "http://pathcystore.oss-cn-shanghai.aliyuncs.com/video/"+filename,
  117. CoverUrl: "http://pathcystore.oss-cn-shanghai.aliyuncs.com/cover/"+filename[0:len(filename)-4]+".jpeg",
  118. FavoriteCount: 0,
  119. CommentCount: 0,
  120. IsFavorite: false,
  121. }*/
  122. dao.Db.Table("video").Where("id", id).Updates(map[string]interface{}{"play_url": config.CONFIG.OssConfig.Endpoint + "/video/" + filename, "cover_url": config.CONFIG.OssConfig.Endpoint + "/cover/" + filename[0:len(filename)-4] + ".jpeg"})
  123. var wg sync.WaitGroup
  124. wg.Add(2)
  125. go service.UploadFile(filename, id, &wg) //向OSS上传文件
  126. go service.UploadCover(filename[0:len(filename)-4], id, &wg) //向OSS上传封面
  127. wg.Wait()
  128. }