C#/.NET gRPC Service with Duplex (Bidirectional) Streaming

One of the main advantage of gRPC over REST API’s is that gRPC supports streaming methods in addition to the traditional request-response style. There are 3 kinds of streaming support in gRPC namely Client Streaming, Server Streaming and Duplex (bi-directional streaming). In this post we talk about Duplex (Bidirectional) Streaming and how to implement it in C#/.NET

What is Duplex Streaming

In Duplex Streaming scenario, both the client and the server sends a sequence messages to each other via separate read and write streams. The call is initiated by the client to the server after which the streams will be available. The streams are independent from each other so the client and the server can read and write to the streams as required by their own applications’ requirements. For example, the server can wait for all the messages from the client before it sends back response, or it could immediately reply and have a “ping-pong”, chat-like, style of communication with the client. Within each stream, the order of messages is guaranteed.

A definition of a duplex streaming method is shown below. Note the use of the keyword stream in both the request and the response.

 rpc ChatNotification(stream NotificationsRequest) returns (stream NotificationsResponse);   

Implementation (Server)

To create a gRPC project follow the steps in Your First gRPC Project. By default, dotnet creates a greet.proto file which contains the service definition of the generated GreeterService. Let’s edit this file and replace its content with the proto file below. Also best to rename the service as well as the proto file. In this sample code, let’s rename the service from Greeter to Notifier and greet.proto to notify.proto

In this example implementation, we will do a “ping-pong” style of communication where the server responds to every request that the client makes.

syntax = "proto3";
import "google/protobuf/timestamp.proto";
option csharp_namespace = "DuplexStreaming";

package notify;


service Notifer {
   rpc ChatNotification(stream NotificationsRequest) returns (stream NotificationsResponse);   
}

message NotificationsRequest {
  string message = 1;
  string to = 2;
  string from = 3;
}

message NotificationsResponse{  
  string message = 1;
  google.protobuf.Timestamp receivedAt = 3;
}

Build the project/solution to ensure that everything is correct. Now, open up NotifierService.cs and add the implementation of the ChatNotifications method.

Note that the 1st parameter of the method is an IAsyncStreamReader<NotificationRequest> and the 2nd parameter is an IServerStreamWriter<NotificationResponse>

using System.ComponentModel;
using System.Diagnostics.Metrics;

using DuplexStreaming;

using Google.Protobuf.WellKnownTypes;

using Grpc.Core;

namespace DuplexStreaming.Services;
public class NotifierService : Notifier.NotifierBase {
    private readonly ILogger<NotifierService> _logger;
    public NotifierService(ILogger<NotifierService> logger) {
        _logger = logger;
    }

    public override async Task ChatNotification(IAsyncStreamReader<NotificationsRequest> requestStream, IServerStreamWriter<NotificationsResponse> responseStream, ServerCallContext context) {

        while (await requestStream.MoveNext()) {
            var request = requestStream.Current;
          
            var now = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow);
            var reply = new NotificationsResponse() {
                Message = $"Hi {request.From}!, You have sent the message \"{request.Message}\" to {request.To}",
                ReceivedAt = now
            };
         
            await responseStream.WriteAsync(reply);
          
        }
    }
}

  • IAsyncStreamReader<T> requestStream
    – The client application writes to this stream to send a message to the server
  • IServerStreamWriter<T> responseStream
    – The server writes to this stream to send a message to the client. In our sample implementation above, the server sends back a response for every message it receives from the client (like a chat app)

On Line 19, we call MoveNext() on the requestStream and in Line 20, we take the latest message from client by accessing the Current property. We then use values from the request to immediately send back a response – which is done in Line 28 by call the WriteAsync method of the responseStream.

Implementation (Client)

using DuplexStreaming;
using Grpc.Net.Client;

// The port number must match the port of the gRPC server.
using var channel = GrpcChannel.ForAddress("http://localhost:5295");
var client = new Notifier.NotifierClient(channel);
using var call = client.ChatNotification();

var responseReaderTask = Task.Run(async Task () =>
{
    while (await call.ResponseStream.MoveNext(CancellationToken.None))
    {
        var note = call.ResponseStream.Current;
        Console.WriteLine($"{note.Message}, received at {note.ReceivedAt}");
    }
});

foreach (var msg in new[] {"Tom", "Jones"})
{
    var request = new NotificationsRequest() { Message = $"Hello {msg}", From = "Mom", To = msg };
    await call.RequestStream.WriteAsync(request);
}

await call.RequestStream.CompleteAsync();
await responseReaderTask;

Console.WriteLine("Press any key to exit...");
Console.ReadKey();
 

On the client side, we need to initiate a call (Line 7) to get hold of the request and response streams. After which we then setup a task to read the server responses (Lines 9 – 16) before writing messages to the request stream (Lines 18 – 22)

Now, run both the client and the server, and you should see something like this…

PS C:\Users\Erik\Source\Repos\grpctutorials\DuplexStreaming\source\DuplexStreamingClient\bin\debug\net8.0> .\DuplexStreamingClient.exe      
Hi Mom!, You have sent the message "Hello Tom" to Tom, received at "2024-01-25T10:07:32.183720200Z"
Hi Mom!, You have sent the message "Hello Jones" to Jones, received at "2024-01-25T10:07:32.183947200Z"
Press any key to exit...

Testing with FintX

You can also the service without needing to write a client application. Let’s fire up FintX (https://github.com/namigop/FintX) to verify that the service is working fine. FintX is an open-source, native, and cross-platform gRPC client. I’m on a Windows, so I have installed the Windows package (MacOS and Linux downloads are also available)

Click on the plus icon to add a client. The value entered in the http address should match the running service

Click Okay and then double-click on the ChatNotifications method to open it in a new tab. Test the method as per the short video below

Others

The full source code of this tutorial can be downloaded from https://github.com/namigop/grpctutorials/tree/main/DuplexStreaming. Happy coding!

Leave a Reply