Nelze vybrat více než 25 témat Téma musí začínat písmenem nebo číslem, může obsahovat pomlčky („-“) a může být dlouhé až 35 znaků.

před 5 měsíci

  1. #include "Util/onceToken.h"
  2. #include "Util/mini.h"
  3. #include <iterator>
  4. #include <stdlib.h>
  5. #include "Ack.hpp"
  6. #include "Packet.hpp"
  7. #include "SrtTransport.hpp"
  8. namespace SRT {
  9. #define SRT_FIELD "srt."
  10. // srt 超时时间
  11. const std::string kTimeOutSec = SRT_FIELD "timeoutSec";
  12. // srt 单端口udp服务器
  13. const std::string kPort = SRT_FIELD "port";
  14. const std::string kLatencyMul = SRT_FIELD "latencyMul";
  15. const std::string kPktBufSize = SRT_FIELD "pktBufSize";
  16. static onceToken token([]() {
  17. mINI::Instance()[kTimeOutSec] = 5;
  18. mINI::Instance()[kPort] = 9000;
  19. mINI::Instance()[kLatencyMul] = 4;
  20. mINI::Instance()[kPktBufSize] = 8192;
  21. });
  22. static std::atomic<uint32_t> s_srt_socket_id_generate { 125 };
  23. //////////// SrtTransport //////////////////////////
  24. SrtTransport::SrtTransport(const EventPoller::Ptr &poller)
  25. : _poller(poller) {
  26. _start_timestamp = SteadyClock::now();
  27. _socket_id = s_srt_socket_id_generate.fetch_add(1);
  28. _pkt_recv_rate_context = std::make_shared<PacketRecvRateContext>(_start_timestamp);
  29. //_recv_rate_context = std::make_shared<RecvRateContext>(_start_timestamp);
  30. _estimated_link_capacity_context = std::make_shared<EstimatedLinkCapacityContext>(_start_timestamp);
  31. }
  32. SrtTransport::~SrtTransport() {
  33. TraceL << " ";
  34. }
  35. const EventPoller::Ptr &SrtTransport::getPoller() const {
  36. return _poller;
  37. }
  38. void SrtTransport::setSession(Session::Ptr session) {
  39. _history_sessions.emplace(session.get(), session);
  40. if (_selected_session) {
  41. InfoL << "srt network changed: " << _selected_session->get_peer_ip() << ":"
  42. << _selected_session->get_peer_port() << " -> " << session->get_peer_ip() << ":"
  43. << session->get_peer_port() << ", id:" << _selected_session->getIdentifier();
  44. }
  45. _selected_session = session;
  46. }
  47. const Session::Ptr &SrtTransport::getSession() const {
  48. return _selected_session;
  49. }
  50. void SrtTransport::switchToOtherTransport(uint8_t *buf, int len, uint32_t socketid, struct sockaddr_storage *addr) {
  51. BufferRaw::Ptr tmp = BufferRaw::create();
  52. struct sockaddr_storage tmp_addr = *addr;
  53. tmp->assign((char *)buf, len);
  54. auto trans = SrtTransportManager::Instance().getItem(std::to_string(socketid));
  55. if (trans) {
  56. trans->getPoller()->async([tmp, tmp_addr, trans] {
  57. trans->inputSockData((uint8_t *)tmp->data(), tmp->size(), (struct sockaddr_storage *)&tmp_addr);
  58. });
  59. }
  60. }
  61. void SrtTransport::createTimerForCheckAlive(){
  62. std::weak_ptr<SrtTransport> weak_self = std::static_pointer_cast<SrtTransport>(shared_from_this());
  63. auto timeoutSec = getTimeOutSec();
  64. _timer = std::make_shared<Timer>(
  65. timeoutSec/ 2,
  66. [weak_self,timeoutSec]() {
  67. auto strong_self = weak_self.lock();
  68. if (!strong_self) {
  69. return false;
  70. }
  71. if (strong_self->_alive_ticker.elapsedTime() > timeoutSec * 1000) {
  72. strong_self->onShutdown(SockException(Err_timeout, "接收srt数据超时"));
  73. }
  74. return true;
  75. },
  76. getPoller());
  77. }
  78. void SrtTransport::inputSockData(uint8_t *buf, int len, struct sockaddr_storage *addr) {
  79. _alive_ticker.resetTime();
  80. if(!_timer){
  81. createTimerForCheckAlive();
  82. }
  83. using srt_control_handler = void (SrtTransport::*)(uint8_t * buf, int len, struct sockaddr_storage *addr);
  84. static std::unordered_map<uint16_t, srt_control_handler> s_control_functions;
  85. static onceToken token([]() {
  86. s_control_functions.emplace(ControlPacket::HANDSHAKE, &SrtTransport::handleHandshake);
  87. s_control_functions.emplace(ControlPacket::KEEPALIVE, &SrtTransport::handleKeeplive);
  88. s_control_functions.emplace(ControlPacket::ACK, &SrtTransport::handleACK);
  89. s_control_functions.emplace(ControlPacket::NAK, &SrtTransport::handleNAK);
  90. s_control_functions.emplace(ControlPacket::CONGESTIONWARNING, &SrtTransport::handleCongestionWarning);
  91. s_control_functions.emplace(ControlPacket::SHUTDOWN, &SrtTransport::handleShutDown);
  92. s_control_functions.emplace(ControlPacket::ACKACK, &SrtTransport::handleACKACK);
  93. s_control_functions.emplace(ControlPacket::DROPREQ, &SrtTransport::handleDropReq);
  94. s_control_functions.emplace(ControlPacket::PEERERROR, &SrtTransport::handlePeerError);
  95. s_control_functions.emplace(ControlPacket::USERDEFINEDTYPE, &SrtTransport::handleUserDefinedType);
  96. });
  97. _now = SteadyClock::now();
  98. // 处理srt数据
  99. if (DataPacket::isDataPacket(buf, len)) {
  100. uint32_t socketId = DataPacket::getSocketID(buf, len);
  101. if (socketId == _socket_id) {
  102. if(_handleshake_timer){
  103. _handleshake_timer.reset();
  104. }
  105. _pkt_recv_rate_context->inputPacket(_now,len+UDP_HDR_SIZE);
  106. //_recv_rate_context->inputPacket(_now, len);
  107. handleDataPacket(buf, len, addr);
  108. checkAndSendAckNak();
  109. } else {
  110. WarnL<<"DataPacket switch to other transport: "<<socketId;
  111. switchToOtherTransport(buf, len, socketId, addr);
  112. }
  113. } else {
  114. if (ControlPacket::isControlPacket(buf, len)) {
  115. uint32_t socketId = ControlPacket::getSocketID(buf, len);
  116. uint16_t type = ControlPacket::getControlType(buf, len);
  117. if (type != ControlPacket::HANDSHAKE && socketId != _socket_id && _socket_id != 0) {
  118. // socket id not same
  119. WarnL<<"ControlPacket: "<< (int)type <<" switch to other transport: "<<socketId;
  120. switchToOtherTransport(buf, len, socketId, addr);
  121. return;
  122. }
  123. //_pkt_recv_rate_context->inputPacket(_now,len);
  124. //_estimated_link_capacity_context->inputPacket(_now);
  125. //_recv_rate_context->inputPacket(_now, len);
  126. auto it = s_control_functions.find(type);
  127. if (it == s_control_functions.end()) {
  128. WarnL << " not support type ignore" << ControlPacket::getControlType(buf, len);
  129. return;
  130. } else {
  131. (this->*(it->second))(buf, len, addr);
  132. }
  133. if(_is_handleshake_finished && isPusher()){
  134. checkAndSendAckNak();
  135. }
  136. } else {
  137. // not reach
  138. WarnL << "not reach this";
  139. }
  140. }
  141. }
  142. void SrtTransport::handleHandshakeInduction(HandshakePacket &pkt, struct sockaddr_storage *addr) {
  143. // Induction Phase
  144. if (_handleshake_res) {
  145. if(_handleshake_res->handshake_type == HandshakePacket::HS_TYPE_INDUCTION){
  146. if(pkt.srt_socket_id == _handleshake_res->dst_socket_id){
  147. TraceL << getIdentifier() <<" Induction repeate "<<SockUtil::inet_ntoa((struct sockaddr *)addr) << ":" << SockUtil::inet_port((struct sockaddr *)addr);
  148. sendControlPacket(_handleshake_res, true);
  149. }else{
  150. TraceL << getIdentifier() <<" new connection fron client "<<SockUtil::inet_ntoa((struct sockaddr *)addr) << ":" << SockUtil::inet_port((struct sockaddr *)addr);
  151. onShutdown(SockException(Err_other, "client new connection"));
  152. }
  153. return;
  154. }else if(_handleshake_res->handshake_type == HandshakePacket::HS_TYPE_CONCLUSION){
  155. if(_handleshake_res->dst_socket_id != pkt.srt_socket_id){
  156. TraceL << getIdentifier() <<" new connection fron client "<<SockUtil::inet_ntoa((struct sockaddr *)addr) << ":" << SockUtil::inet_port((struct sockaddr *)addr);
  157. onShutdown(SockException(Err_other, "client new connection"));
  158. }
  159. return;
  160. }else{
  161. WarnL<<"not reach this";
  162. }
  163. return;
  164. }else{
  165. TraceL << getIdentifier() <<" Induction from "<<SockUtil::inet_ntoa((struct sockaddr *)addr) << ":" << SockUtil::inet_port((struct sockaddr *)addr);
  166. }
  167. _induction_ts = _now;
  168. _start_timestamp = _now;
  169. _init_seq_number = pkt.initial_packet_sequence_number;
  170. _max_window_size = pkt.max_flow_window_size;
  171. _mtu = pkt.mtu;
  172. _last_pkt_seq = _init_seq_number - 1;
  173. _estimated_link_capacity_context->setLastSeq(_last_pkt_seq);
  174. _peer_socket_id = pkt.srt_socket_id;
  175. HandshakePacket::Ptr res = std::make_shared<HandshakePacket>();
  176. res->dst_socket_id = _peer_socket_id;
  177. res->timestamp = DurationCountMicroseconds(_start_timestamp.time_since_epoch());
  178. res->mtu = _mtu;
  179. res->max_flow_window_size = _max_window_size;
  180. res->initial_packet_sequence_number = _init_seq_number;
  181. res->version = 5;
  182. res->encryption_field = HandshakePacket::NO_ENCRYPTION;
  183. res->extension_field = 0x4A17;
  184. res->handshake_type = HandshakePacket::HS_TYPE_INDUCTION;
  185. res->srt_socket_id = _peer_socket_id;
  186. res->syn_cookie = HandshakePacket::generateSynCookie(addr, _start_timestamp);
  187. _sync_cookie = res->syn_cookie;
  188. memcpy(res->peer_ip_addr, pkt.peer_ip_addr, sizeof(pkt.peer_ip_addr) * sizeof(pkt.peer_ip_addr[0]));
  189. _handleshake_res = res;
  190. res->storeToData();
  191. registerSelfHandshake();
  192. sendControlPacket(res, true);
  193. _handleshake_timer = std::make_shared<Timer>(0.2,[this]()->bool{
  194. sendControlPacket(_handleshake_res, true);
  195. return true;
  196. },getPoller());
  197. }
  198. void SrtTransport::handleHandshakeConclusion(HandshakePacket &pkt, struct sockaddr_storage *addr) {
  199. if (!_handleshake_res) {
  200. ErrorL << "must Induction Phase for handleshake ";
  201. return;
  202. }
  203. if (_handleshake_res->handshake_type == HandshakePacket::HS_TYPE_INDUCTION) {
  204. // first
  205. HSExtMessage::Ptr req;
  206. HSExtStreamID::Ptr sid;
  207. uint32_t srt_flag = 0xbf;
  208. uint16_t delay = DurationCountMicroseconds(_now - _induction_ts) * getLatencyMul() / 1000;
  209. if (delay <= 120) {
  210. delay = 120;
  211. }
  212. for (auto& ext : pkt.ext_list) {
  213. // TraceL << getIdentifier() << " ext " << ext->dump();
  214. if (!req) {
  215. req = std::dynamic_pointer_cast<HSExtMessage>(ext);
  216. }
  217. if (!sid) {
  218. sid = std::dynamic_pointer_cast<HSExtStreamID>(ext);
  219. }
  220. }
  221. if (sid) {
  222. _stream_id = sid->streamid;
  223. }
  224. if (req) {
  225. if (req->srt_flag != srt_flag) {
  226. WarnL << " flag " << req->srt_flag;
  227. }
  228. srt_flag = req->srt_flag;
  229. delay = delay <= req->recv_tsbpd_delay ? req->recv_tsbpd_delay : delay;
  230. }
  231. TraceL << getIdentifier() << " CONCLUSION Phase from"<<SockUtil::inet_ntoa((struct sockaddr *)addr) << ":" << SockUtil::inet_port((struct sockaddr *)addr);;
  232. HandshakePacket::Ptr res = std::make_shared<HandshakePacket>();
  233. res->dst_socket_id = _peer_socket_id;
  234. res->timestamp = DurationCountMicroseconds(_now - _start_timestamp);
  235. res->mtu = _mtu;
  236. res->max_flow_window_size = _max_window_size;
  237. res->initial_packet_sequence_number = _init_seq_number;
  238. res->version = 5;
  239. res->encryption_field = HandshakePacket::NO_ENCRYPTION;
  240. res->extension_field = HandshakePacket::HS_EXT_FILED_HSREQ;
  241. res->handshake_type = HandshakePacket::HS_TYPE_CONCLUSION;
  242. res->srt_socket_id = _socket_id;
  243. res->syn_cookie = 0;
  244. res->assignPeerIP(addr);
  245. HSExtMessage::Ptr ext = std::make_shared<HSExtMessage>();
  246. ext->extension_type = HSExt::SRT_CMD_HSRSP;
  247. ext->srt_version = srtVersion(1, 5, 0);
  248. ext->srt_flag = srt_flag;
  249. ext->recv_tsbpd_delay = ext->send_tsbpd_delay = delay;
  250. res->ext_list.push_back(std::move(ext));
  251. res->storeToData();
  252. _handleshake_res = res;
  253. unregisterSelfHandshake();
  254. registerSelf();
  255. sendControlPacket(res, true);
  256. TraceL << " buf size = " << res->max_flow_window_size << " init seq =" << _init_seq_number
  257. << " latency=" << delay;
  258. _recv_buf = std::make_shared<PacketRecvQueue>(getPktBufSize(), _init_seq_number, delay * 1e3,srt_flag);
  259. _send_buf = std::make_shared<PacketSendQueue>(getPktBufSize(), delay * 1e3,srt_flag);
  260. _send_packet_seq_number = _init_seq_number;
  261. _buf_delay = delay;
  262. onHandShakeFinished(_stream_id, addr);
  263. if(!isPusher()){
  264. _handleshake_timer.reset();
  265. }
  266. } else {
  267. if(_handleshake_res->handshake_type == HandshakePacket::HS_TYPE_CONCLUSION){
  268. if(_handleshake_res->dst_socket_id != pkt.srt_socket_id){
  269. TraceL << getIdentifier() <<" new connection fron client "<<SockUtil::inet_ntoa((struct sockaddr *)addr) << ":" << SockUtil::inet_port((struct sockaddr *)addr);
  270. onShutdown(SockException(Err_other, "client new connection"));
  271. }else{
  272. TraceL << getIdentifier() <<" CONCLUSION repeate "<<SockUtil::inet_ntoa((struct sockaddr *)addr) << ":" << SockUtil::inet_port((struct sockaddr *)addr);
  273. sendControlPacket(_handleshake_res, true);
  274. }
  275. }else{
  276. WarnL<<"not reach this";
  277. }
  278. return;
  279. }
  280. _last_ack_pkt_seq = _init_seq_number;
  281. }
  282. void SrtTransport::handleHandshake(uint8_t *buf, int len, struct sockaddr_storage *addr) {
  283. HandshakePacket pkt;
  284. if(!pkt.loadFromData(buf, len)){
  285. WarnL<<"is not vaild HandshakePacket";
  286. return;
  287. }
  288. if (pkt.handshake_type == HandshakePacket::HS_TYPE_INDUCTION) {
  289. handleHandshakeInduction(pkt, addr);
  290. } else if (pkt.handshake_type == HandshakePacket::HS_TYPE_CONCLUSION) {
  291. handleHandshakeConclusion(pkt, addr);
  292. } else {
  293. WarnL << " not support handshake type = " << pkt.handshake_type;
  294. WarnL <<pkt.dump();
  295. }
  296. _ack_ticker.resetTime(_now);
  297. _nak_ticker.resetTime(_now);
  298. }
  299. void SrtTransport::handleKeeplive(uint8_t *buf, int len, struct sockaddr_storage *addr) {
  300. // TraceL;
  301. sendKeepLivePacket();
  302. }
  303. void SrtTransport::sendKeepLivePacket() {
  304. KeepLivePacket::Ptr pkt = std::make_shared<KeepLivePacket>();
  305. pkt->dst_socket_id = _peer_socket_id;
  306. pkt->timestamp = DurationCountMicroseconds(_now - _start_timestamp);
  307. pkt->storeToData();
  308. sendControlPacket(pkt, true);
  309. }
  310. void SrtTransport::handleACK(uint8_t *buf, int len, struct sockaddr_storage *addr) {
  311. // TraceL;
  312. ACKPacket ack;
  313. if (!ack.loadFromData(buf, len)) {
  314. return;
  315. }
  316. ACKACKPacket::Ptr pkt = std::make_shared<ACKACKPacket>();
  317. pkt->dst_socket_id = _peer_socket_id;
  318. pkt->timestamp = DurationCountMicroseconds(_now - _start_timestamp);
  319. pkt->ack_number = ack.ack_number;
  320. pkt->storeToData();
  321. _send_buf->drop(ack.last_ack_pkt_seq_number);
  322. sendControlPacket(pkt, true);
  323. // TraceL<<"ack number "<<ack.ack_number;
  324. }
  325. void SrtTransport::sendMsgDropReq(uint32_t first, uint32_t last) {
  326. MsgDropReqPacket::Ptr pkt = std::make_shared<MsgDropReqPacket>();
  327. pkt->dst_socket_id = _peer_socket_id;
  328. pkt->timestamp = DurationCountMicroseconds(_now - _start_timestamp);
  329. pkt->first_pkt_seq_num = first;
  330. pkt->last_pkt_seq_num = last;
  331. pkt->storeToData();
  332. sendControlPacket(pkt, true);
  333. }
  334. void SrtTransport::handleNAK(uint8_t *buf, int len, struct sockaddr_storage *addr) {
  335. // TraceL;
  336. NAKPacket pkt;
  337. pkt.loadFromData(buf, len);
  338. bool empty = false;
  339. bool flush = false;
  340. for (auto& it : pkt.lost_list) {
  341. if (pkt.lost_list.back() == it) {
  342. flush = true;
  343. }
  344. empty = true;
  345. auto re_list = _send_buf->findPacketBySeq(it.first, it.second - 1);
  346. for (auto& pkt : re_list) {
  347. pkt->R = 1;
  348. pkt->storeToHeader();
  349. sendPacket(pkt, flush);
  350. empty = false;
  351. }
  352. if (empty) {
  353. sendMsgDropReq(it.first, it.second - 1);
  354. }
  355. }
  356. }
  357. void SrtTransport::handleCongestionWarning(uint8_t *buf, int len, struct sockaddr_storage *addr) {
  358. TraceL;
  359. }
  360. void SrtTransport::handleShutDown(uint8_t *buf, int len, struct sockaddr_storage *addr) {
  361. TraceL;
  362. onShutdown(SockException(Err_shutdown, "peer close connection"));
  363. }
  364. void SrtTransport::handleDropReq(uint8_t *buf, int len, struct sockaddr_storage *addr) {
  365. MsgDropReqPacket pkt;
  366. pkt.loadFromData(buf, len);
  367. std::list<DataPacket::Ptr> list;
  368. // TraceL<<"drop "<<pkt.first_pkt_seq_num<<" last "<<pkt.last_pkt_seq_num;
  369. _recv_buf->drop(pkt.first_pkt_seq_num, pkt.last_pkt_seq_num, list);
  370. //checkAndSendAckNak();
  371. if (list.empty()) {
  372. return;
  373. }
  374. // uint32_t max_seq = 0;
  375. for (auto& data : list) {
  376. // max_seq = data->packet_seq_number;
  377. if (_last_pkt_seq + 1 != data->packet_seq_number) {
  378. TraceL << "pkt lost " << _last_pkt_seq + 1 << "->" << data->packet_seq_number;
  379. }
  380. _last_pkt_seq = data->packet_seq_number;
  381. onSRTData(std::move(data));
  382. }
  383. /*
  384. _recv_nack.drop(max_seq);
  385. auto lost = _recv_buf->getLostSeq();
  386. _recv_nack.update(_now, lost);
  387. lost.clear();
  388. _recv_nack.getLostList(_now, _rtt, _rtt_variance, lost);
  389. if (!lost.empty()) {
  390. sendNAKPacket(lost);
  391. // TraceL << "check lost send nack";
  392. }
  393. */
  394. }
  395. void SrtTransport::checkAndSendAckNak(){
  396. auto nak_interval = (_rtt + _rtt_variance * 4) / 2;
  397. if (nak_interval <= 20 * 1000) {
  398. nak_interval = 20 * 1000;
  399. }
  400. if (_nak_ticker.elapsedTime(_now) > nak_interval) {
  401. auto lost = _recv_buf->getLostSeq();
  402. if (!lost.empty()) {
  403. sendNAKPacket(lost);
  404. }
  405. _nak_ticker.resetTime(_now);
  406. }
  407. if (_ack_ticker.elapsedTime(_now) > 10 * 1000) {
  408. _light_ack_pkt_count = 0;
  409. _ack_ticker.resetTime(_now);
  410. // send a ack per 10 ms for receiver
  411. if(_last_ack_pkt_seq != _recv_buf->getExpectedSeq()){
  412. //TraceL<<"send a ack packet";
  413. sendACKPacket();
  414. }else{
  415. //TraceL<<" ignore repeate ack packet";
  416. }
  417. } else {
  418. if (_light_ack_pkt_count >= 64) {
  419. // for high bitrate stream send light ack
  420. // TODO
  421. sendLightACKPacket();
  422. TraceL << "send light ack";
  423. }
  424. _light_ack_pkt_count = 0;
  425. }
  426. _light_ack_pkt_count++;
  427. }
  428. void SrtTransport::handleUserDefinedType(uint8_t *buf, int len, struct sockaddr_storage *addr) {
  429. TraceL;
  430. }
  431. void SrtTransport::handleACKACK(uint8_t *buf, int len, struct sockaddr_storage *addr) {
  432. // TraceL;
  433. ACKACKPacket::Ptr pkt = std::make_shared<ACKACKPacket>();
  434. pkt->loadFromData(buf, len);
  435. if(_ack_send_timestamp.find(pkt->ack_number)!=_ack_send_timestamp.end()){
  436. uint32_t rtt = DurationCountMicroseconds(_now - _ack_send_timestamp[pkt->ack_number]);
  437. _rtt_variance = (3 * _rtt_variance + abs((long)_rtt - (long)rtt)) / 4;
  438. _rtt = (7 * rtt + _rtt) / 8;
  439. // TraceL<<" rtt:"<<_rtt<<" rtt variance:"<<_rtt_variance;
  440. _ack_send_timestamp.erase(pkt->ack_number);
  441. if(_last_recv_ackack_seq_num < pkt->ack_number){
  442. _last_recv_ackack_seq_num = pkt->ack_number;
  443. }else{
  444. if((_last_recv_ackack_seq_num-pkt->ack_number)>(MAX_TS>>1)){
  445. _last_recv_ackack_seq_num = pkt->ack_number;
  446. }
  447. }
  448. if(_ack_send_timestamp.size()>1000){
  449. // clear data
  450. for(auto it = _ack_send_timestamp.begin(); it != _ack_send_timestamp.end();){
  451. if(DurationCountMicroseconds(_now-it->second)>5e6){
  452. // 超过五秒没有ackack 丢弃
  453. it = _ack_send_timestamp.erase(it);
  454. }else{
  455. it++;
  456. }
  457. }
  458. }
  459. }
  460. }
  461. void SrtTransport::handlePeerError(uint8_t *buf, int len, struct sockaddr_storage *addr) {
  462. TraceL;
  463. }
  464. void SrtTransport::sendACKPacket() {
  465. uint32_t recv_rate = 0;
  466. ACKPacket::Ptr pkt = std::make_shared<ACKPacket>();
  467. pkt->dst_socket_id = _peer_socket_id;
  468. pkt->timestamp = DurationCountMicroseconds(_now - _start_timestamp);
  469. pkt->ack_number = ++_ack_number_count;
  470. pkt->last_ack_pkt_seq_number = _recv_buf->getExpectedSeq();
  471. pkt->rtt = _rtt;
  472. pkt->rtt_variance = _rtt_variance;
  473. pkt->available_buf_size = _recv_buf->getAvailableBufferSize();
  474. pkt->pkt_recv_rate = _pkt_recv_rate_context->getPacketRecvRate(recv_rate);
  475. pkt->estimated_link_capacity = _estimated_link_capacity_context->getEstimatedLinkCapacity();
  476. pkt->recv_rate = recv_rate;
  477. if(0){
  478. TraceL<<pkt->pkt_recv_rate<<" pkt/s "<<recv_rate<<" byte/s "<<pkt->estimated_link_capacity<<" pkt/s (cap) "<<pkt->available_buf_size<<" available buf";
  479. //TraceL<<_pkt_recv_rate_context->dump();
  480. //TraceL<<"recv estimated:";
  481. //TraceL<< _pkt_recv_rate_context->dump();
  482. //TraceL<<"recv queue:";
  483. //TraceL<<_recv_buf->dump();
  484. }
  485. if(pkt->available_buf_size<2){
  486. pkt->available_buf_size = 2;
  487. }
  488. pkt->storeToData();
  489. _ack_send_timestamp[pkt->ack_number] = _now;
  490. _last_ack_pkt_seq = pkt->last_ack_pkt_seq_number;
  491. sendControlPacket(pkt, true);
  492. // TraceL<<"send ack "<<pkt->dump();
  493. // TraceL<<_recv_buf->dump();
  494. }
  495. void SrtTransport::sendLightACKPacket() {
  496. ACKPacket::Ptr pkt = std::make_shared<ACKPacket>();
  497. pkt->dst_socket_id = _peer_socket_id;
  498. pkt->timestamp = DurationCountMicroseconds(_now - _start_timestamp);
  499. pkt->ack_number = 0;
  500. pkt->last_ack_pkt_seq_number = _recv_buf->getExpectedSeq();
  501. pkt->rtt = 0;
  502. pkt->rtt_variance = 0;
  503. pkt->available_buf_size = 0;
  504. pkt->pkt_recv_rate = 0;
  505. pkt->estimated_link_capacity = 0;
  506. pkt->recv_rate = 0;
  507. pkt->storeToData();
  508. _last_ack_pkt_seq = pkt->last_ack_pkt_seq_number;
  509. sendControlPacket(pkt, true);
  510. TraceL << "send ack " << pkt->dump();
  511. }
  512. void SrtTransport::sendNAKPacket(std::list<PacketQueue::LostPair> &lost_list) {
  513. NAKPacket::Ptr pkt = std::make_shared<NAKPacket>();
  514. std::list<PacketQueue::LostPair> tmp;
  515. auto size = NAKPacket::getCIFSize(lost_list);
  516. size_t paylaod_size = getPayloadSize();
  517. if (size > paylaod_size) {
  518. WarnL << "loss report cif size " << size;
  519. size_t num = paylaod_size / 8;
  520. size_t msgNum = (lost_list.size() + num - 1) / num;
  521. decltype(lost_list.begin()) cur, next;
  522. for (size_t i = 0; i < msgNum; ++i) {
  523. cur = lost_list.begin();
  524. std::advance(cur, i * num);
  525. if (i == msgNum - 1) {
  526. next = lost_list.end();
  527. } else {
  528. next = lost_list.begin();
  529. std::advance(next, (i + 1) * num);
  530. }
  531. tmp.assign(cur, next);
  532. pkt->dst_socket_id = _peer_socket_id;
  533. pkt->timestamp = DurationCountMicroseconds(_now - _start_timestamp);
  534. pkt->lost_list = tmp;
  535. pkt->storeToData();
  536. sendControlPacket(pkt, true);
  537. }
  538. } else {
  539. pkt->dst_socket_id = _peer_socket_id;
  540. pkt->timestamp = DurationCountMicroseconds(_now - _start_timestamp);
  541. pkt->lost_list = lost_list;
  542. pkt->storeToData();
  543. sendControlPacket(pkt, true);
  544. }
  545. // TraceL<<"send NAK "<<pkt->dump();
  546. }
  547. void SrtTransport::sendShutDown() {
  548. ShutDownPacket::Ptr pkt = std::make_shared<ShutDownPacket>();
  549. pkt->dst_socket_id = _peer_socket_id;
  550. pkt->timestamp = DurationCountMicroseconds(_now - _start_timestamp);
  551. pkt->storeToData();
  552. sendControlPacket(pkt, true);
  553. }
  554. void SrtTransport::handleDataPacket(uint8_t *buf, int len, struct sockaddr_storage *addr) {
  555. DataPacket::Ptr pkt = std::make_shared<DataPacket>();
  556. pkt->loadFromData(buf, len);
  557. _estimated_link_capacity_context->inputPacket(_now,pkt);
  558. std::list<DataPacket::Ptr> list;
  559. //TraceL<<" seq="<< pkt->packet_seq_number<<" ts="<<pkt->timestamp<<" size="<<pkt->payloadSize()<<\
  560. //" PP="<<(int)pkt->PP<<" O="<<(int)pkt->O<<" kK="<<(int)pkt->KK<<" R="<<(int)pkt->R;
  561. _recv_buf->inputPacket(pkt, list);
  562. if (list.empty()) {
  563. // when no data ok send nack to sender immediately
  564. } else {
  565. // uint32_t last_seq;
  566. for (auto& data : list) {
  567. // last_seq = data->packet_seq_number;
  568. if (_last_pkt_seq + 1 != data->packet_seq_number) {
  569. TraceL << "pkt lost " << _last_pkt_seq + 1 << "->" << data->packet_seq_number;
  570. }
  571. _last_pkt_seq = data->packet_seq_number;
  572. onSRTData(std::move(data));
  573. }
  574. //_recv_nack.drop(last_seq);
  575. }
  576. /*
  577. auto lost = _recv_buf->getLostSeq();
  578. _recv_nack.update(_now, lost);
  579. lost.clear();
  580. _recv_nack.getLostList(_now, _rtt, _rtt_variance, lost);
  581. if (!lost.empty()) {
  582. // TraceL << "check lost send nack immediately";
  583. sendNAKPacket(lost);
  584. }
  585. */
  586. /*
  587. auto nak_interval = (_rtt + _rtt_variance * 4) / 2;
  588. if (nak_interval <= 20 * 1000) {
  589. nak_interval = 20 * 1000;
  590. }
  591. if (_nak_ticker.elapsedTime(_now) > nak_interval) {
  592. // Periodic NAK reports
  593. auto lost = _recv_buf->getLostSeq();
  594. if (!lost.empty()) {
  595. sendNAKPacket(lost);
  596. // TraceL<<"send NAK";
  597. } else {
  598. // TraceL<<"lost is empty";
  599. }
  600. _nak_ticker.resetTime(_now);
  601. }
  602. if (_ack_ticker.elapsedTime(_now) > 10 * 1000) {
  603. _light_ack_pkt_count = 0;
  604. _ack_ticker.resetTime(_now);
  605. // send a ack per 10 ms for receiver
  606. sendACKPacket();
  607. } else {
  608. if (_light_ack_pkt_count >= 64) {
  609. // for high bitrate stream send light ack
  610. // TODO
  611. sendLightACKPacket();
  612. TraceL << "send light ack";
  613. }
  614. _light_ack_pkt_count = 0;
  615. }
  616. _light_ack_pkt_count++;
  617. */
  618. // bufCheckInterval();
  619. }
  620. void SrtTransport::sendDataPacket(DataPacket::Ptr pkt, char *buf, int len, bool flush) {
  621. pkt->storeToData((uint8_t *)buf, len);
  622. sendPacket(pkt, flush);
  623. _send_buf->inputPacket(pkt);
  624. }
  625. void SrtTransport::sendControlPacket(ControlPacket::Ptr pkt, bool flush) {
  626. sendPacket(pkt, flush);
  627. }
  628. void SrtTransport::sendPacket(Buffer::Ptr pkt, bool flush) {
  629. if (_selected_session) {
  630. auto tmp = _packet_pool.obtain2();
  631. tmp->assign(pkt->data(), pkt->size());
  632. _selected_session->setSendFlushFlag(flush);
  633. _selected_session->send(std::move(tmp));
  634. } else {
  635. WarnL << "not reach this";
  636. }
  637. }
  638. std::string SrtTransport::getIdentifier() {
  639. return _selected_session ? _selected_session->getIdentifier() : "";
  640. }
  641. void SrtTransport::registerSelfHandshake() {
  642. SrtTransportManager::Instance().addHandshakeItem(std::to_string(_sync_cookie), shared_from_this());
  643. }
  644. void SrtTransport::unregisterSelfHandshake() {
  645. if (_sync_cookie == 0) {
  646. return;
  647. }
  648. SrtTransportManager::Instance().removeHandshakeItem(std::to_string(_sync_cookie));
  649. }
  650. void SrtTransport::registerSelf() {
  651. if (_socket_id == 0) {
  652. return;
  653. }
  654. SrtTransportManager::Instance().addItem(std::to_string(_socket_id), shared_from_this());
  655. }
  656. void SrtTransport::unregisterSelf() {
  657. SrtTransportManager::Instance().removeItem(std::to_string(_socket_id));
  658. }
  659. void SrtTransport::onShutdown(const SockException &ex) {
  660. sendShutDown();
  661. WarnL << ex.what();
  662. unregisterSelfHandshake();
  663. unregisterSelf();
  664. for (auto &pr : _history_sessions) {
  665. auto session = pr.second.lock();
  666. if (session) {
  667. session->shutdown(ex);
  668. }
  669. }
  670. }
  671. size_t SrtTransport::getPayloadSize() {
  672. size_t ret = (_mtu - 28 - 16) / 188 * 188;
  673. return ret;
  674. }
  675. void SrtTransport::onSendTSData(const Buffer::Ptr &buffer, bool flush) {
  676. // TraceL;
  677. DataPacket::Ptr pkt;
  678. size_t payloadSize = getPayloadSize();
  679. size_t size = buffer->size();
  680. char *ptr = buffer->data();
  681. char *end = buffer->data() + size;
  682. while (ptr < end && size >= payloadSize) {
  683. pkt = std::make_shared<DataPacket>();
  684. pkt->f = 0;
  685. pkt->packet_seq_number = _send_packet_seq_number & 0x7fffffff;
  686. _send_packet_seq_number = (_send_packet_seq_number + 1) & 0x7fffffff;
  687. pkt->PP = 3;
  688. pkt->O = 0;
  689. pkt->KK = 0;
  690. pkt->R = 0;
  691. pkt->msg_number = _send_msg_number++;
  692. pkt->dst_socket_id = _peer_socket_id;
  693. pkt->timestamp = DurationCountMicroseconds(SteadyClock::now() - _start_timestamp);
  694. sendDataPacket(pkt, ptr, (int)payloadSize, flush);
  695. ptr += payloadSize;
  696. size -= payloadSize;
  697. }
  698. if (size > 0 && ptr < end) {
  699. pkt = std::make_shared<DataPacket>();
  700. pkt->f = 0;
  701. pkt->packet_seq_number = _send_packet_seq_number & 0x7fffffff;
  702. _send_packet_seq_number = (_send_packet_seq_number + 1) & 0x7fffffff;
  703. pkt->PP = 3;
  704. pkt->O = 0;
  705. pkt->KK = 0;
  706. pkt->R = 0;
  707. pkt->msg_number = _send_msg_number++;
  708. pkt->dst_socket_id = _peer_socket_id;
  709. pkt->timestamp = DurationCountMicroseconds(SteadyClock::now() - _start_timestamp);
  710. sendDataPacket(pkt, ptr, (int)size, flush);
  711. }
  712. }
  713. //////////// SrtTransportManager //////////////////////////
  714. SrtTransportManager &SrtTransportManager::Instance() {
  715. static SrtTransportManager s_instance;
  716. return s_instance;
  717. }
  718. void SrtTransportManager::addItem(const std::string &key, const SrtTransport::Ptr &ptr) {
  719. std::lock_guard<std::mutex> lck(_mtx);
  720. _map[key] = ptr;
  721. }
  722. SrtTransport::Ptr SrtTransportManager::getItem(const std::string &key) {
  723. if (key.empty()) {
  724. return nullptr;
  725. }
  726. std::lock_guard<std::mutex> lck(_mtx);
  727. auto it = _map.find(key);
  728. if (it == _map.end()) {
  729. return nullptr;
  730. }
  731. return it->second.lock();
  732. }
  733. void SrtTransportManager::removeItem(const std::string &key) {
  734. std::lock_guard<std::mutex> lck(_mtx);
  735. _map.erase(key);
  736. }
  737. void SrtTransportManager::addHandshakeItem(const std::string &key, const SrtTransport::Ptr &ptr) {
  738. std::lock_guard<std::mutex> lck(_handshake_mtx);
  739. _handshake_map[key] = ptr;
  740. }
  741. void SrtTransportManager::removeHandshakeItem(const std::string &key) {
  742. std::lock_guard<std::mutex> lck(_handshake_mtx);
  743. _handshake_map.erase(key);
  744. }
  745. SrtTransport::Ptr SrtTransportManager::getHandshakeItem(const std::string &key) {
  746. if (key.empty()) {
  747. return nullptr;
  748. }
  749. std::lock_guard<std::mutex> lck(_handshake_mtx);
  750. auto it = _handshake_map.find(key);
  751. if (it == _handshake_map.end()) {
  752. return nullptr;
  753. }
  754. return it->second.lock();
  755. }
  756. } // namespace SRT