During our development process, we often encounter such scenarios where some states of an object depend on the state of another object, and the two objects are not related to each other, and the coupling between the two is very low, especially in the development of container models, such as Prism framework or MEF frameworks. We will find that in such systems, we often use a Publish and Subscribe pattern to interact. What are the benefits of this interaction? Based on thinking with these questions, let’s analyze them step by step!
The first step is to define an interface called IEventAggregator, which defines some overloaded Subscribe and Publish methods. Let's take a look at this interface in detail:
/// <summary> /// Enables loosely-coupled publication of and subscription to events. /// </summary> public interface IEventAggregator { /// <summary> /// Gets or sets the default publication thread marshaller. /// </summary> /// <value> /// The default publication thread marshaller. /// </value> Action<> PublicationThreadMarshaller { get; set; } /// <summary> /// Subscribes an instance to all events declared through implementations of <see cref = "IHandle{T}" /> /// </summary> /// <param name = "instance">The instance to subscribe for event publication.</param> void Subscribe(object instance); /// <summary> /// Unsubscribes the instance from all events. /// </summary> /// <param name = "instance">The instance to unsubscribe.</param> void Unsubscribe(object instance); /// <summary> /// Publishes a message. /// </summary> /// <param name = "message">The message instance.</param> /// <remarks> /// Uses the default thread marshaller during publication. /// </remarks> void Publish(object message); /// <summary> /// Publishes a message. /// </summary> /// <param name = "message">The message instance.</param> /// <param name = "marshal">Allows the publisher to provide a custom thread marshaller for the message publication.</param> void Publish(object message, Action<> marshal); }
With this interface, the next step is how to implement various methods in this interface. Let’s take a look at the specific implementation process.
/// <summary> /// Enables loosely-coupled publication of and subscription to events. /// </summary> public class EventAggregator : IEventAggregator { /// <summary> /// The default thread marshaller used for publication; /// </summary> public static Action<> DefaultPublicationThreadMarshaller = action => action(); readonly List<Handler> handlers = new List<Handler>(); /// <summary> /// Initializes a new instance of the <see cref = "EventAggregator" /> class. /// </summary> public EventAggregator() { PublicationThreadMarshaller = DefaultPublicationThreadMarshaller; } /// <summary> /// Gets or sets the default publication thread marshaller. /// </summary> /// <value> /// The default publication thread marshaller. /// </value> public Action<> PublicationThreadMarshaller { get; set; } /// <summary> /// Subscribes an instance to all events declared through implementations of <see cref = "IHandle{T}" /> /// </summary> /// <param name = "instance">The instance to subscribe for event publication.</param> public virtual void Subscribe(object instance) { lock(handlers) { if ((x => (instance))) { return; } (new Handler(instance)); } } /// <summary> /// Unsubscribes the instance from all events. /// </summary> /// <param name = "instance">The instance to unsubscribe.</param> public virtual void Unsubscribe(object instance) { lock(handlers) { var found = (x => (instance)); if (found != null) { (found); } } } /// <summary> /// Publishes a message. /// </summary> /// <param name = "message">The message instance.</param> /// <remarks> /// Does not marshall the the publication to any special thread by default. /// </remarks> public virtual void Publish(object message) { Publish(message, PublicationThreadMarshaller); } /// <summary> /// Publishes a message. /// </summary> /// <param name = "message">The message instance.</param> /// <param name = "marshal">Allows the publisher to provide a custom thread marshaller for the message publication.</param> public virtual void Publish(object message, Action<> marshal) { Handler[] toNotify; lock (handlers) { toNotify = (); } marshal(() => { var messageType = (); var dead = toNotify .Where(handler => !(messageType, message)) .ToList(); if(()) { lock(handlers) { foreach(var handler in dead) { (handler); } } } }); } protected class Handler { readonly WeakReference reference; readonly Dictionary<Type, MethodInfo> supportedHandlers = new Dictionary<Type, MethodInfo>(); public Handler(object handler) { reference = new WeakReference(handler); var interfaces = ().GetInterfaces() .Where(x => typeof(IHandle).IsAssignableFrom(x) && ); foreach(var @interface in interfaces) { var type = @()[0]; var method = @("Handle"); supportedHandlers[type] = method; } } public bool Matches(object instance) { return == instance; } public bool Handle(Type messageType, object message) { var target = ; if(target == null) return false; foreach(var pair in supportedHandlers) { if((messageType)) { (target, new[] { message }); return true; } } return true; } } }
First, a List object of LIst<Handler> is maintained inside the EventAggregator to store a series of Handles. So what does this nested class Handler play?
We will find that every time when the Subscribe method is executed, the current object type parameter instance will be passed into the Handler object. In the constructor of the Handler class, first put this instance into a weak reference, and then obtain all the inherited interfaces of the object, and check whether it inherits the generic interface of IHandle<TMessage>. If it can be obtained, then the Handle method defined in the current instance will be obtained through reflection, and the type parameter defined in the generic type or the type parameter defined in the generic type is obtained, and the two objects are placed in a Dictionary<Type defined internally. In the MethodInfo> dictionary, this way, the Handler object with a specific processing method is placed in a List<Handler> collection. This is the core part of the subscription message. Therefore, if the current object wants to subscribe to a message, it must implement the generic interface IHandle<TMessage> and implement the methods in the interface. At the same time, the most important thing is to subscribe to the message in the constructor function of the current object (that is, execute Subscribe(this). Let's take a look at this generic interface IHandle<TMessage>
public interface IHandle {} /// <summary> /// Denotes a class which can handle a particular type of message. /// </summary> /// <typeparam name = "TMessage">The type of message to handle.</typeparam> public interface IHandle<TMessage> : IHandle { /// <summary> /// Handles the message. /// </summary> /// <param name = "message">The message.</param> void Handle(TMessage message); }
After reading the Subscribe method, let’s take a look at the Unsubscribe method later. This idea is actually very simple, just find the object in List<Handler> and remove the current object. So the focus of our attention is what is implemented in the Publish method? First, let’s take a look at the code, and then do step-by-step analysis.
/// <summary> /// Publishes a message. /// </summary> /// <param name = "message">The message instance.</param> /// <param name = "marshal">Allows the publisher to provide a custom thread marshaller for the message publication.</param> public virtual void Publish(object message, Action<> marshal) { Handler[] toNotify; lock (handlers) { toNotify = (); } marshal(() => { var messageType = (); var dead = toNotify .Where(handler => !(messageType, message)) .ToList(); if(()) { lock(handlers) { foreach(var handler in dead) { (handler); } } } }); }
We see that when publishing an object-type message, another object must be corresponding to the message. So how do you find the message to handle it?
Yes, when we Subscribe an object, have we already used reflection to subscribe to this message in a List<Handler>? So, we just need to find the corresponding object with the same message type in this List and execute the Handle method inside, right? It is indeed a good idea, and we can also implement the code here.
Another key point here is that if the execution method returns false, the execution is unsuccessful, then remove the object from the current List<Handler>, because such an operation is meaningless. Through such a process, we can completely realize the message transmission between the two objects. In addition, after summarizing, we can find that the key points of this idea implementation include the following aspects:
1 All messages subscribed objects must implement a unified interface IHandle<TMessage> and implement the Handel method inside.
2 The entire EventAggregator must be a single instance or static, so that the above operations can be implemented in a unified collection.
Finally, follow the previous convention and give a specific example to explain it. Please click here to do it.download,existNext articleIn this article, we will introduce a simple version of the event-based publish and subscription model.
The above is the detailed content of the C# publishing subscription model based on message (above). For more information about the C# publishing subscription model, please follow my other related articles!