So, you want to optimize gRPC. Part 1

The question often arises about how to speed up gRPC. gRPC allows for high performance RPC, but it is not always clear how to achieve this performance. And I decided to try to show my train of thought when optimizing programs.





"-", . . . , , . gRPC . , .





Java. protobuf-, API:





  • KvClient β€” "-". , , . , .





  • KvService β€” "-". . 10 50 , .





  • KvRunner β€” . , , , . Runner 60 , RPC.





  • kvstore.proto β€” Protocol Buffers . , . Create, Retrieve, Update Delete ( CRUD). , . REST, .





Protocol Buffers gRPC β€” , . gRPC. gRPC-, , (stub, ).





, , , , . , RPC. , create:





private void doCreate(KeyValueServiceBlockingStub stub) {
  ByteString key = createRandomKey();
  try {
    CreateResponse res = stub.create(
        CreateRequest.newBuilder()
            .setKey(key)
            .setValue(randomBytes(MEAN_VALUE_SIZE))
            .build());
    if (!res.equals(CreateResponse.getDefaultInstance())) {
      throw new RuntimeException("Invalid response");
    }
  } catch (StatusRuntimeException e) {
    if (e.getStatus().getCode() == Code.ALREADY_EXISTS) {
      knownKeys.remove(key);
      logger.log(Level.INFO, "Key already existed", e);
    } else {
      throw e;
    }
  }
}

      
      



. , . , , , , , . , , , . , . , - , . .





gRPC API, . gRPC-, . , RPC.





:





private final Map<ByteBuffer, ByteBuffer> store = new HashMap<>();

@Override
public synchronized void create(
    CreateRequest request, StreamObserver<CreateResponse> responseObserver) {
  ByteBuffer key = request.getKey().asReadOnlyByteBuffer();
  ByteBuffer value = request.getValue().asReadOnlyByteBuffer();
  simulateWork(WRITE_DELAY_MILLIS);
  if (store.putIfAbsent(key, value) == null) {
    responseObserver.onNext(CreateResponse.getDefaultInstance());
    responseObserver.onCompleted();
    return;
  }
  responseObserver.onError(Status.ALREADY_EXISTS.asRuntimeException());
}

      
      



ByteBuffer



. , , synchronized



. Map



.





, . onNext()



responseObserver



. onCompleted()



.





β€” , . Ubuntu, 12- 32 . :





$ ./gradlew installDist
$ time ./build/install/kvstore/bin/kvstore
Feb 26, 2018 1:10:07 PM io.grpc.examples.KvRunner runClient
INFO: Starting
Feb 26, 2018 1:11:07 PM io.grpc.examples.KvRunner runClient
INFO: Did 16.55 RPCs/s

real	1m0.927s
user	0m10.688s
sys	0m1.456s
      
      



! 16 RPC . , , . , .





- , , . , , . , .





RPC . , :





void doClientWork(AtomicBoolean done) {
  Random random = new Random();
  KeyValueServiceBlockingStub stub = KeyValueServiceGrpc.newBlockingStub(channel);

  while (!done.get()) {
    // Pick a random CRUD action to take.
    int command = random.nextInt(4);
    if (command == 0) {
      doCreate(stub);
      continue;
    }
    /* ... */
    rpcCount++;
  }
}

      
      



, RPC. . RPC? , 50 . 20 :





20 = 1000 / (50 / )





16 , . , time



, . simulateWork (sleep). , , RPC.





, (real) (user). , 10 . 16% . , , , , RPC.





. β€” , . .





gRPC- Java : , ListenableFuture



. . ListenableFuture API β€” , , . , , , RPC, . 





ListenableFuture



. , . , , . , RPC ( ). , RPC , RPC .





. , . . - . doCreate():





private void doCreate(KeyValueServiceFutureStub stub, AtomicReference<Throwable> error) {
  ByteString key = createRandomKey();
  ListenableFuture<CreateResponse> res = stub.create(
      CreateRequest.newBuilder()
          .setKey(key)
          .setValue(randomBytes(MEAN_VALUE_SIZE))
          .build());
  res.addListener(() -> rpcCount.incrementAndGet(), MoreExecutors.directExecutor());
  Futures.addCallback(res, new FutureCallback<CreateResponse>() {
    @Override
    public void onSuccess(CreateResponse result) {
      if (!result.equals(CreateResponse.getDefaultInstance())) {
        error.compareAndSet(null, new RuntimeException("Invalid response"));
      }
      synchronized (knownKeys) {
        knownKeys.add(key);
      }
    }

    @Override
    public void onFailure(Throwable t) {
      Status status = Status.fromThrowable(t);
      if (status.getCode() == Code.ALREADY_EXISTS) {
        synchronized (knownKeys) {
          knownKeys.remove(key);
        }
        logger.log(Level.INFO, "Key already existed", t);
      } else {
        error.compareAndSet(null, t);
      }
    }
  });
}

      
      



KeyValueServiceFutureStub



, Future



. gRPC Java ListenableFuture



, Future



. . , RPC . , .





RPC. , RPC.





RPC , , . doCreate()



RPC, , throw. . , , .





, , knownKeys



, RPC , , , . , , . : knownKeys



, . knownKeys



, knownKeys



, RPC, . , . , . , .





, , :





WARNING: An exception was thrown by io.grpc.netty.NettyClientStream$Sink$1.operationComplete()
java.lang.OutOfMemoryError: unable to create new native thread
	at java.lang.Thread.start0(Native Method)
	at java.lang.Thread.start(Thread.java:714)
	...
      
      



?! , ? . . , . , . : "unable to create new native thread" ( ), . , . OOM , Java , . , , .





, ? RPC , . , RPC, . ListenableFuture



.





RPC. RPC . , RPC. , , . RPC ( ), . (Semaphore):





private final Semaphore limiter = new Semaphore(100);

private void doCreate(KeyValueServiceFutureStub stub, AtomicReference<Throwable> error)
    throws InterruptedException {
  limiter.acquire();
  ByteString key = createRandomKey();
  ListenableFuture<CreateResponse> res = stub.create(
      CreateRequest.newBuilder()
          .setKey(key)
          .setValue(randomBytes(MEAN_VALUE_SIZE))
          .build());
  res.addListener(() ->  {
    rpcCount.incrementAndGet();
    limiter.release();
  }, MoreExecutors.directExecutor());
  /* ... */
}

      
      



.





:





$ ./gradlew installDist
$ time ./build/install/kvstore/bin/kvstore
Feb 26, 2018 2:40:47 PM io.grpc.examples.KvRunner runClient
INFO: Starting
Feb 26, 2018 2:41:47 PM io.grpc.examples.KvRunner runClient
INFO: Did 24.283 RPCs/s

real	1m0.923s
user	0m12.772s
sys	0m1.572s
      
      



46% RPC , . , 20% . , . . - .





? , 1/4 (create, update delete). 1/4 . RPC :





.25 * 50ms (create)
  .25 * 10ms (retrieve)
  .25 * 50ms (update)
 +.25 * 50ms (delete)
------------
        40ms
      
      



40 RPC , RPC :





25 = 1000 / (40 / )





, . , . , , .





gRPC-. , . , . .






Java Developer. Basic. , , .





.








All Articles