Wednesday, January 13, 2016

Owin Appfuncs and Midfuncs

I received some questions about Owin appfuncs and midfuncs, because they are not the easies thing to wrap your head around.

They have the following (rather ugly) definition:

using AppFunc = System.Func<System.Collections.Generic.Dictionary<string, object>, System.Threading.Tasks.Task>;
using MidFunc = System.Func<System.Func<System.Collections.Generic.Dictionary<string, object>, System.Threading.Tasks.Task>, System.Func<System.Collections.Generic.Dictionary<string, object>, System.Threading.Tasks.Task>>

Wow.. that’s easy to understand right? So what’s going on here?

What’s an appfunc?

An appfunc is basically a function that, when invoked with a http request, implements part of a webapplication and thus returns an http response. It can be anything ranging from a function that returns just an http statuscode to a complete webapplication.

A slightly more readable way to think of an appfunc is:

delegate Task AppFunc(Dictionary<string, object> environment);

It’s a function (that executes asynchronously, so it returns a task) that takes a dictionary as a parameter. That dictionary will contain keys that describe both the incoming webrequest as the outgoing response, such as RequestUrl, StatusCode, ResponseStream, etc..

Why do you need a midfunc then?

Owin is designed to make pluggable components into a pipeline. The Midfunc allows you to compose functions (and components) together.

 

A (simplified) example.

public delegate void SimplifiedAppFunc(string url, ref string response);

        /// <summary>
        /// Simplified example of an appfunc that returns helloworld only if you go to the url helloworld.html
        /// </summary>
        /// <param name="url"></param>
        /// <param name="response"></param>
        public static void HelloWorld(string url, ref string response)
        {
            if (url == "helloworld.html")
            {
                response = "<Hello> <World/> </Hello>";
            }
        }

This code is a simplified version of an appfunc. When invoked with the right webrequest, it will return a hello world response.

Now if you want to implement a component, for example request logging, you could do the following:

/// <summary>
        /// Static (non flexible) example of composing functions together to build an appliation
        /// 
        /// Here the LogREquest is limited to calling ONLY the hello world function
        /// </summary>
        /// <param name="url"></param>
        /// <param name="response"></param>
        public static void LogRequest(string url, ref string response)
        {
            Logger.Log("Received request to ", url);

            HelloWorld(url, ref response);

            Logger.Log("Completed request to ", url, "Response was ", response);
        }

        public static void InvokeStaticAppFunc()
        {
            SimplifiedAppFunc appfunc = LogRequest;

            string response = null;
            appfunc("helloworld.html", ref response);

            Console.WriteLine("static appfunc response: " + response);
        }

 

When you call this mini ‘webapplication’, it will log all requests and if the request happens to be for helloworld.html, it will return the hello world page.

But this is hardly a flexible approach, because the LogRequest function now directly calls the helloworld application. To make this component flexible, you’ll need a midfunc:

public delegate SimplifiedAppFunc SimplifiedMidFunc(SimplifiedAppFunc appfunc);

        public static SimplifiedAppFunc LogRequest(SimplifiedAppFunc next)
        {
            return (string url, ref string response) =>
            {

                Logger.Log("Received request to ", url);

                next(url, ref response);

                Logger.Log("Completed request to ", url, "Response was ", response);
            };
        }

 

Now the logrequest function takes an appfunc (which is a mini webapplication), and then extends it with it’s own functionality (in this case request logging). It doesn’t execute it itself, but it returns a new function. Yes, this is functional programming in action here.

Note how you can execute logic both before and after the next() function is called. That means you could do something to intercept / modify the request and / or something to intercept / do something with the response.

Now you have to combine these together when executing:

public static void InvokeComposedAppFunc()
        {
            string response = null;

            SimplifiedAppFunc application = HelloWorld;
            
            // create the function log request. Inject the function Hello world application. 
            SimplifiedAppFunc appfunc = LogRequest(application);

            appfunc("helloworld.html", ref response);

            Console.WriteLine("composed appfunc response: " + response);
        }

When you invoke the logrequest midfunc, with another appfunc (helloworld) as a parameter, you get back a new appfunc. That appfunc is now again a mini webapplication that, when you invoke it, does logging AND returns a helloworld.

Now back to owin appfunc and midfunc

Now if you think about it like this, owin isn’t as scary anymore:

