Skip to content

Commit

Permalink
Add RTCP component as submodule after re-factoring (#2027)
Browse files Browse the repository at this point in the history
* Initial Changes

* Update deserialization

* Update serialization

* Add SLI, PLI, FIR parsing

* parse sender report

* Parse receiver report

* Parse remb packet

* Update according to new RTCP code

* Update packet type functions

* Address comments

* Add convertRtcpErrorCodeTest test case

* Update according to rtcp library

* Remove unused variable

* Update function def

* Update formatting

* Add newline at end of the file

* Point to the main branch commit
  • Loading branch information
moninom1 authored Aug 8, 2024
1 parent a4e293d commit ace791c
Show file tree
Hide file tree
Showing 11 changed files with 338 additions and 66 deletions.
23 changes: 23 additions & 0 deletions CMake/Dependencies/libkvsrtcp-CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
cmake_minimum_required(VERSION 3.6.3)

project(libkvsrtcp NONE)

include(ExternalProject)
if (BUILD_STATIC_LIBS OR WIN32)
set(LIBKVSRTCP_SHARED_LIBS OFF)
else()
set(LIBKVSRTCP_SHARED_LIBS ON)
endif()

ExternalProject_Add(libkvsrtcp
GIT_REPOSITORY https://github.com/awslabs/amazon-kinesis-video-streams-rtcp.git
GIT_TAG 6c12978af7331bf156ddc2886727d2bc1b8acc96
PREFIX ${CMAKE_CURRENT_BINARY_DIR}/build
CMAKE_ARGS
-DCMAKE_INSTALL_PREFIX=${OPEN_SRC_INSTALL_PREFIX}
-DCMAKE_C_FLAGS=${CMAKE_C_FLAGS}
-DBUILD_SHARED_LIBS=${LIBKVSRTCP_SHARED_LIBS}
-DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE}
BUILD_ALWAYS TRUE
TEST_COMMAND ""
)
1 change: 1 addition & 0 deletions CMake/Utilities.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ function(build_dependency lib_name)
kvssdp
kvsstun
kvsrtp
kvsrtcp
kvssignaling
corejson)
list(FIND supported_libs ${lib_name} index)
Expand Down
3 changes: 3 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ if(BUILD_DEPENDENCIES)
build_dependency(kvssdp ${BUILD_ARGS})
build_dependency(kvsstun ${BUILD_ARGS})
build_dependency(kvsrtp ${BUILD_ARGS})
build_dependency(kvsrtcp ${BUILD_ARGS})

