SoFunction
Updated on 2025-03-06

Example of C# method to implement event bus

EventBus is a mechanism for communication of events within an application or across application components.

It allows different components to decouple and communicate by publishing and subscribing events. In the given code snippet, we can see an Event Bus implemented using C#. It defines some interfaces and classes to implement publishing and subscriptions for events.

First, we have two basic constraint interfaces:IEventandIAsyncEventHandler<TEvent>

IEvent is an empty interface that constrains the types of events.IAsyncEventHandler<TEvent>is a generic interface that constrains the type of event handler. It defines the asynchronous method HandleAsync that handles events and the method HandleException that handles exceptions. Next, we have an IEventBus interface that defines some operation methods for publishing and subscribing to events.

in,Publish<TEvent>andPublishAsync<TEvent>Methods are used to publish events, andOnSubscribe<TEvent>Methods are used to subscribe to events. Then we see a class that implements the local event busLocalEventBusManager<TEvent>. It implementsILocalEventBusManager<TEvent>Interface for handling local events within a single pipeline. It uses aChannel<TEvent>To store events and provide a method to publish eventsPublishandPublishAsync. In addition, it also provides a method to automatically handle eventsAutoHandle

OverallEvent BusA convenient way to achieve loosely coupled communication between components is provided.

By publishing and subscribing to events, components can operate independently without directly relying on each other's implementation details.

This mechanism can improve the maintainability and scalability of the code.

Github repository address:/DonPangPang/soda-event-bus

Implement some basic constraints

Implement some constraints first, implement themIEventConstrain events, implementIAsyncEvnetHandler<TEvent> where TEvent:IEventto constrain event handlers.

public interface IEvent
{

}

public interface IAsyncEventHandler<in TEvent> where TEvent : IEvent
{
    Task HandleAsync(IEvent @event);

    void HandleException(IEvent @event, Exception ex);
}

Next, let's specify oursIEventBus, what are the operation methods? Basically, publish and subscribe.

public interface IEventBus
{
    void Publish<TEvent>(TEvent @event) where TEvent : IEvent;
    Task PublishAsync<TEvent>(TEvent @event) where TEvent : IEvent;

    void OnSubscribe<TEvent>() where TEvent : IEvent;
}

Implement a local event bus

Local event handling

I plan to implement the local event processing in two ways, one isLocalEventBusManagerThat is, local event management, the second type isLocalEventBusPoolPooling local events.

LocalEvnetBusManager

LocalEventBusManagerIt is mainly processed in a single pipeline and consumed in a centralized manner.

public interface ILocalEventBusManager&lt;in TEvent&gt;where TEvent : IEvent
{
    void Publish(TEvent @event);
    Task PublishAsync(TEvent @event) ;
    
    void AutoHandle();
}

public class LocalEventBusManager&lt;TEvent&gt;(IServiceProvider serviceProvider):ILocalEventBusManager&lt;TEvent&gt;
    where TEvent: IEvent
{
    readonly IServiceProvider _servicesProvider = serviceProvider;

    private readonly Channel&lt;TEvent&gt; _eventChannel = &lt;TEvent&gt;();

    public void Publish(TEvent @event)
    {
        (_eventChannel != null, nameof(_eventChannel) + " != null");
        _eventChannel.(@event);
    }

    private CancellationTokenSource Cts { get; } = new();

    public void Cancel()
    {
        ();
    }
    
    public async Task PublishAsync(TEvent @event)
    {
        await _eventChannel.(@event);
    }

    public void AutoHandle()
    {
        // Make sure to start only once        if (!) return;

        (async () =&gt;
        {
            while (!)
            {
                var reader = await _eventChannel.();
                await HandleAsync(reader);
            }
        }, );
    }

    async Task HandleAsync(TEvent @event)
    {
        var handler = _servicesProvider.GetService&lt;IAsyncEventHandler&lt;TEvent&gt;&gt;();

        if (handler is null)
        {
            throw new NullReferenceException($"No handler for event {@().Name}");
        }
        try
        {
            await (@event);
        }
        catch (Exception ex)
        {
            ( @event, ex);
        }
    }
}

LocalEventBusPool

LocalEventBusPoolThat is, all Events will have a separate pipeline to process, and the parallel capability will be better.

public sealed class LocalEventBusPool(IServiceProvider serviceProvider)
{
    private readonly IServiceProvider _serviceProvider = serviceProvider;

