Skip to content

Commit

Permalink
Merge pull request #1183 from couchbase/feature/issue_1108_push_many_…
Browse files Browse the repository at this point in the history
…attachments

Fixed #1108 - Too many open files error on push replication
  • Loading branch information
pasin committed Apr 5, 2016
2 parents 0ed3f50 + 38f0f1a commit bcabd31
Show file tree
Hide file tree
Showing 2 changed files with 156 additions and 112 deletions.
165 changes: 53 additions & 112 deletions src/main/java/com/couchbase/lite/replicator/PusherInternal.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.couchbase.lite.Status;
import com.couchbase.lite.internal.InterfaceAudience;
import com.couchbase.lite.internal.RevisionInternal;
import com.couchbase.lite.support.BlobContentBody;
import com.couchbase.lite.support.CustomFuture;
import com.couchbase.lite.support.HttpClientFactory;
import com.couchbase.lite.support.RemoteRequest;
Expand All @@ -21,14 +22,12 @@
import com.couchbase.lite.util.Log;
import com.couchbase.lite.util.Utils;
import com.couchbase.org.apache.http.entity.mime.MultipartEntity;
import com.couchbase.org.apache.http.entity.mime.content.InputStreamBody;
import com.couchbase.org.apache.http.entity.mime.content.StringBody;

import org.apache.http.HttpResponse;
import org.apache.http.client.HttpResponseException;

