Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JoinStorageSession retries #94

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.amazonaws.kinesisvideo.demoapp.fragment;

import android.Manifest;
import android.app.Activity;
import android.content.Intent;
import android.content.pm.PackageManager;
import android.os.AsyncTask;
Expand All @@ -11,15 +10,13 @@
import android.view.LayoutInflater;
import android.view.View;
import android.view.ViewGroup;
import android.view.inputmethod.InputMethodManager;
import android.widget.ArrayAdapter;
import android.widget.Button;
import android.widget.CheckBox;
import android.widget.CheckedTextView;
import android.widget.EditText;
import android.widget.ListView;
import android.widget.Spinner;
import android.widget.Toast;

import androidx.annotation.NonNull;
import androidx.annotation.Nullable;
Expand All @@ -32,6 +29,7 @@
import com.amazonaws.kinesisvideo.demoapp.R;
import com.amazonaws.kinesisvideo.demoapp.activity.SimpleNavActivity;
import com.amazonaws.kinesisvideo.demoapp.activity.WebRtcActivity;
import com.amazonaws.kinesisvideo.utils.Constants;
import com.amazonaws.regions.Region;
import com.amazonaws.services.kinesisvideo.AWSKinesisVideoClient;
import com.amazonaws.services.kinesisvideo.model.ChannelRole;
Expand All @@ -55,7 +53,6 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;