public static Task HelloWorld(Dictionary<string, object> environment)
        {
            string responseText = "Hello World";
            byte[] responseBytes = Encoding.UTF8.GetBytes(responseText);

            // See http://owin.org/spec/owin-1.0.0.html for standard environment keys.
            Stream responseStream = (Stream)environment["owin.ResponseBody"];
            IDictionary<string, string[]> responseHeaders =
                (IDictionary<string, string[]>)environment["owin.ResponseHeaders"];

            responseHeaders["Content-Length"] = new string[] { responseBytes.Length.ToString(CultureInfo.InvariantCulture) };
            responseHeaders["Content-Type"] = new string[] { "text/plain" };

            return Task.Factory.FromAsync(responseStream.BeginWrite, responseStream.EndWrite, responseBytes, 0, responseBytes.Length, null);
            // 4.5: return responseStream.WriteAsync(responseBytes, 0, responseBytes.Length);
        }

        public static AppFunc LogRequest(AppFunc next)
        {
            return async (env) =>
            {
                Logger.Log("received request", env["RequestUrl"]);
                await next(env);
                Logger.Log("completed request", env["RequestUrl"], env["StatusCode"]);
            };
        }

        public static async Task InvokeOwin()
        {
            AppFunc application = HelloWorld;
            MidFunc logRequest = LogRequest;

            AppFunc composedApp = logRequest(application);

            var environent = new Dictionary<string, object>()
            {

            };
            await composedApp(environent);
        }

 

But why not an interface?

Owin components can be built so that they depend ONLY on the .net framework and nothing else. This helps to maximize the pluggability of owin components. Because else, if your component depends on owin 1.0 and mine depends on owin 2.0, they can’t work together anymore.

But it looks so ugly.

That’s where AppBuilder, and OwinContext come to the rescue. You can use these classes (as an internal implementation detail) to make working with appfuncs and the environment dictionary a bit less scary.

Plenty of examples on how this would help:

// Note: By default all requests go through this OWIN pipeline. Alternatively you can turn this off by adding an appSetting owin:AutomaticAppStartup with value “false”. 
    // With this turned off you can still have OWIN apps listening on specific routes by adding routes in global.asax file using MapOwinPath or MapOwinRoute extensions on RouteTable.Routes
    public class Startup
    {
        // Invoked once at startup to configure your application.
        public void Configuration(IAppBuilder app)
        {
            app.Run(Invoke);
        }

        // Invoked once per request.
        public Task Invoke(IOwinContext context)
        {
            context.Response.ContentType = "text/plain";
            return context.Response.WriteAsync("Hello World");
        }
    }

Hope this helps understanding appfuncs and midfuncs a bit.

Happy funcing (without a k of course)

Friday, August 28, 2015

Cache asynchronous results

I love working with async / await in c#. Once you get used to it, and know a bit of the quirks, it becomes second nature. I’m finding that all my methods become async.. As terry prattchet used to say.. It’s Turtles (or in this case async) all the way down.

So I have nice asynchronous controllers, nice asynchronous methods to do data access.. And then I wanted to do caching today. But to my surprise there is no async cache. Ok, I’ve found an async cache in the paralell extensions. While it certainly allows me to cache async responses, I was missing some features I’m used to in a normal caching implementation, such as cache expiration or LRU expiration.

