Jump to content

Good uses for .NET's Reactive Extensions


thefellow3j

Recommended Posts

Who's heard of Reactive Extensions (Rx) for .NET? It's a framework for streamlining "asynchronous sequences of data." Basically, you can turn .NET events into collections of events, much like IEnumerables. (Check out Reactive Extensions here: http://msdn.microsoft.com/en-us/data/gg577609)

 

This thread is for ideas and ways on how to make use of Rx to solve situations we might face in code.

 


 

I thought up an idea involving INotifyPropertyChanging and INotifyPropertyChanged where you can get "change sets" of objects. That way, if you have a class that makes a lot of changes to many properties when you set one, you can force other code that listens to an IObservable that fires when enough PropertyChanging and PropertyChanged events are fired from an object. It's a good way to minimize the time it takes for handler logic to run.

 

Here's an example. I have a class called FilterClass defined like this:

 

using System.ComponentModel;
public class FilterClass : INotifyPropertyChanging, INotifyPropertyChanged
{
   private int _Height;
   public int Height
   {
    get
    {
	    return _Height;
    }
    set
    {
	    if (Equals(_Height, value))
		    return;
	    OnNotifyPropertyChanging("Height");
	    OnNotifyPropertyChanging("Size");
	    _Height = value;
	    OnNotifyPropertyChanged("Height");
	    OnNotifyPropertyChanged("Size");
    }
   }

   private int _Width;
   public int Width
   {
    get
    {
	    return _Width;
    }
    set
    {
	    if (Equals(_Width, value))
		    return;
	    OnNotifyPropertyChanging("Width");
	    OnNotifyPropertyChanging("Size");
	    _Width = value;
	    OnNotifyPropertyChanged("Width");
	    OnNotifyPropertyChanged("Size");
    }
   }

   public long Size
   {
    get { return Height * Width; }
   }

   #region INotifyPropertyChanging Members
   public event System.ComponentModel.PropertyChangingEventHandler PropertyChanging;
   protected void OnNotifyPropertyChanging(string prop)
   {
    if (PropertyChanging != null) PropertyChanging(this, new System.ComponentModel.PropertyChangingEventArgs(prop));
   }
   #endregion
   #region INotifyPropertyChanged Members
   public event System.ComponentModel.PropertyChangedEventHandler PropertyChanged;
   protected void OnNotifyPropertyChanged(string prop)
   {
    if (PropertyChanged != null) PropertyChanged(this, new System.ComponentModel.PropertyChangedEventArgs(prop));
   }
   #endregion
}

 

I have a Form that needs to refresh the contents of a grid using the value of the FilterClass.Size property and the FilterClass.Width property. The grid has over 1,000,000 records and it takes at least 5 seconds to process the records for a filter.

 

Usually, in order to wait for both properties, you would need a PropertyChanged handler and one or two private variables to track which properties events have been raised for. Then you would make your filtering call.

 

Using Rx, you could use this code to generate an event that produces all of the properties that changes were reported for after the last PropertyChanged event is fired:

 

using System;
using System.Collections;
using System.Collections.Generic;
using System.Collections.Specialized;
using System.ComponentModel;
using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reflection;
using System.Windows;
namespace USL.Events
{
   public static class EventExtensions
   {
    public static IObservable<PropertyChangedEventArgs> GetPropertyChanged(this INotifyPropertyChanged source)
    {
	    var evt = Observable.FromEvent<PropertyChangedEventHandler, PropertyChangedEventArgs>(h => (sender, e) => h(e),
		    h => source.PropertyChanged += h,
		    h => source.PropertyChanged -= h);
	    return evt;
    }

    public static IObservable<PropertyChangingEventArgs> GetPropertyChanging(this INotifyPropertyChanging source)
    {
	    var evt = Observable.FromEvent<PropertyChangingEventHandler, PropertyChangingEventArgs>(h => (sender, e) => h(e),
		    h => source.PropertyChanging += h,
		    h => source.PropertyChanging -= h);
	    return evt;
    }

    public static ReadOnlyCollection<T> ToReadOnlyCollection<T>(this IEnumerable<T> col)
    {
	    if (col == null)
		    throw new ArgumentNullException("col");
	    return new ReadOnlyCollection<T>(col.ToArray());
    }

    public static ReadOnlyDictionary<TKey, TSource> ToReadOnlyDictionary<TSource, TKey>(this IEnumerable<TSource> col, Func<TSource, TKey> generator)
    {
	    if (col == null)
		    throw new ArgumentNullException("col");
	    var valuePairs = col
		    .Select(x => new KeyValuePair<TKey, TSource>(generator(x), x));
	    return new ReadOnlyDictionary<TKey, TSource>(valuePairs);
    }

    #region Change Sets
    public static IObservable<ReadOnlyDictionary<string, ChangeSetItem>> GetChangeSets<T>(this T item)
	    where T : INotifyPropertyChanging, INotifyPropertyChanged
    {
	    var type = typeof(T);

	    var changing = item.GetPropertyChanging().Select(x => new { Notification = ChangeNotification.Changing, Property = x.PropertyName });
	    var changed = item.GetPropertyChanged().Select(x => new { Notification = ChangeNotification.Changed, Property = x.PropertyName });
	    var gate = new object();
	    return Observable.Create<ReadOnlyDictionary<string, ChangeSetItem>>(obs =>
	    {
		    var propInfo = new Dictionary<string, PropertyInfo>();
		    var propIsChanging = new Dictionary<string, bool>();
		    var propValues = new Dictionary<string, List<object>>();
		    var changingSubscription = changing.Subscribe(next =>
		    {
			    lock (gate)
			    {
				    if (!propInfo.ContainsKey(next.Property))
					    propInfo.Add(next.Property, type.GetProperty(next.Property));
				    var info = propInfo[next.Property];
				    propIsChanging[next.Property] = true;
				    if (!propValues.ContainsKey(next.Property))
				    {
					    propValues[next.Property] = new List<object>();
					    propValues[next.Property].Add(info.GetValue(item, null));
				    }
			    }
		    });
		    var changedSubscription = changed.Subscribe(next =>
		    {
			    lock (gate)
			    {
				    if (!propInfo.ContainsKey(next.Property))
					    propInfo.Add(next.Property, type.GetProperty(next.Property));
				    var info = propInfo[next.Property];
				    if (propValues.Empty())
				    {
					    var changeSetItem = new ChangeSetItem(next.Property, info.GetValue(item, null));
					    var changeSetItems = new ReadOnlyDictionary<ChangeSetItem>(new List<ChangeSetItem> { changeSetItem })
						    .ToReadOnlyDictionary(x => x.Property);
					    propInfo.Clear();
					    propIsChanging.Clear();
					    propValues.Clear();
					    obs.OnNext(changeSetItems);
				    }
				    else
				    {
					    propIsChanging[next.Property] = false;
					    if (!propValues.ContainsKey(next.Property))
						    propValues[next.Property] = new List<object>();
					    propValues[next.Property].Add(info.GetValue(item, null));
					    if (propIsChanging.Values.All(isChanging => !isChanging))
					    {
						    var changeSetItems = propValues
							    .Select(prop => new ChangeSetItem(prop.Key, prop.Value.ToReadOnlyCollection()))
							    .ToReadOnlyDictionary(x => x.Property);
						    propInfo.Clear();
						    propIsChanging.Clear();
						    propValues.Clear();
						    obs.OnNext(changeSetItems);
					    }
				    }
			    }
		    });
		    return new CompositeDisposable(changingSubscription, changedSubscription);
	    });
    }
    private enum ChangeNotification
    {
	    Changing,
	    Changed
    }
    #endregion
   }

   public struct ChangeSetItem
   {
    public ChangeSetItem(string property, object newValue)
	    : this()
    {
	    this.Property = property;
	    this.OldValueKnown = false;
	    this.Values = Enumerable.Repeat(newValue, 1).ToReadOnlyCollection();
    }
    public ChangeSetItem(string property, object newValue, IEnumerable<object> oldValues)
	    : this()
    {
	    if (oldValues == null)
		    throw new ArgumentNullException("oldValues");
	    this.Property = property;
	    var values = oldValues.ToReadOnlyCollection();
	    this.OldValueKnown = values.Any();
	    this.Values = values.Append(newValue).ToReadOnlyCollection();
    }
    public ChangeSetItem(string property, IEnumerable<object> values)
	    : this()
    {
	    this.Property = property;
	    this.Values = values.ToReadOnlyCollection();
	    this.OldValueKnown = Values.ContainsAtLeast(2);
    }
    public string Property { get; private set; }
    public ReadOnlyCollection<object> Values { get; private set; }
    public bool OldValueKnown { get; private set; }
    public object OldestValue
    {
	    get
	    {
		    if (!OldValueKnown)
			    throw new InvalidOperationException("There is no old value.");
		    return Values.First();
	    }
    }
    public object NewestValue
    {
	    get { return Values.Last(); }
    }
   }

   public class ReadOnlyDictionary<TKey, TValue> : IDictionary<TKey, TValue>
   {
    public ReadOnlyDictionary(IEnumerable<KeyValuePair<TKey, TValue>> source)
    {
	    mSource = source.ToDictionary(x => x.Key, x => x.Value);
    }
    private readonly Dictionary<TKey, TValue> mSource;
    public TValue this[TKey key]
    {
	    get { return mSource[key]; }
    }
    public int Count
    {
	    get { return mSource.Count; }
    }
    public bool IsReadOnly
    {
	    get { return true; }
    }
    public ICollection<TKey> Keys
    {
	    get { return mSource.Keys; }
    }
    public ICollection<TValue> Values
    {
	    get { return mSource.Values; }
    }

    public bool Contains(KeyValuePair<TKey, TValue> item)
    {
	    return mSource.Contains(item);
    }
    public void CopyTo(KeyValuePair<TKey, TValue>[] array, int arrayIndex)
    {
	    (mSource as IDictionary<TKey, TValue>).CopyTo(array, arrayIndex);
    }

    public bool ContainsKey(TKey key)
    {
	    return mSource.ContainsKey(key);
    }
    public bool TryGetValue(TKey key, out TValue value)
    {
	    return mSource.TryGetValue(key, out value);
    }

    #region IDictionary<TKey,TValue> Members
    void IDictionary<TKey, TValue>.Add(TKey key, TValue value)
    {
	    throw new NotSupportedException();
    }
    bool IDictionary<TKey, TValue>.Remove(TKey key)
    {
	    throw new NotSupportedException();
    }
    TValue IDictionary<TKey, TValue>.this[TKey key]
    {
	    get
	    {
		    return mSource[key];
	    }
	    set
	    {
		    throw new NotSupportedException();
	    }
    }
    #endregion
    #region ICollection<KeyValuePair<TKey,TValue>> Members
    void ICollection<KeyValuePair<TKey, TValue>>.Add(KeyValuePair<TKey, TValue> item)
    {
	    throw new NotSupportedException();
    }
    void ICollection<KeyValuePair<TKey, TValue>>.Clear()
    {
	    throw new NotSupportedException();
    }
    bool ICollection<KeyValuePair<TKey, TValue>>.Remove(KeyValuePair<TKey, TValue> item)
    {
	    throw new NotSupportedException();
    }
    #endregion
    #region IEnumerable<KeyValuePair<TKey,TValue>> Members
    public IEnumerator<KeyValuePair<TKey, TValue>> GetEnumerator()
    {
	    return mSource.GetEnumerator();
    }
    #endregion
    #region IEnumerable Members
    IEnumerator IEnumerable.GetEnumerator()
    {
	    return mSource.GetEnumerator();
    }
    #endregion
   }
}

 

Then, in the Form, call the extension method to create the IObservable:

 

private FilterClass filter = new FilterClass();
private IObservable<ReadOnlyDictionary<string, ChangeSetItem>> filterChanged;
private IDisposable filterChangedSubscription;
public GridForm()
{
   filter.Height = 75;
   filterChanged = filter.GetChangeSets();
   filterChangedSubscription = filterChanged.Subscribe(next => UpdateRecordsInGrid(filter.Width, filter.Size));
}
private void UpdateRecordsInGrid(int width, long Size)
{
   // Do some filtering here.
}

 

And there! Whenever a change is made to Width, the IObservable first picks up that two properties are about to change. Then it picks up that one changed, then it picks up the other change. When the other change comes, one event is sent to the Form, where the operation will commence.

 

This doesn't look like much, but what if the filter contained over 20 different fields, and the grid had to update instantly? That's a lot of code you'd have to write to try and keep up with events.

 


 

So that's my idea. I want to see yours. How can we make use of Rx to make events and asynchronity easier for us?

Link to comment
Share on other sites

Join the conversation

You can post now and register later. If you have an account, sign in now to post with your account.

Guest
Reply to this topic...

×   Pasted as rich text.   Paste as plain text instead

  Only 75 emoji are allowed.

×   Your link has been automatically embedded.   Display as a link instead

×   Your previous content has been restored.   Clear editor

×   You cannot paste images directly. Upload or insert images from URL.

×
×
  • Create New...