Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -329,11 +329,7 @@ public long getResponseContentLength() {
return length;

if (getResponse().getBody().isRead()) {
try {
return getResponse().getBody().getLength();
} catch (IOException e) {
log.error("", e);
}
return getResponse().getBody().getLength();
}

return -1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,11 +323,7 @@ public void bodyRequested(AbstractBody body) {
}

public void bodyComplete(AbstractBody body) {
try {
snapInternal(exc, flow, getBody(body));
} catch (IOException e) {
throw new RuntimeException(e);
}
snapInternal(exc, flow, getBody(body));
}
}
}
64 changes: 40 additions & 24 deletions core/src/main/java/com/predic8/membrane/core/http/AbstractBody.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@
* Streams do not have to be read completely. Accessing the body from multiple
* threads is illegal. Using a Body Stream after the Body as been accessed by
* someone else (using streams or not) is illegal.
* <p>
* Public instance methods must not throw {@link IOException}s. Throw an
* unchecked {@link ReadingBodyException} or {@link WritingBodyException} instead.
* (This is enforced by the BodyDoesntThrowIOExceptionTest .)
*/
public abstract class AbstractBody {
private static final Logger log = LoggerFactory.getLogger(AbstractBody.class.getName());
Expand All @@ -53,7 +57,7 @@ public abstract class AbstractBody {
protected final List<MessageObserver> observers = new ArrayList<>(1);
private boolean wasStreamed = false;

public void read() throws IOException {
public void read() {
if (read)
return;

Expand All @@ -64,11 +68,15 @@ public void read() throws IOException {
observer.bodyRequested(this);

chunks.clear();
readLocal();
try {
readLocal();
} catch (IOException e) {
throw new ReadingBodyException(e);
}
markAsRead();
}

public void discard() throws IOException {
public void discard() {
read();
}

Expand Down Expand Up @@ -105,7 +113,7 @@ protected void markAsRead() {
* {@link #getContent()}. If you do not need the body as one single byte[],
* you should therefore use {@link #getContentAsStream()} instead.
*/
public byte[] getContent() throws IOException {
public byte[] getContent() {
if (wasStreamed)
throw new IllegalStateException("Cannot read body after it was streamed.");
read();
Expand All @@ -117,25 +125,29 @@ public byte[] getContent() throws IOException {
return content;
}

public InputStream getContentAsStream() throws IOException {
public InputStream getContentAsStream() {
if (wasStreamed)
throw new IllegalStateException("Cannot read body after it was streamed.");
read();
return new BodyInputStream(chunks);
}

public void write(AbstractBodyTransferrer out, boolean retainCopy) throws IOException {
if (!read && !retainCopy) {
if (wasStreamed)
log.warn("Streaming the body twice will not work.");
for (MessageObserver observer : observers)
observer.bodyRequested(this);
wasStreamed = true;
writeStreamed(out);
return;
public void write(AbstractBodyTransferrer out, boolean retainCopy) {
try {
if (!read && !retainCopy) {
if (wasStreamed)
log.warn("Streaming the body twice will not work.");
for (MessageObserver observer : observers)
observer.bodyRequested(this);
wasStreamed = true;
writeStreamed(out);
return;
}

writeAlreadyRead(out);
} catch (IOException e) {
throw new WritingBodyException(e);
}

writeAlreadyRead(out);
}

protected abstract void writeAlreadyRead(AbstractBodyTransferrer out) throws IOException;
Expand All @@ -145,15 +157,15 @@ public void write(AbstractBodyTransferrer out, boolean retainCopy) throws IOExce
/**
* Is called when there are no observers that need to read the body. Streams the body without reading it
*/
protected abstract void writeStreamed(AbstractBodyTransferrer out) throws IOException;
protected abstract void writeStreamed(AbstractBodyTransferrer out);

/**
* Warning: Calling this method will trigger reading the body from the client, disabling "streaming".
* Use {@link #isRead()} to determine wether the body already has been read, if necessary.
*
* @return the length of the return value of {@link #getContent()}
*/
public int getLength() throws IOException {
public int getLength() {
read();

int length = 0;
Expand Down Expand Up @@ -182,10 +194,14 @@ public int getLength() throws IOException {
* 0
* </pre>
*/
public byte[] getRaw() throws IOException {
public byte[] getRaw() {
read();
return getRawLocal();
}
try {
return getRawLocal();
} catch (IOException e) {
throw new ReadingBodyException(e);
}
}

protected abstract byte[] getRawLocal() throws IOException;

Expand All @@ -204,9 +220,9 @@ public String toString() {
}
try {
return new String(getRaw(), UTF_8);
} catch (IOException e) {
log.error("", e);
return "Error in body: " + e;
} catch (ReadingBodyException e) {
log.error(e.getMessage());
return "Error in body: " + e.getMessage();
}
}

Expand Down
38 changes: 28 additions & 10 deletions core/src/main/java/com/predic8/membrane/core/http/Body.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ protected void readLocal() throws IOException {
}
}

public void discard() throws IOException {
public void discard() {
if (read)
return;
if (wasStreamed())
Expand All @@ -90,8 +90,12 @@ public void discard() throws IOException {
for (MessageObserver observer : observers)
observer.bodyRequested(this);

skipBodyContent();
}
try {
skipBodyContent();
} catch (IOException e) {
throw new ReadingBodyException(e);
}
}

private void skipBodyContent() throws IOException {
byte[] buffer = null;
Expand Down Expand Up @@ -148,25 +152,39 @@ protected void writeNotRead(AbstractBodyTransferrer out) throws IOException {
}

@Override
protected void writeStreamed(AbstractBodyTransferrer out) throws IOException {
protected void writeStreamed(AbstractBodyTransferrer out) {
byte[] buffer = new byte[BUFFER_SIZE];

long totalLength = 0;
int length;
chunks.clear();
while ((this.length > totalLength || this.length == -1) && (length = inputStream.read(buffer)) > 0) {
totalLength += length;
while (true) {
try {
if (!((this.length > totalLength || this.length == -1) && (length = inputStream.read(buffer)) > 0))
break;
} catch (IOException e) {
throw new ReadingBodyException(e);
}
totalLength += length;
streamedLength += length;
out.write(buffer, 0, length);
try {
out.write(buffer, 0, length);
} catch (IOException e) {
throw new WritingBodyException(e);
}
for (MessageObserver observer : observers)
observer.bodyChunk(buffer, 0, length);
}
out.finish(null);
markAsRead();
try {
out.finish(null);
} catch (IOException e) {
throw new WritingBodyException(e);
}
markAsRead();
}

@Override
public int getLength() throws IOException {
public int getLength() {
if (wasStreamed())
return (int)streamedLength;
return super.getLength();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public void bodyChunk(Chunk chunk) {
storedSize += chunk.getLength();
}

public AbstractBody getBody(AbstractBody body) throws IOException {
public AbstractBody getBody(AbstractBody body) {
if (!body.wasStreamed()) {
return body;
}
Expand Down
74 changes: 49 additions & 25 deletions core/src/main/java/com/predic8/membrane/core/http/ChunkedBody.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,17 +109,25 @@ public static int readChunkSize(InputStream in) throws IOException {
}

@Override
public void read() throws IOException {
if (bodyObserved && !bodyComplete)
ByteUtil.readStream(getContentAsStream());
bodyObserved = true;
super.read();
public void read() {
try {
if (bodyObserved && !bodyComplete)
ByteUtil.readStream(getContentAsStream());
bodyObserved = true;
super.read();
} catch (IOException e) {
throw new ReadingBodyException(e);
}
}

@Override
public void write(AbstractBodyTransferrer out, boolean retainCopy) throws IOException {
if (bodyObserved && !bodyComplete)
ByteUtil.readStream(getContentAsStream());
public void write(AbstractBodyTransferrer out, boolean retainCopy) {
try {
if (bodyObserved && !bodyComplete)
ByteUtil.readStream(getContentAsStream());
} catch (IOException e) {
throw new ReadingBodyException(e);
}
super.write(out, retainCopy);
}

Expand All @@ -140,7 +148,7 @@ protected void readLocal() throws IOException {
}

@Override
public void discard() throws IOException {
public void discard() {
if (read)
return;
if (wasStreamed())
Expand All @@ -149,8 +157,12 @@ public void discard() throws IOException {
for (MessageObserver observer : observers)
observer.bodyRequested(this);

readChunksAndDrop(inputStream);
trailer = readTrailer(inputStream);
try {
readChunksAndDrop(inputStream);
trailer = readTrailer(inputStream);
} catch (IOException e) {
throw new ReadingBodyException(e);
}
markAsRead();
}

Expand Down Expand Up @@ -212,22 +224,34 @@ protected void writeNotRead(AbstractBodyTransferrer out) throws IOException {
}

@Override
protected void writeStreamed(AbstractBodyTransferrer out) throws IOException {
protected void writeStreamed(AbstractBodyTransferrer out) {
log.debug("writeStreamed");
int chunkSize;
while ((chunkSize = readChunkSize(inputStream)) > 0) {
Chunk chunk = new Chunk(readByteArray(inputStream, chunkSize));
out.write(chunk);
for (MessageObserver observer : observers)
observer.bodyChunk(chunk);
//noinspection ResultOfMethodCallIgnored
inputStream.read(); // CR
//noinspection ResultOfMethodCallIgnored
inputStream.read(); // LF
lengthStreamed += chunkSize;
try {
while ((chunkSize = readChunkSize(inputStream)) > 0) {
Chunk chunk = new Chunk(readByteArray(inputStream, chunkSize));
try {
out.write(chunk);
} catch (IOException e) {
throw new WritingBodyException(e);
}
for (MessageObserver observer : observers)
observer.bodyChunk(chunk);
//noinspection ResultOfMethodCallIgnored
inputStream.read(); // CR
//noinspection ResultOfMethodCallIgnored
inputStream.read(); // LF
lengthStreamed += chunkSize;
}
trailer = readTrailer(inputStream);
} catch (IOException e) { // note that we only want to catch the IOExceptions associated to *reading* the body
throw new ReadingBodyException(e);
}
try {
out.finish(trailer);
} catch (IOException e) {
throw new WritingBodyException(e);
}
trailer = readTrailer(inputStream);
out.finish(trailer);
markAsRead();
}

Expand Down Expand Up @@ -281,7 +305,7 @@ protected void writeAlreadyRead(AbstractBodyTransferrer out) throws IOException
}

@Override
public int getLength() throws IOException {
public int getLength() {
if (wasStreamed())
return (int) lengthStreamed;
return super.getLength();
Expand Down
10 changes: 7 additions & 3 deletions core/src/main/java/com/predic8/membrane/core/http/EmptyBody.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ protected void writeNotRead(AbstractBodyTransferrer out) throws IOException {
}

@Override
protected void writeStreamed(AbstractBodyTransferrer out) throws IOException {
protected void writeStreamed(AbstractBodyTransferrer out) {
//ignore
}

Expand All @@ -54,8 +54,12 @@ protected byte[] getRawLocal() throws IOException {
}

@Override
public void write(AbstractBodyTransferrer out, boolean retainCopy) throws IOException {
out.finish(null);
public void write(AbstractBodyTransferrer out, boolean retainCopy) {
try {
out.finish(null);
} catch (IOException e) {
throw new WritingBodyException(e);
}
markAsRead();
}

Expand Down
Loading