-
Notifications
You must be signed in to change notification settings - Fork 38
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
JoinStorageSession retries #94
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,6 @@ | ||
package com.amazonaws.kinesisvideo.demoapp.fragment; | ||
|
||
import android.Manifest; | ||
import android.app.Activity; | ||
import android.content.Intent; | ||
import android.content.pm.PackageManager; | ||
import android.os.AsyncTask; | ||
|
@@ -11,15 +10,13 @@ | |
import android.view.LayoutInflater; | ||
import android.view.View; | ||
import android.view.ViewGroup; | ||
import android.view.inputmethod.InputMethodManager; | ||
import android.widget.ArrayAdapter; | ||
import android.widget.Button; | ||
import android.widget.CheckBox; | ||
import android.widget.CheckedTextView; | ||
import android.widget.EditText; | ||
import android.widget.ListView; | ||
import android.widget.Spinner; | ||
import android.widget.Toast; | ||
|
||
import androidx.annotation.NonNull; | ||
import androidx.annotation.Nullable; | ||
|
@@ -32,6 +29,7 @@ | |
import com.amazonaws.kinesisvideo.demoapp.R; | ||
import com.amazonaws.kinesisvideo.demoapp.activity.SimpleNavActivity; | ||
import com.amazonaws.kinesisvideo.demoapp.activity.WebRtcActivity; | ||
import com.amazonaws.kinesisvideo.utils.Constants; | ||
import com.amazonaws.regions.Region; | ||
import com.amazonaws.services.kinesisvideo.AWSKinesisVideoClient; | ||
import com.amazonaws.services.kinesisvideo.model.ChannelRole; | ||
|
@@ -55,7 +53,6 @@ | |
import java.util.ArrayList; | ||
import java.util.Arrays; | ||
import java.util.List; | ||
import java.util.Optional; | ||
|
||
public class StreamWebRtcConfigurationFragment extends Fragment { | ||
private static final String TAG = StreamWebRtcConfigurationFragment.class.getSimpleName(); | ||
|
@@ -76,6 +73,8 @@ public class StreamWebRtcConfigurationFragment extends Fragment { | |
|
||
private static final String KEY_SEND_VIDEO = "sendVideo"; | ||
public static final String KEY_SEND_AUDIO = "sendAudio"; | ||
private static final String KEY_USE_TURN = "useTurn"; | ||
public static final String KEY_USE_STUN = "useStun"; | ||
|
||
private static final String[] WEBRTC_OPTIONS = { | ||
"Send Video", | ||
|
@@ -92,7 +91,9 @@ public class StreamWebRtcConfigurationFragment extends Fragment { | |
private EditText mClientId; | ||
private EditText mRegion; | ||
private Spinner mCameras; | ||
private CheckBox mIngestMedia; | ||
private CheckBox mMediaIngestionFeature; | ||
private CheckBox mUseTurn; | ||
private CheckBox mUseStun; | ||
private final List<ResourceEndpointListItem> mEndpointList = new ArrayList<>(); | ||
private final List<IceServer> mIceServerList = new ArrayList<>(); | ||
private String mChannelArn = null; | ||
|
@@ -133,7 +134,9 @@ public void onViewCreated(final View view, Bundle savedInstanceState) { | |
mChannelName = view.findViewById(R.id.channel_name); | ||
mClientId = view.findViewById(R.id.client_id); | ||
mRegion = view.findViewById(R.id.region); | ||
mIngestMedia = view.findViewById(R.id.ingest_media); | ||
mMediaIngestionFeature = view.findViewById(R.id.ingest_media); | ||
mUseTurn = view.findViewById(R.id.use_turn); | ||
mUseStun = view.findViewById(R.id.use_stun); | ||
setRegionFromCognito(); | ||
|
||
mOptions = view.findViewById(R.id.webrtc_options); | ||
|
@@ -195,22 +198,6 @@ public void onClick(final View view) { | |
} | ||
|
||
private void startMasterActivity() { | ||
|
||
if (mIngestMedia.isChecked()) { | ||
// Check that the "Send Audio" and "Send Video" boxes are enabled. | ||
final SparseBooleanArray checked = mOptions.getCheckedItemPositions(); | ||
for (int i = 0; i < mOptions.getCount(); i++) { | ||
if (!checked.get(i)) { | ||
new AlertDialog.Builder(getActivity()) | ||
.setPositiveButton("OK", null) | ||
.setMessage("Audio and video must be sent to ingest media!") | ||
.create() | ||
.show(); | ||
return; | ||
} | ||
} | ||
} | ||
|
||
if (!updateSignalingChannelInfo(mRegion.getText().toString(), | ||
mChannelName.getText().toString(), | ||
ChannelRole.MASTER)) { | ||
|
@@ -298,6 +285,9 @@ private Bundle setExtras(boolean isMaster) { | |
extras.putBoolean(KEY_OF_OPTIONS[i], checked.get(i)); | ||
} | ||
|
||
extras.putBoolean(KEY_USE_TURN, mUseTurn.isChecked()); | ||
extras.putBoolean(KEY_USE_STUN, mUseStun.isChecked()); | ||
|
||
extras.putBoolean(KEY_CAMERA_FRONT_FACING, mCameras.getSelectedItem().equals("Front Camera")); | ||
|
||
return extras; | ||
|
@@ -367,6 +357,13 @@ protected String doInBackground(final Object... objects) { | |
final String channelName = (String) objects[1]; | ||
final ChannelRole role = (ChannelRole) objects[2]; | ||
|
||
final Region regionObj = Region.getRegion(region); | ||
|
||
// Validate region | ||
if (regionObj == null) { | ||
return "The region: " + region + " is invalid or not supported!"; | ||
} | ||
|
||
// Step 1. Create Kinesis Video Client | ||
final AWSKinesisVideoClient awsKinesisVideoClient; | ||
try { | ||
|
@@ -407,27 +404,60 @@ protected String doInBackground(final Object... objects) { | |
// Step 3. If we are ingesting media, we need to check if the Signaling Channel has a Kinesis Video | ||
// Stream configured to write media to. We can call the DescribeMediaStorageConfiguration API | ||
// to determine this. | ||
if (role == ChannelRole.MASTER && mFragment.get().mIngestMedia.isChecked()) { | ||
if (role == ChannelRole.MASTER && mFragment.get().mMediaIngestionFeature.isChecked()) { | ||
|
||
if (!Constants.INGESTION_PREVIEW_REGIONS.contains(regionObj)) { | ||
return "The media ingestion feature is not supported in " + regionObj.getName() + ". It is only supported in " + Constants.INGESTION_PREVIEW_REGIONS; | ||
} | ||
|
||
|
||
try { | ||
final DescribeMediaStorageConfigurationResult describeMediaStorageConfigurationResult = awsKinesisVideoClient.describeMediaStorageConfiguration( | ||
new DescribeMediaStorageConfigurationRequest() | ||
.withChannelARN(mFragment.get().mChannelArn)); | ||
|
||
if ("ENABLED".equalsIgnoreCase(describeMediaStorageConfigurationResult.getMediaStorageConfiguration().getStatus())) { | ||
Log.i(TAG, "Media storage is enabled for this channel."); | ||
|
||
// Check that the "Send Audio" and "Send Video" boxes are enabled. | ||
final SparseBooleanArray checked = mFragment.get().mOptions.getCheckedItemPositions(); | ||
for (int i = 0; i < mFragment.get().mOptions.getCount(); i++) { | ||
if (!checked.get(i)) { | ||
return "Audio and video must be sent to ingest media!"; | ||
} | ||
} | ||
|
||
mFragment.get().mStreamArn = describeMediaStorageConfigurationResult.getMediaStorageConfiguration().getStreamARN(); | ||
} else { | ||
Log.i(TAG, "Media storage is not enabled for this channel!"); | ||
} | ||
} catch (Exception ex) { | ||
return "Describe Media Storage Configuration failed with exception " + ex.getLocalizedMessage(); | ||
} | ||
} else if (role == ChannelRole.VIEWER && mFragment.get().mMediaIngestionFeature.isChecked() && Constants.INGESTION_PREVIEW_REGIONS.contains(regionObj)) { | ||
Log.i(TAG, "Checking if the signaling channel is in media service mode."); | ||
try { | ||
final DescribeMediaStorageConfigurationResult describeMediaStorageConfigurationResult = awsKinesisVideoClient.describeMediaStorageConfiguration( | ||
new DescribeMediaStorageConfigurationRequest() | ||
.withChannelARN(mFragment.get().mChannelArn)); | ||
|
||
if (!"ENABLED".equalsIgnoreCase(describeMediaStorageConfigurationResult.getMediaStorageConfiguration().getStatus())) { | ||
Log.e(TAG, "Media storage is not enabled for this channel!"); | ||
return "Media Storage is DISABLED for this channel!"; | ||
if ("ENABLED".equalsIgnoreCase(describeMediaStorageConfigurationResult.getMediaStorageConfiguration().getStatus())) { | ||
return "This signaling channel is configured for WebRTC Ingestion. Regular peer-to-peer connections can no longer occur."; | ||
} | ||
mFragment.get().mStreamArn = describeMediaStorageConfigurationResult.getMediaStorageConfiguration().getStreamARN(); | ||
} catch (Exception ex) { | ||
return "Describe Media Storage Configuration failed with Exception " + ex.getLocalizedMessage(); | ||
return "Describe Media Storage Configuration failed with exception " + ex.getLocalizedMessage(); | ||
} | ||
} else { | ||
mFragment.get().mStreamArn = null; | ||
} | ||
|
||
final String[] protocols; | ||
if (mFragment.get().mIngestMedia.isChecked()) { | ||
protocols = new String[]{"WSS", "HTTPS", "WEBRTC"}; | ||
} else { | ||
if (mFragment.get().mStreamArn == null) { | ||
// Regular WebRTC | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. WebRTC P2P without Ingestion |
||
protocols = new String[]{"WSS", "HTTPS"}; | ||
} else { | ||
// Media ingestion mode | ||
protocols = new String[]{"WSS", "HTTPS", "WEBRTC"}; | ||
} | ||
|
||
// Step 4. Use the Kinesis Video Client to call GetSignalingChannelEndpoint. | ||
|
@@ -451,25 +481,29 @@ protected String doInBackground(final Object... objects) { | |
return "Get Signaling Endpoint failed with Exception " + e.getLocalizedMessage(); | ||
} | ||
|
||
String dataEndpoint = null; | ||
for (ResourceEndpointListItem endpoint : mFragment.get().mEndpointList) { | ||
if (endpoint.getProtocol().equals("HTTPS")) { | ||
dataEndpoint = endpoint.getResourceEndpoint(); | ||
if (mFragment.get().mUseTurn.isChecked()) { | ||
String dataEndpoint = null; | ||
for (ResourceEndpointListItem endpoint : mFragment.get().mEndpointList) { | ||
if (endpoint.getProtocol().equals("HTTPS")) { | ||
dataEndpoint = endpoint.getResourceEndpoint(); | ||
} | ||
} | ||
} | ||
|
||
// Step 5. Construct the Kinesis Video Signaling Client. The HTTPS endpoint from the | ||
// GetSignalingChannelEndpoint response above is used with this client. This | ||
// client is just used for getting ICE servers, not for actual signaling. | ||
// Step 6. Call GetIceServerConfig in order to obtain TURN ICE server info. | ||
// Note: the STUN endpoint will be `stun:stun.kinesisvideo.${region}.amazonaws.com:443` | ||
try { | ||
final AWSKinesisVideoSignalingClient awsKinesisVideoSignalingClient = mFragment.get().getAwsKinesisVideoSignalingClient(region, dataEndpoint); | ||
GetIceServerConfigResult getIceServerConfigResult = awsKinesisVideoSignalingClient.getIceServerConfig( | ||
new GetIceServerConfigRequest().withChannelARN(mFragment.get().mChannelArn).withClientId(role.name())); | ||
mFragment.get().mIceServerList.addAll(getIceServerConfigResult.getIceServerList()); | ||
} catch (Exception e) { | ||
return "Get Ice Server Config failed with Exception " + e.getLocalizedMessage(); | ||
// Step 5. Construct the Kinesis Video Signaling Client. The HTTPS endpoint from the | ||
// GetSignalingChannelEndpoint response above is used with this client. This | ||
// client is just used for getting ICE servers, not for actual signaling. | ||
// Step 6. Call GetIceServerConfig in order to obtain TURN ICE server info. | ||
// Note: the STUN endpoint will be `stun:stun.kinesisvideo.${region}.amazonaws.com:443` | ||
try { | ||
final AWSKinesisVideoSignalingClient awsKinesisVideoSignalingClient = mFragment.get().getAwsKinesisVideoSignalingClient(region, dataEndpoint); | ||
GetIceServerConfigResult getIceServerConfigResult = awsKinesisVideoSignalingClient.getIceServerConfig( | ||
new GetIceServerConfigRequest().withChannelARN(mFragment.get().mChannelArn).withClientId(role.name())); | ||
mFragment.get().mIceServerList.addAll(getIceServerConfigResult.getIceServerList()); | ||
} catch (Exception e) { | ||
return "Get Ice Server Config failed with Exception " + e.getLocalizedMessage(); | ||
} | ||
} else { | ||
Log.i(TAG, "Not fetching TURN servers."); | ||
} | ||
|
||
return null; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,8 +28,13 @@ public class SignalingServiceWebSocketClient { | |
|
||
public SignalingServiceWebSocketClient(final String uri, final SignalingListener signalingListener, | ||
final ExecutorService executorService) { | ||
this(uri, signalingListener, executorService, 0L); | ||
} | ||
|
||
public SignalingServiceWebSocketClient(final String uri, final SignalingListener signalingListener, | ||
final ExecutorService executorService, final long pingIntervalSeconds) { | ||
Log.d(TAG, "Connecting to URI " + uri + " as master"); | ||
websocketClient = new WebSocketClient(uri, new ClientManager(), signalingListener, executorService); | ||
websocketClient = new WebSocketClient(uri, new ClientManager(), signalingListener, executorService, pingIntervalSeconds); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. probably a note for keepalive for websocket to continue to offer after x minutes? |
||
this.executorService = executorService; | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,10 +15,14 @@ | |
import java.io.IOException; | ||
import java.net.URI; | ||
import java.net.URISyntaxException; | ||
import java.nio.ByteBuffer; | ||
import java.nio.charset.StandardCharsets; | ||
import java.util.Collections; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.concurrent.ExecutorService; | ||
import java.util.concurrent.Executors; | ||
import java.util.concurrent.ScheduledExecutorService; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
import javax.websocket.ClientEndpointConfig; | ||
|
@@ -40,9 +44,12 @@ class WebSocketClient { | |
|
||
private final ExecutorService executorService; | ||
|
||
private final ScheduledExecutorService pingService; | ||
|
||
WebSocketClient(final String uri, final ClientManager clientManager, | ||
final SignalingListener signalingListener, | ||
final ExecutorService executorService) { | ||
final ExecutorService executorService, | ||
final long pingIntervalSeconds) { | ||
|
||
this.executorService = executorService; | ||
final ClientEndpointConfig cec = ClientEndpointConfig.Builder.create() | ||
|
@@ -99,6 +106,38 @@ public void onError(final Session session, final Throwable thr) { | |
}); | ||
|
||
await().atMost(10, TimeUnit.SECONDS).until(WebSocketClient.this::isOpen); | ||
|
||
if (pingIntervalSeconds > 0) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nice! |
||
pingService = Executors.newSingleThreadScheduledExecutor(); | ||
|
||
pingService.scheduleAtFixedRate(() -> { | ||
try { | ||
if (session == null || !session.isOpen()) { | ||
Log.e(TAG, "Unable to send ping. Session has been closed."); | ||
if (!pingService.isShutdown()) { | ||
pingService.shutdown(); | ||
} | ||
return; | ||
} | ||
session.getAsyncRemote().sendPing(ByteBuffer.wrap("".getBytes(StandardCharsets.UTF_8))); | ||
Log.i(TAG, "Sent ping! Sending another in " + pingIntervalSeconds + " seconds."); | ||
} catch (final Exception ex) { | ||
Log.e(TAG, "Exception sending ping message", ex); | ||
} | ||
}, 0, pingIntervalSeconds, TimeUnit.SECONDS); | ||
|
||
Log.i(TAG, "Sending a ping keep-alive message every " + pingIntervalSeconds + " seconds."); | ||
} else { | ||
pingService = null; | ||
Log.d(TAG, "Not sending any pings."); | ||
} | ||
} | ||
|
||
WebSocketClient(final String uri, final ClientManager clientManager, | ||
final SignalingListener signalingListener, | ||
final ExecutorService executorService) { | ||
|
||
this(uri, clientManager, signalingListener, executorService, 0L); | ||
} | ||
|
||
boolean isOpen() { | ||
|
@@ -129,14 +168,19 @@ void disconnect() { | |
return; | ||
} | ||
|
||
if (!session.isOpen()) { | ||
Log.w(TAG, "Connection already closed for " + session.getRequestURI()); | ||
return; | ||
if (pingService != null && !pingService.isShutdown()) { | ||
pingService.shutdownNow(); | ||
} | ||
|
||
try { | ||
session.close(); | ||
executorService.shutdownNow(); | ||
if (!session.isOpen()) { | ||
Log.w(TAG, "Connection already closed for " + session.getRequestURI()); | ||
} else { | ||
session.close(); | ||
} | ||
if (!executorService.isShutdown()) { | ||
executorService.shutdownNow(); | ||
} | ||
Log.i(TAG, "Disconnected from " + session.getRequestURI() + " successfully!"); | ||
} catch (final IOException e) { | ||
Log.e(TAG, "Exception closing: " + e.getMessage()); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
throw exception?