error_inject_syacall.c 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. #include <pthread.h>
  2. #include <signal.h>
  3. #include <stdio.h>
  4. #include <string.h>
  5. #include <stdint.h>
  6. #include <bpf/libbpf.h>
  7. #include <bpf/bpf.h>
  8. #include <errno.h>
  9. #include <unistd.h>
  10. #include <fcntl.h>
  11. #include <stdlib.h>
  12. #include <sys/mman.h>
  13. #include <stdatomic.h>
  14. #include <sys/syscall.h>
  15. #include <linux/futex.h>
  16. #include "error_inject_syscall.skel.h"
  17. #include "error_inject_syscall.h"
  18. #include <time.h>
  19. #define SHM_NAME "/my_shared_memory"
  20. #define SHM_SIZE 1024 // 共享内存大小
  21. #define BUFFER_SIZE 1024
  22. static void *shm_ptr;
  23. static pthread_mutex_t data_mutex = PTHREAD_MUTEX_INITIALIZER;
  24. #define warn(...) fprintf(stderr, __VA_ARGS__)
  25. static volatile sig_atomic_t exiting = false;
  26. static void sig_handler(int sig) {
  27. if (sig == SIGINT || sig == SIGTERM) {
  28. exiting = true;
  29. printf("Signal received: %d. Exiting...\n", sig);
  30. }
  31. }
  32. // 定义 libbpf 的打印回调函数
  33. static int libbpf_print_fn(enum libbpf_print_level level, const char *format, va_list args) {
  34. return vfprintf(stderr, format, args);
  35. }
  36. typedef struct {
  37. atomic_int futex; // 用于同步的 futex 变量
  38. char buffer[BUFFER_SIZE];
  39. atomic_size_t write_index;
  40. atomic_size_t read_index;
  41. } shared_memory_t;
  42. void *map_shared_memory() {
  43. int shm_fd;
  44. shm_fd = shm_open(SHM_NAME, O_RDWR, 0666);
  45. if (shm_fd == -1) {
  46. warn("shm_open failed: %s\n", strerror(errno));
  47. return NULL;
  48. }
  49. void *shm_ptr = mmap(0, SHM_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0);
  50. if (shm_ptr == MAP_FAILED) {
  51. perror("mmap");
  52. close(shm_fd);
  53. return NULL;
  54. }
  55. close(shm_fd);
  56. return shm_ptr;
  57. }
  58. void unmap_shared_memory(void *shm_ptr) {
  59. if (munmap(shm_ptr, SHM_SIZE) == -1) {
  60. perror("munmap");
  61. }
  62. }
  63. int futex_wait(atomic_int *futexp, int expected, const struct timespec *timeout) {
  64. return syscall(SYS_futex, futexp, FUTEX_WAIT, expected, timeout, NULL, 0);
  65. }
  66. int futex_wake(atomic_int *futexp, int num_wake) {
  67. return syscall(SYS_futex, futexp, FUTEX_WAKE, num_wake, NULL, NULL, 0);
  68. }
  69. void write_data_to_queue(shared_memory_t *shm_ptr, const char *data, size_t size) {
  70. size_t write_index = atomic_load(&shm_ptr->write_index);
  71. size_t current_read_index = atomic_load(&shm_ptr->read_index);
  72. size_t next_write_index = (write_index + size) % BUFFER_SIZE;
  73. if (next_write_index == current_read_index) {
  74. fprintf(stderr, "Error: Buffer is full, cannot write data.\n");
  75. return;
  76. }
  77. size_t first_part_size = BUFFER_SIZE - write_index;
  78. if (size <= first_part_size) {
  79. memcpy(&shm_ptr->buffer[write_index], data, size);
  80. } else {
  81. memcpy(&shm_ptr->buffer[write_index], data, first_part_size);
  82. memcpy(&shm_ptr->buffer[0], data + first_part_size, size - first_part_size);
  83. }
  84. atomic_store(&shm_ptr->write_index, next_write_index);
  85. // 输出写入的数据和索引位置进行调试
  86. fprintf(stderr, "Data written to shared memory at index %zu: %s\n", write_index, data);
  87. // 设置 futex 变量为 1,通知 FUSE 文件系统有新数据
  88. atomic_store(&shm_ptr->futex, 1);
  89. futex_wake(&shm_ptr->futex, 1); // 唤醒等待的 FUSE 进程
  90. }
  91. static int read_event(void *ctx, void *data, size_t data_sz) {
  92. const struct event *e = data;
  93. printf("Event Pathname: %s, command: %s\n", e->pathname,e->command);
  94. // 获取共享内存的指针
  95. shared_memory_t *shared_mem = (shared_memory_t *)shm_ptr;
  96. // 将事件数据写入共享内存队列
  97. write_data_to_queue(shared_mem, (const char *)e, sizeof(struct event));
  98. return 0;
  99. }
  100. int main(int argc, char **argv) {
  101. struct ring_buffer *rb = NULL;
  102. struct error_inject_syscall_bpf *skel;
  103. int err;
  104. libbpf_set_print(libbpf_print_fn);
  105. signal(SIGINT, sig_handler);
  106. signal(SIGTERM, sig_handler);
  107. shm_ptr = map_shared_memory();
  108. if (!shm_ptr) {
  109. fprintf(stderr, "Failed to map shared memory\n");
  110. return 1;
  111. }
  112. skel = error_inject_syscall_bpf__open();
  113. if (!skel) {
  114. fprintf(stderr, "Failed to open BPF skeleton\n");
  115. unmap_shared_memory(shm_ptr);
  116. return 1;
  117. }
  118. err = error_inject_syscall_bpf__load(skel);
  119. if (err) {
  120. fprintf(stderr, "Failed to load BPF skeleton\n");
  121. error_inject_syscall_bpf__destroy(skel);
  122. unmap_shared_memory(shm_ptr);
  123. return 1;
  124. }
  125. err = error_inject_syscall_bpf__attach(skel);
  126. if (err) {
  127. fprintf(stderr, "Failed to attach BPF skeleton\n");
  128. error_inject_syscall_bpf__destroy(skel);
  129. unmap_shared_memory(shm_ptr);
  130. return 1;
  131. }
  132. rb = ring_buffer__new(bpf_map__fd(skel->maps.rb), read_event, NULL, NULL);
  133. if (!rb) {
  134. fprintf(stderr, "Failed to create ring buffer\n");
  135. error_inject_syscall_bpf__destroy(skel);
  136. unmap_shared_memory(shm_ptr);
  137. return 1;
  138. }
  139. while (!exiting) {
  140. err = ring_buffer__poll(rb, 100 /* timeout, ms */);
  141. if (err == -EINTR) {
  142. err = 0;
  143. break;
  144. }
  145. if (err < 0) {
  146. fprintf(stderr, "Error polling ring buffer: %d\n", err);
  147. break;
  148. }
  149. }
  150. // // 退出清理
  151. // exiting = true;
  152. // pthread_join(data_thread, NULL);
  153. cleanup:
  154. ring_buffer__free(rb);
  155. error_inject_syscall_bpf__destroy(skel);
  156. unmap_shared_memory(shm_ptr);
  157. return err < 0 ? -err : 0;
  158. }