set(BUILD_ARGS
-DBUILD_STATIC_LIBS=${BUILD_STATIC_LIBS}
Expand Down Expand Up @@ -426,6 +427,7 @@ target_link_libraries(
kvssdp
kvsstun
kvsrtp
kvsrtcp
kvssignaling
corejson
${CMAKE_THREAD_LIBS_INIT}
Expand All @@ -452,6 +454,7 @@ target_link_libraries(
kvssdp
kvsstun
kvsrtp
kvsrtcp
kvssignaling
corejson
PRIVATE kvspicUtils
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,8 @@ extern "C" {
#define STATUS_RTCP_INPUT_PARTIAL_PACKET STATUS_RTCP_BASE + 0x00000006
#define STATUS_RTCP_INPUT_REMB_TOO_SMALL STATUS_RTCP_BASE + 0x00000007
#define STATUS_RTCP_INPUT_REMB_INVALID STATUS_RTCP_BASE + 0x00000008
#define STATUS_RTCP_UNKNOWN_ERROR STATUS_RTCP_BASE + 0x00000009

/*!@} */

/////////////////////////////////////////////////////
Expand Down
2 changes: 2 additions & 0 deletions src/source/Include_i.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ extern "C" {
#include "kvssdp/sdp_data_types.h"
#include "kvsstun/stun_data_types.h"
#include "kvsrtp/rtp_data_types.h"
#include "kvsrtcp/rtcp_data_types.h"

#ifdef KVS_USE_OPENSSL
#include <openssl/bio.h>
Expand Down Expand Up @@ -178,6 +179,7 @@ STATUS generateJSONSafeString(PCHAR, UINT32);
STATUS convertSdpErrorCode(SdpResult_t sdpResult);
STATUS convertStunErrorCode(StunResult_t stunResult);
STATUS convertRtpErrorCode(RtpResult_t rtpResult);
STATUS convertRtcpErrorCode(RtcpResult_t rtcpResult);

#ifdef __cplusplus
}
Expand Down
12 changes: 3 additions & 9 deletions src/source/PeerConnection/PeerConnection.c
Original file line number Diff line number Diff line change
Expand Up @@ -733,15 +733,9 @@ STATUS rtcpReportsCallback(UINT32 timerId, UINT64 currentTime, UINT64 customData
// SRTP_MAX_TRAILER_LEN + 4 following the actual rtcp Packet payload
allocSize = packetLen + SRTP_AUTH_TAG_OVERHEAD + SRTP_MAX_TRAILER_LEN + 4;
CHK(NULL != (rawPacket = (PBYTE) MEMALLOC(allocSize)), STATUS_NOT_ENOUGH_MEMORY);
rawPacket[0] = RTCP_PACKET_VERSION_VAL << 6;
rawPacket[RTCP_PACKET_TYPE_OFFSET] = RTCP_PACKET_TYPE_SENDER_REPORT;
putUnalignedInt16BigEndian(rawPacket + RTCP_PACKET_LEN_OFFSET,
(packetLen / RTCP_PACKET_LEN_WORD_SIZE) - 1); // The length of this RTCP packet in 32-bit words minus one
putUnalignedInt32BigEndian(rawPacket + 4, ssrc);
putUnalignedInt64BigEndian(rawPacket + 8, ntpTime);
putUnalignedInt32BigEndian(rawPacket + 16, rtpTime);
putUnalignedInt32BigEndian(rawPacket + 20, packetCount);
putUnalignedInt32BigEndian(rawPacket + 24, octetCount);

CHK_STATUS(setBytesFromRtcpValues_SenderReport(rawPacket, allocSize, ssrc, ntpTime, rtpTime, packetCount, octetCount));

CHK_STATUS(encryptRtcpPacket(pKvsPeerConnection->pSrtpSession, rawPacket, (PINT32) &packetLen));
CHK_STATUS(iceAgentSendPacket(pKvsPeerConnection->pIceAgent, rawPacket, packetLen));
}
Expand Down
180 changes: 155 additions & 25 deletions src/source/PeerConnection/Rtcp.c
Original file line number Diff line number Diff line change
@@ -1,25 +1,79 @@
#define LOG_CLASS "RtcRtcp"

#include "../Include_i.h"
#include "kvsrtcp/rtcp_api.h"

static RtcpPacketType_t getDetailedRtcpPacketType(uint8_t packetType, uint8_t receptionReportCount)
{
RtcpPacketType_t result = RTCP_PACKET_UNKNOWN;

switch (packetType) {
case RTCP_PACKET_TYPE_FIR:
if (receptionReportCount == 0) {
result = RTCP_PACKET_FIR;
}
break;
case RTCP_PACKET_TYPE_SENDER_REPORT:
result = RTCP_PACKET_SENDER_REPORT;
break;
case RTCP_PACKET_TYPE_RECEIVER_REPORT:
result = RTCP_PACKET_RECEIVER_REPORT;
break;
case RTCP_PACKET_TYPE_GENERIC_RTP_FEEDBACK:
if (receptionReportCount == RTCP_FMT_TRANSPORT_SPECIFIC_FEEDBACK_NACK) {
result = RTCP_PACKET_TRANSPORT_FEEDBACK_NACK;
} else if (receptionReportCount == RTCP_FMT_TRANSPORT_SPECIFIC_FEEDBACK_TWCC) {
result = RTCP_PACKET_TRANSPORT_FEEDBACK_TWCC;
}
break;
case RTCP_PACKET_TYPE_PAYLOAD_SPECIFIC_FEEDBACK:
if (receptionReportCount == RTCP_FMT_PAYLOAD_SPECIFIC_FEEDBACK_PLI) {
result = RTCP_PACKET_PAYLOAD_FEEDBACK_PLI;
} else if (receptionReportCount == RTCP_FMT_PAYLOAD_SPECIFIC_FEEDBACK_SLI) {
result = RTCP_PACKET_PAYLOAD_FEEDBACK_SLI;
} else if (receptionReportCount == RTCP_FMT_PAYLOAD_SPECIFIC_FEEDBACK_REMB) {
result = RTCP_PACKET_PAYLOAD_FEEDBACK_REMB;
}
break;
default:
break;
}

return result;
}

// TODO handle FIR packet https://tools.ietf.org/html/rfc2032#section-5.2.1
static STATUS onRtcpFIRPacket(PRtcpPacket pRtcpPacket, PKvsPeerConnection pKvsPeerConnection)
{
STATUS retStatus = STATUS_SUCCESS;
UINT32 mediaSSRC;
PKvsRtpTransceiver pTransceiver = NULL;
RtcpContext_t ctx;
RtcpResult_t rtcpResult;
RtcpPacket_t rtcpPacket;
RtcpFirPacket_t firPacket;

CHK(pKvsPeerConnection != NULL && pRtcpPacket != NULL, STATUS_NULL_ARG);
mediaSSRC = getUnalignedInt32BigEndian((pRtcpPacket->payload + (SIZEOF(UINT32))));
if (STATUS_SUCCEEDED(findTransceiverBySsrc(pKvsPeerConnection, &pTransceiver, mediaSSRC))) {

rtcpResult = Rtcp_Init(&ctx);
CHK(rtcpResult == RTP_RESULT_OK, convertRtcpErrorCode(rtcpResult));

rtcpPacket.header.packetType = getDetailedRtcpPacketType(pRtcpPacket->header.packetType, pRtcpPacket->header.receptionReportCount);
rtcpPacket.header.receptionReportCount = pRtcpPacket->header.receptionReportCount;
rtcpPacket.pPayload = (const PBYTE) pRtcpPacket->payload;
rtcpPacket.payloadLength = (size_t) pRtcpPacket->payloadLength;

rtcpResult = Rtcp_ParseFirPacket(&ctx, &rtcpPacket, &firPacket);
CHK(rtcpResult == RTP_RESULT_OK, convertRtcpErrorCode(rtcpResult));

if (STATUS_SUCCEEDED(findTransceiverBySsrc(pKvsPeerConnection, &pTransceiver, firPacket.senderSsrc))) {
MUTEX_LOCK(pTransceiver->statsLock);
pTransceiver->outboundStats.firCount++;
MUTEX_UNLOCK(pTransceiver->statsLock);
if (pTransceiver->onPictureLoss != NULL) {
pTransceiver->onPictureLoss(pTransceiver->onPictureLossCustomData);
}
} else {
DLOGW("Received FIR for non existing ssrc: %u", mediaSSRC);
DLOGW("Received FIR for non existing ssrc: %u", firPacket.senderSsrc);
}

CleanUp:
Expand All @@ -31,21 +85,46 @@ static STATUS onRtcpFIRPacket(PRtcpPacket pRtcpPacket, PKvsPeerConnection pKvsPe
STATUS onRtcpSLIPacket(PRtcpPacket pRtcpPacket, PKvsPeerConnection pKvsPeerConnection)
{
STATUS retStatus = STATUS_SUCCESS;
UINT32 mediaSSRC;
UINT32 noSliInfo;
PKvsRtpTransceiver pTransceiver = NULL;
RtcpContext_t ctx;
RtcpResult_t rtcpResult;
RtcpPacket_t rtcpPacket;
RtcpSliPacket_t sliPacket = {0};

CHK(pKvsPeerConnection != NULL && pRtcpPacket != NULL, STATUS_NULL_ARG);
mediaSSRC = getUnalignedInt32BigEndian((pRtcpPacket->payload + (SIZEOF(UINT32))));
if (STATUS_SUCCEEDED(findTransceiverBySsrc(pKvsPeerConnection, &pTransceiver, mediaSSRC))) {

rtcpResult = Rtcp_Init(&ctx);
CHK(rtcpResult == RTP_RESULT_OK, convertRtcpErrorCode(rtcpResult));

rtcpPacket.header.packetType = getDetailedRtcpPacketType(pRtcpPacket->header.packetType, pRtcpPacket->header.receptionReportCount);
rtcpPacket.header.receptionReportCount = pRtcpPacket->header.receptionReportCount;
rtcpPacket.pPayload = (const PBYTE) pRtcpPacket->payload;
rtcpPacket.payloadLength = (size_t) pRtcpPacket->payloadLength;

noSliInfo = (UINT32) ((rtcpPacket.payloadLength - SIZEOF(sliPacket.senderSsrc - SIZEOF(sliPacket.mediaSourceSsrc))) / 4);
if (noSliInfo > 0) {
sliPacket.numSliInfos = noSliInfo;
sliPacket.pSliInfos = MEMALLOC(noSliInfo * SIZEOF(UINT32));
}

rtcpResult = Rtcp_ParseSliPacket(&ctx, &rtcpPacket, &sliPacket);
CHK(rtcpResult == RTP_RESULT_OK, convertRtcpErrorCode(rtcpResult));

if (STATUS_SUCCEEDED(findTransceiverBySsrc(pKvsPeerConnection, &pTransceiver, sliPacket.mediaSourceSsrc))) {
MUTEX_LOCK(pTransceiver->statsLock);
pTransceiver->outboundStats.sliCount++;
MUTEX_UNLOCK(pTransceiver->statsLock);
} else {
DLOGW("Received SLI for non existing ssrc: %u", mediaSSRC);
DLOGW("Received SLI for non existing ssrc: %u", sliPacket.mediaSourceSsrc);
}

CleanUp:

if (sliPacket.pSliInfos != NULL) {
MEMFREE(sliPacket.pSliInfos);
sliPacket.pSliInfos = NULL;
}
return retStatus;
}

Expand All @@ -55,6 +134,10 @@ static STATUS onRtcpSenderReport(PRtcpPacket pRtcpPacket, PKvsPeerConnection pKv
STATUS retStatus = STATUS_SUCCESS;
UINT32 senderSSRC;
PKvsRtpTransceiver pTransceiver = NULL;
RtcpContext_t ctx;
RtcpResult_t rtcpResult;
RtcpPacket_t rtcpPacket;
RtcpSenderReport_t senderReport;

CHK(pKvsPeerConnection != NULL && pRtcpPacket != NULL, STATUS_NULL_ARG);

Expand All @@ -63,12 +146,23 @@ static STATUS onRtcpSenderReport(PRtcpPacket pRtcpPacket, PKvsPeerConnection pKv
return STATUS_SUCCESS;
}

senderSSRC = getUnalignedInt32BigEndian(pRtcpPacket->payload);
rtcpResult = Rtcp_Init(&ctx);
CHK(rtcpResult == RTP_RESULT_OK, convertRtcpErrorCode(rtcpResult));

rtcpPacket.pPayload = (const PBYTE) pRtcpPacket->payload;
rtcpPacket.payloadLength = (size_t) pRtcpPacket->payloadLength;
rtcpPacket.header.packetType = getDetailedRtcpPacketType(pRtcpPacket->header.packetType, pRtcpPacket->header.receptionReportCount);
rtcpPacket.header.receptionReportCount = pRtcpPacket->header.receptionReportCount;

rtcpResult = Rtcp_ParseSenderReport(&ctx, &rtcpPacket, &senderReport);
CHK(rtcpResult == RTP_RESULT_OK, convertRtcpErrorCode(rtcpResult));

senderSSRC = senderReport.senderSsrc;
if (STATUS_SUCCEEDED(findTransceiverBySsrc(pKvsPeerConnection, &pTransceiver, senderSSRC))) {
UINT64 ntpTime = getUnalignedInt64BigEndian(pRtcpPacket->payload + 4);
UINT32 rtpTs = getUnalignedInt32BigEndian(pRtcpPacket->payload + 12);
UINT32 packetCnt = getUnalignedInt32BigEndian(pRtcpPacket->payload + 16);
UINT32 octetCnt = getUnalignedInt32BigEndian(pRtcpPacket->payload + 20);
UINT64 ntpTime = senderReport.senderInfo.ntpTime;
UINT32 rtpTs = senderReport.senderInfo.rtpTime;
UINT32 packetCnt = senderReport.senderInfo.packetCount;
UINT32 octetCnt = senderReport.senderInfo.octetCount;
DLOGV("RTCP_PACKET_TYPE_SENDER_REPORT %d %" PRIu64 " rtpTs: %u %u pkts %u bytes", senderSSRC, ntpTime, rtpTs, packetCnt, octetCnt);
} else {
DLOGW("Received sender report for non existing ssrc: %u", senderSSRC);
Expand All @@ -86,6 +180,10 @@ static STATUS onRtcpReceiverReport(PRtcpPacket pRtcpPacket, PKvsPeerConnection p
DOUBLE fractionLost;
UINT32 rttPropDelayMsec = 0, rttPropDelay, delaySinceLastSR, lastSR, interarrivalJitter, extHiSeqNumReceived, cumulativeLost, senderSSRC, ssrc1;
UINT64 currentTimeNTP = convertTimestampToNTP(GETTIME());
RtcpContext_t ctx;
RtcpResult_t rtcpResult;
RtcpPacket_t rtcpPacket;
RtcpReceiverReport_t receiverReport = {0};

UNUSED_PARAM(rttPropDelayMsec);
UNUSED_PARAM(rttPropDelay);
Expand All @@ -103,19 +201,34 @@ static STATUS onRtcpReceiverReport(PRtcpPacket pRtcpPacket, PKvsPeerConnection p
return STATUS_SUCCESS;
}

senderSSRC = getUnalignedInt32BigEndian(pRtcpPacket->payload);
ssrc1 = getUnalignedInt32BigEndian(pRtcpPacket->payload + 4);
rtcpResult = Rtcp_Init(&ctx);
CHK(rtcpResult == RTP_RESULT_OK, convertRtcpErrorCode(rtcpResult));

rtcpPacket.pPayload = (const PBYTE) pRtcpPacket->payload;
rtcpPacket.payloadLength = (size_t) pRtcpPacket->payloadLength;
rtcpPacket.header.packetType = getDetailedRtcpPacketType(pRtcpPacket->header.packetType, pRtcpPacket->header.receptionReportCount);
rtcpPacket.header.receptionReportCount = pRtcpPacket->header.receptionReportCount;
receiverReport.numReceptionReports = pRtcpPacket->header.receptionReportCount;
if (receiverReport.numReceptionReports > 0) {
receiverReport.pReceptionReports = (RtcpReceptionReport_t*) MEMALLOC(rtcpPacket.header.receptionReportCount * sizeof(RtcpReceptionReport_t));
}

rtcpResult = Rtcp_ParseReceiverReport(&ctx, &rtcpPacket, &receiverReport);
CHK(rtcpResult == RTP_RESULT_OK, convertRtcpErrorCode(rtcpResult));

senderSSRC = receiverReport.senderSsrc;
ssrc1 = receiverReport.pReceptionReports->sourceSsrc;

if (STATUS_FAILED(findTransceiverBySsrc(pKvsPeerConnection, &pTransceiver, ssrc1))) {
DLOGW("Received receiver report for non existing ssrc: %u", ssrc1);
return STATUS_SUCCESS; // not really an error ?
}
fractionLost = pRtcpPacket->payload[8] / 255.0;
cumulativeLost = ((UINT32) getUnalignedInt32BigEndian(pRtcpPacket->payload + 8)) & 0x00ffffffu;
extHiSeqNumReceived = getUnalignedInt32BigEndian(pRtcpPacket->payload + 12);
interarrivalJitter = getUnalignedInt32BigEndian(pRtcpPacket->payload + 16);
lastSR = getUnalignedInt32BigEndian(pRtcpPacket->payload + 20);
delaySinceLastSR = getUnalignedInt32BigEndian(pRtcpPacket->payload + 24);
fractionLost = receiverReport.pReceptionReports->fractionLost / 255.0;
cumulativeLost = receiverReport.pReceptionReports->cumulativePacketsLost;
extHiSeqNumReceived = receiverReport.pReceptionReports->extendedHighestSeqNumReceived;
interarrivalJitter = receiverReport.pReceptionReports->interArrivalJitter;
lastSR = receiverReport.pReceptionReports->lastSR;
delaySinceLastSR = receiverReport.pReceptionReports->delaySinceLastSR;

DLOGS("RTCP_PACKET_TYPE_RECEIVER_REPORT %u %u loss: %u %u seq: %u jit: %u lsr: %u dlsr: %u", senderSSRC, ssrc1, fractionLost, cumulativeLost,
extHiSeqNumReceived, interarrivalJitter, lastSR, delaySinceLastSR);
Expand Down Expand Up @@ -143,6 +256,10 @@ static STATUS onRtcpReceiverReport(PRtcpPacket pRtcpPacket, PKvsPeerConnection p

CleanUp:

if (receiverReport.pReceptionReports != NULL) {
MEMFREE(receiverReport.pReceptionReports);
receiverReport.pReceptionReports = NULL;
}
return retStatus;
}

Expand Down Expand Up @@ -496,14 +613,27 @@ STATUS onRtcpRembPacket(PRtcpPacket pRtcpPacket, PKvsPeerConnection pKvsPeerConn
STATUS onRtcpPLIPacket(PRtcpPacket pRtcpPacket, PKvsPeerConnection pKvsPeerConnection)
{
STATUS retStatus = STATUS_SUCCESS;
UINT32 mediaSSRC;
PKvsRtpTransceiver pTransceiver = NULL;
RtcpContext_t ctx;
RtcpResult_t rtcpResult;
RtcpPacket_t rtcpPacket;
RtcpPliPacket_t pliPacket;

CHK(pKvsPeerConnection != NULL && pRtcpPacket != NULL, STATUS_NULL_ARG);
mediaSSRC = getUnalignedInt32BigEndian((pRtcpPacket->payload + (SIZEOF(UINT32))));

CHK_STATUS_ERR(findTransceiverBySsrc(pKvsPeerConnection, &pTransceiver, mediaSSRC), STATUS_RTCP_INPUT_SSRC_INVALID,
"Received PLI for non existing ssrc: %u", mediaSSRC);
rtcpResult = Rtcp_Init(&ctx);
CHK(rtcpResult == RTP_RESULT_OK, convertRtcpErrorCode(rtcpResult));

rtcpPacket.header.packetType = getDetailedRtcpPacketType(pRtcpPacket->header.packetType, pRtcpPacket->header.receptionReportCount);
rtcpPacket.header.receptionReportCount = pRtcpPacket->header.receptionReportCount;
rtcpPacket.pPayload = (const PBYTE) pRtcpPacket->payload;
rtcpPacket.payloadLength = (size_t) pRtcpPacket->payloadLength;

rtcpResult = Rtcp_ParsePliPacket(&ctx, &rtcpPacket, &pliPacket);
CHK(rtcpResult == RTP_RESULT_OK, convertRtcpErrorCode(rtcpResult));

CHK_STATUS_ERR(findTransceiverBySsrc(pKvsPeerConnection, &pTransceiver, pliPacket.mediaSourceSsrc), STATUS_RTCP_INPUT_SSRC_INVALID,
"Received PLI for non existing ssrc: %u", pliPacket.mediaSourceSsrc);

MUTEX_LOCK(pTransceiver->statsLock);
pTransceiver->outboundStats.pliCount++;
Expand Down
Loading

0 comments on commit ace791c

Please sign in to comment.