Isolated Storage as Cache using Reactive Extensions

A lot of people are used to write some data to the Isolated Storage to use it as a Cache. I’m doing that as well, it’s easy to write and read to the Isolated Storage. Some people just write text/xml data and want to put the content in a string. I have however a lot of situations where caching of images is important, so my approach doesn’t make use of a string but a MemoryStream instead. So I have a static method that writes a MemoryStream to a file in Isolated Storage.

private const string CacheFolder = "cache";
public static void CacheResource(MemoryStream toCache, string cacheKey)
{
    string targetFile = Path.Combine(CacheFolder, cacheKey);
    using (IsolatedStorageFile isoFile = IsolatedStorageFile.GetUserStoreForApplication())
    {
        if (!isoFile.DirectoryExists(CacheFolder))
            isoFile.CreateDirectory(CacheFolder);
        if (isoFile.FileExists(targetFile))
            isoFile.DeleteFile(targetFile);
        using (IsolatedStorageFileStream outputStream = isoFile.CreateFile(targetFile))
        {
            toCache.WriteTo(outputStream);
        }
    }
}

That’s the easy part writing. I must admit reading isn’t much more complicated, unless you want to read it in a special way, namely using Reactive Extensions (Rx). Reactive Extensions offer a different way to look at asynchronous code, instead of listening to events, it allows to subscribe to things that happen. In addition to this it supports Linq on observable objects. I’m not going to dig deep on Rx, there are other blogs that can do that much better. The mechanism described in this article is intended for use in Windows Phone, but should work in Silverlight as well.

Reactive Extensions (Rx) over WebRequest

So why did I want to do use Rx? It’s simple: I want to have a similar signature and way of usage of code when I access content online (Async required) and when I access content from the cache (Standard only sync access). So how do I access online content?

In this example, I’m downloading online content and setting the MemoryStream as the Source of a BitmapImage and put the BitmapImage as the source of an Image control. It’s a regular Rx way of reacting on an Observable, where I put a lambda to react on OnNext, OnError and OnCompleted. Further more because I’m interacting with the User Interface it’s important to run that code on the Dispatcher. Running code on the dispatcher can be done manually or you can use Rx combined with ObserveOnDispatcher method.

const string url = "http://mark.mymonster.nl/Content/photo.jpg";
IObservable<MemoryStream> observable = ObservableResource.FromInternet(new Uri(url));
observable
    .ObserveOnDispatcher()
    .Subscribe(
        stream =>
        {
            if (stream != null)
            {
                var image = new BitmapImage();
                image.SetSource(stream);
                TargetImage.Source = image;
            }
        },
        exc => Debug.WriteLine(string.Format("Oops: {0}", exc.Message)),
        () => Debug.WriteLine("Completed."));

So how does my ResourceHelper.OnlineResource method look like? I’m using an AnonymousObservable to construct an observable from the Async pattern in a WebRequest. Please don’t tell me I could use Observable.FromAsyncPattern, I’ve used it in the past, but have stopped using it, I’m now writing my own Observable code to be sure I’m in control.

public class AnonymousObservable<T> : IObservable<T>
{
    private readonly Func<IObserver<T>, IDisposable> _subscribeAction;

    public AnonymousObservable(Func<IObserver<T>, IDisposable> subscribeAction)
    {
        _subscribeAction = subscribeAction;
    }

    #region IObservable<T> Members

    public IDisposable Subscribe(IObserver<T> observer)
    {
        return _subscribeAction(observer);
    }

    #endregion
}

This anonymous Observable class is a simple implementation that get’s a lambda passed in which will be executed on Subscribe. So finally we have the FromInternet implementation combined with a operation that reads a stream to a MemoryStream.

public static IObservable<MemoryStream> FromInternet(Uri onlineResource)
{
    return
        new AnonymousObservable<WebResponse>(
            observer =>
                {
                    var httpWebRequest =
                        (HttpWebRequest) WebRequest.Create(onlineResource);
                    httpWebRequest.BeginGetResponse(
                        iar =>
                            {
                                WebResponse response;
                                try
                                {
                                    var requestState = (HttpWebRequest) iar.AsyncState;
                                    response = requestState.EndGetResponse(iar);
                                }
                                catch (Exception exception)
                                {
                                    observer.OnError(exception);
                                    return;
                                }
                                observer.OnNext(response);
                                observer.OnCompleted();
                            }, httpWebRequest);
                    return Disposable.Empty;
                })
            .Select(
                response => response.GetResponseStream().ToMemoryStream());
}

What you can see in the Lambda that get’s passed into the AnonymousObservable for Online Resources, it’s similar to what we would have done in the past, but now it’s calling observer.OnNext, observer.OnError and observer.OnCompleted operations. Further more you can see that I attached a Select operation we are used to in Linq to convert a WebResponse to a MemoryStream using the ToMemoryStream extension method.

