PolarSPARC |
Bhaskar S | 12/12/2020 |
Overview
Thus far in this series:
Part 1 covered the basics of gRPC, installation and setup, and the demonstration of the Unary RPC
Part 2 covered the Server Streaming RPC
In this part, we will continue the journey to the next RPC communication pattern - Client Streaming and also show how to deal with error conditions.
Client Streaming RPC
The following diagram illustrates the high-level architecture of Client Streaming communication pattern:
In the Client Streaming RPC mode, the client sends a sequence (or stream) of requests to the server and the server responds with a response back to the client.
For the Client Streaming RPC demonstration, we will implement a fictitious Best Insurance Quote service, where the client sends requests for their preferred 'providers' (along with their age) to the server and the server responds with the 'provider' offering the best quote along with the price.
We will first demonstrate the Best Quote service using the Go programming language.
In the $GOPATH directory, create the project directory hierarchy by executing the following commands:
$ cd $GOPATH/src/polarsparc.com/grpc
$ mkdir -p clientstream clientstream/quotepb clientstream/server clientstream/client
The following are the contents of the file best_quote.proto located in the directory $GOPATH/src/polarsparc.com/grpc/clientstream/quotepb as shown below:
/* @Author: Bhaskar S @Blog: https://www.polarsparc.com @Date: 12 Dec 2020 */ syntax = "proto3"; package clientstream; option go_package = "polarsparc.com/grpc/clientstream/quotepb"; option java_multiple_files = true; option java_package = "com.polarsparc.gcs"; message BestQuoteRequest { string provider = 1; int32 age = 2; } message BestQuoteResponse { string provider = 1; double price = 2; } service BestQuoteService { rpc getBestQuote(stream BestQuoteRequest) returns (BestQuoteResponse); }
The request message is defined as BestQuoteRequest and the response message is defined as BestQuoteResponse. The service interface is defined as BestQuoteService with an RPC method getBestQuote that takes in a sequence (or stream) of BestQuoteRequest objects and returns a BestQuoteResponse object.
To compile the best_quote.proto file, execute the following commands:
$ cd $GOPATH/src/polarsparc.com/grpc/clientstream
$ protoc quotepb/best_quote.proto --go_out=plugins=grpc:$GOPATH/src
On success, this will generate the Go code file called best_quote.pb.go located in the directory $GOPATH/src/polarsparc.com/grpc/clientstream/quotepb.
From the file best_quote.pb.go, we see the BestQuoteServiceServer interface, as shown below, that the server needs to implements:
. . . type BestQuoteServiceServer interface { GetBestQuote(BestQuoteService_GetBestQuoteServer) error } . . .
The following are the contents of the file quote_provider.go that simulates an in-memory store for initializing and returning quotes from fictitious providers and is located in the directory $GOPATH/src/polarsparc.com/grpc/clientstream/server as shown below:
/* @Author: Bhaskar S @Blog: https://www.polarsparc.com @Date: 12 Dec 2020 */ package main import ( "errors" "fmt" "log" ) type ProviderQuote struct { Provider string AgeLow int32 AgeHigh int32 Price float64 } func (pq ProviderQuote) inRange(age int32) bool { if age >= pq.AgeLow && age <= pq.AgeHigh { return true } return false } type QuotesCache map[string][]ProviderQuote type server struct { cache QuotesCache } func (s *server) Init() { l1 := []ProviderQuote{{Provider: "Alice", AgeLow: 20, AgeHigh: 30, Price: 1000.0}, {Provider: "Alice", AgeLow: 31, AgeHigh: 45, Price: 1500.0}, {Provider: "Alice", AgeLow: 46, AgeHigh: 55, Price: 2000.0}, } s.cache["Alice"] = l1 l2 := []ProviderQuote{{Provider: "Bob", AgeLow: 20, AgeHigh: 30, Price: 1100.0}, {Provider: "Bob", AgeLow: 31, AgeHigh: 45, Price: 1475.0}, {Provider: "Bob", AgeLow: 46, AgeHigh: 55, Price: 1950.0}, } s.cache["Bob"] = l2 l3 := []ProviderQuote{{Provider: "Charlie", AgeLow: 20, AgeHigh: 30, Price: 975.0}, {Provider: "Charlie", AgeLow: 31, AgeHigh: 45, Price: 1525.0}, {Provider: "Charlie", AgeLow: 46, AgeHigh: 55, Price: 2050.0}, } s.cache["Charlie"] = l3 } func (s *server) GetProviderQuote(provider string, age int32) (*ProviderQuote, error) { log.Printf("Request for provider: %s, age: %d", provider, age) var pq *ProviderQuote quotes := s.cache[provider] if quotes == nil { return nil, errors.New(fmt.Sprintf("Specified provider %s invalid", provider)) } for _, e := range quotes { if e.inRange(age) { pq = &e break } } if pq == nil { return nil, errors.New(fmt.Sprintf("No Quote for the specified provider %s and age %d", provider, age)) } log.Printf("Provider quote for %s and age %d - %.02f\n", provider, age, pq.Price) return pq, nil }
The following are the contents of the file server.go for the Client Streaming RPC server that implements the BestQuoteServiceServer interface and is located in the directory $GOPATH/src/polarsparc.com/grpc/clientstream/server as shown below:
/* @Author: Bhaskar S @Blog: https://www.polarsparc.com @Date: 12 Dec 2020 */ package main import ( "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "io" "log" "net" "polarsparc.com/grpc/clientstream/quotepb" // [1] ) func (s *server) GetBestQuote(stream quotepb.BestQuoteService_GetBestQuoteServer) error { // [2] var provider string var price float64 for { req, err := stream.Recv() // [3] if err == nil { pq, err := s.GetProviderQuote(req.Provider, req.Age) // [4] if err == nil { if provider == "" || price > pq.Price { // [5] provider = pq.Provider price = pq.Price } } else { log.Printf("Encountered an error on the server: %v", err) return status.Errorf(codes.InvalidArgument, err.Error()) // [7] } } else if err == io.EOF { // Received all client requests log.Printf("===> Best quote Provider: %s, Price: %.03f\n", provider, price) return stream.SendAndClose("epb.BestQuoteResponse{ // [6] Provider: provider, Price: price, }) } else { log.Printf("Encountered an error for BestQuote at localhost:20003: %v\n", err) return status.Errorf(codes.FailedPrecondition, err.Error()) // [7] } } } func main() { qs := &server{ cache: QuotesCache{}, } qs.Init() log.Println("Ready to start the BestQuote server...") lis, err := net.Listen("tcp", "localhost:20003") if err != nil { log.Fatalf("Failed to create listener on localhost:20003") } srv := grpc.NewServer() quotepb.RegisterBestQuoteServiceServer(srv, qs) if err = srv.Serve(lis); err != nil { log.Fatalf("Failed to start server: %v", err) } }
The following are brief descriptions for some of the Go type(s)/method(s) used in the code above:
[1] :: import the code from the package polarsparc.com/grpc/clientstream/quotepb generated by the protoc compiler
[2] :: when the gRPC server invokes the RPC method GetBestQuote, it automatically passes in a reference to the stream object BestQuoteService_GetBestQuoteServer
[3] :: calling the Recv() method on the stream object returns the next request object. If there are no more requests, the stream returns an error in the form of an io.EOF
[4] :: for each request from the client, invoke the method GetProviderQuote to find the quote for the given provider and age
[5] :: keep track of the best quote (lowest price) all the providers for the given age
[6] :: when the server receives all the requests from the client and finally sees a io.EOF, send a response back to the client with the best quote with the provider and price
[7] :: leverage the gRPC built-in error model by calling the method status.Errorf(...) to handle error conditions. The status object is composed of a standard status code and a user-defined message description
The following are the contents of the file client.go that implements the Client Streaming RPC client for the BestQuoteServiceServer located in the directory $GOPATH/src/polarsparc.com/grpc/clientstream/client as shown below:
/* @Author: Bhaskar S @Blog: https://www.polarsparc.com @Date: 12 Dec 2020 */ package main import ( "context" "google.golang.org/grpc" "google.golang.org/grpc/status" "log" "polarsparc.com/grpc/clientstream/quotepb" ) func main() { log.Println("Ready to start the BestQuote client...") conn, err := grpc.Dial("localhost:20003", grpc.WithInsecure()) if err != nil { log.Fatalf("Failed to connect to localhost:20003") } defer conn.Close() cl := quotepb.NewBestQuoteServiceClient(conn) // [1] // Case - 1 stream, err := cl.GetBestQuote(context.Background()) // [2] if err != nil { log.Fatalf("[1] Failed to create client stub to localhost:20003: %v", err) } err = stream.Send("epb.BestQuoteRequest{ // [3] Provider: "Bob", Age: 37, }) if err != nil { log.Fatalf("[1] Failed to send request to localhost:20003: %v", err) } err = stream.Send("epb.BestQuoteRequest{ // [3] Provider: "Charlie", Age: 37, }) if err != nil { log.Fatalf("[1] Failed to send request to localhost:20003: %v", err) } res, err := stream.CloseAndRecv() if err != nil { log.Fatalf("[1] Received and error from BestQuote at localhost:20003: %v", err) } log.Printf("[1] Best quote from provider %s with price %.02f\n", res.Provider, res.Price) // Case - 2 - Error case stream, err = cl.GetBestQuote(context.Background()) // [2] if err != nil { log.Fatalf("[2] Failed to create client stub to localhost:20003: %v", err) } err = stream.Send("epb.BestQuoteRequest{ // [3] Provider: "Alice", Age: 48, }) if err != nil { log.Fatalf("[2-1] Failed to send request to localhost:20003: %v", err) } err = stream.Send("epb.BestQuoteRequest{ // [3] Provider: "Dave", Age: 48, }) if err != nil { log.Fatalf("[2-2] Failed to send request to localhost:20003: %v", err) } res, err = stream.CloseAndRecv() // [4] if err != nil { st, ok := status.FromError(err) // [5] if ok { log.Printf("[2] Error - %s\n", st.Message()) } else { log.Fatalf("[2] Unexpected failure from BestQuote at localhost:20003: %v", err) } } }
The following are brief descriptions for some of the Go type(s)/method(s) used in the code above:
[1] :: create an instance of the gRPC client stub NewBestQuoteServiceClient generated by the protoc compiler
[2] :: call the method GetBestQuote to get an instance of the stream object
[3] :: invoke the Send method on the stream object to dispatch a BestQuoteRequest to the server
[4] :: invoke the CloseAndRecv method on the stream object to signal the completion of request from the client and wait for the response (or an error) from the server
[5] :: call the method status.FromError passing in the error object to get the gRPC error status details
Open two Terminal windows - one for the server and one for the client.
In the server Terminal, execute the following commands:
$ cd $GOPATH/src/polarsparc.com/grpc/clientstream/server
$ go run server.go quote_provider.go
The following would be the typical output:
2020/12/12 12:13:59 Ready to start the BestQuote server...
In the client Terminal, execute the following commands:
$ cd $GOPATH/src/polarsparc.com/grpc/clientstream/client
$ go run client.go
The following would be the typical output:
2020/12/12 12:14:07 Ready to start the BestQuote client... 2020/12/12 12:14:07 [1] Best quote from provider Bob with price 1475.00 2020/12/12 12:14:07 [2] Error - Specified provider Dave invalid
The following would be the additional output on the Terminal running the server:
2020/12/12 12:14:07 Request for provider: Bob, age: 37 2020/12/12 12:14:07 Provider quote for Bob and age 37 - 1475.00 2020/12/12 12:14:07 Request for provider: Charlie, age: 37 2020/12/12 12:14:07 Provider quote for Charlie and age 37 - 1525.00 2020/12/12 12:14:07 ===> Best quote Provider: Bob, Price: 1475.000 2020/12/12 12:14:07 Request for provider: Alice, age: 48 2020/12/12 12:14:07 Provider quote for Alice and age 48 - 2000.00 2020/12/12 12:14:07 Request for provider: Dave, age: 48 2020/12/12 12:14:07 Encountered an error on the server: Specified provider Dave invalid
Excelente !!! We have successfully demonstrated the Client Streaming gRPC communication style (with error handling) using the Go language.
Copy the file best_quote.proto listed above to the directory $HOME/java/grpc/src/main/proto.
To compile the best_quote.proto file, execute the following commands:
$ cd $HOME/java/grpc
$ mvn compile
On success, this will generate some files in the directory $HOME/java/grpc/target/generated-sources/protobuf/java/com/polarsparc/gcs.
The following are the contents of the file ProviderQuote.java that represents a holder object for storing the provider, the lower and upper age limits, and the price for the age group and is located in the directory $HOME/java/grpc/src/main/java/com/polarsparc/gcs/server as shown below:
/* @Author: Bhaskar S @Blog: https://www.polarsparc.com @Date: 12 Dec 2020 */ package com.polarsparc.gcs.server; public class ProviderQuote { private final String provider; private final int ageLow; private final int ageHigh; private final double price; public ProviderQuote(String provider, int ageLow, int ageHigh, double price) { this.provider = provider; this.ageLow = ageLow; this.ageHigh = ageHigh; this.price = price; } public boolean inRange(int age) { return age >= this.ageLow && age <= this.ageHigh; } public double getPrice() { return price; } @Override public String toString() { return "ProviderQuote{" + "provider='" + provider + '\'' + ", ageLow=" + ageLow + ", ageHigh=" + ageHigh + ", price=" + price + '}'; } }
The following are the contents of the file BestQuoteProvider.java that simulates an in-memory store for initializing and returning quotes from fictitious providers and is located in the directory $HOME/java/grpc/src/main/java/com/polarsparc/gcs/server as shown below:
/* @Author: Bhaskar S @Blog: https://www.polarsparc.com @Date: 12 Dec 2020 */ package com.polarsparc.gcs.server; import java.util.*; import java.util.logging.Level; import java.util.logging.Logger; public class BestQuoteProvider { private final static Logger LOGGER = Logger.getLogger(BestQuoteProvider.class.getName()); private final static Map<String, List<ProviderQuote>> quotesTable = new HashMap<>(); static { LOGGER.setLevel(Level.INFO); List<ProviderQuote> p1 = Arrays.asList(new ProviderQuote("Alice", 20, 30, 1000.00), new ProviderQuote("Alice", 31, 45, 1500.00), new ProviderQuote("Alice", 46, 55, 2000.00)); quotesTable.put("Alice", p1); List<ProviderQuote> p2 = Arrays.asList(new ProviderQuote("Bob", 20, 30, 1100.00), new ProviderQuote("Bob", 31, 45, 1475.00), new ProviderQuote("Bob", 46, 55, 1950.00)); quotesTable.put("Bob", p2); List<ProviderQuote> p3 = Arrays.asList(new ProviderQuote("Charlie", 20, 30, 975.00), new ProviderQuote("Charlie", 31, 45, 1525.00), new ProviderQuote("Charlie", 46, 55, 2050.00)); quotesTable.put("Charlie", p3); } private BestQuoteProvider() { } public static ProviderQuote getBestQuote(String provider, int age) { LOGGER.info(String.format("Request for provider: %s, age: %d", provider, age)); if (!quotesTable.containsKey(provider)) { throw new RuntimeException(String.format("Specified provider %s invalid", provider)); } ProviderQuote quote = null; List<ProviderQuote> quotes = quotesTable.get(provider); for (ProviderQuote pq : quotes) { if (pq.inRange(age)) { quote = pq; break; } } if (quote == null) { throw new RuntimeException(String.format("No Quote for the specified provider %s and age %d", provider, age)); } LOGGER.info(String.format("Quote by provider %s at price %.02f", provider, quote.getPrice())); return quote; } }
To receive the sequence of requests from the client, the server needs to return a stub handler object to the client that implements the interface StreamObserver. The following are the contents of the Java program called BestQuoteRequestStreamObserver.java that implements the required interface and located in the directory $HOME/java/grpc/src/test/java/com/polarsparc/gcs/server as shown below:
/* @Author: Bhaskar S @Blog: https://www.polarsparc.com @Date: 12 Dec 2020 */ package com.polarsparc.gcs.server; import com.polarsparc.gcs.BestQuoteRequest; import com.polarsparc.gcs.BestQuoteResponse; import io.grpc.Status; import io.grpc.stub.StreamObserver; public class BestQuoteRequestStreamObserver implements StreamObserver<BestQuoteRequest> { private final StreamObserver<BestQuoteResponse> response; private String provider; private double price; private boolean okay = true; // [1] public BestQuoteRequestStreamObserver(StreamObserver<BestQuoteResponse> response) { this.response = response; this.provider = null; this.price = 0.0; } @Override public void onNext(BestQuoteRequest request) { // [2] String provider = request.getProvider(); int age = request.getAge(); try { ProviderQuote pq = BestQuoteProvider.getBestQuote(provider, age); if (this.provider == null || this.price > pq.getPrice()) { this.provider = provider; this.price = pq.getPrice(); } } catch (RuntimeException ex) { okay = false; // [3] onError(ex); // [3] } } @Override public void onError(Throwable ex) { // [4] Status status = Status.INVALID_ARGUMENT.withDescription(ex.getMessage()); response.onError(status.asRuntimeException()); } @Override public void onCompleted() { // [5] if (okay) { BestQuoteResponse quote = BestQuoteResponse.newBuilder() .setProvider(this.provider) .setPrice(this.price) .build(); response.onNext(quote); response.onCompleted(); } } }
The following are brief descriptions for some of the Java class(es)/method(s) used in the code above:
[1] :: a flag that tracks if an error occurred during the processing of the next request from the client
[2] :: method onNext() is invoked when the server receives the next request from the client. We keep track of the provider with the best quote (lowest price) after a lookup from the object BestQuoteProvider
[3] :: if the lookup fails, set the flag to 'false' and invoke the method onError() passing in the exception
[4] :: method onError() is invoked when an error is encountered. Leverage the gRPC built-in error model to indicate the type of error
[5] :: method onCompleted() is invoked when the server is done processing all the requests from the client and is ready to send a response back to the client. Notice that the response is sent only if there was no error (using the flag 'okay')
The following are the contents of the Java program called BestQuoteService.java that implements the Client Streaming gRPC service BestQuoteService located in the directory $HOME/java/grpc/src/main/java/com/polarsparc/gcs/server as shown below:
/* @Author: Bhaskar S @Blog: https://www.polarsparc.com @Date: 12 Dec 2020 */ package com.polarsparc.gcs.server; import com.polarsparc.gcs.BestQuoteRequest; import com.polarsparc.gcs.BestQuoteResponse; import com.polarsparc.gcs.BestQuoteServiceGrpc; import io.grpc.stub.StreamObserver; import java.util.logging.Level; import java.util.logging.Logger; public class BestQuoteService extends BestQuoteServiceGrpc.BestQuoteServiceImplBase { // [1] private final static Logger LOGGER = Logger.getLogger(BestQuoteService.class.getName()); static { LOGGER.setLevel(Level.INFO); } @Override public StreamObserver<BestQuoteRequest> getBestQuote(StreamObserver<BestQuoteResponse> responseObserver) { // [2] return new BestQuoteRequestStreamObserver(responseObserver); } }
The following are brief descriptions for some of the Java class(es)/method(s) used in the code above:
[1] :: extend the base class BestQuoteServiceGrpc.BestQuoteServiceImplBase generated by the maven protobuf compiler plugin
[2] :: override the service method getBestQuote which takes a single input argument - a StreamObserver object that is used for sending the response back to the client. It returns an instance of the client handler object BestQuoteRequestStreamObserver
The following are the contents of the Java program called BestQuoteServer.java that registers the Client Streaming RPC service BestQuoteService as a gRPC server and is located in the directory $HOME/java/grpc/src/main/java/com/polarsparc/gcs/server as shown below:
/* @Author: Bhaskar S @Blog: https://www.polarsparc.com @Date: 12 Dec 2020 */ package com.polarsparc.gcs.server; import io.grpc.Server; import io.grpc.ServerBuilder; import java.io.IOException; public class BestQuoteServer { public static void main(String[] args) { Server server = ServerBuilder.forPort(20003) // [1] .addService(new BestQuoteService()) // [2] .build(); try { server.start(); // [3] } catch (IOException e) { e.printStackTrace(); } System.out.print("Started the gRPC BestQuoteService on 20003 ...\n"); try { server.awaitTermination(); // [4] } catch (InterruptedException e) { e.printStackTrace(); } } }
The following are brief descriptions for some of the Java class(es)/method(s) used in the code above:
[1] :: create an instance of the gRPC server on the specified port
[2] :: register an instance of the BestQuoteService object (that implements the service method getBestQuote) with the gRPC server
[3] :: start the gRPC server
[4] :: wait till the gRPC server terminates
To receive responses from the server in an asynchronous fashion, a client needs to implement the interface StreamObserver and register it as a callback handler on the client stub. The following are the contents of the Java program called BestQuoteStreamObserver.java that implements required interface for the asynchronous callback and located in the directory $HOME/java/grpc/src/test/java/com/polarsparc/gcs/client as shown below:
/* @Author: Bhaskar S @Blog: https://www.polarsparc.com @Date: 12 Dec 2020 */ package com.polarsparc.gcs.client; import com.polarsparc.gcs.BestQuoteResponse; import io.grpc.Status; import io.grpc.stub.StreamObserver; import java.util.concurrent.CountDownLatch; public class BestQuoteStreamObserver implements StreamObserver<BestQuoteResponse> { private final CountDownLatch latch; public BestQuoteStreamObserver(CountDownLatch latch) { this.latch = latch; } @Override public void onNext(BestQuoteResponse response) { // [1] System.out.printf("Best quote from provider %s with price %.02f\n", response.getProvider(), response.getPrice()); } @Override public void onError(Throwable ex) { // [2] Status status = Status.fromThrowable(ex); // [3] System.out.printf("Error status: code - %s, description - %s\n", status.getCode(), status.getDescription()); latch.countDown(); } @Override public void onCompleted() { // [4] System.out.println("Done !!!"); latch.countDown(); } }
The following are brief descriptions for some of the Java class(es)/method(s) used in the code above:
[1] :: method onNext() is invoked when there is valid response from the server
[2] :: method onError() is invoked when an error is encountered
[3] :: call the method Status.fromThrowable() with the thrown exception as an input to get the gRPC error status details
[4] :: method onCompleted() is invoked when the server is done sending all the responses
The following are the contents of the Java program called BestQuoteClientTest.java that implements the Client Streaming RPC client for BestQuoteService and is located in the directory $HOME/java/grpc/src/test/java/com/polarsparc/gcs/client as shown below:
/* @Author: Bhaskar S @Blog: https://www.polarsparc.com @Date: 12 Dec 2020 */ package com.polarsparc.gcs.client; import com.polarsparc.gcs.BestQuoteRequest; import com.polarsparc.gcs.BestQuoteServiceGrpc; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.stub.StreamObserver; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; import java.util.concurrent.CountDownLatch; @TestInstance(TestInstance.Lifecycle.PER_CLASS) public class BestQuoteClientTest { private BestQuoteServiceGrpc.BestQuoteServiceStub stub; @BeforeAll public void setup() { ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 20003) // [1] .usePlaintext() // [2] .build(); stub = BestQuoteServiceGrpc.newStub(channel); // [3] } @Test public void bestQuoteAsyncTestOne() { CountDownLatch latch = new CountDownLatch(1); StreamObserver<BestQuoteRequest> requestObserver = stub.getBestQuote(new BestQuoteStreamObserver(latch)); // [4] BestQuoteRequest req1 = BestQuoteRequest.newBuilder().setProvider("Bob").setAge(37).build(); // [5] BestQuoteRequest req2 = BestQuoteRequest.newBuilder().setProvider("Charlie").setAge(37).build(); // [5] requestObserver.onNext(req1); // [6] requestObserver.onNext(req2); // [6] requestObserver.onCompleted(); // [7] try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } @Test public void bestQuoteAsyncTestTwo() { CountDownLatch latch = new CountDownLatch(1); StreamObserver<BestQuoteRequest> requestObserver = stub.getBestQuote(new BestQuoteStreamObserver(latch)); // [4] BestQuoteRequest req1 = BestQuoteRequest.newBuilder().setProvider("Alice").setAge(48).build(); // [5] BestQuoteRequest req2 = BestQuoteRequest.newBuilder().setProvider("Dave").setAge(48).build(); // [5] requestObserver.onNext(req1); // [6] requestObserver.onNext(req2); // [6] requestObserver.onCompleted(); // [7] try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } }
The following are brief descriptions for some of the Java class(es)/method(s) used in the code above:
[1] :: create an instance of the object ManagedChannel that represents a virtual gRPC connection to the service endpoint on the specified ip address and port
[2] :: indicate that we are using an unsecured communication channel
[3] :: create an instance of the gRPC asynchronous (non-blocking) client stub BestQuoteServiceStub generated by the protoc compiler
[4] :: invoke the gRPC method getBestQuote using the asynchronous client stub passing in an instance of the callback handler BestQuoteStreamObserver. The service method invocation will return a stream request handler
[5] :: create an instance of the request object BestQuoteRequest
[6] :: invoke the gRPC method onNext to send a request to the server
[7] :: invoke the gRPC method onCompleted to signal to the server the client has finished sending all the requests
Open two Terminal windows - one for the server and one for the client.
In the server Terminal, execute the following commands:
$ cd $HOME/java/grpc
$ mvn exec:java -Dexec.mainClass=com.polarsparc.gcs.server.BestQuoteServer
The following would be the typical output:
Started the gRPC BestQuoteService on 20003 ...
In the client Terminal, execute the following commands:
$ cd $HOME/java/grpc
$ mvn test -Dtest=com.polarsparc.gcs.client.BestQuoteClientTest
The following would be the typical output:
Best quote from provider Bob with price 1475.00 Done !!! Error status: code - INVALID_ARGUMENT, description - Specified provider Dave invalid
The following would be the additional output on the Terminal running the server:
Dec 12, 2020 12:31:56 PM com.polarsparc.gcs.server.BestQuoteProvider getBestQuote INFO: Request for provider: Bob, age: 37 Dec 12, 2020 12:31:56 PM com.polarsparc.gcs.server.BestQuoteProvider getBestQuote INFO: Quote by provider Bob at price 1475.00 Dec 12, 2020 12:31:56 PM com.polarsparc.gcs.server.BestQuoteProvider getBestQuote INFO: Request for provider: Charlie, age: 37 Dec 12, 2020 12:31:56 PM com.polarsparc.gcs.server.BestQuoteProvider getBestQuote INFO: Quote by provider Charlie at price 1525.00 Dec 12, 2020 12:31:56 PM com.polarsparc.gcs.server.BestQuoteProvider getBestQuote INFO: Request for provider: Alice, age: 48 Dec 12, 2020 12:31:56 PM com.polarsparc.gcs.server.BestQuoteProvider getBestQuote INFO: Quote by provider Alice at price 2000.00 Dec 12, 2020 12:31:56 PM com.polarsparc.gcs.server.BestQuoteProvider getBestQuote INFO: Request for provider: Dave, age: 48
One could also test with the Go server running and using the Java client and vice versa.
References