New interfaces for implementing Publish-Subscribe in .NET
You know you are a techie at heart if you find yourself getting excited at the introduction of a couple of new interfaces.
.NET 4.0 introduces IObservable (Publisher)
- The ubiquitous nature of the Publish Subscribe pattern.
- The cludgeability of implementing these using events and delegates. Prior to these, one could implement the pattern using events and delegates. This came with its own set of gotchas.
- Even a WCF Publish Subscribe implementation could not get away from explicitly publishing ( using events ) and subscribing ( using event handlers)
And while these new interfaces will not save you the effort of an actual implementation, they provide a standard template for implementing the publish subscribe pattern in .NET. This article provides:
- A simple, starting implementation of these two interfaces.
- As a second step, to illustrate a real-world usage of the sample implementation, a MagazinePublisher class is defined. This class maintains a list of all magazine subscribers – and notifies them whenever a new issue is available.
This sample application (full source code available below) can be used as a starting point for any publish subscribe application.
The ‘Publisher’ Interface
- In these new interfaces, the Publisher goes by the name Observable.
- Two primary responsibilities of the Publisher are:
- a) Subscription and Unsubscription (Adding and Removing subscribers)
- b) Notification (of all subscribers)
Subscription and Unsubscription
The implementation of IObservable(T) contains a subscribe method – which returns an instance of the RemoveMe class (which implements the IDisposable interface). This ensures that subscribers can call Dispose() when they are ready to unsubscribe.
- public IDisposable Subscribe(IObserver<T> observer)
- {
- if (!observers.Contains(observer))
- {
- observers.Add(observer);
- }
- return new RemoveMe(observers, observer);
- }
- private class RemoveMe : IDisposable
- {
- private List<IObserver<T>> observers;
- private IObserver<T> observer;
- public RemoveMe(List<IObserver<T>> observers, IObserver<T> observer)
- {
- this.observers = observers;
- this.observer = observer;
- }
- public void Dispose()
- {
- if (observer != null && observers.Contains(observer))
- {
- observers.Remove(observer);
- }
- }
- }
Notification
Notify can be called to notify all observers that there are changes in the observed object.
- protected void Notify(T obj)
- {
- foreach (IObserver<T> observer in observers)
- {
- observer.OnNext(obj);
- }
- }
The Subscriber
Now that we have successfully described the two main responsibilities of the Publisher, we are ready to look at a sample Subscriber. The only thing a Subscriber needs to do is implement the IObserver<T> interface – using the Publisher as the type T that it is observing. This is a particular simple approach – even though it somewhat couples the subscriber to a specific publisher type.
Since the Publisher instance is passed in to the OnNext method, it is easy to retrieve all Publisher specific info in the subscriber. This example shows a subscriber getting notified of a new issue – and printing out the month of the issue.
- /// <summary>
- /// ‘Real-World’ Subscriber – Notice that it takes the type parameter MagazinePublisher in the IObserver interface
- /// </summary>
- class Subscriber : IObserver<MagazinePublisher>
- {
- public void OnCompleted()
- {
- Console.WriteLine(“Unsubscribed”);
- }
- public void OnError(Exception error)
- {
- Console.WriteLine(“Error”);
- }
- public void OnNext(MagazinePublisher mp)
- {
- Console.WriteLine(“New Issue: “ + mp.NewIssue);
- }
- }
Tying the Subscriber to the Publisher
This is the last step in our sample solution – now that we have both the Subscriber and the Publisher completely defined, we can use them in a simple program.
- static void Main(string[] args)
- {
- MagazinePublisher mp = new MagazinePublisher();
- Subscriber s1 = new Subscriber();
- Subscriber s2 = new Subscriber();
- var removeMe1 = mp.Subscribe(s1);
- var removeMe2 = mp.Subscribe(s2);
- // ‘setting’ this NewIssue property – will call notify on all the subscribers
- mp.NewIssue = DateTime.Now.ToString(“MMMM”) + ” issue”;
- // Cleanup
- removeMe1.Dispose();
- removeMe2.Dispose();
- }
Summary
There are several advantages of using these new interfaces to use in your own publish subscribe solution.
- Consistency, Uniformity – Everyone’s Publish Subscribe will now look the same – a great time-saver for new people brought onto a project.
- Moving away from events delegates –While events and delegates are great- and can be used to provide the same implementation as the one above, one needs to be extra careful in removing (unsubscribing) from subscribed events. There are also a few other ‘gotchas’ in a typical publish subscribe pattern (that are discussed in detail here).
It isn’t everyday that one gets excited about the introduction of a couple of new interfaces – but given the ubiquitous applicability of the Publish Subscribe (or Observer) pattern, this is a fairly big deal. The only thing I disliked was the naming of the two interfaces – it leads to confusion. Which is why this article continues to refer to them as Publisher (IObservable) and Subscriber (IObserver).
Full Solution
Download .NET 4.0 IObservable Implementation Full Solution
Note – To compile successfully, your target framework must be set to .NET 4.0 (Right click the project properties and set the target framework). If you haven’t already, I would recommend using Visual Studio 2011 (beta) available here.
I’ve looked all over for an article like this. I knew there had to be a better way. Clear and concise.
Thanks
I’m trying to figure out how the “OnCompleted” function routine in the Subscriber code get’s called? Seems like the removeMe routines should unsubscribe before disposing?
Yes – you are correct. The unsubscribe can be called inside the OnCompleted() (btw – the OnCompleted event is raised by the runtime, you do not need to explicitly raise it). However, the only thing the unsubscribe does is Dispose() of the instance (subscriber). The example calls Dispose() on the subscriber inside the Main() method – which accomplishes the same thing.
Either way is fine – I like to keep the Dispose() at the same code layer as the subscription – in this case inside the main() method.
Also, the Subscribe() implementation has a RemoveMe() method – which contains the Dispose() implementation. When one calls .Dispose() on a subscriber, this impl gets called – removing that subscriber from the list of subscribers.
Hello Anuj,
This is a great article. Is it possible to make this async?
Thanks
So – while I think that is fine in principle, IObservable works just like Task . In other words, each subscriber gets its own thread to get notified on, and while all subscribers get notified in parallel, there isn’t a concept of asynchronous notification (if that is what you are referring to).
So I am not sure that making it async (which means it can only return Task) will buy you anything new – just returning IObservable is sufficient.
Nice Article – Thanke you ! The only 2 things I didn’t understand are: onError() and onComplete(). Will they be activated automatically, and why so we need them?