    private class ChannelKey
    {
        public required string Key { get; init; }
        public int Subscribers { get; set; }

        public override bool Equals(object? obj)
        {
            if (obj is ChannelKey key)
            {
                return (, Key, );
            }

            return false;
        }

        public override int GetHashCode()
        {
            return 0;
        }
    }

    private Channel<IEvent> Rent(string channel)
    {
        _channels.TryGetValue(new ChannelKey() { Key = channel }, out var value);

        if (value != null) return value;
        value = <IEvent>();
        _channels.TryAdd(new ChannelKey() { Key = channel }, value);
        return value;
    }

    private Channel<IEvent> Rent(ChannelKey channelKey)
    {
        _channels.TryGetValue(channelKey, out var value);
        if (value != null) return value;
        value = <IEvent>();
        _channels.TryAdd(channelKey, value);
        return value;
    }

    private readonly ConcurrentDictionary<ChannelKey, Channel<IEvent>> _channels = new();

    private CancellationTokenSource Cts { get; } = new();

    public void Cancel()
    {
        ();
        _channels.Clear();
        ();
    }

    public async Task PublishAsync<TEvent>(TEvent @event) where TEvent : IEvent
    {
        await Rent(typeof(TEvent).Name).(@event);
    }

    public void Publish<TEvent>(TEvent @event) where TEvent : IEvent
    {
        Rent(typeof(TEvent).Name).(@event);
    }

    public void OnSubscribe<TEvent>() where TEvent : IEvent
    {
        var channelKey = _channels.FirstOrDefault(x =>  == typeof(TEvent).Name).Key ??
                         new ChannelKey() { Key = typeof(TEvent).Name };
        ++;

        (async () =>
        {
            try
            {
                while (!)
                {
                    var @event = await ReadAsync(channelKey);

                    var handler = _serviceProvider.GetService<IAsyncEventHandler<TEvent>>();
                    if (handler == null) throw new NullReferenceException($"No handler for Event {typeof(TEvent).Name}");
                    try
                    {
                        await ((TEvent)@event);
                    }
                    catch (Exception ex)
                    {
                        ((TEvent)@event, ex);
                    }
                }
            }
            catch (Exception e)
            {
                throw new InvalidOperationException("Error on onSubscribe handler", e);
            }
        }, );
    }

    private async Task<IEvent> ReadAsync(string channel)
    {
        return await Rent(channel).();
    }

    private async Task<IEvent> ReadAsync(ChannelKey channel)
    {
        return await Rent(channel).();
    }
}

LocalEventBus

accomplishLocalEventBusInherited fromIEventBusThat is, if there is a method that needs to be extended, the pooling and manager situations will be handled separately.

public interface ILocalEventBus: IEventBus
{

}
public class LocalEventBus(IServiceProvider serviceProvider, LocalEventBusOptions options) : ILocalEventBus
{
    private  LocalEventBusPool? EventBusPool => <LocalEventBusPool>();
    
    
    public void Publish<TEvent>(TEvent @event) where TEvent : IEvent
    {
        if ()
        {
            (EventBusPool != null, nameof(EventBusPool) + " != null");
            (@event);
        }
        else
        {
            var manager = <LocalEventBusManager<TEvent>>();
            if (manager is null) throw new NullReferenceException($"No manager for event {typeof(TEvent).Name}, please add singleton service it.");
            (@event);
        }
    }

    public async Task PublishAsync<TEvent>(TEvent @event) where TEvent : IEvent
    {
        if ()
        {
            (EventBusPool != null, nameof(EventBusPool) + " != null");
            await (@event);
        }
        else
        {
            var manager = <LocalEventBusManager<TEvent>>();
            if (manager is null) throw new NullReferenceException($"No manager for event {typeof(TEvent).Name}, please add singleton service it.");
            await (@event);
        }
    }

    public void OnSubscribe<TEvent>() where TEvent : IEvent
    {
        if ()
        {
            (EventBusPool != null, nameof(EventBusPool) + " != null");
            <TEvent>();
        }
        else
        {
            var manager = <LocalEventBusManager<TEvent>>();
            if (manager is null) throw new NullReferenceException($"No manager for event {typeof(TEvent).Name}, please add singleton service it.");
            ();
        }
    }
}

Distributed Event Bus

It is enough to expand as needed, the basic logic is the same, but a confirmation mechanism may be added, etc.

This is the end of this article about C#’s method examples to implement event bus. For more related C# event bus content, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!