|
- // RFC2326 A.1 RTP Data Header Validity Checks
-
- #include "rtp-queue.h"
- #include <stdlib.h>
- #include <string.h>
- #include <assert.h>
- #include <errno.h>
-
- #define MAX_PACKET 3000
-
- #define RTP_MISORDER 300
- #define RTP_DROPOUT 1000
- #define RTP_SEQUENTIAL 3
- #define RTP_SEQMOD (1 << 16)
-
- struct rtp_item_t
- {
- struct rtp_packet_t* pkt;
- // uint64_t clock;
- };
-
- struct rtp_queue_t
- {
- struct rtp_item_t* items;
- int capacity;
- int size;
- int pos; // ring buffer read position
-
- int probation;
- int cycles;
- uint16_t last_seq;
- uint16_t first_seq;
-
- int bad_count;
- uint16_t bad_seq;
- struct rtp_item_t bad_items[RTP_SEQUENTIAL+1];
-
- int threshold;
- int frequency;
- void (*free)(void*, struct rtp_packet_t*);
- void* param;
- };
-
- static void rtp_queue_reset(struct rtp_queue_t* q);
- static int rtp_queue_find(struct rtp_queue_t* q, uint16_t seq);
- static int rtp_queue_insert(struct rtp_queue_t* q, int position, struct rtp_packet_t* pkt);
-
- struct rtp_queue_t* rtp_queue_create(int threshold, int frequency, void(*freepkt)(void*, struct rtp_packet_t*), void* param)
- {
- struct rtp_queue_t* q;
- q = (struct rtp_queue_t*)calloc(1, sizeof(*q));
- if(!q)
- return NULL;
-
- rtp_queue_reset(q);
- q->threshold = threshold;
- q->frequency = frequency;
- q->free = freepkt;
- q->param = param;
- return q;
- }
-
- int rtp_queue_destroy(struct rtp_queue_t* q)
- {
- rtp_queue_reset(q);
-
- if (q->items)
- {
- assert(q->capacity > 0);
- free(q->items);
- q->items = 0;
- }
- free(q);
- return 0;
- }
-
- static inline void rtp_queue_reset_bad_items(struct rtp_queue_t* q)
- {
- int i;
- struct rtp_packet_t* pkt;
-
- for (i = 0; i < q->bad_count; i++)
- {
- pkt = q->bad_items[i].pkt;
- q->free(q->param, pkt);
- }
-
- q->bad_seq = 0;
- q->bad_count = 0;
- }
-
- static void rtp_queue_reset(struct rtp_queue_t* q)
- {
- int i;
- struct rtp_packet_t* pkt;
-
- rtp_queue_reset_bad_items(q);
-
- for (i = 0; i < q->size; i++)
- {
- pkt = q->items[(q->pos + i) % q->capacity].pkt;
- q->free(q->param, pkt);
- }
-
- q->pos = 0;
- q->size = 0;
- q->probation = RTP_SEQUENTIAL;
- }
-
- static int rtp_queue_find(struct rtp_queue_t* q, uint16_t seq)
- {
- uint16_t v;
- uint16_t vi;
- int l, r, i;
-
- l = q->pos;
- r = q->pos + q->size;
- v = q->last_seq - seq;
- while (l < r)
- {
- i = (l + r) / 2;
- vi = (uint16_t)q->last_seq - (uint16_t)q->items[i % q->capacity].pkt->rtp.seq;
- if (vi == v)
- {
- return -1; // duplicate
- }
- else if (vi < v)
- {
- r = i;
- }
- else
- {
- assert(vi > v);
- l = i + 1;
- }
- }
-
- return l; // insert position
- }
-
- static int rtp_queue_insert(struct rtp_queue_t* q, int position, struct rtp_packet_t* pkt)
- {
- void* p;
- int i, capacity;
-
- assert(position >= q->pos && position <= q->pos + q->size);
-
- if (q->size >= q->capacity)
- {
- if (q->size + 1 > MAX_PACKET)
- return -E2BIG;
-
- capacity = q->capacity + 250;
- p = realloc(q->items, capacity * sizeof(struct rtp_item_t));
- if (NULL == p)
- return -ENOMEM;
-
- q->items = (struct rtp_item_t*)p;
- if (q->pos + q->size > q->capacity)
- {
- // move to tail
- assert(q->pos < q->capacity);
- memmove(&q->items[q->pos + capacity - q->capacity], &q->items[q->pos], (q->capacity - q->pos) * sizeof(struct rtp_item_t));
- q->pos += capacity - q->capacity;
- position += capacity - q->capacity;
- }
-
- q->capacity = capacity;
- }
-
- // move items
- for (i = q->pos + q->size; i > position; i--)
- memcpy(&q->items[i % q->capacity], &q->items[(i - 1) % q->capacity], sizeof(struct rtp_item_t));
-
- q->items[position % q->capacity].pkt = pkt;
- // q->items[position % q->capacity].clock = 0;
- q->size++;
- return 1;
- }
-
- /*
- first last
- ^ ^
- ---too late---|------------------|----max drop---|-----another sequential---
- --------------|------queue-------|-------------------------------------------->
- */
- int rtp_queue_write(struct rtp_queue_t* q, struct rtp_packet_t* pkt)
- {
- int i, idx;
- uint16_t delta;
-
- if (q->probation)
- {
- if (q->size > 0 && (uint16_t)pkt->rtp.seq == q->last_seq + 1)
- {
- q->probation--;
- if (0 == q->probation)
- q->first_seq = (uint16_t)q->items[q->pos].pkt->rtp.seq;
- }
- else
- {
- rtp_queue_reset(q);
- }
-
- q->last_seq = (uint16_t)pkt->rtp.seq;
- return rtp_queue_insert(q, q->pos + q->size, pkt);
- }
- else
- {
- delta = (uint16_t)(pkt->rtp.seq - q->last_seq);
- if (delta > 0 && delta < RTP_DROPOUT)
- {
- if (pkt->rtp.seq < q->last_seq)
- q->cycles += RTP_SEQMOD;
-
- rtp_queue_reset_bad_items(q);
- q->last_seq = (uint16_t)pkt->rtp.seq;
- return rtp_queue_insert(q, q->pos + q->size, pkt);
- }
- else if ( (int16_t)delta <= 0 && (int16_t)delta >= (int16_t)(q->first_seq - q->last_seq) )
- {
- // pkt->rtp.seq - q->first_seq < q->last_seq - q->first_seq
-
- // duplicate or reordered packet
- idx = rtp_queue_find(q, (uint16_t)pkt->rtp.seq);
- if (-1 == idx)
- return -1;
-
- rtp_queue_reset_bad_items(q);
- return rtp_queue_insert(q, idx, pkt);
- }
- else if ((uint16_t)(q->first_seq - pkt->rtp.seq) < RTP_MISORDER)
- {
- // too late: pkt->req.seq < q->first_seq
- return -1;
- }
- else
- {
- if (q->bad_count > 0 && q->bad_seq == pkt->rtp.seq)
- {
- if (q->bad_count >= RTP_SEQUENTIAL)
- {
- // Two sequential packets -- assume that the other side
- // restarted without telling us so just re-sync
- // (i.e., pretend this was the first packet).
-
- //rtp_queue_reset(q);
-
- // copy saved items
- for (i = 0; i < q->bad_count; i++)
- rtp_queue_insert(q, q->pos + q->size, q->bad_items[i].pkt);
-
- q->bad_count = 0;
- q->last_seq = (uint16_t)pkt->rtp.seq;
- return rtp_queue_insert(q, q->pos + q->size, pkt);
- }
- }
- else
- {
- rtp_queue_reset_bad_items(q);
- }
-
- q->bad_seq = (pkt->rtp.seq + 1) % (RTP_SEQMOD-1);
- q->bad_items[q->bad_count++].pkt = pkt;
- return 1;
- }
- }
-
- // for safety
- return -1;
- }
-
- struct rtp_packet_t* rtp_queue_read(struct rtp_queue_t* q)
- {
- uint32_t threshold;
- struct rtp_packet_t* pkt;
- if (q->size < 1 || q->probation)
- return NULL;
-
- assert(q->pos < q->capacity);
- pkt = q->items[q->pos].pkt;
- if (q->first_seq == pkt->rtp.seq)
- {
- q->first_seq++;
- q->size--;
- q->pos = (q->pos + 1) % q->capacity;
- return pkt;
- }
- else
- {
- threshold = (q->items[(q->pos + q->size - 1) % q->capacity].pkt->rtp.timestamp - pkt->rtp.timestamp) / ((uint32_t)q->frequency / 1000);
- if (threshold < (uint32_t)q->threshold)
- return NULL;
-
- q->first_seq = (uint16_t)(pkt->rtp.seq + 1);
- q->size--;
- q->pos = (q->pos + 1) % q->capacity;
- return pkt;
- }
- }
-
- #if defined(_DEBUG) || defined(DEBUG)
- #include <stdio.h>
- static void rtp_queue_dump(struct rtp_queue_t* q)
- {
- int i;
- printf("[%02d/%02d]: ", q->pos, q->size);
- for (i = 0; i < q->size; i++)
- {
- printf("%u\t", (unsigned int)q->items[(i + q->pos) % q->capacity].pkt->rtp.seq);
- }
- printf("\n");
- }
-
- static void rtp_packet_free(void* param, struct rtp_packet_t* pkt)
- {
- free(pkt); (void)param;
- }
-
- static int rtp_queue_packet(rtp_queue_t* q, uint16_t seq)
- {
- struct rtp_packet_t* pkt;
- pkt = (struct rtp_packet_t*)malloc(sizeof(*pkt));
- if (pkt)
- {
- memset(pkt, 0, sizeof(*pkt));
- pkt->rtp.seq = seq;
- if (0 == rtp_queue_write(q, pkt))
- free(pkt);
- }
- return 0;
- }
-
- static void rtp_queue_test2(void)
- {
- int i;
- uint16_t seq;
- rtp_queue_t* q;
- struct rtp_packet_t* pkt;
-
- static uint16_t s_seq[1000];
-
- q = rtp_queue_create(100, 90000, rtp_packet_free, NULL);
-
- for(i = 0; i < sizeof(s_seq)/sizeof(s_seq[0]); i++)
- s_seq[i] = 45000 + i;
-
- // 45460, 45461, 45462, 45464, 45465, 45466, ...,
- // 45490, 45491, 45492, 45503, 45504, 45505, 45463,
- // 45506, 45507, 45493, 45494, 45495, 45496, 45497,
- // 45498, 45499, 45500, 45501, 45502, 45508, 45509, ...
- memmove(s_seq + 463, s_seq + 464, sizeof(s_seq[0]) * (509 - 464)); // lost 45463
- s_seq[492] = 45503;
- s_seq[493] = 45504;
- s_seq[494] = 45505;
- s_seq[495] = 45463;
- s_seq[496] = 45506;
- s_seq[497] = 45507;
- s_seq[498] = 45493;
- s_seq[499] = 45494;
- s_seq[500] = 45495;
- s_seq[501] = 45496;
- s_seq[502] = 45497;
- s_seq[503] = 45498;
- s_seq[504] = 45499;
- s_seq[505] = 45500;
- s_seq[506] = 45501;
- s_seq[507] = 45502;
- s_seq[508] = 45508;
-
- seq = s_seq[0];
- for (i = 0; i < sizeof(s_seq) / sizeof(s_seq[0]); i++)
- {
- rtp_queue_packet(q, s_seq[i]);
- pkt = rtp_queue_read(q);
- if (pkt)
- {
- //printf("%u ", pkt->rtp.seq);
- assert(0 == pkt->rtp.seq - seq++);
- free(pkt);
- }
- }
-
- rtp_queue_destroy(q);
- }
-
- void rtp_queue_test(void)
- {
- int i;
- rtp_queue_t* q;
- struct rtp_packet_t* pkt;
-
- static uint16_t s_seq[] = {
- 836, 837, 859, 860, 822, 823, 824, 825,
- 826, 822, 830, 827, 831, 828, 829, 830,
- 832, 833, 834, 6000, 840, 841, 842, 843,
- 835, 836, 837, 838, 838, 844, 859, 811,
- };
-
- rtp_queue_test2();
-
- q = rtp_queue_create(100, 90000, rtp_packet_free, NULL);
-
- for (i = 0; i < sizeof(s_seq) / sizeof(s_seq[0]); i++)
- {
- rtp_queue_packet(q, s_seq[i]);
- rtp_queue_dump(q);
- pkt = rtp_queue_read(q);
- if (pkt) free(pkt);
- rtp_queue_dump(q);
- }
-
- rtp_queue_destroy(q);
- }
- #endif
|