Cache as Fallback using Reactive Extensions

In my previous article I wrote about how to use the Isolated Storage using Reactive Extensions. Let’s combine that with the ability to access online resources using Reactive Extensions.

What I want to achieve can be explained best by a picture. Basically if downloading of a resource fails it should fall back to the cache.

image

Let’s combine some code from the previous article.

First of all we want to download a resource from the internet, and if that’s successful we want to cache that data.

public static IObservable<MemoryStream> CacheAsFallback(Uri onlineResource, string cacheKey)
{
    IObservable<MemoryStream> observableOnlineResource = FromInternet(onlineResource);
    return observableOnlineResource
        .Select(stream =>
                    {
                        CacheResource(stream, cacheKey);
                        stream.Position = 0;
                        return stream;
                    });
}

Yes that’s all, it’s easy to combine Reactive Extensions (Rx) with Linq. So the above code will make sure that the resource that’s downloaded is cached before it’s returned to the caller. Please take a notion on the resetting of the stream position.

Catch an exception in Reactive Extensions

The next step is a little bit less seen in the Rx examples found on the web. Catch an exception and continue with a different Observable.

public static IObservable<TSource> Catch<TSource, TException>(
    this IObservable<TSource> source,
    Func<TException, IObservable<TSource>> handler
    )
    where TException : Exception

So Catch is an Extension method that accepts a Func<Exception, IObservable<T>>. In our case that would be an IObservable<MemoryStream>. If you want to catch only specific Exception types, you could do that as well, but I want to fall back in case of any error. Further more the FromCache Observable returns null when nothing is found. I want that to be translated to an Exception.

public static IObservable<MemoryStream> CacheAsFallback(Uri onlineResource, string cacheKey)
{
    IObservable<MemoryStream> observableOnlineResource = FromInternet(onlineResource);
    return observableOnlineResource
        .Select(stream =>
                    {
                        CacheResource(stream, cacheKey);
                        stream.Position = 0;
                        return stream;
                    })
        .Catch<MemoryStream, Exception>(
            exception => FromCache(cacheKey)
                                .Select(cacheStream =>
                                            {
                                                if (cacheStream == null)
                                                    throw new Exception("No cache");
                                                return cacheStream;
                                            }));
}

So now I can write below code to get a resource, and in case of network connectivity problems it will fallback to a cache value.

const string url = "http://mark.mymonster.nl/Content/photo.jpg";
IObservable<MemoryStream> observable = ObservableResource.CacheAsFallback(new Uri(url), "photo.jpg");
observable
    .ObserveOnDispatcher()
    .Subscribe(
        stream =>
            {
                if (stream != null)
                {
                    var image = new BitmapImage();
                    image.SetSource(stream);
                    TargetImage.Source = image;
                }
            },
        exc => Debug.WriteLine(string.Format("Oops: {0}", exc.Message)),
        () => Debug.WriteLine("Completed."));

Now let’s combine everything we have again, toggle below to view it.

public static class StreamExtensions
{
    public static MemoryStream ToMemoryStream(this Stream stream)
    {
        var br = new BinaryReader(stream);
        var outputStream = new MemoryStream();
        var buffer = new byte[1024];
        int cb;
        while ((cb = br.Read(buffer, 0, buffer.Length)) > 0)
        {
            outputStream.Write(buffer, 0, cb);
        }
        outputStream.Position = 0;
        return outputStream;
    }
}

public static class ObservableResource
{
    private const string CacheFolder = "cache";

    public static IObservable<MemoryStream> CacheAsFallback(Uri onlineResource, string cacheKey)
    {
        IObservable<MemoryStream> observableOnlineResource = FromInternet(onlineResource);
        return observableOnlineResource
            .Select(stream =>
                        {
                            CacheResource(stream, cacheKey);
                            stream.Position = 0;
                            return stream;
                        })
            .Catch<MemoryStream, Exception>(
                exception => FromCache(cacheKey)
                                    .Select(cacheStream =>
                                                {
                                                    if (cacheStream == null)
                                                        throw new Exception("No cache");
                                                    return cacheStream;
                                                }));
    }

    public static void CacheResource(MemoryStream toCache, string cacheKey)
    {
        string targetFile = Path.Combine(CacheFolder, cacheKey);
        using (IsolatedStorageFile isoFile = IsolatedStorageFile.GetUserStoreForApplication())
        {
            if (!isoFile.DirectoryExists(CacheFolder))
                isoFile.CreateDirectory(CacheFolder);
            if (isoFile.FileExists(targetFile))
                isoFile.DeleteFile(targetFile);
            using (IsolatedStorageFileStream outputStream = isoFile.CreateFile(targetFile))
            {
                toCache.WriteTo(outputStream);
            }
        }
    }

    public static IObservable<MemoryStream> FromCache(string cacheKey)
    {
        return new AnonymousObservable<MemoryStream>(
            observer =>
                {
                    string targetFile = Path.Combine(CacheFolder, cacheKey);

                    using (IsolatedStorageFile isoFile = IsolatedStorageFile.GetUserStoreForApplication())
                    {
                        if (!isoFile.FileExists(targetFile))
                        {
                            observer.OnNext(null);
                        }
                        else
                        {
                            using (IsolatedStorageFileStream inputStream =
                                isoFile.OpenFile(targetFile, FileMode.Open, FileAccess.Read))
                            {
                                observer.OnNext(inputStream.ToMemoryStream());
                            }
                        }
                    }
                    observer.OnCompleted();
                    return Disposable.Empty;
                });
    }

    public static IObservable<MemoryStream> FromInternet(Uri onlineResource)
    {
        return
            new AnonymousObservable<WebResponse>(
                observer =>
                    {
                        var httpWebRequest =
                            (HttpWebRequest) WebRequest.Create(onlineResource);
                        httpWebRequest.BeginGetResponse(
                            iar =>
                                {
                                    WebResponse response;
                                    try
                                    {
                                        var requestState = (HttpWebRequest) iar.AsyncState;
                                        response = requestState.EndGetResponse(iar);
                                    }
                                    catch (Exception exception)
                                    {
                                        observer.OnError(exception);
                                        return;
                                    }
                                    observer.OnNext(response);
                                    observer.OnCompleted();
                                }, httpWebRequest);
                        return Disposable.Empty;
                    })
                .Select(
                    response => response.GetResponseStream().ToMemoryStream());
    }
}
Gravatar