Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions java-storage/google-cloud-storage/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,17 @@
</dependency>
</dependencies>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<configuration>
<forkCount>1</forkCount>
<reuseForks>true</reuseForks>
<systemPropertyVariables>
<testbench.keepAlive>true</testbench.keepAlive>
</systemPropertyVariables>
</configuration>
</plugin>
</plugins>
</build>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -974,6 +974,9 @@ private AppendableUploadState(
@Nullable StorageException onResponse(BidiWriteObjectResponse response) {
lock.lock();
try {
if (state == State.TERMINAL_SUCCESS || state == State.TERMINAL_ERROR) {
return null;
}
validateCurrentStateIsOneOf(State.allNonTerminal);

if (response.hasWriteHandle()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ static FakeHttpServer of(HttpRequestHandler server) {
static FakeHttpServer of(HttpRequestHandler server, boolean trailingSlash) {
// based on
// https://github.com/netty/netty/blob/59aa6e635b9996cf21cd946e64353270679adc73/example/src/main/java/io/netty/example/http/helloworld/HttpHelloWorldServer.java
InetSocketAddress address = new InetSocketAddress("localhost", 0);
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 0);
// Configure the server.
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
Expand Down Expand Up @@ -129,7 +129,7 @@ protected void initChannel(SocketChannel ch) {

InetSocketAddress socketAddress = (InetSocketAddress) channel.localAddress();
String suffix = trailingSlash ? "/" : "";
URI endpoint = URI.create("http://localhost:" + socketAddress.getPort() + suffix);
URI endpoint = URI.create("http://127.0.0.1:" + socketAddress.getPort() + suffix);
HttpStorageOptions httpStorageOptions =
HttpStorageOptions.http()
.setHost(endpoint.toString())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,15 @@

package com.google.cloud.storage;

import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.FixedExecutorProvider;
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
import com.google.api.gax.retrying.RetrySettings;
import com.google.cloud.NoCredentials;
import com.google.cloud.ServiceFactory;
import com.google.cloud.storage.it.GrpcPlainRequestLoggingInterceptor;
import com.google.cloud.storage.it.runner.registry.Registry;
import com.google.storage.v2.StorageClient;
import com.google.storage.v2.StorageGrpc;
import com.google.storage.v2.StorageSettings;
import io.grpc.Server;
Expand All @@ -28,34 +33,97 @@
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Locale;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

final class FakeServer implements AutoCloseable {

private final Server server;
private final GrpcStorageOptions grpcStorageOptions;
private final ScheduledThreadPoolExecutor executor;

FakeServer(Server server, GrpcStorageOptions grpcStorageOptions) {
FakeServer(Server server, GrpcStorageOptions grpcStorageOptions, ScheduledThreadPoolExecutor executor) {
this.server = server;
this.grpcStorageOptions = grpcStorageOptions;
this.executor = executor;
}

GrpcStorageOptions getGrpcStorageOptions() {
return grpcStorageOptions;
}

StorageSettings storageSettings() throws IOException {
return grpcStorageOptions.getStorageSettings();
StorageSettings settings = grpcStorageOptions.getStorageSettings();
if (executor != null) {
ExecutorProvider executorProvider = FixedExecutorProvider.create(executor);
StorageSettings.Builder builder = settings.toBuilder()
.setBackgroundExecutorProvider(executorProvider);
if (builder.getTransportChannelProvider() instanceof InstantiatingGrpcChannelProvider) {
builder.setTransportChannelProvider(
((InstantiatingGrpcChannelProvider) builder.getTransportChannelProvider())
.toBuilder()
.setExecutorProvider(executorProvider)
.build());
}
return builder.build();
}
return settings;
}

@Override
public void close() throws InterruptedException {
server.shutdownNow().awaitTermination(10, TimeUnit.SECONDS);
try {
server.shutdownNow().awaitTermination(10, TimeUnit.SECONDS);
} finally {
if (executor != null) {
executor.shutdownNow();
}
}
}

static void injectIsolatedClient(Storage storage, ScheduledThreadPoolExecutor executor) {
Storage delegate = storage;
if (storage instanceof OtelStorageDecorator) {
try {
java.lang.reflect.Field delegateField = OtelStorageDecorator.class.getDeclaredField("delegate");
delegateField.setAccessible(true);
delegate = (Storage) delegateField.get(storage);
} catch (Exception e) {
throw new RuntimeException("Failed to unwrap OtelStorageDecorator", e);
}
}
if (delegate instanceof GrpcStorageImpl) {
GrpcStorageImpl impl = (GrpcStorageImpl) delegate;
try {
StorageSettings settings = impl.getOptions().getStorageSettings();
ExecutorProvider executorProvider = FixedExecutorProvider.create(executor);
StorageSettings.Builder settingsBuilder = settings.toBuilder()
.setBackgroundExecutorProvider(executorProvider);
if (settingsBuilder.getTransportChannelProvider() instanceof InstantiatingGrpcChannelProvider) {
settingsBuilder.setTransportChannelProvider(
((InstantiatingGrpcChannelProvider) settingsBuilder.getTransportChannelProvider())
.toBuilder()
.setExecutorProvider(executorProvider)
.build());
}
StorageSettings isolatedSettings = settingsBuilder.build();
StorageClient isolatedClient = StorageClient.create(isolatedSettings);

java.lang.reflect.Field clientField = GrpcStorageImpl.class.getDeclaredField("storageClient");
clientField.setAccessible(true);
clientField.set(impl, isolatedClient);
} catch (Exception e) {
throw new RuntimeException("Failed to inject isolated StorageClient", e);
}
}
}

static FakeServer of(StorageGrpc.StorageImplBase service) throws IOException {
InetSocketAddress address = new InetSocketAddress("localhost", 0);
Server server = NettyServerBuilder.forAddress(address).addService(service).build();
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 0);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Instead of hardcoding the IPv4 loopback address "127.0.0.1", consider using InetAddress.getLoopbackAddress(). This is more robust as it dynamically resolves to the correct loopback address (IPv4 or IPv6) for the environment without performing a DNS lookup, which avoids the localhost resolution hangs while maintaining compatibility with IPv6-only loopback environments.

Suggested change
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 0);
InetSocketAddress address = new InetSocketAddress(InetAddress.getLoopbackAddress(), 0);

ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(4);
executor.setKeepAliveTime(10, TimeUnit.SECONDS);
executor.allowCoreThreadTimeOut(true);
Server server = NettyServerBuilder.forAddress(address).addService(service).executor(executor).build();
server.start();
String endpoint = String.format(Locale.US, "%s:%d", address.getHostString(), server.getPort());
GrpcStorageOptions grpcStorageOptions =
Expand All @@ -67,6 +135,16 @@ static FakeServer of(StorageGrpc.StorageImplBase service) throws IOException {
.setEnableGrpcClientMetrics(false)
.setAttemptDirectPath(false)
.setOpenTelemetry(Registry.getInstance().otelSdk.get().get())
.setServiceFactory(
new ServiceFactory<Storage, StorageOptions>() {
@Override
@SuppressWarnings("deprecation")
public Storage create(StorageOptions opts) {
Storage storage = new GrpcStorageOptions.GrpcStorageFactory().create(opts);
injectIsolatedClient(storage, executor);
return storage;
}
})
// cut most retry settings by half. we're hitting an in process server.
.setRetrySettings(
RetrySettings.newBuilder()
Expand All @@ -80,6 +158,6 @@ static FakeServer of(StorageGrpc.StorageImplBase service) throws IOException {
.setMaxRpcTimeoutDuration(Duration.ofSeconds(25))
.build())
.build();
return new FakeServer(server, grpcStorageOptions);
return new FakeServer(server, grpcStorageOptions, executor);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -835,12 +835,9 @@ static class BidiWriteService extends StorageImplBase {
private final BiConsumer<StreamObserver<BidiWriteObjectResponse>, List<BidiWriteObjectRequest>>
c;

private ImmutableList.Builder<BidiWriteObjectRequest> requests;

BidiWriteService(
BiConsumer<StreamObserver<BidiWriteObjectResponse>, List<BidiWriteObjectRequest>> c) {
this.c = c;
this.requests = new ImmutableList.Builder<>();
}

BidiWriteService(ImmutableMap<List<BidiWriteObjectRequest>, BidiWriteObjectResponse> writes) {
Expand Down Expand Up @@ -880,12 +877,16 @@ private static void logUnexpectedRequest(
.collect(joining),
build.stream().map(StorageV2ProtoUtils::fmtProto).collect(oneLine));
LOGGER.warn(msg);
System.err.println(msg);
}

@Override
public StreamObserver<BidiWriteObjectRequest> bidiWriteObject(
StreamObserver<BidiWriteObjectResponse> obs) {
return new Adapter() {
private ImmutableList.Builder<BidiWriteObjectRequest> requests =
new ImmutableList.Builder<>();

@Override
public void onNext(BidiWriteObjectRequest value) {
requests.add(value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,13 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;

public final class ITGapicUnbufferedReadableByteChannelTest {
@Rule public Timeout globalTimeout = Timeout.seconds(45);

private final byte[] bytes = DataGenerator.base64Characters().genBytes(40);
private final ByteString data1 = ByteString.copyFrom(bytes, 0, 10);
private final ByteString data2 = ByteString.copyFrom(bytes, 10, 10);
Expand Down Expand Up @@ -105,7 +109,12 @@ public void readObject(
if (request.equals(req1)) {
responseObserver.onNext(resp1);
responseObserver.onNext(resp2);
responseObserver.onError(apiException(Code.DATA_LOSS));
new Thread(() -> {
try {
Thread.sleep(200);
} catch (InterruptedException ignored) {}
responseObserver.onError(apiException(Code.DATA_LOSS));
}).start();
} else if (request.equals(req2)) {
responseObserver.onNext(resp3);
responseObserver.onNext(resp4);
Expand Down Expand Up @@ -191,11 +200,14 @@ public void readObject(
ReadObjectRequest request, StreamObserver<ReadObjectResponse> responseObserver) {
int count = invocationCount.getAndIncrement();
if (request.equals(req1)) {
if (count == 0) {
responseObserver.onNext(resp1);
responseObserver.onNext(resp2);
responseObserver.onNext(resp1);
responseObserver.onNext(resp2);
new Thread(() -> {
try {
Thread.sleep(200);
} catch (InterruptedException ignored) {}
responseObserver.onError(apiException(Code.DATA_LOSS));
}
}).start();
} else if (request.equals(req2)) {
ReadObjectResponse.Builder builder = resp3.toBuilder();
// increment the generation, as if it had been updated between initial read and retry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@
import java.security.SecureRandom;
import java.util.concurrent.ExecutionException;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.junit.experimental.runners.Enclosed;
import org.junit.runner.RunWith;

Expand Down Expand Up @@ -96,6 +98,7 @@ public class ITGzipReadableByteChannelTest {
.build();

public static final class Uncompressed {
@Rule public Timeout globalTimeout = Timeout.seconds(45);
private static final StorageGrpc.StorageImplBase fakeStorage =
new StorageGrpc.StorageImplBase() {
@Override
Expand Down Expand Up @@ -172,6 +175,7 @@ public void autoGzipDecompress_false() throws IOException {
}

public static final class Compressed {
@Rule public Timeout globalTimeout = Timeout.seconds(45);

private static final StorageGrpc.StorageImplBase fakeStorage =
new StorageGrpc.StorageImplBase() {
Expand Down Expand Up @@ -317,6 +321,7 @@ public void storage_reader_returnRawInputStream_true() throws Exception {
}

public static final class Behavior {
@Rule public Timeout globalTimeout = Timeout.seconds(45);

@Test
public void properlyTracksEOF() throws IOException, InterruptedException, ExecutionException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,7 @@ public void bidiReadObjectError() throws Exception {
assertThat(actual).hasLength(5);
assertThat(xxd(actual)).isEqualTo(xxd(content3.getBytes()));
}
Thread.sleep(50);
}
}

Expand Down Expand Up @@ -660,7 +661,7 @@ public void onNext(BidiReadObjectRequest request) {

StorageException se =
assertThrows(
StorageException.class, () -> TestUtils.await(future, 5, TimeUnit.SECONDS));
StorageException.class, () -> TestUtils.await(future, 30, TimeUnit.SECONDS));
assertThat(se).hasCauseThat().isInstanceOf(DataLossException.class);
DataLossException dataLossException = (DataLossException) se.getCause();
assertThat(dataLossException).isInstanceOf(UncheckedChecksumMismatchException.class);
Expand Down Expand Up @@ -734,7 +735,7 @@ public void retrySettingsApplicable_objectRangeData_offset_notAligned_gt() throw

StorageException se =
assertThrows(
StorageException.class, () -> TestUtils.await(future, 5, TimeUnit.SECONDS));
StorageException.class, () -> TestUtils.await(future, 30, TimeUnit.SECONDS));
assertThat(se).hasCauseThat().isInstanceOf(OutOfRangeException.class);
String suppressedMessages = TestUtils.messagesToText(se);
assertAll(
Expand All @@ -746,6 +747,7 @@ public void retrySettingsApplicable_objectRangeData_offset_notAligned_gt() throw
.contains("position = 10, readRange.read_offset = 11"),
() -> assertThat(suppressedMessages).contains("Asynchronous task failed"));
}
Thread.sleep(50);
}
}

Expand Down Expand Up @@ -795,6 +797,7 @@ public void validateReadRemovedFromStateWhenFailed() throws Exception {
ObjectReadSessionStreamRead<?> outstandingRead = orsi.state.getOutstandingRead(1L);
assertThat(outstandingRead).isNull();
}
Thread.sleep(50);
}
}

Expand Down Expand Up @@ -938,6 +941,7 @@ public void onNext(BidiReadObjectRequest request) {
assert503(f2),
assert503(f3));
}
Thread.sleep(50);
}
}

Expand Down Expand Up @@ -997,7 +1001,7 @@ public void onNext(BidiReadObjectRequest request) {
bd.readAs(ReadProjectionConfigs.asFutureBytes().withRangeSpec(RangeSpec.of(3, 3)));

// make sure the first read succeeded
byte[] actual = TestUtils.await(f1, 5, TimeUnit.SECONDS);
byte[] actual = TestUtils.await(f1, 30, TimeUnit.SECONDS);

// close the "parent"
bd.close();
Expand Down Expand Up @@ -1775,7 +1779,7 @@ static ThrowingRunnable assert503(ApiFuture<?> f) {
static ThrowingRunnable assertStatusCodeIs(ApiFuture<?> f, int expected) {
return () -> {
StorageException se =
assertThrows(StorageException.class, () -> TestUtils.await(f, 5, TimeUnit.SECONDS));
assertThrows(StorageException.class, () -> TestUtils.await(f, 30, TimeUnit.SECONDS));
assertThat(se.getCode()).isEqualTo(expected);
};
}
Expand Down
Loading
Loading