public class StreamWebRtcConfigurationFragment extends Fragment {
private static final String TAG = StreamWebRtcConfigurationFragment.class.getSimpleName();
Expand All @@ -76,6 +73,8 @@ public class StreamWebRtcConfigurationFragment extends Fragment {

private static final String KEY_SEND_VIDEO = "sendVideo";
public static final String KEY_SEND_AUDIO = "sendAudio";
private static final String KEY_USE_TURN = "useTurn";
public static final String KEY_USE_STUN = "useStun";

private static final String[] WEBRTC_OPTIONS = {
"Send Video",
Expand All @@ -92,7 +91,9 @@ public class StreamWebRtcConfigurationFragment extends Fragment {
private EditText mClientId;
private EditText mRegion;
private Spinner mCameras;
private CheckBox mIngestMedia;
private CheckBox mMediaIngestionFeature;
private CheckBox mUseTurn;
private CheckBox mUseStun;
private final List<ResourceEndpointListItem> mEndpointList = new ArrayList<>();
private final List<IceServer> mIceServerList = new ArrayList<>();
private String mChannelArn = null;
Expand Down Expand Up @@ -133,7 +134,9 @@ public void onViewCreated(final View view, Bundle savedInstanceState) {
mChannelName = view.findViewById(R.id.channel_name);
mClientId = view.findViewById(R.id.client_id);
mRegion = view.findViewById(R.id.region);
mIngestMedia = view.findViewById(R.id.ingest_media);
mMediaIngestionFeature = view.findViewById(R.id.ingest_media);
mUseTurn = view.findViewById(R.id.use_turn);
mUseStun = view.findViewById(R.id.use_stun);
setRegionFromCognito();

mOptions = view.findViewById(R.id.webrtc_options);
Expand Down Expand Up @@ -195,22 +198,6 @@ public void onClick(final View view) {
}

private void startMasterActivity() {

if (mIngestMedia.isChecked()) {
// Check that the "Send Audio" and "Send Video" boxes are enabled.
final SparseBooleanArray checked = mOptions.getCheckedItemPositions();
for (int i = 0; i < mOptions.getCount(); i++) {
if (!checked.get(i)) {
new AlertDialog.Builder(getActivity())
.setPositiveButton("OK", null)
.setMessage("Audio and video must be sent to ingest media!")
.create()
.show();
return;
}
}
}

if (!updateSignalingChannelInfo(mRegion.getText().toString(),
mChannelName.getText().toString(),
ChannelRole.MASTER)) {
Expand Down Expand Up @@ -298,6 +285,9 @@ private Bundle setExtras(boolean isMaster) {
extras.putBoolean(KEY_OF_OPTIONS[i], checked.get(i));
}

extras.putBoolean(KEY_USE_TURN, mUseTurn.isChecked());
extras.putBoolean(KEY_USE_STUN, mUseStun.isChecked());

extras.putBoolean(KEY_CAMERA_FRONT_FACING, mCameras.getSelectedItem().equals("Front Camera"));

return extras;
Expand Down Expand Up @@ -367,6 +357,13 @@ protected String doInBackground(final Object... objects) {
final String channelName = (String) objects[1];
final ChannelRole role = (ChannelRole) objects[2];

final Region regionObj = Region.getRegion(region);

// Validate region
if (regionObj == null) {
return "The region: " + region + " is invalid or not supported!";
}

// Step 1. Create Kinesis Video Client
final AWSKinesisVideoClient awsKinesisVideoClient;
try {
Expand Down Expand Up @@ -407,27 +404,60 @@ protected String doInBackground(final Object... objects) {
// Step 3. If we are ingesting media, we need to check if the Signaling Channel has a Kinesis Video
// Stream configured to write media to. We can call the DescribeMediaStorageConfiguration API
// to determine this.
if (role == ChannelRole.MASTER && mFragment.get().mIngestMedia.isChecked()) {
if (role == ChannelRole.MASTER && mFragment.get().mMediaIngestionFeature.isChecked()) {

if (!Constants.INGESTION_PREVIEW_REGIONS.contains(regionObj)) {
return "The media ingestion feature is not supported in " + regionObj.getName() + ". It is only supported in " + Constants.INGESTION_PREVIEW_REGIONS;
}


try {
final DescribeMediaStorageConfigurationResult describeMediaStorageConfigurationResult = awsKinesisVideoClient.describeMediaStorageConfiguration(
new DescribeMediaStorageConfigurationRequest()
.withChannelARN(mFragment.get().mChannelArn));

if ("ENABLED".equalsIgnoreCase(describeMediaStorageConfigurationResult.getMediaStorageConfiguration().getStatus())) {
Log.i(TAG, "Media storage is enabled for this channel.");

// Check that the "Send Audio" and "Send Video" boxes are enabled.
final SparseBooleanArray checked = mFragment.get().mOptions.getCheckedItemPositions();
for (int i = 0; i < mFragment.get().mOptions.getCount(); i++) {
if (!checked.get(i)) {
return "Audio and video must be sent to ingest media!";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throw exception?

}
}

mFragment.get().mStreamArn = describeMediaStorageConfigurationResult.getMediaStorageConfiguration().getStreamARN();
} else {
Log.i(TAG, "Media storage is not enabled for this channel!");
}
} catch (Exception ex) {
return "Describe Media Storage Configuration failed with exception " + ex.getLocalizedMessage();
}
} else if (role == ChannelRole.VIEWER && mFragment.get().mMediaIngestionFeature.isChecked() && Constants.INGESTION_PREVIEW_REGIONS.contains(regionObj)) {
Log.i(TAG, "Checking if the signaling channel is in media service mode.");
try {
final DescribeMediaStorageConfigurationResult describeMediaStorageConfigurationResult = awsKinesisVideoClient.describeMediaStorageConfiguration(
new DescribeMediaStorageConfigurationRequest()
.withChannelARN(mFragment.get().mChannelArn));

if (!"ENABLED".equalsIgnoreCase(describeMediaStorageConfigurationResult.getMediaStorageConfiguration().getStatus())) {
Log.e(TAG, "Media storage is not enabled for this channel!");
return "Media Storage is DISABLED for this channel!";
if ("ENABLED".equalsIgnoreCase(describeMediaStorageConfigurationResult.getMediaStorageConfiguration().getStatus())) {
return "This signaling channel is configured for WebRTC Ingestion. Regular peer-to-peer connections can no longer occur.";
}
mFragment.get().mStreamArn = describeMediaStorageConfigurationResult.getMediaStorageConfiguration().getStreamARN();
} catch (Exception ex) {
return "Describe Media Storage Configuration failed with Exception " + ex.getLocalizedMessage();
return "Describe Media Storage Configuration failed with exception " + ex.getLocalizedMessage();
}
} else {
mFragment.get().mStreamArn = null;
}

final String[] protocols;
if (mFragment.get().mIngestMedia.isChecked()) {
protocols = new String[]{"WSS", "HTTPS", "WEBRTC"};
} else {
if (mFragment.get().mStreamArn == null) {
// Regular WebRTC
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WebRTC P2P without Ingestion

protocols = new String[]{"WSS", "HTTPS"};
} else {
// Media ingestion mode
protocols = new String[]{"WSS", "HTTPS", "WEBRTC"};
}

// Step 4. Use the Kinesis Video Client to call GetSignalingChannelEndpoint.
Expand All @@ -451,25 +481,29 @@ protected String doInBackground(final Object... objects) {
return "Get Signaling Endpoint failed with Exception " + e.getLocalizedMessage();
}

String dataEndpoint = null;
for (ResourceEndpointListItem endpoint : mFragment.get().mEndpointList) {
if (endpoint.getProtocol().equals("HTTPS")) {
dataEndpoint = endpoint.getResourceEndpoint();
if (mFragment.get().mUseTurn.isChecked()) {
String dataEndpoint = null;
for (ResourceEndpointListItem endpoint : mFragment.get().mEndpointList) {
if (endpoint.getProtocol().equals("HTTPS")) {
dataEndpoint = endpoint.getResourceEndpoint();
}
}
}

// Step 5. Construct the Kinesis Video Signaling Client. The HTTPS endpoint from the
// GetSignalingChannelEndpoint response above is used with this client. This
// client is just used for getting ICE servers, not for actual signaling.
// Step 6. Call GetIceServerConfig in order to obtain TURN ICE server info.
// Note: the STUN endpoint will be `stun:stun.kinesisvideo.${region}.amazonaws.com:443`
try {
final AWSKinesisVideoSignalingClient awsKinesisVideoSignalingClient = mFragment.get().getAwsKinesisVideoSignalingClient(region, dataEndpoint);
GetIceServerConfigResult getIceServerConfigResult = awsKinesisVideoSignalingClient.getIceServerConfig(
new GetIceServerConfigRequest().withChannelARN(mFragment.get().mChannelArn).withClientId(role.name()));
mFragment.get().mIceServerList.addAll(getIceServerConfigResult.getIceServerList());
} catch (Exception e) {
return "Get Ice Server Config failed with Exception " + e.getLocalizedMessage();
// Step 5. Construct the Kinesis Video Signaling Client. The HTTPS endpoint from the
// GetSignalingChannelEndpoint response above is used with this client. This
// client is just used for getting ICE servers, not for actual signaling.
// Step 6. Call GetIceServerConfig in order to obtain TURN ICE server info.
// Note: the STUN endpoint will be `stun:stun.kinesisvideo.${region}.amazonaws.com:443`
try {
final AWSKinesisVideoSignalingClient awsKinesisVideoSignalingClient = mFragment.get().getAwsKinesisVideoSignalingClient(region, dataEndpoint);
GetIceServerConfigResult getIceServerConfigResult = awsKinesisVideoSignalingClient.getIceServerConfig(
new GetIceServerConfigRequest().withChannelARN(mFragment.get().mChannelArn).withClientId(role.name()));
mFragment.get().mIceServerList.addAll(getIceServerConfigResult.getIceServerList());
} catch (Exception e) {
return "Get Ice Server Config failed with Exception " + e.getLocalizedMessage();
}
} else {
Log.i(TAG, "Not fetching TURN servers.");
}

return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public void onMessage(final String message) {
switch (evt.getMessageType().toUpperCase()) {
case "SDP_OFFER":
Log.d(TAG, "Offer received: SenderClientId=" + evt.getSenderClientId());
Log.d(TAG, new String(Base64.decode(evt.getMessagePayload(), 0)));
Log.d(TAG, new String(Base64.decode(evt.getMessagePayload(), Base64.DEFAULT)));

onSdpOffer(evt);
break;
Expand All @@ -49,7 +49,7 @@ public void onMessage(final String message) {
break;
case "ICE_CANDIDATE":
Log.d(TAG, "Ice Candidate received: SenderClientId=" + evt.getSenderClientId());
Log.d(TAG, new String(Base64.decode(evt.getMessagePayload(), 0)));
Log.d(TAG, new String(Base64.decode(evt.getMessagePayload(), Base64.DEFAULT)));

onIceCandidate(evt);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,13 @@ public class SignalingServiceWebSocketClient {

public SignalingServiceWebSocketClient(final String uri, final SignalingListener signalingListener,
final ExecutorService executorService) {
this(uri, signalingListener, executorService, 0L);
}

public SignalingServiceWebSocketClient(final String uri, final SignalingListener signalingListener,
final ExecutorService executorService, final long pingIntervalSeconds) {
Log.d(TAG, "Connecting to URI " + uri + " as master");
websocketClient = new WebSocketClient(uri, new ClientManager(), signalingListener, executorService);
websocketClient = new WebSocketClient(uri, new ClientManager(), signalingListener, executorService, pingIntervalSeconds);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably a note for keepalive for websocket to continue to offer after x minutes?

this.executorService = executorService;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,14 @@
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import javax.websocket.ClientEndpointConfig;
Expand All @@ -40,9 +44,12 @@ class WebSocketClient {

private final ExecutorService executorService;

private final ScheduledExecutorService pingService;

WebSocketClient(final String uri, final ClientManager clientManager,
final SignalingListener signalingListener,
final ExecutorService executorService) {
final ExecutorService executorService,
final long pingIntervalSeconds) {

this.executorService = executorService;
final ClientEndpointConfig cec = ClientEndpointConfig.Builder.create()
Expand Down Expand Up @@ -99,6 +106,38 @@ public void onError(final Session session, final Throwable thr) {
});

await().atMost(10, TimeUnit.SECONDS).until(WebSocketClient.this::isOpen);

if (pingIntervalSeconds > 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!

pingService = Executors.newSingleThreadScheduledExecutor();

pingService.scheduleAtFixedRate(() -> {
try {
if (session == null || !session.isOpen()) {
Log.e(TAG, "Unable to send ping. Session has been closed.");
if (!pingService.isShutdown()) {
pingService.shutdown();
}
return;
}
session.getAsyncRemote().sendPing(ByteBuffer.wrap("".getBytes(StandardCharsets.UTF_8)));
Log.i(TAG, "Sent ping! Sending another in " + pingIntervalSeconds + " seconds.");
} catch (final Exception ex) {
Log.e(TAG, "Exception sending ping message", ex);
}
}, 0, pingIntervalSeconds, TimeUnit.SECONDS);

Log.i(TAG, "Sending a ping keep-alive message every " + pingIntervalSeconds + " seconds.");
} else {
pingService = null;
Log.d(TAG, "Not sending any pings.");
}
}

WebSocketClient(final String uri, final ClientManager clientManager,
final SignalingListener signalingListener,
final ExecutorService executorService) {

this(uri, clientManager, signalingListener, executorService, 0L);
}

boolean isOpen() {
Expand Down Expand Up @@ -129,14 +168,19 @@ void disconnect() {
return;
}

if (!session.isOpen()) {
Log.w(TAG, "Connection already closed for " + session.getRequestURI());
return;
if (pingService != null && !pingService.isShutdown()) {
pingService.shutdownNow();
}

try {
session.close();
executorService.shutdownNow();
if (!session.isOpen()) {
Log.w(TAG, "Connection already closed for " + session.getRequestURI());
} else {
session.close();
}
if (!executorService.isShutdown()) {
executorService.shutdownNow();
}
Log.i(TAG, "Disconnected from " + session.getRequestURI() + " successfully!");
} catch (final IOException e) {
Log.e(TAG, "Exception closing: " + e.getMessage());
Expand Down
Loading