Skip to content

Commit

Permalink
BPK session close fix (#38)
Browse files Browse the repository at this point in the history
  • Loading branch information
volodymyr-bondarenko85 authored Jan 15, 2024
1 parent 2ea4ae2 commit ccad728
Showing 1 changed file with 62 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
import com.kaltura.playkit.PlayerEvent;
import com.kaltura.tvplayer.PKMediaEntryInterceptor;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import tv.broadpeak.smartlib.SmartLib;
Expand All @@ -31,11 +33,21 @@ public class BroadpeakPlugin extends PKPlugin implements PKMediaEntryInterceptor
private static final PKLog log = PKLog.get("BroadpeakPlugin");

private MessageBus messageBus;
private final Map<String, StreamingSession> sessionsMap = new HashMap<>();
private final Map<String, StreamingSessionInfo> sessionsMap = new HashMap<>();
private Player player;
private BroadpeakConfig config;
private Context context;

static class StreamingSessionInfo {
private final StreamingSession session;
private boolean updateMediaReceived;

public StreamingSessionInfo(StreamingSession session) {
this.session = session;
this.updateMediaReceived = false;
}
}

public static final Factory factory = new Factory() {
@Override
public String getName() {
Expand Down Expand Up @@ -93,8 +105,9 @@ protected void onLoad(final Player player, Object config, final MessageBus messa

this.messageBus.addListener(this, PlayerEvent.stopped, event -> {
log.d("PlayerEvent stopped: calling stopStreamingSession");
PlayerEvent.Stopped stoppedEvent = (PlayerEvent.Stopped)event;
// Stop the session in case of Playback stop
stopCurrentStreamingSession();
stopStreamingSession(stoppedEvent.mediaSourceUrl);
});
}

Expand Down Expand Up @@ -123,6 +136,16 @@ private void addGeneralConfig(BroadpeakConfig bpConfig) {
@Override
protected void onUpdateMedia(PKMediaConfig mediaConfig) {
log.d("Start onUpdateMedia");
if (mediaConfig != null
&& mediaConfig.getMediaEntry() != null
&& mediaConfig.getMediaEntry().getSources() != null
&& mediaConfig.getMediaEntry().getSources().get(0) != null) {
String currentSession = mediaConfig.getMediaEntry().getSources().get(0).getUrl();
onUpdateMediaReceivedForStreamingSession(currentSession);
cleanupRunningStreamingSessions(currentSession);
} else {
log.d("Empty mediaConfig, sessions cleanup skipped");
}
}

@Override
Expand Down Expand Up @@ -192,11 +215,42 @@ private void stopCurrentStreamingSession() {
private void stopStreamingSession(String sessionKey) {
log.d("stopStreamingSession called with sessionKey=[" + sessionKey + "]");
if (sessionKey != null && sessionsMap.containsKey(sessionKey)) {
StreamingSession session = sessionsMap.get(sessionKey);
if (session != null) {
session.stopStreamingSession();
StreamingSessionInfo sessionInfo = sessionsMap.get(sessionKey);
if (sessionInfo != null) {
if (sessionInfo.updateMediaReceived) {
sessionInfo.session.stopStreamingSession();
sessionsMap.remove(sessionKey);
} else {
log.d("Session not stopped, updateMediaReceived false");
}
} else {
log.d("Session not stopped, sessionInfo is null");
}
sessionsMap.remove(sessionKey);
} else {
log.d("sessionMap entry not found");
}
log.d("Finalizing stopStreamingSession call, number of active sessions = " + sessionsMap.size());
}

private void onUpdateMediaReceivedForStreamingSession(String currentSession) {
log.d("cleanupStreamingSessions called with currentSession=[" + currentSession + "]");
if (currentSession != null && sessionsMap.containsKey(currentSession)) {
StreamingSessionInfo sessionInfo = sessionsMap.get(currentSession);
if (sessionInfo != null) {
sessionInfo.updateMediaReceived = true;
}
}
}

private void cleanupRunningStreamingSessions(String currentSession) {
List<String> cleanedUpSessions = new ArrayList<>();
for (Map.Entry<String, StreamingSessionInfo> sessionEntry : sessionsMap.entrySet()) {
if (!sessionEntry.getKey().equals(currentSession)) {
cleanedUpSessions.add(sessionEntry.getKey());
}
}
for (String sessionKey : cleanedUpSessions) {
stopStreamingSession(sessionKey);
}
}

Expand Down Expand Up @@ -228,7 +282,6 @@ public void apply(PKMediaEntry mediaEntry, PKMediaEntryInterceptor.Listener list
sendBroadpeakErrorEvent(errorCode, errorMessage);
return;
}
sessionsMap.put(source.getUrl(), session);

addSessionConfig(session);
session.attachPlayer(player, messageBus);
Expand All @@ -240,13 +293,14 @@ public void apply(PKMediaEntry mediaEntry, PKMediaEntryInterceptor.Listener list
// Replace the URL
log.d("Apply New Entry URL " + mediaEntry.getName() + " - " + mediaEntry.getId() + " url: " + result.getURL());
source.setUrl(result.getURL());
sessionsMap.put(source.getUrl(), new StreamingSessionInfo(session));
} else {
// Stop the session in case of error
if (result != null) {
errorCode = result.getErrorCode();
errorMessage = result.getErrorMessage();
}
stopStreamingSession(source.getUrl());
session.stopStreamingSession();
// send event to MessageBus
sendBroadpeakErrorEvent(errorCode, errorMessage);
}
Expand Down

0 comments on commit ccad728

Please sign in to comment.