The question often arises about how to speed up gRPC. gRPC allows for high performance RPC, but it is not always clear how to achieve this performance. And I decided to try to show my train of thought when optimizing programs.
"-", . . . , , . gRPC . , .
KvClient β "-". , , . , .
KvService β "-". . 10 50 , .
KvRunner β . , , , . Runner 60 , RPC.
kvstore.proto β Protocol Buffers . , . Create, Retrieve, Update Delete ( CRUD). , . REST, .
Protocol Buffers gRPC β , . gRPC. gRPC-, , (stub, ).
, , , , . , RPC. , create:
private void doCreate(KeyValueServiceBlockingStub stub) {
ByteString key = createRandomKey();
try {
CreateResponse res = stub.create(
CreateRequest.newBuilder()
.setKey(key)
.setValue(randomBytes(MEAN_VALUE_SIZE))
.build());
if (!res.equals(CreateResponse.getDefaultInstance())) {
throw new RuntimeException("Invalid response");
}
} catch (StatusRuntimeException e) {
if (e.getStatus().getCode() == Code.ALREADY_EXISTS) {
knownKeys.remove(key);
logger.log(Level.INFO, "Key already existed", e);
} else {
throw e;
}
}
}
. , . , , , , , . , , , . , . , - , . .
gRPC API, . gRPC-, . , RPC.
private final Map<ByteBuffer, ByteBuffer> store = new HashMap<>();
@Override
public synchronized void create(
CreateRequest request, StreamObserver<CreateResponse> responseObserver) {
ByteBuffer key = request.getKey().asReadOnlyByteBuffer();
ByteBuffer value = request.getValue().asReadOnlyByteBuffer();
simulateWork(WRITE_DELAY_MILLIS);
if (store.putIfAbsent(key, value) == null) {
responseObserver.onNext(CreateResponse.getDefaultInstance());
responseObserver.onCompleted();
return;
}
responseObserver.onError(Status.ALREADY_EXISTS.asRuntimeException());
}
ByteBuffer
. , , synchronized
. Map
.
, . onNext()
responseObserver
. onCompleted()
.
β , . Ubuntu, 12- 32 . :
$ ./gradlew installDist
$ time ./build/install/kvstore/bin/kvstore
Feb 26, 2018 1:10:07 PM io.grpc.examples.KvRunner runClient
INFO: Starting
Feb 26, 2018 1:11:07 PM io.grpc.examples.KvRunner runClient
INFO: Did 16.55 RPCs/s
real 1m0.927s
user 0m10.688s
sys 0m1.456s
! 16 RPC . , , . , .
- , , . , , . , .
void doClientWork(AtomicBoolean done) {
Random random = new Random();
KeyValueServiceBlockingStub stub = KeyValueServiceGrpc.newBlockingStub(channel);
while (!done.get()) {
// Pick a random CRUD action to take.
int command = random.nextInt(4);
if (command == 0) {
doCreate(stub);
continue;
}
/* ... */
rpcCount++;
}
}
, RPC. . RPC? , 50 . 20 :
20 = 1000 / (50 / )
16 , . , time
, . simulateWork (sleep). , , RPC.
, (real) (user). , 10 . 16% . , , , , RPC.
. β , . .
gRPC- Java : , ListenableFuture
. . ListenableFuture API β , , . , , , RPC, .
ListenableFuture
. , . , , . , RPC ( ). , RPC , RPC .
. , . . - . doCreate():
private void doCreate(KeyValueServiceFutureStub stub, AtomicReference<Throwable> error) {
ByteString key = createRandomKey();
ListenableFuture<CreateResponse> res = stub.create(
CreateRequest.newBuilder()
.setKey(key)
.setValue(randomBytes(MEAN_VALUE_SIZE))
.build());
res.addListener(() -> rpcCount.incrementAndGet(), MoreExecutors.directExecutor());
Futures.addCallback(res, new FutureCallback<CreateResponse>() {
@Override
public void onSuccess(CreateResponse result) {
if (!result.equals(CreateResponse.getDefaultInstance())) {
error.compareAndSet(null, new RuntimeException("Invalid response"));
}
synchronized (knownKeys) {
knownKeys.add(key);
}
}
@Override
public void onFailure(Throwable t) {
Status status = Status.fromThrowable(t);
if (status.getCode() == Code.ALREADY_EXISTS) {
synchronized (knownKeys) {
knownKeys.remove(key);
}
logger.log(Level.INFO, "Key already existed", t);
} else {
error.compareAndSet(null, t);
}
}
});
}
KeyValueServiceFutureStub
, Future
. gRPC Java ListenableFuture
, Future
. . , RPC . , .
RPC. , RPC.
RPC , , . doCreate()
RPC, , throw. . , , .
, , knownKeys
, RPC , , , . , , . : knownKeys
, . knownKeys
, knownKeys
, RPC, . , . , . , .
, , :
WARNING: An exception was thrown by io.grpc.netty.NettyClientStream$Sink$1.operationComplete()
java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Thread.java:714)
...
?! , ? . . , . , . : "unable to create new native thread" ( ), . , . OOM , Java , . , , .
, ? RPC , . , RPC, . ListenableFuture
.
RPC. RPC . , RPC. , , . RPC ( ), . (Semaphore):
private final Semaphore limiter = new Semaphore(100);
private void doCreate(KeyValueServiceFutureStub stub, AtomicReference<Throwable> error)
throws InterruptedException {
limiter.acquire();
ByteString key = createRandomKey();
ListenableFuture<CreateResponse> res = stub.create(
CreateRequest.newBuilder()
.setKey(key)
.setValue(randomBytes(MEAN_VALUE_SIZE))
.build());
res.addListener(() -> {
rpcCount.incrementAndGet();
limiter.release();
}, MoreExecutors.directExecutor());
/* ... */
}
.
:
$ ./gradlew installDist
$ time ./build/install/kvstore/bin/kvstore
Feb 26, 2018 2:40:47 PM io.grpc.examples.KvRunner runClient
INFO: Starting
Feb 26, 2018 2:41:47 PM io.grpc.examples.KvRunner runClient
INFO: Did 24.283 RPCs/s
real 1m0.923s
user 0m12.772s
sys 0m1.572s
46% RPC , . , 20% . , . . - .
? , 1/4 (create, update delete). 1/4 . RPC :
.25 * 50ms (create)
.25 * 10ms (retrieve)
.25 * 50ms (update)
+.25 * 50ms (delete)
------------
40ms
40 RPC , RPC :
25 = 1000 / (40 / )
, . , . , , .
gRPC-. , . , . .
Java Developer. Basic. , , .