Skip to content

Commit

Permalink
Release 1.9.4 (9 July 2019)
Browse files Browse the repository at this point in the history
    License update: KVS SDK is under Apache 2.0 license now.
    Stablization updates in C layer.
    Added internal retry logic to handle error case that SDK can recover, i.e. network unstability.
    Skip over error fragments - SDK will continue skip any invalid fragments are ingested through SDK earlier and continue streaming.
    Automatic CPD (codec private data) extraction from the stream when CPD is part of the first H264 AnnexB frame.
  • Loading branch information
chehefen committed Jul 10, 2019
1 parent 09a1ed2 commit 026f890
Show file tree
Hide file tree
Showing 22 changed files with 659 additions and 142 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,6 @@
.settings
/eclipse-bin/
.idea
*.key
*.gpg
cm-file
9 changes: 8 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ The Amazon Kinesis Video Streams Producer SDK Java makes it easy to build an on-

* [Developer Guide](https://docs.aws.amazon.com/kinesisvideostreams/latest/dg/producer-sdk-javaapi.html) - For in-depth getting started and usage information.
* [Release Notes](https://github.com/awslabs/amazon-kinesis-video-streams-producer-sdk-java/releases) - To see the latest features, bug fixes, and changes in the SDK
* [Issues](https://github.com/awslabs/amazon-kinesis-video-streams-producer-sdk-java/issues) - Report issues and submit pull requests
* [Issues](https://github.com/awslabs/amazon-kinesis-video-streams-producer-sdk-java/issues) - Report issues and submit pull requests


### Prerequisites
Expand Down Expand Up @@ -132,6 +132,13 @@ export LD_LIBRARY_PATH=/<YOUR_PRODUCER_SDK_CPP_DOWNLOAD>/amazon-kinesis-video-st
This should resolve native library loading issues.

## Release Notes
### Release 1.9.4 (9 July 2019)
* License update: KVS SDK is under Apache 2.0 license now.
* Stablization updates in C layer.
* Added internal retry logic to handle error case that SDK can recover, i.e. network unstability.
* Skip over error fragments - SDK will continue skip any invalid fragments are ingested through SDK earlier and continue streaming.
* Automatic CPD (codec private data) extraction from the stream when CPD is part of the first H264 AnnexB frame.

### Release 1.9.3 (12 March 2019)
* Bug fix to avoid crash due to access to freed native stream object.

Expand Down
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,12 @@
<artifactId>commons-io</artifactId>
<version>2.4</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.httpcomponents/httpasyncclient -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpasyncclient</artifactId>
<version>4.1.4</version>
</dependency>


</dependencies>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package com.amazonaws.kinesisvideo.client;

import java.io.IOException;
import java.net.URI;
import org.apache.http.HttpResponse;
import org.apache.http.concurrent.FutureCallback;
import org.apache.http.entity.ContentType;
import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
import com.amazonaws.kinesisvideo.http.HttpMethodName;
import com.amazonaws.kinesisvideo.http.KinesisVideoApacheHttpAsyncClient;
import com.amazonaws.kinesisvideo.signing.KinesisVideoSigner;

/**
* Async Client which wraps around Apache Http Async client which takes care
* of massaging the request to be made using the apache client
*/
public final class StreamingReadAsyncClient {
private static final String CONTENT_TYPE_HEADER_KEY = "Content-Type";
private URI uri;
private KinesisVideoSigner signer;
private String inputInJson;
private Integer connectionTimeoutInMillis;
private Integer readTimeoutInMillis;
private HttpAsyncResponseConsumer<HttpResponse> httpAsyncResponseConsumer;
private FutureCallback<HttpResponse> futureCallback;
private KinesisVideoApacheHttpAsyncClient asyncClient;

private StreamingReadAsyncClient(final URI uri, final KinesisVideoSigner signer, final String inputInJson, final Integer connectionTimeoutInMillis, final Integer readTimeoutInMillis, final HttpAsyncResponseConsumer<HttpResponse> httpAsyncResponseConsumer, final FutureCallback<HttpResponse> futureCallback) {
this.uri = uri;
this.signer = signer;
this.inputInJson = inputInJson;
this.connectionTimeoutInMillis = connectionTimeoutInMillis;
this.readTimeoutInMillis = readTimeoutInMillis;
this.httpAsyncResponseConsumer = httpAsyncResponseConsumer;
this.futureCallback = futureCallback;
this.asyncClient = getHttpClient();
}

public void execute() {
asyncClient.executeRequest();
}

public void close() throws IOException {
asyncClient.close();
}

private KinesisVideoApacheHttpAsyncClient getHttpClient() {
KinesisVideoApacheHttpAsyncClient.Builder clientBuilder = KinesisVideoApacheHttpAsyncClient.builder().withUri(uri).withContentType(ContentType.APPLICATION_JSON).withMethod(HttpMethodName.POST).withContentInJson(inputInJson).withHeader(CONTENT_TYPE_HEADER_KEY, ContentType.APPLICATION_JSON.getMimeType()).withFutureCallback(futureCallback).withHttpAsyncResponseConsumer(httpAsyncResponseConsumer);
if (connectionTimeoutInMillis != null) {
clientBuilder = clientBuilder.withConnectionTimeoutInMillis(connectionTimeoutInMillis.intValue());
}
if (readTimeoutInMillis != null) {
clientBuilder = clientBuilder.withSocketTimeoutInMillis(readTimeoutInMillis.intValue());
}
final KinesisVideoApacheHttpAsyncClient client = clientBuilder.build();
signer.sign(client);
return client;
}


public static class StreamingReadAsyncClientBuilder {
private URI uri;
private KinesisVideoSigner signer;
private String inputInJson;
private Integer connectionTimeoutInMillis;
private Integer readTimeoutInMillis;
private HttpAsyncResponseConsumer<HttpResponse> httpAsyncResponseConsumer;
private FutureCallback<HttpResponse> futureCallback;

StreamingReadAsyncClientBuilder() {
}

public StreamingReadAsyncClientBuilder uri(final URI uri) {
this.uri = uri;
return this;
}

public StreamingReadAsyncClientBuilder signer(final KinesisVideoSigner signer) {
this.signer = signer;
return this;
}

public StreamingReadAsyncClientBuilder inputInJson(final String inputInJson) {
this.inputInJson = inputInJson;
return this;
}

public StreamingReadAsyncClientBuilder connectionTimeoutInMillis(final Integer connectionTimeoutInMillis) {
this.connectionTimeoutInMillis = connectionTimeoutInMillis;
return this;
}

public StreamingReadAsyncClientBuilder readTimeoutInMillis(final Integer readTimeoutInMillis) {
this.readTimeoutInMillis = readTimeoutInMillis;
return this;
}

public StreamingReadAsyncClientBuilder httpAsyncResponseConsumer(final HttpAsyncResponseConsumer<HttpResponse> httpAsyncResponseConsumer) {
this.httpAsyncResponseConsumer = httpAsyncResponseConsumer;
return this;
}

public StreamingReadAsyncClientBuilder futureCallback(final FutureCallback<HttpResponse> futureCallback) {
this.futureCallback = futureCallback;
return this;
}

public StreamingReadAsyncClient build() {
return new StreamingReadAsyncClient(uri, signer, inputInJson, connectionTimeoutInMillis, readTimeoutInMillis, httpAsyncResponseConsumer, futureCallback);
}

@Override
public String toString() {
return "StreamingReadAsyncClient.StreamingReadAsyncClientBuilder(uri=" + this.uri + ", signer=" + this.signer + ", inputInJson=" + this.inputInJson + ", connectionTimeoutInMillis=" + this.connectionTimeoutInMillis + ", readTimeoutInMillis=" + this.readTimeoutInMillis + ", httpAsyncResponseConsumer=" + this.httpAsyncResponseConsumer + ", futureCallback=" + this.futureCallback + ")";
}
}

public static StreamingReadAsyncClientBuilder builder() {
return new StreamingReadAsyncClientBuilder();
}
}
115 changes: 115 additions & 0 deletions src/main/java/com/amazonaws/kinesisvideo/http/HttpClientBase.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package com.amazonaws.kinesisvideo.http;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;

import org.apache.http.entity.ContentType;

public abstract class HttpClientBase implements HttpClient {

private static final String HOST_HEADER = "Host";
private static final int DEFAULT_CONNECTION_TIMEOUT_IN_MILLIS = 10000; //magic number
private static final int DEFAULT_SOCKET_TIMEOUT_IN_MILLIS = 10000; //magic number

protected final BuilderBase<? extends BuilderBase> mBuilder;

public HttpClientBase(final BuilderBase<? extends BuilderBase> builder) {
this.mBuilder = builder;
}

/**
* This method is intended for testing use only.
*
* @param key header key
* @param value header value
*/
public void addHeaderUnsafe(final String key, final String value) {
mBuilder.withHeader(key, value);
}

@Override
public HttpMethodName getMethod() {
return mBuilder.mMethod;
}

@Override
public URI getUri() {
return mBuilder.mUri;
}

@Override
public Map<String, String> getHeaders() {
return mBuilder.mHeaders;
}

@Override
public InputStream getContent() {
return new ByteArrayInputStream(mBuilder.mContentInJson.getBytes(StandardCharsets.US_ASCII));
}

@Override
public void close() throws IOException {
this.closeClient();
}

public abstract void closeClient() throws IOException;

public abstract static class BuilderBase<T> {
protected final Map<String, String> mHeaders;
protected URI mUri;
protected HttpMethodName mMethod;
protected int mConnectionTimeoutInMillis;
protected int mSocketTimeoutInMillis;
protected ContentType mContentType;
protected String mContentInJson;

public abstract T builderType();

public BuilderBase() {
mHeaders = new HashMap<String, String>();
mConnectionTimeoutInMillis = DEFAULT_CONNECTION_TIMEOUT_IN_MILLIS;
mSocketTimeoutInMillis = DEFAULT_SOCKET_TIMEOUT_IN_MILLIS;
}

public T withUri(final URI uri) {
mUri = uri;
mHeaders.put(HOST_HEADER, uri.getHost());
return builderType();
}

public T withMethod(final HttpMethodName method) {
mMethod = method;
return builderType();
}

public T withHeader(final String key, final String value) {
mHeaders.put(key, value);
return builderType();
}

public T withConnectionTimeoutInMillis(final int connectionTimeoutInMillis) {
mConnectionTimeoutInMillis = connectionTimeoutInMillis;
return builderType();
}

public T withSocketTimeoutInMillis(final int socketTimeoutInMillis) {
mSocketTimeoutInMillis = socketTimeoutInMillis;
return builderType();
}

public T withContentType(final ContentType contentType) {
mContentType = contentType;
return builderType();
}

public T withContentInJson(final String contentInJson) {
mContentInJson = contentInJson;
return builderType();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package com.amazonaws.kinesisvideo.http;

import static com.amazonaws.kinesisvideo.common.preconditions.Preconditions.checkNotNull;

import java.io.IOException;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.util.Map;

import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.concurrent.FutureCallback;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.nio.client.methods.HttpAsyncMethods;
import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
import org.apache.http.ssl.SSLContextBuilder;

/**
* Http Async Client which uses Apache HttpAsyncClient internally to make
* the http request and invoke callbacks when there is data ready to consume.
*/
public final class KinesisVideoApacheHttpAsyncClient extends HttpClientBase {

private final CloseableHttpAsyncClient mHttpClient;

private KinesisVideoApacheHttpAsyncClient(final BuilderBase<Builder> builder) {
super(builder);
this.mHttpClient = buildHttpAsyncClient();
this.mHttpClient.start();
}

public static Builder builder() {
return new Builder();
}

public void executeRequest() {
final HttpPost request = new HttpPost(mBuilder.mUri);
for (Map.Entry<String, String> entry : mBuilder.mHeaders.entrySet()) {
request.addHeader(entry.getKey(), entry.getValue());
}
final HttpEntity entity = new StringEntity(mBuilder.mContentInJson, mBuilder.mContentType);
request.setEntity(entity);
final HttpAsyncRequestProducer requestProducer = HttpAsyncMethods.create(request);
this.mHttpClient.execute(requestProducer, ((Builder) mBuilder).mHttpAsyncResponseConsumer,
((Builder) mBuilder).mFutureCallback);
}

private CloseableHttpAsyncClient buildHttpAsyncClient() {
final SSLContextBuilder builder = new SSLContextBuilder();
try {
builder.loadTrustMaterial(new TrustAllStrategy());
final SSLIOSessionStrategy sslSessionStrategy = new SSLIOSessionStrategy(builder.build(),
new String[] {"TLSv1.2"},
null,
new NoopHostnameVerifier());
return HttpAsyncClientBuilder.create()
.setSSLStrategy(sslSessionStrategy)
.setDefaultRequestConfig(RequestConfig.custom()
.setConnectTimeout(mBuilder.mConnectionTimeoutInMillis)
.setSocketTimeout(mBuilder.mSocketTimeoutInMillis)
.build())
.build();
} catch (final KeyManagementException e) {
throw new RuntimeException("Exception while building Apache http client", e);
} catch (final NoSuchAlgorithmException e) {
throw new RuntimeException("Exception while building Apache http client", e);
} catch (final KeyStoreException e) {
throw new RuntimeException("Exception while building Apache http client", e);
}
}

public static final class Builder extends BuilderBase<Builder> {

private HttpAsyncResponseConsumer<HttpResponse> mHttpAsyncResponseConsumer;
private FutureCallback<HttpResponse> mFutureCallback;

public Builder withHttpAsyncResponseConsumer(final HttpAsyncResponseConsumer<HttpResponse>
httpAsyncResponseConsumer) {
mHttpAsyncResponseConsumer = httpAsyncResponseConsumer;
return this;
}

public Builder withFutureCallback(final FutureCallback<HttpResponse> futureCallback) {
mFutureCallback = futureCallback;
return this;
}

public KinesisVideoApacheHttpAsyncClient build() {
checkNotNull(mUri);
return new KinesisVideoApacheHttpAsyncClient(this);
}

@Override
public Builder builderType() {
return this;
}
}

@Override
public void closeClient() throws IOException {
this.mHttpClient.close();
}
}
Loading

0 comments on commit 026f890

Please sign in to comment.