Home Using threading channels
Post
Cancel

Using threading channels

Idea behind…

In one of my tasks I had a case where I had to use the same Azure ServiceBus message from one subscription in multiple Web API endpoints. One of these endpoints serves data on publish/subscribe basis and has to execute publish operation as soon as something new becomes available.

Solution

In order to utilize a single subscription in ServiceBus, a single reader was created for reading these messages that come on irregular basis. In order to serve incoming messages to the publish/subscribe endpoint and potentially any other listeners, some sort of distribution mechanism had to be put in place. For this task I chose Microsoft.Threading.Channels - a thread safe producer/consumer library. In short, System.Threading.Channels allows creation of Channel<T> property, which has ChannelWriter<TWrite> Writer and ChannelReader<TRead> Reader. In other words, Writer is used to write messages to Channel and Reader is used to access them. Channel is an object that stores written messages and allows reading them in “queue” like fashion.

Implementation of System.Threading.Channels logic is quite straightforward. First, we have to define Channel itself, it can be Bound and Unbound. Bound channel has a specific capacity, which you have to set when initializing the channel. Unbound channels however do not have this capacity limitation. In my case I chose Bound channel, as my APIs require only the latest messages.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class ExampleDataModelChannel
{
    private readonly Channel<ExampleDto> _channel;

    public ExampleDataModelChannel()
    {
        _channel = Channel.CreateBounded<ExampleDto>(
            new BoundedChannelOptions(10)
            {
                AllowSynchronousContinuations = false,
                SingleReader = false,
                SingleWriter = false,
                FullMode = BoundedChannelFullMode.DropOldest
            });
    }

    public ChannelReader<ExampleDto> Reader => _channel.Reader;

    public bool Publish(ExampleDto data)
    {
        return _channel.Writer.TryWrite(data);
    }
}

Given code allows me to write to Channel<T> using Publish whenever I instantiate ExampleDataModelChannel class.

_exampleDataModelChannel.Publish(model);

and read utilizing Reader, as in example below.

In this scenario, whenever a ServiceBus reader gets a new message it gets distributed to Channel<T>. To read the message, ChannelReader<TRead> Reader has to be utilized. So in order to notify subscribers from pub/sub API, a message would have to be served to them as soon as it becomes available in Channel<T>. Since logic that handles distributing messages to subscribed clients in my case runs as a separate BackgroundService, I’m able to read given Channel<T> and execute publish logic only when there is something to serve subscribed clients.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        await foreach (var model in _dataChannel.Reader.ReadAllAsync(stoppingToken))
        {
            try
            {
                //do magic here
            }
            catch (Exception e)
            {
                _logger.LogError(e, "Something broke");
            }
        }
    }

Summary

In my particular case, I was able to utilize Microsoft.Threading.Channels for distributing ServiceBus messages in producer/consumer fashion across applications, as soon as ServiceBus reader receives them. Therefore not only allowing various reading scenarios for a single ServiceBus subscription message receiver, but also separating responsibility and logic outside the ServiceBus reader.

More info on threading channels here: https://learn.microsoft.com/en-us/dotnet/core/extensions/channels

Thank you for reading.

This post is licensed under CC BY 4.0 by the author.