EventBus is a mechanism for communication of events within an application or across application components.
It allows different components to decouple and communicate by publishing and subscribing events. In the given code snippet, we can see an Event Bus implemented using C#. It defines some interfaces and classes to implement publishing and subscriptions for events.
First, we have two basic constraint interfaces:IEvent
andIAsyncEventHandler<TEvent>
。
IEvent is an empty interface that constrains the types of events.IAsyncEventHandler<TEvent>
is a generic interface that constrains the type of event handler. It defines the asynchronous method HandleAsync that handles events and the method HandleException that handles exceptions. Next, we have an IEventBus interface that defines some operation methods for publishing and subscribing to events.
in,Publish<TEvent>
andPublishAsync<TEvent>
Methods are used to publish events, andOnSubscribe<TEvent>
Methods are used to subscribe to events. Then we see a class that implements the local event busLocalEventBusManager<TEvent>
. It implementsILocalEventBusManager<TEvent>
Interface for handling local events within a single pipeline. It uses aChannel<TEvent>
To store events and provide a method to publish eventsPublish
andPublishAsync
. In addition, it also provides a method to automatically handle eventsAutoHandle
。
OverallEvent Bus
A convenient way to achieve loosely coupled communication between components is provided.
By publishing and subscribing to events, components can operate independently without directly relying on each other's implementation details.
This mechanism can improve the maintainability and scalability of the code.
Github repository address:/DonPangPang/soda-event-bus
Implement some basic constraints
Implement some constraints first, implement themIEvent
Constrain events, implementIAsyncEvnetHandler<TEvent> where TEvent:IEvent
to constrain event handlers.
public interface IEvent { } public interface IAsyncEventHandler<in TEvent> where TEvent : IEvent { Task HandleAsync(IEvent @event); void HandleException(IEvent @event, Exception ex); }
Next, let's specify oursIEventBus
, what are the operation methods? Basically, publish and subscribe.
public interface IEventBus { void Publish<TEvent>(TEvent @event) where TEvent : IEvent; Task PublishAsync<TEvent>(TEvent @event) where TEvent : IEvent; void OnSubscribe<TEvent>() where TEvent : IEvent; }
Implement a local event bus
Local event handling
I plan to implement the local event processing in two ways, one isLocalEventBusManager
That is, local event management, the second type isLocalEventBusPool
Pooling local events.
LocalEvnetBusManager
LocalEventBusManager
It is mainly processed in a single pipeline and consumed in a centralized manner.
public interface ILocalEventBusManager<in TEvent>where TEvent : IEvent { void Publish(TEvent @event); Task PublishAsync(TEvent @event) ; void AutoHandle(); } public class LocalEventBusManager<TEvent>(IServiceProvider serviceProvider):ILocalEventBusManager<TEvent> where TEvent: IEvent { readonly IServiceProvider _servicesProvider = serviceProvider; private readonly Channel<TEvent> _eventChannel = <TEvent>(); public void Publish(TEvent @event) { (_eventChannel != null, nameof(_eventChannel) + " != null"); _eventChannel.(@event); } private CancellationTokenSource Cts { get; } = new(); public void Cancel() { (); } public async Task PublishAsync(TEvent @event) { await _eventChannel.(@event); } public void AutoHandle() { // Make sure to start only once if (!) return; (async () => { while (!) { var reader = await _eventChannel.(); await HandleAsync(reader); } }, ); } async Task HandleAsync(TEvent @event) { var handler = _servicesProvider.GetService<IAsyncEventHandler<TEvent>>(); if (handler is null) { throw new NullReferenceException($"No handler for event {@().Name}"); } try { await (@event); } catch (Exception ex) { ( @event, ex); } } }
LocalEventBusPool
LocalEventBusPool
That is, all Events will have a separate pipeline to process, and the parallel capability will be better.
public sealed class LocalEventBusPool(IServiceProvider serviceProvider) { private readonly IServiceProvider _serviceProvider = serviceProvider; private class ChannelKey { public required string Key { get; init; } public int Subscribers { get; set; } public override bool Equals(object? obj) { if (obj is ChannelKey key) { return (, Key, ); } return false; } public override int GetHashCode() { return 0; } } private Channel<IEvent> Rent(string channel) { _channels.TryGetValue(new ChannelKey() { Key = channel }, out var value); if (value != null) return value; value = <IEvent>(); _channels.TryAdd(new ChannelKey() { Key = channel }, value); return value; } private Channel<IEvent> Rent(ChannelKey channelKey) { _channels.TryGetValue(channelKey, out var value); if (value != null) return value; value = <IEvent>(); _channels.TryAdd(channelKey, value); return value; } private readonly ConcurrentDictionary<ChannelKey, Channel<IEvent>> _channels = new(); private CancellationTokenSource Cts { get; } = new(); public void Cancel() { (); _channels.Clear(); (); } public async Task PublishAsync<TEvent>(TEvent @event) where TEvent : IEvent { await Rent(typeof(TEvent).Name).(@event); } public void Publish<TEvent>(TEvent @event) where TEvent : IEvent { Rent(typeof(TEvent).Name).(@event); } public void OnSubscribe<TEvent>() where TEvent : IEvent { var channelKey = _channels.FirstOrDefault(x => == typeof(TEvent).Name).Key ?? new ChannelKey() { Key = typeof(TEvent).Name }; ++; (async () => { try { while (!) { var @event = await ReadAsync(channelKey); var handler = _serviceProvider.GetService<IAsyncEventHandler<TEvent>>(); if (handler == null) throw new NullReferenceException($"No handler for Event {typeof(TEvent).Name}"); try { await ((TEvent)@event); } catch (Exception ex) { ((TEvent)@event, ex); } } } catch (Exception e) { throw new InvalidOperationException("Error on onSubscribe handler", e); } }, ); } private async Task<IEvent> ReadAsync(string channel) { return await Rent(channel).(); } private async Task<IEvent> ReadAsync(ChannelKey channel) { return await Rent(channel).(); } }
LocalEventBus
accomplishLocalEventBus
Inherited fromIEventBus
That is, if there is a method that needs to be extended, the pooling and manager situations will be handled separately.
public interface ILocalEventBus: IEventBus { } public class LocalEventBus(IServiceProvider serviceProvider, LocalEventBusOptions options) : ILocalEventBus { private LocalEventBusPool? EventBusPool => <LocalEventBusPool>(); public void Publish<TEvent>(TEvent @event) where TEvent : IEvent { if () { (EventBusPool != null, nameof(EventBusPool) + " != null"); (@event); } else { var manager = <LocalEventBusManager<TEvent>>(); if (manager is null) throw new NullReferenceException($"No manager for event {typeof(TEvent).Name}, please add singleton service it."); (@event); } } public async Task PublishAsync<TEvent>(TEvent @event) where TEvent : IEvent { if () { (EventBusPool != null, nameof(EventBusPool) + " != null"); await (@event); } else { var manager = <LocalEventBusManager<TEvent>>(); if (manager is null) throw new NullReferenceException($"No manager for event {typeof(TEvent).Name}, please add singleton service it."); await (@event); } } public void OnSubscribe<TEvent>() where TEvent : IEvent { if () { (EventBusPool != null, nameof(EventBusPool) + " != null"); <TEvent>(); } else { var manager = <LocalEventBusManager<TEvent>>(); if (manager is null) throw new NullReferenceException($"No manager for event {typeof(TEvent).Name}, please add singleton service it."); (); } } }
Distributed Event Bus
It is enough to expand as needed, the basic logic is the same, but a confirmation mechanism may be added, etc.
This is the end of this article about C#’s method examples to implement event bus. For more related C# event bus content, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!