You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

SrtSession.cpp 4.9KB

5 kuukautta sitten
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. #include "SrtSession.hpp"
  2. #include "Packet.hpp"
  3. #include "SrtTransportImp.hpp"
  4. #include "Common/config.h"
  5. namespace SRT {
  6. using namespace mediakit;
  7. SrtSession::SrtSession(const Socket::Ptr &sock)
  8. : Session(sock) {
  9. socklen_t addr_len = sizeof(_peer_addr);
  10. memset(&_peer_addr, 0, addr_len);
  11. // TraceL<<"before addr len "<<addr_len;
  12. getpeername(sock->rawFD(), (struct sockaddr *)&_peer_addr, &addr_len);
  13. // TraceL<<"after addr len "<<addr_len<<" family "<<_peer_addr.ss_family;
  14. }
  15. SrtSession::~SrtSession() {
  16. InfoP(this);
  17. }
  18. EventPoller::Ptr SrtSession::queryPoller(const Buffer::Ptr &buffer) {
  19. uint8_t *data = (uint8_t *)buffer->data();
  20. size_t size = buffer->size();
  21. if (DataPacket::isDataPacket(data, size)) {
  22. uint32_t socket_id = DataPacket::getSocketID(data, size);
  23. auto trans = SrtTransportManager::Instance().getItem(std::to_string(socket_id));
  24. return trans ? trans->getPoller() : nullptr;
  25. }
  26. if (HandshakePacket::isHandshakePacket(data, size)) {
  27. auto type = HandshakePacket::getHandshakeType(data, size);
  28. if (type == HandshakePacket::HS_TYPE_INDUCTION) {
  29. // 握手第一阶段
  30. return nullptr;
  31. } else if (type == HandshakePacket::HS_TYPE_CONCLUSION) {
  32. // 握手第二阶段
  33. uint32_t sync_cookie = HandshakePacket::getSynCookie(data, size);
  34. auto trans = SrtTransportManager::Instance().getHandshakeItem(std::to_string(sync_cookie));
  35. return trans ? trans->getPoller() : nullptr;
  36. } else {
  37. WarnL << " not reach there";
  38. }
  39. } else {
  40. uint32_t socket_id = ControlPacket::getSocketID(data, size);
  41. auto trans = SrtTransportManager::Instance().getItem(std::to_string(socket_id));
  42. return trans ? trans->getPoller() : nullptr;
  43. }
  44. return nullptr;
  45. }
  46. void SrtSession::attachServer(const toolkit::Server &server) {
  47. SockUtil::setRecvBuf(getSock()->rawFD(), 1024 * 1024);
  48. }
  49. void SrtSession::onRecv(const Buffer::Ptr &buffer) {
  50. uint8_t *data = (uint8_t *)buffer->data();
  51. size_t size = buffer->size();
  52. if (_find_transport) {
  53. //只允许寻找一次transport
  54. _find_transport = false;
  55. if (DataPacket::isDataPacket(data, size)) {
  56. uint32_t socket_id = DataPacket::getSocketID(data, size);
  57. auto trans = SrtTransportManager::Instance().getItem(std::to_string(socket_id));
  58. if (trans) {
  59. _transport = std::move(trans);
  60. } else {
  61. WarnL << " data packet not find transport ";
  62. }
  63. }
  64. if (HandshakePacket::isHandshakePacket(data, size)) {
  65. auto type = HandshakePacket::getHandshakeType(data, size);
  66. if (type == HandshakePacket::HS_TYPE_INDUCTION) {
  67. // 握手第一阶段
  68. _transport = std::make_shared<SrtTransportImp>(getPoller());
  69. } else if (type == HandshakePacket::HS_TYPE_CONCLUSION) {
  70. // 握手第二阶段
  71. uint32_t sync_cookie = HandshakePacket::getSynCookie(data, size);
  72. auto trans = SrtTransportManager::Instance().getHandshakeItem(std::to_string(sync_cookie));
  73. if (trans) {
  74. _transport = std::move(trans);
  75. } else {
  76. WarnL << " hanshake packet not find transport ";
  77. }
  78. } else {
  79. WarnL << " not reach there";
  80. }
  81. } else {
  82. uint32_t socket_id = ControlPacket::getSocketID(data, size);
  83. auto trans = SrtTransportManager::Instance().getItem(std::to_string(socket_id));
  84. if (trans) {
  85. _transport = std::move(trans);
  86. } else {
  87. WarnL << " not find transport";
  88. }
  89. }
  90. if (_transport) {
  91. _transport->setSession(shared_from_this());
  92. }
  93. InfoP(this);
  94. }
  95. _ticker.resetTime();
  96. if (_transport) {
  97. _transport->inputSockData(data, size, &_peer_addr);
  98. } else {
  99. // WarnL<< "ingore data";
  100. }
  101. }
  102. void SrtSession::onError(const SockException &err) {
  103. // udp链接超时,但是srt链接不一定超时,因为可能存在udp链接迁移的情况
  104. //在udp链接迁移时,新的SrtSession对象将接管SrtSession对象的生命周期
  105. //本SrtSession对象将在超时后自动销毁
  106. WarnP(this) << err.what();
  107. if (!_transport) {
  108. return;
  109. }
  110. // 防止互相引用导致不释放
  111. auto transport = std::move(_transport);
  112. getPoller()->async(
  113. [transport] {
  114. //延时减引用,防止使用transport对象时,销毁对象
  115. //transport->onShutdown(err);
  116. },
  117. false);
  118. }
  119. void SrtSession::onManager() {
  120. GET_CONFIG(float, timeoutSec, kTimeOutSec);
  121. if (_ticker.elapsedTime() > timeoutSec * 1000) {
  122. shutdown(SockException(Err_timeout, "srt connection timeout"));
  123. return;
  124. }
  125. }
  126. } // namespace SRT