So I decided to steal copy the source code from the parallel extensions and add some expiration logic to it. Here’s the result. Hope you like it. Oh.. the logging implementation is LibLog.

   1: /// <summary>Caches asynchronously retrieved data.</summary>
   2: [DebuggerDisplay("Count={Count}")]
   3: public class AsyncCache : IEnumerable<AsyncCache.CachedItem>
   4: {
   5:     private static ILog s_log = LogProvider.GetCurrentClassLogger();
   6:     private readonly int _maxItems;
   7:     private readonly Timer _checkTimer;
   8:     private readonly TimeSpan _defaultLifeTime;
   9:  
  10:     /// <summary>The dictionary to store all of the tasks.</summary>
  11:     private readonly ConcurrentDictionary<string, CachedItem> _map;
  12:  
  13:     /// <summary>Initializes the cache.</summary>
  14:     public AsyncCache(int maxItems = 1000, TimeSpan? defaultLifeTime = null, TimeSpan? cacheCheckInterval = null)
  15:     {
  16:         _maxItems = maxItems;
  17:         _defaultLifeTime = defaultLifeTime ?? TimeSpan.FromHours(1);
  18:         _map = new ConcurrentDictionary<string, CachedItem>();
  19:         _checkTimer = new Timer((cacheCheckInterval ?? TimeSpan.FromSeconds(2)).TotalMilliseconds);
  20:         _checkTimer.Elapsed += CheckItemsForExpirations;
  21:         _checkTimer.Start();
  22:     }
  23:  
  24:     internal void CheckItemsForExpirations(object state = null, EventArgs args = null)
  25:     {
  26:         try
  27:         {
  28:             DateTime now = DateTime.UtcNow;
  29:  
  30:             foreach(var item in this.Where(x => x.ExpireAfter <= now).ToList())
  31:             {
  32:                 ExpireValue(item, "removeDate expired");
  33:             }
  34:  
  35:             if(this.Count > _maxItems)
  36:             {
  37:                 var itemsToRemove = this.OrderByDescending(x => x.LastAccessed)
  38:                     .Skip(_maxItems)
  39:                     .ToList();
  40:  
  41:                 foreach(var item in itemsToRemove)
  42:                 {
  43:                     ExpireValue(item, "removeDate expired");
  44:                 }
  45:             }
  46:         }
  47:         catch (Exception ex)
  48:         {
  49:             s_log.WarnException("Failed to clear cache", ex);
  50:         }
  51:  
  52:     }
  53:  
  54:     private void ExpireValue(CachedItem item, string reason)
  55:     {
  56:         CachedItem removedItem;
  57:         _map.TryRemove(item.Key, out removedItem);
  58:         s_log.InfoFormat("Expired item with key {itemKey} and expiration Date {expire} from cache due to {reason}", item.Key, item.ExpireAfter, reason);
  59:     }
  60:  
  61:     /// <summary>Gets the number of items in the cache.</summary>
  62:     public int Count
  63:     {
  64:         get { return _map.Count; }
  65:     }
  66:  
  67:     /// <summary>Gets an enumerator for the contents of the cache.</summary>
  68:     /// <returns>An enumerator for the contents of the cache.</returns>
  69:     public IEnumerator<CachedItem> GetEnumerator()
  70:     {
  71:         return _map.Values.GetEnumerator();
  72:     }
  73:  
  74:     /// <summary>Gets an enumerator for the contents of the cache.</summary>
  75:     /// <returns>An enumerator for the contents of the cache.</returns>
  76:     IEnumerator IEnumerable.GetEnumerator()
  77:     {
  78:         return GetEnumerator();
  79:     }
  80:  
  81:     /// <summary>Gets a Task to retrieve the value for the specified key.</summary>
  82:     /// <param name="key">The key whose value should be retrieved.</param>
  83:     /// <param name="valueFactory">A factory for producing the cache's values.</param>
  84:     /// <param name="expireAfter">When does the cached item expire</param>
  85:     /// <returns>A Task for the value of the specified key.</returns>
  86:     public async Task<TValue> GetOrAdd<TValue>(string key, Func<string, Task<TValue>> valueFactory, 
  87:         DateTime? expireAfter = null)
  88:     {
  89:         if(key == null)
  90:         {
  91:             throw new ArgumentNullException("key");
  92:         }
  93:         if(valueFactory == null)
  94:         {
  95:             throw new ArgumentNullException("valueFactory");
  96:         }
  97:  
  98:         Func<string, Task<TValue>> loggingValuefactory = async (k) =>
  99:         {
 100:             var v = await valueFactory(k);
 101:             s_log.InfoFormat("Created cached value for key {0}", key);
 102:             return v;
 103:         };
 104:  
 105:  
 106:         var cachedItem = _map.GetOrAdd(key, (k) =>
 107:         {
 108:             return new CachedItem(k,
 109:                 async () => await loggingValuefactory(k),
 110:                 expireAfter ?? DateTime.UtcNow.Add(_defaultLifeTime));
 111:         });
 112:  
 113:         var value = (await cachedItem.Value.Value);
 114:  
 115:         cachedItem.LastAccessed = DateTime.UtcNow;
 116:         if (value == null)
 117:         s_log.InfoFormat("Returned cached value for key {key}", key);
 118:  
 119:         if(Equals(value, default(TValue)))
 120:             return default(TValue);
 121:  
 122:         return (TValue) value;
 123:     }
 124:  
 125:     /// <summary>Sets the value for the specified key.</summary>
 126:     /// <param name="key">The key whose value should be set.</param>
 127:     /// <param name="value">The value to which the key should be set.</param>
 128:     public void SetValue(string key, object value)
 129:     {
 130:         SetValue(key, Task.Factory.FromResult(value));
 131:     }
 132:  
 133:     /// <summary>Sets the value for the specified key.</summary>
 134:     /// <param name="key">The key whose value should be set.</param>
 135:     /// <param name="asyncValue">The value to which the key should be set.</param>
 136:     public void SetValue(string key, Task<object> asyncValue, DateTime? expireAfter = null)
 137:     {
 138:         if(key == null)
 139:         {
 140:             throw new ArgumentNullException("key");
 141:         }
 142:         Func<Task<object>> func = async () =>
 143:         {
 144:             var value = await asyncValue;
 145:             s_log.InfoFormat("Created cached value for key {0}", key);
 146:             return value;
 147:         };
 148:         var cachedItem = new CachedItem(key, func, expireAfter ?? DateTime.UtcNow.Add(_defaultLifeTime));
 149:         _map[key] = cachedItem;
 150:     }
 151:  
 152:     /// <summary>Empties the cache.</summary>
 153:     public void Clear()
 154:     {
 155:         _map.Clear();
 156:     }
 157:  
 158:     public class CachedItem
 159:     {
 160:         public CachedItem(string key, Func<Task<object>> value, DateTime expireAfter)
 161:         {
 162:             Key = key;
 163:             Value = new Lazy<Task<object>>(value);
 164:             ExpireAfter = expireAfter;
 165:             LastAccessed = DateTime.UtcNow;
 166:         }
 167:  
 168:         private Lazy<Task<object>> _value;
 169:         public string Key { get; private set; }
 170:  
 171:         public Lazy<Task<object>> Value
 172:         {
 173:             get { return _value; }
 174:             set
 175:             {
 176:                 LastAccessed = DateTime.UtcNow;
 177:                 _value = value;
 178:             }
 179:         }
 180:  
 181:         public DateTime ExpireAfter { get; private set; }
 182:         public DateTime LastAccessed { get; set; }
 183:             
 184:     }
 185: }

 