public static class StreamExtensions
{
    public static MemoryStream ToMemoryStream(this Stream stream)
    {
        var br = new BinaryReader(stream);
        var outputStream = new MemoryStream();
        var buffer = new byte[1024];
        int cb;
        while ((cb = br.Read(buffer, 0, buffer.Length)) > 0)
        {
            outputStream.Write(buffer, 0, cb);
        }
        outputStream.Position = 0;
        return outputStream;
    }
}

Reactive Extensions (Rx) over Isolated Storage

So far it has all been preparation for the Reactive Extensions over Isolated Storage. Where the implementation is actually really simple, it’s now wrapped in an Anonymous Observable.

public static IObservable<MemoryStream> FromCache(string cacheKey)
{
    return new AnonymousObservable<MemoryStream>(
        observer =>
            {
                string targetFile = Path.Combine(CacheFolder, cacheKey);

                using (IsolatedStorageFile isoFile = IsolatedStorageFile.GetUserStoreForApplication())
                {
                    if (!isoFile.FileExists(targetFile))
                    {
                        observer.OnNext(null);
                    }
                    else
                    {
                        using (IsolatedStorageFileStream inputStream =
                            isoFile.OpenFile(targetFile, FileMode.Open, FileAccess.Read))
                        {
                            observer.OnNext(inputStream.ToMemoryStream());
                        }
                    }
                }
                observer.OnCompleted();
                return Disposable.Empty;
            });
}

This now enables me to get something from cache in the same way as I can get something from the internet. Please mark the “photo.jpg” is a key for accessing the cache, which in the end results in data. If you choose a different caching-strategy you should remind this is a cache key.

IObservable<MemoryStream> observable = ObservableResource.FromCache("photo.jpg");
observable
    .ObserveOnDispatcher()
    .Subscribe(
        stream =>
        {
            if (stream != null)
            {
                var image = new BitmapImage();
                image.SetSource(stream);
                TargetImage.Source = image;
            }
        },
        exc => Debug.WriteLine(string.Format("Oops: {0}", exc.Message)),
        () => Debug.WriteLine("Completed."));

You can find the full source code here, by toggling.

public static class StreamExtensions
{
    public static MemoryStream ToMemoryStream(this Stream stream)
    {
        var br = new BinaryReader(stream);
        var outputStream = new MemoryStream();
        var buffer = new byte[1024];
        int cb;
        while ((cb = br.Read(buffer, 0, buffer.Length)) > 0)
        {
            outputStream.Write(buffer, 0, cb);
        }
        outputStream.Position = 0;
        return outputStream;
    }
}

public static class ObservableResource
{
    private const string CacheFolder = "cache";

    public static void CacheResource(MemoryStream toCache, string cacheKey)
    {
        string targetFile = Path.Combine(CacheFolder, cacheKey);
        using (IsolatedStorageFile isoFile = IsolatedStorageFile.GetUserStoreForApplication())
        {
            if (!isoFile.DirectoryExists(CacheFolder))
                isoFile.CreateDirectory(CacheFolder);
            if (isoFile.FileExists(targetFile))
                isoFile.DeleteFile(targetFile);
            using (IsolatedStorageFileStream outputStream = isoFile.CreateFile(targetFile))
            {
                toCache.WriteTo(outputStream);
            }
        }
    }

    public static IObservable<MemoryStream> FromCache(string cacheKey)
    {
        return new AnonymousObservable<MemoryStream>(
            observer =>
                {
                    string targetFile = Path.Combine(CacheFolder, cacheKey);

                    using (IsolatedStorageFile isoFile = IsolatedStorageFile.GetUserStoreForApplication())
                    {
                        if (!isoFile.FileExists(targetFile))
                        {
                            observer.OnNext(null);
                        }
                        else
                        {
                            using (IsolatedStorageFileStream inputStream =
                                isoFile.OpenFile(targetFile, FileMode.Open, FileAccess.Read))
                            {
                                observer.OnNext(inputStream.ToMemoryStream());
                            }
                        }
                    }
                    observer.OnCompleted();
                    return Disposable.Empty;
                });
    }

    public static IObservable<MemoryStream> FromInternet(Uri onlineResource)
    {
        return
            new AnonymousObservable<WebResponse>(
                observer =>
                    {
                        var httpWebRequest =
                            (HttpWebRequest) WebRequest.Create(onlineResource);
                        httpWebRequest.BeginGetResponse(
                            iar =>
                                {
                                    WebResponse response;
                                    try
                                    {
                                        var requestState = (HttpWebRequest) iar.AsyncState;
                                        response = requestState.EndGetResponse(iar);
                                    }
                                    catch (Exception exception)
                                    {
                                        observer.OnError(exception);
                                        return;
                                    }
                                    observer.OnNext(response);
                                    observer.OnCompleted();
                                }, httpWebRequest);
                        return Disposable.Empty;
                    })
                .Select(
                    response => response.GetResponseStream().ToMemoryStream());
    }
}
Gravatar