Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,9 @@ ApiFuture<Void> beginReconciliation() {
final void sendVia(Consumer<BidiWriteObjectRequest> consumer) {
lock.lock();
try {
if (state == State.TERMINAL_SUCCESS || state == State.TERMINAL_ERROR) {
return;
}
validateCurrentStateIsOneOf(
State.INITIALIZING, State.RUNNING, State.RETRYING, State.TAKEOVER);
BidiWriteObjectRequest prev = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,11 @@ public void sendClose() {
if (tmp != null) {
tmp.closeSend();
}
if (pendingReconciliation != null) {
pendingReconciliation.cancel(true);
pendingReconciliation = null;
}
retryContext.reset();
} finally {
lock.unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,12 @@ private void flush(@NonNull List<BidiWriteObjectRequest> segments) {
responseObserver.await();
return null;
} catch (Throwable t) {
if (stream != null) {
try {
stream.onError(io.grpc.Status.CANCELLED.withCause(t).asRuntimeException());
} catch (Exception ignored) {
}
}
Comment on lines +254 to +259

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Wrapping the caught Throwable t in io.grpc.Status.CANCELLED.withCause(t).asRuntimeException() obscures the original gRPC status code (such as UNAVAILABLE, PERMISSION_DENIED, etc.) that might be present in t. This can break retry policies or error handling downstream that rely on the specific status code of the failure.

Consider passing the original exception t directly to stream.onError(t) to preserve the original error type and status code.

Suggested change
if (stream != null) {
try {
stream.onError(io.grpc.Status.CANCELLED.withCause(t).asRuntimeException());
} catch (Exception ignored) {
}
}
if (stream != null) {
try {
stream.onError(t);
} catch (Exception ignored) {
}
}

stream = null;
first = true;
t.addSuppressed(new AsyncStorageTaskException());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ public boolean isOpen() {

@Override
public void close() throws IOException {
if (!open) {
return;
}
open = false;
ApiStreamObserver<WriteObjectRequest> openedStream = openedStream();
if (!finished) {
WriteObjectRequest message = finishMessage();
Expand All @@ -154,7 +158,6 @@ public void close() throws IOException {
throw e;
}
}
open = false;
responseObserver.await();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,17 +106,18 @@ public void getResultAlwaysReturnsTheSameFuture() {
@Test
public void closingWithoutAppending_throwNoSuchElementException() {
Executor exec = MoreExecutors.newDirectExecutorService();
//noinspection resource
AsyncAppendingQueue<String> q = AsyncAppendingQueue.of(exec, 3, AsyncAppendingQueueTest::agg);

ApiFuture<String> result = q.getResult();
NoSuchElementException nse1 = assertThrows(NoSuchElementException.class, q::close);
NoSuchElementException nse2 =
assertThrows(
NoSuchElementException.class, () -> ApiExceptions.callAndTranslateApiException(result));

assertThat(nse1).hasMessageThat().contains("Never appended to");
assertThat(nse2).hasMessageThat().contains("Never appended to");
try (AsyncAppendingQueue<String> q =
AsyncAppendingQueue.of(exec, 3, AsyncAppendingQueueTest::agg)) {
ApiFuture<String> result = q.getResult();
NoSuchElementException nse1 = assertThrows(NoSuchElementException.class, q::close);
NoSuchElementException nse2 =
assertThrows(
NoSuchElementException.class,
() -> ApiExceptions.callAndTranslateApiException(result));

assertThat(nse1).hasMessageThat().contains("Never appended to");
assertThat(nse2).hasMessageThat().contains("Never appended to");
}
}

@SuppressWarnings("resource")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,81 +41,86 @@ public final class BidiAppendableUnbufferedWritableByteChannelTest {
@Test
public void appendAndFinalizeOnlyPerformedIfAllBytesConsumed() throws IOException {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
ChecksummedTestContent ctc = ChecksummedTestContent.gen(27);
AppendableUploadState state =
BidiUploadState.appendableNew(
BidiUploadTest.appendRequestNew,
GrpcCallContext::createDefault,
16,
SettableApiFuture.create(),
Crc32cValue.zero());
AtomicLong finishWriteOffset = new AtomicLong(-1);
BidiUploadStreamingStream stream =
new BidiUploadStreamingStream(
state,
executor,
BidiUploadTestUtils.adaptOnlySend(
respond ->
request -> {
if (request.getFinishWrite()) {
finishWriteOffset.set(
request.getWriteOffset()
+ request.getChecksummedData().getContent().size());
}
executor.submit(
() -> {
switch ((int) request.getWriteOffset()) {
case 0:
respond.onResponse(BidiUploadTest.resourceWithSize(0));
break;
case 4:
case 8:
// do not ack any bytes until we receive 16, this simulates
// latency on the bytes being ack'd.
break;
case 12:
respond.onResponse(BidiUploadTestUtils.incremental(8));
break;
case 16:
respond.onResponse(BidiUploadTestUtils.incremental(12));
break;
case 20:
respond.onResponse(BidiUploadTestUtils.incremental(16));
break;
case 24:
BidiWriteObjectResponse.Builder b =
BidiUploadTest.resourceFor(ctc).toBuilder();
b.getResourceBuilder()
.setFinalizeTime(
Conversions.grpc()
.timestampCodec
.encode(OffsetDateTime.now()));
respond.onResponse(b.build());
break;
default:
respond.onError(
FakeStorage.unexpectedRequest(request, ImmutableList.of()));
break;
}
});
}),
3,
RetryContext.neverRetry());
ChunkSegmenter chunkSegmenter =
new ChunkSegmenter(Hasher.enabled(), ByteStringStrategy.copy(), 4, 2);
BidiAppendableUnbufferedWritableByteChannel channel =
new BidiAppendableUnbufferedWritableByteChannel(stream, chunkSegmenter, 4, 0);
try {
ChecksummedTestContent ctc = ChecksummedTestContent.gen(27);
AppendableUploadState state =
BidiUploadState.appendableNew(
BidiUploadTest.appendRequestNew,
GrpcCallContext::createDefault,
16,
SettableApiFuture.create(),
Crc32cValue.zero());
AtomicLong finishWriteOffset = new AtomicLong(-1);
BidiUploadStreamingStream stream =
new BidiUploadStreamingStream(
state,
executor,
BidiUploadTestUtils.adaptOnlySend(
respond ->
request -> {
if (request.getFinishWrite()) {
finishWriteOffset.set(
request.getWriteOffset()
+ request.getChecksummedData().getContent().size());
}
executor.submit(
() -> {
switch ((int) request.getWriteOffset()) {
case 0:
respond.onResponse(BidiUploadTest.resourceWithSize(0));
break;
case 4:
case 8:
// do not ack any bytes until we receive 16, this simulates
// latency on the bytes being ack'd.
break;
case 12:
respond.onResponse(BidiUploadTestUtils.incremental(8));
break;
case 16:
respond.onResponse(BidiUploadTestUtils.incremental(12));
break;
case 20:
respond.onResponse(BidiUploadTestUtils.incremental(16));
break;
case 24:
BidiWriteObjectResponse.Builder b =
BidiUploadTest.resourceFor(ctc).toBuilder();
b.getResourceBuilder()
.setFinalizeTime(
Conversions.grpc()
.timestampCodec
.encode(OffsetDateTime.now()));
respond.onResponse(b.build());
break;
default:
respond.onError(
FakeStorage.unexpectedRequest(request, ImmutableList.of()));
break;
}
});
}),
3,
RetryContext.neverRetry());
ChunkSegmenter chunkSegmenter =
new ChunkSegmenter(Hasher.enabled(), ByteStringStrategy.copy(), 4, 2);
BidiAppendableUnbufferedWritableByteChannel channel =
new BidiAppendableUnbufferedWritableByteChannel(stream, chunkSegmenter, 4, 0);

ByteBuffer buf = ctc.asByteBuffer();
int written1 = channel.write(buf);
// fill up the outbound queue
assertThat(written1).isEqualTo(16);
ByteBuffer buf = ctc.asByteBuffer();
int written1 = channel.write(buf);
// fill up the outbound queue
assertThat(written1).isEqualTo(16);

// asynchronously bytes will be ack'd 4 at a time, eventually there will be enough space in the
// outbound queue to allow writeAndClose to start consuming bytes.
channel.nextWriteShouldFinalize();
int written2 = channel.writeAndClose(buf);
assertThat(written2).isEqualTo(11);
assertThat(finishWriteOffset.get()).isEqualTo(ctc.length());
// asynchronously bytes will be ack'd 4 at a time, eventually there will be enough space in
// the
// outbound queue to allow writeAndClose to start consuming bytes.
channel.nextWriteShouldFinalize();
int written2 = channel.writeAndClose(buf);
assertThat(written2).isEqualTo(11);
assertThat(finishWriteOffset.get()).isEqualTo(ctc.length());
} finally {
executor.shutdownNow();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,21 +76,25 @@ public void lazyBufferHandle_afterAllocationOnlyBackingIsReferenced() {
public void lazyBufferHandle_initIsThreadSafe() throws ExecutionException, InterruptedException {
int capacity = 10;
ExecutorService exec = Executors.newFixedThreadPool(2);
AtomicBoolean alloc = new AtomicBoolean(false);
LazyBufferHandle handle =
new LazyBufferHandle(
capacity,
i -> {
alloc.compareAndSet(false, true);
return ByteBuffer.allocate(capacity);
});

Future<ByteBuffer> f1 = exec.submit(handle::get);
Future<ByteBuffer> f2 = exec.submit(handle::get);

assertThat(f1.get()).isSameInstanceAs(f2.get());

assertThat(handle.get().capacity()).isEqualTo(capacity);
try {
AtomicBoolean alloc = new AtomicBoolean(false);
LazyBufferHandle handle =
new LazyBufferHandle(
capacity,
i -> {
alloc.compareAndSet(false, true);
return ByteBuffer.allocate(capacity);
});

Future<ByteBuffer> f1 = exec.submit(handle::get);
Future<ByteBuffer> f2 = exec.submit(handle::get);

assertThat(f1.get()).isSameInstanceAs(f2.get());

assertThat(handle.get().capacity()).isEqualTo(capacity);
} finally {
exec.shutdownNow();
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,6 @@ void illegalStateExceptionIfWrittenLt0_slice_eqBuffer() {
ChecksummedTestContent all = ChecksummedTestContent.gen(11);
IllegalStateException ise =
assertThrows(IllegalStateException.class, () -> c.write(all.slice(0, 4).asByteBuffer()));
ise.printStackTrace(System.out);
}

@Example
Expand All @@ -568,7 +567,6 @@ void illegalStateExceptionIfWrittenLt0_slice_gtBuffer() {
ChecksummedTestContent all = ChecksummedTestContent.gen(11);
IllegalStateException ise =
assertThrows(IllegalStateException.class, () -> c.write(all.slice(0, 5).asByteBuffer()));
ise.printStackTrace(System.out);
}

@Example
Expand All @@ -587,7 +585,6 @@ void illegalStateExceptionIfWrittenLt0_slice_ltBuffer() {
c.write(all.slice(3, 3).asByteBuffer());
fail("should have errored in previous write call");
});
ise.printStackTrace(System.out);
}

@Example
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,11 @@
import java.net.URI;
import java.time.Duration;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class FakeHttpServer implements AutoCloseable {
private static final Logger LOGGER = LoggerFactory.getLogger(FakeHttpServer.class);

private final URI endpoint;
private final Channel channel;
Expand Down Expand Up @@ -195,7 +198,7 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpRequest req) throws E

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
LOGGER.warn("Exception caught in pipeline", cause);
ctx.close();
}
}
Expand Down
Loading
Loading