Você não pode selecionar mais de 25 tópicos Os tópicos devem começar com uma letra ou um número, podem incluir traços ('-') e podem ter até 35 caracteres.

rtmp-server-forward-aio-test.cpp 7.0KB

10 meses atrás

  1. //
  2. // TODO: add packet queue for player
  3. //
  4. #include "aio-rtmp-server.h"
  5. #include "aio-timeout.h"
  6. #include "aio-worker.h"
  7. #include "sys/sync.hpp"
  8. #include "flv-writer.h"
  9. #include "flv-proto.h"
  10. #include "flv-muxer.h"
  11. #include "flv-demuxer.h"
  12. #include <string.h>
  13. #include <assert.h>
  14. #include <stdio.h>
  15. #include "cpm/shared_ptr.h"
  16. #include <string>
  17. #include <list>
  18. #include <map>
  19. struct rtmp_player_t
  20. {
  21. // TODO: add packet queue
  22. aio_rtmp_session_t* rtmp;
  23. struct flv_muxer_t* muxer;
  24. rtmp_player_t(aio_rtmp_session_t* rtmp) : rtmp(rtmp)
  25. {
  26. muxer = flv_muxer_create(&handler, this);
  27. }
  28. ~rtmp_player_t()
  29. {
  30. if(muxer)
  31. flv_muxer_destroy(muxer);
  32. }
  33. private:
  34. static int handler(void* param, int type, const void* data, size_t bytes, uint32_t timestamp)
  35. {
  36. rtmp_player_t* player = (rtmp_player_t*)param;
  37. switch (type)
  38. {
  39. case FLV_TYPE_SCRIPT:
  40. return aio_rtmp_server_send_script(player->rtmp, data, bytes, timestamp);
  41. case FLV_TYPE_AUDIO:
  42. return aio_rtmp_server_send_audio(player->rtmp, data, bytes, timestamp);
  43. case FLV_TYPE_VIDEO:
  44. return aio_rtmp_server_send_video(player->rtmp, data, bytes, timestamp);
  45. default:
  46. assert(0);
  47. return -1;
  48. }
  49. }
  50. };
  51. struct rtmp_source_t
  52. {
  53. ThreadLocker locker;
  54. struct flv_demuxer_t* demuxer;
  55. std::list<std::shared_ptr<rtmp_player_t> > players;
  56. rtmp_source_t()
  57. {
  58. demuxer = flv_demuxer_create(handler, this);
  59. }
  60. ~rtmp_source_t()
  61. {
  62. if (demuxer)
  63. flv_demuxer_destroy(demuxer);
  64. }
  65. private:
  66. static int handler(void* param, int codec, const void* data, size_t bytes, uint32_t pts, uint32_t dts, int flags)
  67. {
  68. int r = 0;
  69. rtmp_source_t* s = (rtmp_source_t*)param;
  70. AutoThreadLocker locker(s->locker);
  71. for (auto it = s->players.begin(); it != s->players.end(); ++it)
  72. {
  73. // TODO: push to packet queue
  74. switch (codec)
  75. {
  76. case FLV_VIDEO_H264:
  77. r = flv_muxer_avc((*it)->muxer, data, bytes, pts, dts);
  78. break;
  79. case FLV_VIDEO_H265:
  80. r = flv_muxer_hevc((*it)->muxer, data, bytes, pts, dts);
  81. break;
  82. case FLV_AUDIO_AAC:
  83. r = flv_muxer_aac((*it)->muxer, data, bytes, pts, dts);
  84. break;
  85. case FLV_AUDIO_MP3:
  86. r = flv_muxer_mp3((*it)->muxer, data, bytes, pts, dts);
  87. break;
  88. case FLV_VIDEO_AVCC:
  89. case FLV_VIDEO_HVCC:
  90. case FLV_AUDIO_ASC:
  91. break; // ignore
  92. default:
  93. assert(0);
  94. }
  95. }
  96. return 0; // ignore error
  97. }
  98. };
  99. static ThreadLocker s_locker;
  100. static std::map<std::string, std::shared_ptr<rtmp_source_t> > s_lives;
  101. static aio_rtmp_userptr_t aio_rtmp_server_onpublish(void* param, aio_rtmp_session_t* /*session*/, const char* app, const char* stream, const char* type)
  102. {
  103. printf("aio_rtmp_server_onpublish(%s, %s, %s)\n", app, stream, type);
  104. std::string key(app);
  105. key += "/";
  106. key += stream;
  107. std::shared_ptr<rtmp_source_t> source(new rtmp_source_t);
  108. AutoThreadLocker locker(s_locker);
  109. assert(s_lives.find(key) == s_lives.end());
  110. s_lives[key] = source;
  111. return source.get();
  112. }
  113. static int aio_rtmp_server_onscript(aio_rtmp_userptr_t ptr, const void* script, size_t bytes, uint32_t timestamp)
  114. {
  115. struct rtmp_source_t* s = (struct rtmp_source_t*)ptr;
  116. AutoThreadLocker locker(s->locker);
  117. return flv_demuxer_input(s->demuxer, FLV_TYPE_SCRIPT, script, bytes, timestamp);
  118. }
  119. static int aio_rtmp_server_onvideo(aio_rtmp_userptr_t ptr, const void* data, size_t bytes, uint32_t timestamp)
  120. {
  121. struct rtmp_source_t* s = (struct rtmp_source_t*)ptr;
  122. AutoThreadLocker locker(s->locker);
  123. return flv_demuxer_input(s->demuxer, FLV_TYPE_VIDEO, data, bytes, timestamp);
  124. }
  125. static int aio_rtmp_server_onaudio(aio_rtmp_userptr_t ptr, const void* data, size_t bytes, uint32_t timestamp)
  126. {
  127. struct rtmp_source_t* s = (struct rtmp_source_t*)ptr;
  128. AutoThreadLocker locker(s->locker);
  129. return flv_demuxer_input(s->demuxer, FLV_TYPE_AUDIO, data, bytes, timestamp);
  130. }
  131. static void aio_rtmp_server_onsend(aio_rtmp_userptr_t /*ptr*/, size_t /*bytes*/)
  132. {
  133. }
  134. static void aio_rtmp_server_onclose(aio_rtmp_userptr_t ptr)
  135. {
  136. AutoThreadLocker locker(s_locker);
  137. for (auto it = s_lives.begin(); it != s_lives.end(); ++it)
  138. {
  139. std::shared_ptr<struct rtmp_source_t>& s = it->second;
  140. if (ptr == s.get())
  141. {
  142. s_lives.erase(it);
  143. return;
  144. }
  145. AutoThreadLocker l(s->locker);
  146. for (auto j = s->players.begin(); j != s->players.end(); ++j)
  147. {
  148. if (j->get() == ptr)
  149. {
  150. s->players.erase(j);
  151. return;
  152. }
  153. }
  154. }
  155. }
  156. static aio_rtmp_userptr_t aio_rtmp_server_onplay(void* /*param*/, aio_rtmp_session_t* session, const char* app, const char* stream, double start, double duration, uint8_t reset)
  157. {
  158. printf("aio_rtmp_server_onplay(%s, %s, %f, %f, %d)\n", app, stream, start, duration, (int)reset);
  159. std::string key(app);
  160. key += "/";
  161. key += stream;
  162. std::shared_ptr<struct rtmp_source_t> s;
  163. {
  164. AutoThreadLocker locker(s_locker);
  165. auto it = s_lives.find(key);
  166. if (it == s_lives.end())
  167. {
  168. printf("source(%s, %s) not found\n", app, stream);
  169. return NULL;
  170. }
  171. s = it->second;
  172. for (auto j = s->players.begin(); j != s->players.end(); ++j)
  173. {
  174. if (j->get()->rtmp == session)
  175. {
  176. printf("rtmp session(%s, %s) exist\n", app, stream);
  177. return j->get();
  178. }
  179. }
  180. }
  181. std::shared_ptr<rtmp_player_t> player(new rtmp_player_t(session));
  182. AutoThreadLocker locker(s->locker);
  183. s->players.push_back(player);
  184. return player.get();
  185. }
  186. static int aio_rtmp_server_onpause(aio_rtmp_userptr_t /*ptr*/, int pause, uint32_t ms)
  187. {
  188. printf("aio_rtmp_server_onpause(%d, %u)\n", pause, (unsigned int)ms);
  189. return 0;
  190. }
  191. static int aio_rtmp_server_onseek(aio_rtmp_userptr_t /*ptr*/, uint32_t ms)
  192. {
  193. printf("aio_rtmp_server_onseek(%u)\n", (unsigned int)ms);
  194. return 0;
  195. }
  196. void rtmp_server_forward_aio_test(const char* ip, int port)
  197. {
  198. aio_rtmp_server_t* rtmp;
  199. struct aio_rtmp_server_handler_t handler;
  200. memset(&handler, 0, sizeof(handler));
  201. handler.onsend = aio_rtmp_server_onsend;
  202. handler.onplay = aio_rtmp_server_onplay;
  203. handler.onpause = aio_rtmp_server_onpause;
  204. handler.onseek = aio_rtmp_server_onseek;
  205. handler.onpublish = aio_rtmp_server_onpublish;
  206. handler.onscript = aio_rtmp_server_onscript;
  207. handler.onaudio = aio_rtmp_server_onaudio;
  208. handler.onvideo = aio_rtmp_server_onvideo;
  209. handler.onclose = aio_rtmp_server_onclose;
  210. aio_worker_init(8);
  211. rtmp = aio_rtmp_server_create(ip, port, &handler, NULL);
  212. while ('q' != getchar())
  213. {
  214. }
  215. aio_rtmp_server_destroy(rtmp);
  216. aio_worker_clean(8);
  217. }