You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

416 line
8.8KB

  1. // RFC2326 A.1 RTP Data Header Validity Checks
  2. #include "rtp-queue.h"
  3. #include <stdlib.h>
  4. #include <string.h>
  5. #include <assert.h>
  6. #include <errno.h>
  7. #define MAX_PACKET 3000
  8. #define RTP_MISORDER 300
  9. #define RTP_DROPOUT 1000
  10. #define RTP_SEQUENTIAL 3
  11. #define RTP_SEQMOD (1 << 16)
  12. struct rtp_item_t
  13. {
  14. struct rtp_packet_t* pkt;
  15. // uint64_t clock;
  16. };
  17. struct rtp_queue_t
  18. {
  19. struct rtp_item_t* items;
  20. int capacity;
  21. int size;
  22. int pos; // ring buffer read position
  23. int probation;
  24. int cycles;
  25. uint16_t last_seq;
  26. uint16_t first_seq;
  27. int bad_count;
  28. uint16_t bad_seq;
  29. struct rtp_item_t bad_items[RTP_SEQUENTIAL+1];
  30. int threshold;
  31. int frequency;
  32. void (*free)(void*, struct rtp_packet_t*);
  33. void* param;
  34. };
  35. static void rtp_queue_reset(struct rtp_queue_t* q);
  36. static int rtp_queue_find(struct rtp_queue_t* q, uint16_t seq);
  37. static int rtp_queue_insert(struct rtp_queue_t* q, int position, struct rtp_packet_t* pkt);
  38. struct rtp_queue_t* rtp_queue_create(int threshold, int frequency, void(*freepkt)(void*, struct rtp_packet_t*), void* param)
  39. {
  40. struct rtp_queue_t* q;
  41. q = (struct rtp_queue_t*)calloc(1, sizeof(*q));
  42. if(!q)
  43. return NULL;
  44. rtp_queue_reset(q);
  45. q->threshold = threshold;
  46. q->frequency = frequency;
  47. q->free = freepkt;
  48. q->param = param;
  49. return q;
  50. }
  51. int rtp_queue_destroy(struct rtp_queue_t* q)
  52. {
  53. rtp_queue_reset(q);
  54. if (q->items)
  55. {
  56. assert(q->capacity > 0);
  57. free(q->items);
  58. q->items = 0;
  59. }
  60. free(q);
  61. return 0;
  62. }
  63. static inline void rtp_queue_reset_bad_items(struct rtp_queue_t* q)
  64. {
  65. int i;
  66. struct rtp_packet_t* pkt;
  67. for (i = 0; i < q->bad_count; i++)
  68. {
  69. pkt = q->bad_items[i].pkt;
  70. q->free(q->param, pkt);
  71. }
  72. q->bad_seq = 0;
  73. q->bad_count = 0;
  74. }
  75. static void rtp_queue_reset(struct rtp_queue_t* q)
  76. {
  77. int i;
  78. struct rtp_packet_t* pkt;
  79. rtp_queue_reset_bad_items(q);
  80. for (i = 0; i < q->size; i++)
  81. {
  82. pkt = q->items[(q->pos + i) % q->capacity].pkt;
  83. q->free(q->param, pkt);
  84. }
  85. q->pos = 0;
  86. q->size = 0;
  87. q->probation = RTP_SEQUENTIAL;
  88. }
  89. static int rtp_queue_find(struct rtp_queue_t* q, uint16_t seq)
  90. {
  91. uint16_t v;
  92. uint16_t vi;
  93. int l, r, i;
  94. l = q->pos;
  95. r = q->pos + q->size;
  96. v = q->last_seq - seq;
  97. while (l < r)
  98. {
  99. i = (l + r) / 2;
  100. vi = (uint16_t)q->last_seq - (uint16_t)q->items[i % q->capacity].pkt->rtp.seq;
  101. if (vi == v)
  102. {
  103. return -1; // duplicate
  104. }
  105. else if (vi < v)
  106. {
  107. r = i;
  108. }
  109. else
  110. {
  111. assert(vi > v);
  112. l = i + 1;
  113. }
  114. }
  115. return l; // insert position
  116. }
  117. static int rtp_queue_insert(struct rtp_queue_t* q, int position, struct rtp_packet_t* pkt)
  118. {
  119. void* p;
  120. int i, capacity;
  121. assert(position >= q->pos && position <= q->pos + q->size);
  122. if (q->size >= q->capacity)
  123. {
  124. if (q->size + 1 > MAX_PACKET)
  125. return -E2BIG;
  126. capacity = q->capacity + 250;
  127. p = realloc(q->items, capacity * sizeof(struct rtp_item_t));
  128. if (NULL == p)
  129. return -ENOMEM;
  130. q->items = (struct rtp_item_t*)p;
  131. if (q->pos + q->size > q->capacity)
  132. {
  133. // move to tail
  134. assert(q->pos < q->capacity);
  135. memmove(&q->items[q->pos + capacity - q->capacity], &q->items[q->pos], (q->capacity - q->pos) * sizeof(struct rtp_item_t));
  136. q->pos += capacity - q->capacity;
  137. position += capacity - q->capacity;
  138. }
  139. q->capacity = capacity;
  140. }
  141. // move items
  142. for (i = q->pos + q->size; i > position; i--)
  143. memcpy(&q->items[i % q->capacity], &q->items[(i - 1) % q->capacity], sizeof(struct rtp_item_t));
  144. q->items[position % q->capacity].pkt = pkt;
  145. // q->items[position % q->capacity].clock = 0;
  146. q->size++;
  147. return 1;
  148. }
  149. /*
  150. first last
  151. ^ ^
  152. ---too late---|------------------|----max drop---|-----another sequential---
  153. --------------|------queue-------|-------------------------------------------->
  154. */
  155. int rtp_queue_write(struct rtp_queue_t* q, struct rtp_packet_t* pkt)
  156. {
  157. int i, idx;
  158. uint16_t delta;
  159. if (q->probation)
  160. {
  161. if (q->size > 0 && (uint16_t)pkt->rtp.seq == q->last_seq + 1)
  162. {
  163. q->probation--;
  164. if (0 == q->probation)
  165. q->first_seq = (uint16_t)q->items[q->pos].pkt->rtp.seq;
  166. }
  167. else
  168. {
  169. rtp_queue_reset(q);
  170. }
  171. q->last_seq = (uint16_t)pkt->rtp.seq;
  172. return rtp_queue_insert(q, q->pos + q->size, pkt);
  173. }
  174. else
  175. {
  176. delta = (uint16_t)(pkt->rtp.seq - q->last_seq);
  177. if (delta > 0 && delta < RTP_DROPOUT)
  178. {
  179. if (pkt->rtp.seq < q->last_seq)
  180. q->cycles += RTP_SEQMOD;
  181. rtp_queue_reset_bad_items(q);
  182. q->last_seq = (uint16_t)pkt->rtp.seq;
  183. return rtp_queue_insert(q, q->pos + q->size, pkt);
  184. }
  185. else if ( (int16_t)delta <= 0 && (int16_t)delta >= (int16_t)(q->first_seq - q->last_seq) )
  186. {
  187. // pkt->rtp.seq - q->first_seq < q->last_seq - q->first_seq
  188. // duplicate or reordered packet
  189. idx = rtp_queue_find(q, (uint16_t)pkt->rtp.seq);
  190. if (-1 == idx)
  191. return -1;
  192. rtp_queue_reset_bad_items(q);
  193. return rtp_queue_insert(q, idx, pkt);
  194. }
  195. else if ((uint16_t)(q->first_seq - pkt->rtp.seq) < RTP_MISORDER)
  196. {
  197. // too late: pkt->req.seq < q->first_seq
  198. return -1;
  199. }
  200. else
  201. {
  202. if (q->bad_count > 0 && q->bad_seq == pkt->rtp.seq)
  203. {
  204. if (q->bad_count >= RTP_SEQUENTIAL)
  205. {
  206. // Two sequential packets -- assume that the other side
  207. // restarted without telling us so just re-sync
  208. // (i.e., pretend this was the first packet).
  209. //rtp_queue_reset(q);
  210. // copy saved items
  211. for (i = 0; i < q->bad_count; i++)
  212. rtp_queue_insert(q, q->pos + q->size, q->bad_items[i].pkt);
  213. q->bad_count = 0;
  214. q->last_seq = (uint16_t)pkt->rtp.seq;
  215. return rtp_queue_insert(q, q->pos + q->size, pkt);
  216. }
  217. }
  218. else
  219. {
  220. rtp_queue_reset_bad_items(q);
  221. }
  222. q->bad_seq = (pkt->rtp.seq + 1) % (RTP_SEQMOD-1);
  223. q->bad_items[q->bad_count++].pkt = pkt;
  224. return 1;
  225. }
  226. }
  227. // for safety
  228. return -1;
  229. }
  230. struct rtp_packet_t* rtp_queue_read(struct rtp_queue_t* q)
  231. {
  232. uint32_t threshold;
  233. struct rtp_packet_t* pkt;
  234. if (q->size < 1 || q->probation)
  235. return NULL;
  236. assert(q->pos < q->capacity);
  237. pkt = q->items[q->pos].pkt;
  238. if (q->first_seq == pkt->rtp.seq)
  239. {
  240. q->first_seq++;
  241. q->size--;
  242. q->pos = (q->pos + 1) % q->capacity;
  243. return pkt;
  244. }
  245. else
  246. {
  247. threshold = (q->items[(q->pos + q->size - 1) % q->capacity].pkt->rtp.timestamp - pkt->rtp.timestamp) / ((uint32_t)q->frequency / 1000);
  248. if (threshold < (uint32_t)q->threshold)
  249. return NULL;
  250. q->first_seq = (uint16_t)(pkt->rtp.seq + 1);
  251. q->size--;
  252. q->pos = (q->pos + 1) % q->capacity;
  253. return pkt;
  254. }
  255. }
  256. #if defined(_DEBUG) || defined(DEBUG)
  257. #include <stdio.h>
  258. static void rtp_queue_dump(struct rtp_queue_t* q)
  259. {
  260. int i;
  261. printf("[%02d/%02d]: ", q->pos, q->size);
  262. for (i = 0; i < q->size; i++)
  263. {
  264. printf("%u\t", (unsigned int)q->items[(i + q->pos) % q->capacity].pkt->rtp.seq);
  265. }
  266. printf("\n");
  267. }
  268. static void rtp_packet_free(void* param, struct rtp_packet_t* pkt)
  269. {
  270. free(pkt); (void)param;
  271. }
  272. static int rtp_queue_packet(rtp_queue_t* q, uint16_t seq)
  273. {
  274. struct rtp_packet_t* pkt;
  275. pkt = (struct rtp_packet_t*)malloc(sizeof(*pkt));
  276. if (pkt)
  277. {
  278. memset(pkt, 0, sizeof(*pkt));
  279. pkt->rtp.seq = seq;
  280. if (0 == rtp_queue_write(q, pkt))
  281. free(pkt);
  282. }
  283. return 0;
  284. }
  285. static void rtp_queue_test2(void)
  286. {
  287. int i;
  288. uint16_t seq;
  289. rtp_queue_t* q;
  290. struct rtp_packet_t* pkt;
  291. static uint16_t s_seq[1000];
  292. q = rtp_queue_create(100, 90000, rtp_packet_free, NULL);
  293. for(i = 0; i < sizeof(s_seq)/sizeof(s_seq[0]); i++)
  294. s_seq[i] = 45000 + i;
  295. // 45460, 45461, 45462, 45464, 45465, 45466, ...,
  296. // 45490, 45491, 45492, 45503, 45504, 45505, 45463,
  297. // 45506, 45507, 45493, 45494, 45495, 45496, 45497,
  298. // 45498, 45499, 45500, 45501, 45502, 45508, 45509, ...
  299. memmove(s_seq + 463, s_seq + 464, sizeof(s_seq[0]) * (509 - 464)); // lost 45463
  300. s_seq[492] = 45503;
  301. s_seq[493] = 45504;
  302. s_seq[494] = 45505;
  303. s_seq[495] = 45463;
  304. s_seq[496] = 45506;
  305. s_seq[497] = 45507;
  306. s_seq[498] = 45493;
  307. s_seq[499] = 45494;
  308. s_seq[500] = 45495;
  309. s_seq[501] = 45496;
  310. s_seq[502] = 45497;
  311. s_seq[503] = 45498;
  312. s_seq[504] = 45499;
  313. s_seq[505] = 45500;
  314. s_seq[506] = 45501;
  315. s_seq[507] = 45502;
  316. s_seq[508] = 45508;
  317. seq = s_seq[0];
  318. for (i = 0; i < sizeof(s_seq) / sizeof(s_seq[0]); i++)
  319. {
  320. rtp_queue_packet(q, s_seq[i]);
  321. pkt = rtp_queue_read(q);
  322. if (pkt)
  323. {
  324. //printf("%u ", pkt->rtp.seq);
  325. assert(0 == pkt->rtp.seq - seq++);
  326. free(pkt);
  327. }
  328. }
  329. rtp_queue_destroy(q);
  330. }
  331. void rtp_queue_test(void)
  332. {
  333. int i;
  334. rtp_queue_t* q;
  335. struct rtp_packet_t* pkt;
  336. static uint16_t s_seq[] = {
  337. 836, 837, 859, 860, 822, 823, 824, 825,
  338. 826, 822, 830, 827, 831, 828, 829, 830,
  339. 832, 833, 834, 6000, 840, 841, 842, 843,
  340. 835, 836, 837, 838, 838, 844, 859, 811,
  341. };
  342. rtp_queue_test2();
  343. q = rtp_queue_create(100, 90000, rtp_packet_free, NULL);
  344. for (i = 0; i < sizeof(s_seq) / sizeof(s_seq[0]); i++)
  345. {
  346. rtp_queue_packet(q, s_seq[i]);
  347. rtp_queue_dump(q);
  348. pkt = rtp_queue_read(q);
  349. if (pkt) free(pkt);
  350. rtp_queue_dump(q);
  351. }
  352. rtp_queue_destroy(q);
  353. }
  354. #endif