選択できるのは25トピックまでです。 トピックは、先頭が英数字で、英数字とダッシュ('-')を使用した35文字以内のものにしてください。

172 行
4.4KB

  1. #include "sockutil.h"
  2. #include "sys/atomic.h"
  3. #include "sys/thread.h"
  4. #include "sys/system.h"
  5. #include "sys/sync.hpp"
  6. #include "aio-worker.h"
  7. #include "flv-reader.h"
  8. #include "flv-proto.h"
  9. #include "aio-rtmp-server.h"
  10. #include <string.h>
  11. #include <assert.h>
  12. static const char* s_file;
  13. struct rtmp_server_vod_t
  14. {
  15. int ref;
  16. ThreadLocker locker;
  17. aio_rtmp_session_t* session;
  18. uint8_t packet[4 * 1024 * 1024];
  19. };
  20. static int STDCALL aio_rtmp_server_worker(void* param)
  21. {
  22. int r, type;
  23. size_t taglen;
  24. uint32_t timestamp;
  25. uint32_t s_timestamp = 0;
  26. uint32_t diff = 0;
  27. uint64_t clock;
  28. //uint64_t clock0 = system_clock() - 3000; // send more data, open fast
  29. rtmp_server_vod_t* vod = (rtmp_server_vod_t*)param;
  30. while (1)
  31. {
  32. void* f = flv_reader_create(s_file);
  33. clock = system_clock(); // timestamp start from 0
  34. while (1 == flv_reader_read(f, &type, &timestamp, &taglen, vod->packet, sizeof(vod->packet)))
  35. {
  36. assert(taglen < sizeof(vod->packet));
  37. uint64_t t = system_clock();
  38. if (clock + timestamp > t && clock + timestamp < t + 3 * 1000)
  39. system_sleep(clock + timestamp - t);
  40. else if (clock + timestamp > t + 3 * 1000)
  41. clock = t - timestamp;
  42. timestamp += diff;
  43. s_timestamp = timestamp > s_timestamp ? timestamp : s_timestamp;
  44. AutoThreadLocker locker(vod->locker);
  45. if (NULL == vod->session)
  46. break;
  47. while (aio_rtmp_server_get_unsend(vod->session) > 8 * 1024 * 1024)
  48. {
  49. vod->locker.Unlock();
  50. system_sleep(1000); // can't send?
  51. vod->locker.Lock();
  52. }
  53. if (FLV_TYPE_AUDIO == type)
  54. {
  55. r = aio_rtmp_server_send_audio(vod->session, vod->packet, taglen, timestamp);
  56. }
  57. else if (FLV_TYPE_VIDEO == type)
  58. {
  59. r = aio_rtmp_server_send_video(vod->session, vod->packet, taglen, timestamp);
  60. }
  61. else if (FLV_TYPE_SCRIPT == type)
  62. {
  63. r = aio_rtmp_server_send_script(vod->session, vod->packet, taglen, timestamp);
  64. }
  65. else
  66. {
  67. //assert(0);
  68. r = 0;
  69. }
  70. if (0 != r)
  71. {
  72. assert(0);
  73. break; // TODO: handle send failed
  74. }
  75. }
  76. flv_reader_destroy(f);
  77. diff = s_timestamp + 30;
  78. }
  79. if(0 == atomic_decrement32(&vod->ref))
  80. delete vod;
  81. return 0;
  82. }
  83. static aio_rtmp_userptr_t aio_rtmp_server_onplay(void* /*param*/, aio_rtmp_session_t* session, const char* app, const char* stream, double start, double duration, uint8_t reset)
  84. {
  85. printf("aio_rtmp_server_onplay(%s, %s, %f, %f, %d)\n", app, stream, start, duration, (int)reset);
  86. rtmp_server_vod_t* vod = new rtmp_server_vod_t;
  87. vod->session = session;
  88. vod->ref = 2;
  89. pthread_t t;
  90. thread_create(&t, aio_rtmp_server_worker, vod);
  91. thread_detach(t);
  92. return vod;
  93. }
  94. static int aio_rtmp_server_onpause(aio_rtmp_userptr_t /*ptr*/, int pause, uint32_t ms)
  95. {
  96. printf("aio_rtmp_server_onpause(%d, %u)\n", pause, (unsigned int)ms);
  97. return 0;
  98. }
  99. static int aio_rtmp_server_onseek(aio_rtmp_userptr_t /*ptr*/, uint32_t ms)
  100. {
  101. printf("aio_rtmp_server_onseek(%u)\n", (unsigned int)ms);
  102. return 0;
  103. }
  104. static int aio_rtmp_server_ongetduration(void* param, const char* app, const char* stream, double* duration)
  105. {
  106. *duration = 30 * 60;
  107. return 0;
  108. }
  109. static void aio_rtmp_server_onsend(aio_rtmp_userptr_t /*ptr*/, size_t /*bytes*/)
  110. {
  111. }
  112. static void aio_rtmp_server_onclose(aio_rtmp_userptr_t ptr)
  113. {
  114. // close thread
  115. rtmp_server_vod_t* vod = (rtmp_server_vod_t*)ptr;
  116. {
  117. AutoThreadLocker locker(vod->locker);
  118. vod->session = NULL;
  119. }
  120. if (0 == atomic_decrement32(&vod->ref))
  121. delete vod;
  122. }
  123. void rtmp_server_vod_aio_test(const char* flv)
  124. {
  125. aio_rtmp_server_t* rtmp;
  126. struct aio_rtmp_server_handler_t handler;
  127. memset(&handler, 0, sizeof(handler));
  128. handler.onsend = aio_rtmp_server_onsend;
  129. handler.onplay = aio_rtmp_server_onplay;
  130. handler.onpause = aio_rtmp_server_onpause;
  131. handler.onseek = aio_rtmp_server_onseek;
  132. handler.onclose = aio_rtmp_server_onclose;
  133. handler.ongetduration = aio_rtmp_server_ongetduration;
  134. aio_worker_init(8);
  135. s_file = flv;
  136. rtmp = aio_rtmp_server_create(NULL, 1935, &handler, NULL);
  137. while ('q' != getchar())
  138. {
  139. }
  140. aio_rtmp_server_destroy(rtmp);
  141. aio_worker_clean(8);
  142. }