import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.nio.charset.Charset;
import java.util.ArrayList;
Expand Down Expand Up @@ -516,39 +515,33 @@ public void onCompletion(HttpResponse httpResponse, Object result, Throwable e)
*/
@InterfaceAudience.Private
private boolean uploadMultipartRevision(final RevisionInternal revision) {

// holds inputStream for blob to close after using
final List<InputStream> streamList = new ArrayList<InputStream>();

MultipartEntity multiPart = null;

Map<String, Object> revProps = revision.getProperties();

Map<String, Object> attachments = (Map<String, Object>) revProps.get("_attachments");
for (String attachmentKey : attachments.keySet()) {
Map<String, Object> attachment = (Map<String, Object>) attachments.get(attachmentKey);
if (attachment.containsKey("follows")) {

if (multiPart == null) {

multiPart = new MultipartEntity();

try {
String json = Manager.getObjectMapper().writeValueAsString(revProps);
Charset utf8charset = Charset.forName("UTF-8");
byte[] uncompressed = json.getBytes(utf8charset);
byte[] compressed = null;
byte[] data = uncompressed;
String contentEncoding = null;
if (uncompressed.length > RemoteRequest.MIN_JSON_LENGTH_TO_COMPRESS && canSendCompressedRequests()) {
if (uncompressed.length > RemoteRequest.MIN_JSON_LENGTH_TO_COMPRESS &&
canSendCompressedRequests()) {
compressed = Utils.compressByGzip(uncompressed);
if (compressed.length < uncompressed.length) {
data = compressed;
contentEncoding = "gzip";
}
}
// NOTE: StringBody.contentEncoding default value is null. Setting null value to contentEncoding does not cause any impact.
multiPart.addPart("param1", new StringBody(data, "application/json", utf8charset, contentEncoding));
// NOTE: StringBody.contentEncoding default value is null.
// Setting null value to contentEncoding does not cause any impact.
multiPart.addPart("param1", new StringBody(data, "application/json",
utf8charset, contentEncoding));
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
Expand All @@ -557,88 +550,65 @@ private boolean uploadMultipartRevision(final RevisionInternal revision) {
BlobStore blobStore = this.db.getAttachmentStore();
String base64Digest = (String) attachment.get("digest");
BlobKey blobKey = new BlobKey(base64Digest);
InputStream blobStream = blobStore.blobStreamForKey(blobKey);
if (blobStream == null) {
Log.w(Log.TAG_SYNC, "Unable to load the blob stream for blobKey: %s - Skipping upload of multipart revision.", blobKey);
return false;
} else {
streamList.add(blobStream);
String contentType = null;
if (attachment.containsKey("content_type")) {
contentType = (String) attachment.get("content_type");
} else if (attachment.containsKey("type")) {
contentType = (String) attachment.get("type");
} else if (attachment.containsKey("content-type")) {
Log.w(Log.TAG_SYNC, "Found attachment that uses content-type" +
" field name instead of content_type (see couchbase-lite-android" +
" issue #80): %s", attachment);
}

// contentType = null causes Exception from FileBody of apache.
if (contentType == null)
contentType = "application/octet-stream"; // default

// NOTE: Content-Encoding might not be necessary to set. Apache FileBody does not set Content-Encoding.
// FileBody always return null for getContentEncoding(), and Content-Encoding header is not set in multipart
// CBL iOS: https://github.com/couchbase/couchbase-lite-ios/blob/feb7ff5eda1e80bd00e5eb19f1d46c793f7a1951/Source/CBL_Pusher.m#L449-L452
String contentEncoding = null;
if (attachment.containsKey("encoding")) {
contentEncoding = (String) attachment.get("encoding");
}

InputStreamBody inputStreamBody =
new CustomStreamBody(blobStream, contentType,
attachmentKey, contentEncoding);
multiPart.addPart(attachmentKey, inputStreamBody);
String contentType = null;
if (attachment.containsKey("content_type")) {
contentType = (String) attachment.get("content_type");
} else if (attachment.containsKey("type")) {
contentType = (String) attachment.get("type");
} else if (attachment.containsKey("content-type")) {
Log.w(Log.TAG_SYNC, "Found attachment that uses content-type" +
" field name instead of content_type (see couchbase-lite-android" +
" issue #80): %s", attachment);
}
// contentType = null causes Exception from FileBody of apache.
if (contentType == null)
contentType = "application/octet-stream"; // default

// CBL iOS: https://github.com/couchbase/couchbase-lite-ios/blob/feb7ff5eda1e80bd00e5eb19f1d46c793f7a1951/Source/CBL_Pusher.m#L449-L452
String contentEncoding = null;
if (attachment.containsKey("encoding"))
contentEncoding = (String) attachment.get("encoding");

BlobContentBody contentBody = new BlobContentBody(blobStore, blobKey,
contentType, attachmentKey, contentEncoding);
multiPart.addPart(attachmentKey, contentBody);
}
}

if (multiPart == null) {
if (multiPart == null)
return false;
}

final String path = String.format("/%s?new_edits=false", encodeDocumentId(revision.getDocID()));

Log.d(Log.TAG_SYNC, "Uploading multipart request. Revision: %s", revision);

addToChangesCount(1);

CustomFuture future = sendAsyncMultipartRequest("PUT", path, multiPart, new RemoteRequestCompletionBlock() {
@Override
public void onCompletion(HttpResponse httpResponse, Object result, Throwable e) {
try {
if (e != null) {
if (e instanceof HttpResponseException) {
// Server doesn't like multipart, eh? Fall back to JSON.
if (((HttpResponseException) e).getStatusCode() == 415) {
//status 415 = "bad_content_type"
dontSendMultipart = true;
uploadJsonRevision(revision);
}
} else {
Log.e(Log.TAG_SYNC, "Exception uploading multipart request", e);
setError(e);
}
} else {
Log.v(Log.TAG_SYNC, "Uploaded multipart request. Revision: %s", revision);
removePending(revision);
}
} finally {
// close all inputStreams for Blob
for (InputStream stream : streamList) {
final String path = String.format("/%s?new_edits=false", encodeDocumentId(revision.getDocID()));
CustomFuture future = sendAsyncMultipartRequest("PUT", path, multiPart,
new RemoteRequestCompletionBlock() {
@Override
public void onCompletion(HttpResponse httpResponse, Object result, Throwable e) {
try {
stream.close();
} catch (IOException ioe) {
if (e != null) {
if (e instanceof HttpResponseException) {
// Server doesn't like multipart, eh? Fall back to JSON.
if (((HttpResponseException) e).getStatusCode() == 415) {
//status 415 = "bad_content_type"
dontSendMultipart = true;
uploadJsonRevision(revision);
}
} else {
Log.e(Log.TAG_SYNC, "Exception uploading multipart request", e);
setError(e);
}
} else {
Log.v(Log.TAG_SYNC, "Uploaded multipart request. Revision: %s", revision);
removePending(revision);
}
} finally {
addToCompletedChangesCount(1);
}
}
addToCompletedChangesCount(1);
}
}
});
});
future.setQueue(pendingFutures);
pendingFutures.add(future);

return true;
}

Expand Down Expand Up @@ -699,35 +669,6 @@ private static int findCommonAncestor(RevisionInternal rev, List<String> possibl
return generation;
}

// CustomFileBody to support contentEncoding. FileBody returns always null for getContentEncoding()
private static class CustomStreamBody extends InputStreamBody {
private String contentEncoding = null;

public CustomStreamBody(final InputStream in, final String mimeType,
final String filename, String contentEncoding) {
super(in, mimeType, filename);
this.contentEncoding = contentEncoding;
}

@Override
protected void finalize() throws Throwable {
// close inputStream after used.
InputStream stream = getInputStream();
if (stream != null) {
try {
stream.close();
} catch (IOException ioe) {
}
}
super.finalize();
}

@Override
public String getContentEncoding() {
return contentEncoding;
}
}

/**
* Submit revisions into inbox for changes from changesSince()
*/
Expand Down
103 changes: 103 additions & 0 deletions src/main/java/com/couchbase/lite/support/BlobContentBody.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/**
* Created by Hideki Itakura on 4/4/16.
* <p/>
* Copyright (c) 2016 Couchbase, Inc All rights reserved.
* <p/>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package com.couchbase.lite.support;

import com.couchbase.lite.BlobKey;
import com.couchbase.lite.BlobStore;
import com.couchbase.lite.util.Log;
import com.couchbase.org.apache.http.entity.mime.MIME;
import com.couchbase.org.apache.http.entity.mime.content.AbstractContentBody;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

public class BlobContentBody extends AbstractContentBody {

private final BlobStore blobStore;
private final BlobKey blobKey;
private final String filename;
private final long contentLength;
private final String contentEncoding;

public BlobContentBody(final BlobStore blobStore,
final BlobKey blobKey,
final String mimeType,
final String filename,
final long contentLength,
final String contentEncoding) {
super(mimeType);
if (blobStore == null)
throw new IllegalArgumentException("BlobStore may not be null");
if (blobKey == null)
throw new IllegalArgumentException("BlobKey may not be null");
if (contentLength < -1)
throw new IllegalArgumentException("Content length must be >= -1");
this.blobStore = blobStore;
this.blobKey = blobKey;
this.filename = filename;
this.contentLength = contentLength;
this.contentEncoding = contentEncoding;
}

public BlobContentBody(final BlobStore blobStore,
final BlobKey blobKey,
final String mimeType,
final String filename,
final String contentEncoding) {
this(blobStore, blobKey, mimeType, filename, -1L, contentEncoding);
}

public void writeTo(OutputStream out) throws IOException {
if (out == null)
throw new IllegalArgumentException("Output stream may not be null");

InputStream in = blobStore.blobStreamForKey(blobKey);
if (in == null) {
Log.w(Log.TAG_SYNC, "Unable to load the blob stream for blobKey: " + blobKey);
throw new IOException("Unable to load the blob stream for blobKey: " + blobKey);
}
try {
byte[] tmp = new byte[4096];
int l;
while ((l = in.read(tmp)) != -1)
out.write(tmp, 0, l);
out.flush();
} finally {
in.close();
}
}

public String getTransferEncoding() {
return MIME.ENC_BINARY;
}

public String getCharset() {
return null;
}

public long getContentLength() {
return contentLength;
}

public String getFilename() {
return this.filename;
}

public String getContentEncoding() {
return contentEncoding;
}
}

0 comments on commit bcabd31

Please sign in to comment.