I did write some tests to prove to myself that it works, using XUnit and Fluent Assertions:


   1: public class AsyncCacheTest
   2: {
   3:     [Fact]
   4:     public async Task CacheCanStoreAsyncValues()
   5:     {
   6:         Func<string, Task<int>> valueFactory = (_) => Task.FromResult(3);
   7:  
   8:         var subject = new AsyncCache();
   9:  
  10:         var result = await subject.GetOrAdd("key", valueFactory);
  11:  
  12:         result.Should().Be(3);
  13:  
  14:     }
  15:  
  16:     [Fact]
  17:     public async Task Cache_is_not_cleared_when_items_are_not_yet_expired()
  18:     {
  19:         int i = 0;
  20:         Func<string, Task<int>> valueFactory = (_) => Task.FromResult(Interlocked.Increment(ref i));
  21:         var subject = new AsyncCache();
  22:  
  23:         await subject.GetOrAdd("key", valueFactory);
  24:         subject.CheckItemsForExpirations();
  25:         var result = await subject.GetOrAdd("key", valueFactory);
  26:  
  27:         result.Should().Be(1);
  28:     }
  29:  
  30:  
  31:     [Fact]
  32:     public async Task Cache_does_not_store_more_values_than_it_can_support()
  33:     {
  34:         int i = 0;
  35:         int cacheSize = 3;
  36:         Func<string, Task<int>> valueFactory = (_) => Task.FromResult(Interlocked.Increment(ref i));
  37:         var subject = new AsyncCache(cacheSize);
  38:  
  39:         // Add 4 items (Access them twice to be sure they are cached)
  40:         (await subject.GetOrAdd("1", valueFactory)).Should().Be(1);
  41:         (await subject.GetOrAdd("1", valueFactory)).Should().Be(1);
  42:         (await subject.GetOrAdd("2", valueFactory)).Should().Be(2);
  43:         (await subject.GetOrAdd("2", valueFactory)).Should().Be(2);
  44:         (await subject.GetOrAdd("3", valueFactory)).Should().Be(3);
  45:         (await subject.GetOrAdd("3", valueFactory)).Should().Be(3);
  46:         (await subject.GetOrAdd("4", valueFactory)).Should().Be(4);
  47:         (await subject.GetOrAdd("4", valueFactory)).Should().Be(4);
  48:  
  49:         // trigger scavenging
  50:         subject.CheckItemsForExpirations();
  51:  
  52:         // Cache should be trimmed to the max size
  53:         subject.Count.Should().Be(cacheSize);
  54:  
  55:         // Now check the contents of the cache to be sure the oldest added item is removed first
  56:         (await subject.GetOrAdd("1", valueFactory)).Should().Be(5);
  57:         (await subject.GetOrAdd("2", valueFactory)).Should().BeOneOf(2,6); // Depending on timing of cleanup job, it's either 2 or 6
  58:         (await subject.GetOrAdd("3", valueFactory)).Should().Be(3);
  59:         (await subject.GetOrAdd("4", valueFactory)).Should().Be(4);
  60:     }
  61:  
  62:     [Fact]
  63:     public async Task Cache_is_LRU()
  64:     {
  65:         int i = 0;
  66:         int cacheSize = 3;
  67:         Func<string, Task<int>> valueFactory = (_) => Task.FromResult(Interlocked.Increment(ref i));
  68:         var subject = new AsyncCache(cacheSize);
  69:  
  70:         // Add 4 items (Access them twice to be sure they are cached)
  71:         await subject.GetOrAdd("1", valueFactory);
  72:         await Task.Delay(TimeSpan.FromMilliseconds(1));
  73:         await subject.GetOrAdd("2", valueFactory);
  74:         await Task.Delay(TimeSpan.FromMilliseconds(1));
  75:         await subject.GetOrAdd("3", valueFactory);
  76:         await Task.Delay(TimeSpan.FromMilliseconds(1));
  77:  
  78:         // Access first item again.. this should mark it as being accessed
  79:         (await subject.GetOrAdd("1", valueFactory)).Should().Be(1);
  80:  
  81:         // Now add one too many which triggers a cleanup
  82:         (await subject.GetOrAdd("4", valueFactory)).Should().Be(4);
  83:  
  84:         // Trigger scavenging 
  85:         subject.CheckItemsForExpirations();
  86:  
  87:         // Cache should be trimmed to the max size
  88:         subject.Count.Should().Be(cacheSize);
  89:  
  90:         // Now check the contents of the cache to be sure the one least accesseditem is removed first
  91:         (await subject.GetOrAdd("1", valueFactory)).Should().Be(1);
  92:         (await subject.GetOrAdd("2", valueFactory)).Should().Be(5);
  93:         (await subject.GetOrAdd("3", valueFactory)).Should().Be(3);
  94:         (await subject.GetOrAdd("4", valueFactory)).Should().Be(4);
  95:     }
  96:  
  97:     [Fact]
  98:     public async Task Cache_removes_items_after_expiration()
  99:     {
 100:         int i = 0;
 101:         Func<string, Task<int>> valueFactory = (_) => Task.FromResult(Interlocked.Increment(ref i));
 102:         var subject = new AsyncCache(3, cacheCheckInterval: TimeSpan.FromMilliseconds(1));
 103:  
 104:         // Add more than 
 105:         await subject.GetOrAdd("1", valueFactory, DateTime.UtcNow.AddMilliseconds(500));
 106:  
 107:         (await subject.GetOrAdd("1", valueFactory)).Should().Be(1);
 108:  
 109:         await Task.Delay(TimeSpan.FromSeconds(1));
 110:  
 111:         (await subject.GetOrAdd("1", valueFactory)).Should().Be(2);
 112:     }
 113:  
 114:  
 115:     [Fact]
 116:     public async Task CacheDoesNotUpdateAutomatically()
 117:     {
 118:         int i = 3;
 119:         var waitForTaskStart = new TaskCompletionSource<bool>();
 120:         var waitForTaskComplete = new TaskCompletionSource<bool>();
 121:         Func<string, Task<int>> valueFactory = async (_) =>
 122:         {
 123:             Interlocked.Increment(ref i);
 124:             await waitForTaskComplete.Task;
 125:             return Interlocked.Increment(ref i);
 126:         };
 127:  
 128:         var subject = new AsyncCache();
 129:  
 130:         int result = 0;
 131:         var task1 = Task.Run(async () =>
 132:         {
 133:             await waitForTaskStart.Task;
 134:             await subject.GetOrAdd("key", valueFactory);
 135:         });
 136:         var task2 = Task.Run(async () =>
 137:         {
 138:             await waitForTaskStart.Task;
 139:             await subject.GetOrAdd("key", valueFactory);
 140:         });
 141:         var task3 = Task.Run(async () =>
 142:         {
 143:             await waitForTaskStart.Task;
 144:             result = await subject.GetOrAdd("key", valueFactory);
 145:         });
 146:             
 147:         waitForTaskStart.SetResult(true);
 148:         waitForTaskComplete.SetResult(true);
 149:  
 150:         await Task.WhenAll(task1, task2, task3);
 151:  
 152:         result.Should().Be(5);
 153:     }
 154:  
 155: }