PolarSPARC |
Bhaskar S | 12/04/2020 |
Overview
In Part 1 of this series, we provided a high-level overview of gRPC, installed the necessary software, setup the environment, and finally demonstrated the Unary RPC communication in both Go and Java.
In this part, we will continue the journey to the next RPC communication pattern - Server Streaming.
Server Streaming RPC
The following diagram illustrates the high-level architecture of Server Streaming communication pattern:
In the Server Streaming RPC mode, the client sends a request to the server and the server responds with sequence (or stream) of messages back to the client.
For the Server Streaming RPC demonstration, we will implement a fictitious Currency Rate service, where the client sends the 'from' currency and the 'to' currency as the request and the server responds with a stream of 'rates' from different 'agents'.
We will first demonstrate the Currency Rate service using the Go programming language.
In the $GOPATH directory, create the project directory hierarchy by executing the following commands:
$ cd $GOPATH/src/polarsparc.com/grpc
$ mkdir -p serverstream serverstream/currencypb serverstream/server serverstream/client
The following are the contents of the file currency.proto located in the directory $GOPATH/src/polarsparc.com/grpc/serverstream/currencypb as shown below:
/* @Author: Bhaskar S @Blog: https://www.polarsparc.com @Date: 04 Dec 2020 */ syntax = "proto3"; package serverstream; option go_package = "polarsparc.com/grpc/serverstream/currencypb"; option java_multiple_files = true; option java_package = "com.polarsparc.gss"; message CurrencyRateRequest { string from = 1; string to = 2; } message CurrencyRateResponse { string agent = 1; string from = 2; string to = 3; double rate = 4; } service CurrencyService { rpc getCurrencyRate(CurrencyRateRequest) returns (stream CurrencyRateResponse) {}; }
The request message is defined as CurrencyRateRequest and the response message is defined as CurrencyRateResponse. The service interface is defined as CurrencyService with an RPC method getCurrencyRate that takes in a CurrencyRateRequest as an input and returns a sequence (or stream) of CurrencyRateResponse objects.
To compile the currency.proto file, execute the following commands:
$ cd $GOPATH/src/polarsparc.com/grpc/serverstream
$ protoc currencypb/currency.proto --go_out=plugins=grpc:$GOPATH/src
On success, this will generate the Go code file called currency.pb.go located in the directory $GOPATH/src/polarsparc.com/grpc/serverstream/currencypb.
From the file currency.pb.go, we see the CurrencyServiceServer interface, as shown below, that the server needs to implements:
. . . type CurrencyServiceServer interface { GetCurrencyRate(*CurrencyRateRequest, CurrencyService_GetCurrencyRateServer) error } . . .
The following are the contents of the file currency_provider.go that simulates an in-memory store for initializing and returning currency rates from fictitious agents and is located in the directory $GOPATH/src/polarsparc.com/grpc/serverstream/server as shown below:
/* @Author: Bhaskar S @Blog: https://www.polarsparc.com @Date: 04 Dec 2020 */ package main import ( "fmt" "github.com/pkg/errors" "log" "strings" ) type CurrencyRate struct { Agent string Rate float64 } type RatesCache map[string][]CurrencyRate type server struct { cache RatesCache } func (s *server) Init() { l1 := []CurrencyRate{{Agent: "Alice", Rate: 1.30}, {Agent: "Bob", Rate: 1.302},{Agent: "Dave", Rate: 1.31}} s.cache["USD:CAD"] = l1 l2 := []CurrencyRate{{Agent: "Alice", Rate: 0.85}, {Agent: "Charlie", Rate: 0.84}} s.cache["USD:EUR"] = l2 l3 := []CurrencyRate{{Agent: "Bob", Rate: 0.75}, {Agent: "Charlie", Rate: 0.751},{Agent: "Eve", Rate: 0.74}} s.cache["USD:GBP"] = l3 } func (s *server) GetAgentRates(from string, to string) ([]CurrencyRate, error) { key := strings.ToUpper(from + ":" + to) log.Printf("Currency rate request for key: %s\n", key) rates := s.cache[key] if rates == nil { return nil, errors.New(fmt.Sprintf("No rate for currency from: %s, to: %s", from, to)) } log.Printf("Currency rates for key: %s = %v", key, rates) return rates, nil }
The following are the contents of the file server.go for the Server Streaming RPC server that implements the CurrencyServiceServer interface and is located in the directory $GOPATH/src/polarsparc.com/grpc/serverstream/server as shown below:
/* @Author: Bhaskar S @Blog: https://www.polarsparc.com @Date: 04 Dec 2020 */ package main import ( "google.golang.org/grpc" "log" "net" "polarsparc.com/grpc/serverstream/currencypb" // [1] "time" ) func (s *server) GetCurrencyRate(req *currencypb.CurrencyRateRequest, stream currencypb.CurrencyService_GetCurrencyRateServer) error { // [2] log.Printf("Received a CurrencyRate request with req: %v\n", req) from := req.From to := req.To rates, err := s.GetAgentRates(from, to) if err == nil { log.Printf("Rates from agents: %v\n", rates) for _, r := range rates { res := ¤cypb.CurrencyRateResponse{Agent: r.Agent, From: from, To: to, Rate: r.Rate} stream.Send(res) // [3] time.Sleep(250 * time.Millisecond) } return nil } return err } func main() { cs := &server{ cache: RatesCache{}, } cs.Init() // [4] log.Println("Ready to start the CurrencyRate server...") lis, err := net.Listen("tcp", "localhost:20002") if err != nil { log.Fatalf("Failed to create listener on localhost:20002") } srv := grpc.NewServer() // [5] currencypb.RegisterCurrencyServiceServer(srv, cs) // [6] if err = srv.Serve(lis); err != nil { log.Fatalf("Failed to start server: %v", err) } }
The following are brief descriptions for some of the Go type(s)/method(s) used in the code above:
[1] :: import the code from the package polarsparc.com/grpc/serverstream/currencypb generated by the protoc compiler
[2] :: associate the service method GetCurrencyRate as receiver method of the custom type server. This method takes two input arguments - a CurrencyRateRequest object and a stream object called CurrencyService_GetCurrencyRateServer
[3] :: send a sequence (or stream) of CurrencyRateResponse objects (one for each of the agents) back to the client that made the request
[4] :: initialize the in-memory currency rates store (with some fictitious entries)
[5] :: create an instance of the gRPC server
[6] :: register an instance of the server object (that implements the service method GetCurrencyRate) with the gRPC server
The following are the contents of the file client.go that implements the Server Streaming RPC client for the CurrencyServiceServer located in the directory $GOPATH/src/polarsparc.com/grpc/serverstream/client as shown below:
/* @Author: Bhaskar S @Blog: https://www.polarsparc.com @Date: 04 Dec 2020 */ package main import ( "golang.org/x/net/context" "google.golang.org/grpc" "io" "log" "polarsparc.com/grpc/serverstream/currencypb" ) func main() { log.Println("Ready to start the CurrencyRate client...") conn, err := grpc.Dial("localhost:20002", grpc.WithInsecure()) if err != nil { log.Fatalf("Failed to connect to localhost:20002") } defer conn.Close() cl := currencypb.NewCurrencyServiceClient(conn) // [1] // Success req := ¤cypb.CurrencyRateRequest{From: "usd", To: "eur"} // [2] stream, err := cl.GetCurrencyRate(context.Background(), req) // [3] if err != nil { log.Fatalf("[1] Failed to send CurrencyRate request to localhost:20002") } for { res, err := stream.Recv() // [4] if err == io.EOF { break } if err != nil { log.Fatalf("[1] Received and error from CurrencyRate at localhost:20002: %v", err) } log.Printf("[1] ===> Agent: %s, Rate: %.03f\n", res.Agent, res.Rate) } // Error req2 := ¤cypb.CurrencyRateRequest{From: "usd", To: "jpy"} stream2, err := cl.GetCurrencyRate(context.Background(), req2) if err != nil { log.Fatalf("[2] Failed to send CurrencyRate request to localhost:20002") } for { res, err := stream2.Recv() if err == io.EOF { break } if err != nil { log.Fatalf("[2] Received and error from CurrencyRate at localhost:20002: %v", err) } log.Printf("[2] ===> Agent: %s, Rate: %.03f\n", res.Agent, res.Rate) } }
The following are brief descriptions for some of the Go type(s)/method(s) used in the code above:
[1] :: create an instance of the gRPC client stub NewCurrencyServiceClient generated by the protoc compiler
[2] :: create an instance of the request object CurrencyRateRequest
[3] :: invoke the gRPC method GetCurrencyRate using the client stub. The method invocation returns a stream object
[4] :: invoke the Recv() method on the stream object until the end of the stream (return code of io.EOF)
Open two Terminal windows - one for the server and one for the client.
In the server Terminal, execute the following commands:
$ cd $GOPATH/src/polarsparc.com/grpc/serverstream/server
$ go run server.go currency_provider.go
The following would be the typical output:
2020/12/04 21:03:37 Ready to start the CurrencyRate server...
In the client Terminal, execute the following commands:
$ cd $GOPATH/src/polarsparc.com/grpc/serverstream/client
$ go run client.go
The following would be the typical output:
2020/12/04 21:04:31 Ready to start the CurrencyRate client... 2020/12/04 21:04:31 [1] ===> Agent: Alice, Rate: 0.850 2020/12/04 21:04:31 [1] ===> Agent: Charlie, Rate: 0.840 2020/12/04 21:04:32 [2] Received and error from CurrencyRate at localhost:20002: rpc error: code = Unknown desc = No rate for currency from: usd, to: jpy exit status 1
EXCELLENT !!! We have successfully demonstrated the Server Streaming gRPC communication style using the Go language.
Copy the file currency.proto listed above to the directory $HOME/java/grpc/src/main/proto.
To compile the currency.proto file, execute the following commands:
$ cd $HOME/java/grpc
$ mvn compile
On success, this will generate some files in the directory $HOME/java/grpc/target/generated-sources/protobuf/java/com/polarsparc/gss.
The following diagram illustrates the contents of the directory $HOME/java/grpc/target/generated-sources :
The following are the contents of the file CurrencyRate.java that acts a holder object for storing the fictitious agent and the currency rate they offer and is located in the directory $HOME/java/grpc/src/main/java/com/polarsparc/gss/server as shown below:
/* @Author: Bhaskar S @Blog: https://www.polarsparc.com @Date: 04 Dec 2020 */ package com.polarsparc.gss.server; public class CurrencyRate { private final String agent; private final Double rate; public CurrencyRate(String agent, Double rate) { this.agent = agent; this.rate = rate; } public String getAgent() { return this.agent; } public Double getRate() { return this.rate; } @Override public String toString() { return "CurrencyRate{" + "agent='" + agent + '\'' + ", rate=" + rate + '}'; } }
The following are the contents of the file CurrencyRateProvider.java that simulates an in-memory store for initializing and returning currency rates from fictitious agents and is located in the directory $HOME/java/grpc/src/main/java/com/polarsparc/gss/server as shown below:
/* @Author: Bhaskar S @Blog: https://www.polarsparc.com @Date: 04 Dec 2020 */ package com.polarsparc.gss.server; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.logging.Level; import java.util.logging.Logger; public class CurrencyRateProvider { private final static Logger LOGGER = Logger.getLogger(CurrencyRateProvider.class.getName()); private final static Map<String, List<CurrencyRate>>> ratesTable = new HashMap<>(); static { LOGGER.setLevel(Level.INFO); ratesTable.put("USD:CAD", Arrays.asList(new CurrencyRate("Alice", 1.30), new CurrencyRate("Bob", 1.302), new CurrencyRate("Dave", 1.31))); ratesTable.put("USD:EUR", Arrays.asList(new CurrencyRate("Alice", 0.85), new CurrencyRate("Charlie", 0.84))); ratesTable.put("USD:GBP", Arrays.asList(new CurrencyRate("Bob", 0.75), new CurrencyRate("Charlie", 0.751), new CurrencyRate("Eve", 0.74))); } private CurrencyRateProvider() { } public static List<CurrencyRate>> getCurrencyRate(String from, String to) { String key = (from + ":" + to).toUpperCase(); LOGGER.info(String.format("Currency rate request for key: %s", key)); if (!ratesTable.containsKey(key)) { throw new RuntimeException(String.format("No rate for currency from: %s, to: %s", from, to)); } List<CurrencyRate>> rates = ratesTable.get(key); LOGGER.info(String.format("Currency rates for key: %s = %s", key, rates)); return rates; } }
The following are the contents of the Java program called CurrencyRateService.java that implements the Server Streaming gRPC service CurrencyService located in the directory $HOME/java/grpc/src/main/java/com/polarsparc/gss/server as shown below:
/* @Author: Bhaskar S @Blog: https://www.polarsparc.com @Date: 04 Dec 2020 */ package com.polarsparc.gss.server; import com.polarsparc.gss.CurrencyRateRequest; import com.polarsparc.gss.CurrencyRateResponse; import com.polarsparc.gss.CurrencyServiceGrpc; import io.grpc.Status; import io.grpc.stub.StreamObserver; import java.util.List; import java.util.logging.Level; import java.util.logging.Logger; public class CurrencyRateService extends CurrencyServiceGrpc.CurrencyServiceImplBase { // [1] private final static Logger LOGGER = Logger.getLogger(CurrencyRateService.class.getName()); static { LOGGER.setLevel(Level.INFO); } @Override public void getCurrencyRate(CurrencyRateRequest request, StreamObserver<CurrencyRateResponse> responseObserver) { // [2] String from = request.getFrom(); String to = request.getTo(); List<CurrencyRate> rates; try { rates = CurrencyRateProvider.getCurrencyRate(from, to); } catch (RuntimeException ex) { Status status = Status.FAILED_PRECONDITION.withDescription(ex.getMessage()); responseObserver.onError(status.asRuntimeException()); return; } if (rates != null) { LOGGER.info(String.format("Rates from agents: %s", rates)); rates.forEach(r -> { CurrencyRateResponse response = CurrencyRateResponse.newBuilder() .setAgent(r.getAgent()) .setFrom(from) .setTo(to) .setRate(r.getRate()) .build(); responseObserver.onNext(response); // [3] try { Thread.sleep(250); } catch (InterruptedException e) { e.printStackTrace(); } }); responseObserver.onCompleted(); // [4] } } }
The following are brief descriptions for some of the Java class(es)/method(s) used in the code above:
[1] :: extend the base class CurrencyServiceGrpc.CurrencyServiceImplBase generated by the maven protobuf compiler plugin
[2] :: override the service method getCurrencyRate which takes two input arguments - a CurrencyRateRequest object and a StreamObserver object. The StreamObserver object is used for sending the CurrencyRateResponse object
[3] :: send a sequence (or stream) of CurrencyRateResponse objects (one for each of the agents) back to the client using the onNext method on the StreamObserver object
[4] :: method onCompleted on the StreamObserver object signals the succesful completion of sending of a stream of responses
The following are the contents of the Java program called CurrencyRateServer.java that registers the Server Streaming RPC service CurrencyRateService as a gRPC server and is located in the directory $HOME/java/grpc/src/main/java/com/polarsparc/gss/server as shown below:
/* @Author: Bhaskar S @Blog: https://www.polarsparc.com @Date: 04 Dec 2020 */ package com.polarsparc.gss.server; import io.grpc.Server; import io.grpc.ServerBuilder; import java.io.IOException; public class CurrencyRateServer { public static void main(String[] args) { Server server = ServerBuilder.forPort(20002) // [1] .addService(new CurrencyRateService()) // [2] .build(); try { server.start(); // [3] } catch (IOException e) { e.printStackTrace(); } System.out.print("Started the gRPC CurrencyRateService on 20002 ...\n"); try { server.awaitTermination(); // [4] } catch (InterruptedException e) { e.printStackTrace(); } } }
The following are brief descriptions for some of the Java class(es)/method(s) used in the code above:
[1] :: create an instance of the gRPC server on the specified port
[2] :: register an instance of the CurrencyRateService object (that implements the service method getCurrencyRate) with the gRPC server
[3] :: start the gRPC server
[4] :: wait till the gRPC server terminates
To receive responses from the server in an asynchronous fashion, a client needs to implement the interface StreamObserver and register it as a callback handler on the client stub. The following are the contents of the Java program called CurrencyRateStreamObserver.java that implements the required interface for the asynchronous callback and located in the directory $HOME/java/grpc/src/test/java/com/polarsparc/gss/client as shown below:
/* @Author: Bhaskar S @Blog: https://www.polarsparc.com @Date: 04 Dec 2020 */ package com.polarsparc.gss.client; import com.polarsparc.gss.CurrencyRateResponse; import io.grpc.stub.StreamObserver; import java.util.concurrent.CountDownLatch; public class CurrencyRateStreamObserver implements StreamObserver<CurrencyRateResponse> { private final CountDownLatch latch; public CurrencyRateStreamObserver(CountDownLatch latch) { this.latch = latch; } @Override public void onNext(CurrencyRateResponse response) { // [1] System.out.printf("Agent: %s, Rate: %.03f\n", response.getAgent(), response.getRate()); } @Override public void onError(Throwable ex) { // [2] System.out.println("Exception: " + ex.getMessage()); } @Override public void onCompleted() { // [3] System.out.println("Done !!!"); latch.countDown(); } }
The following are brief descriptions for some of the Java class(es)/method(s) used in the code above:
[1] :: method onNext() is invoked when there is valid response from the server
[2] :: method onError() is invoked when an error is encountered
[3] :: method onCompleted() is invoked when the server is done sending all the responses
The following are the contents of the Java program called CurrencyRateClientTest.java that implements the Server Streaming RPC client for CurrencyService located in the directory $HOME/java/grpc/src/test/java/com/polarsparc/gss/client as shown below:
/* @Author: Bhaskar S @Blog: https://www.polarsparc.com @Date: 04 Dec 2020 */ package com.polarsparc.gss.client; import com.polarsparc.gss.CurrencyRateRequest; import com.polarsparc.gss.CurrencyRateResponse; import com.polarsparc.gss.CurrencyServiceGrpc; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; import java.util.Iterator; import java.util.concurrent.CountDownLatch; @TestInstance(TestInstance.Lifecycle.PER_CLASS) public class CurrencyRateClientTest { private CurrencyServiceGrpc.CurrencyServiceBlockingStub blockingStub; private CurrencyServiceGrpc.CurrencyServiceStub stub; @BeforeAll public void setup() { ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 20002) // [1] .usePlaintext() // [2] .build(); this.blockingStub = CurrencyServiceGrpc.newBlockingStub(channel); // [3] this.stub = CurrencyServiceGrpc.newStub(channel); // [4] } @Test public void currencyRateBlockingTestOne() { CurrencyRateRequest request = CurrencyRateRequest.newBuilder() .setFrom("usd") .setTo("eur") .build(); // [5] Iterator<CurrencyRateResponse> response = this.blockingStub.getCurrencyRate(request); // [6] response.forEachRemaining(res -> System.out.printf("Agent: %s, Rate: %.03f\n", res.getAgent(), res.getRate())); } @Test public void currencyRateBlockingTestTwo() { CurrencyRateRequest request = CurrencyRateRequest.newBuilder() .setFrom("eur") .setTo("jpy") .build(); // [5] Iterator<CurrencyRateResponse> response = this.blockingStub.getCurrencyRate(request); // [6] Assertions.assertThrows(io.grpc.StatusRuntimeException.class, () -> response.forEachRemaining(res -> System.out.printf("Agent: %s, Rate: %.03f\n", res.getAgent(), res.getRate()))); } @Test public void currencyRateAsyncTestOne() { CountDownLatch latch = new CountDownLatch(1); CurrencyRateRequest request = CurrencyRateRequest.newBuilder() .setFrom("usd") .setTo("cad") .build(); // [5] this.stub.getCurrencyRate(request, new CurrencyRateStreamObserver(latch)); // [7] try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } }
The following are brief descriptions for some of the Java class(es)/method(s) used in the code above:
[1] :: create an instance of the object ManagedChannel that represents a virtual gRPC connection to the service endpoint on the specified ip address and port
[2] :: indicate that we are using an unsecured communication channel
[3] :: create an instance of the gRPC synchronous (or blocking) client stub CurrencyServiceBlockingStub generated by the protoc compiler
[4] :: create an instance of the gRPC asynchronous (non-blocking) client stub CurrencyServiceStub generated by the protoc compiler
[5] :: create an instance of the request object CurrencyRateRequest
[6] :: invoke the gRPC method getCurrencyRate using the synchronous client stub
[7] :: invoke the gRPC method getCurrencyRate using the asynchronous client stub, passing in the callback handler to an CurrencyRateStreamObserver object
Open two Terminal windows - one for the server and one for the client.
In the server Terminal, execute the following commands:
$ cd $HOME/java/grpc
$ mvn exec:java -Dexec.mainClass=com.polarsparc.gss.server.CurrencyRateServer
The following would be the typical output:
Started the gRPC CurrencyRateService on 20002 ...
In the client Terminal, execute the following commands:
$ cd $HOME/java/grpc
$ mvn test -Dtest=com.polarsparc.gss.client.CurrencyRateClientTest
The following would be the typical output:
Running com.polarsparc.gss.client.CurrencyRateClientTest Agent: Alice, Rate: 1.300 Agent: Bob, Rate: 1.302 Agent: Dave, Rate: 1.310 Done !!! Agent: Alice, Rate: 0.850 Agent: Charlie, Rate: 0.840 Tests run: 3, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 1.849 s
One could also test with the Go server running and using the Java client and vice versa.
References