Friday, August 28, 2015

Cache asynchronous results

I love working with async / await in c#. Once you get used to it, and know a bit of the quirks, it becomes second nature. I’m finding that all my methods become async.. As terry prattchet used to say.. It’s Turtles (or in this case async) all the way down.

So I have nice asynchronous controllers, nice asynchronous methods to do data access.. And then I wanted to do caching today. But to my surprise there is no async cache. Ok, I’ve found an async cache in the paralell extensions. While it certainly allows me to cache async responses, I was missing some features I’m used to in a normal caching implementation, such as cache expiration or LRU expiration.

So I decided to steal copy the source code from the parallel extensions and add some expiration logic to it. Here’s the result. Hope you like it. Oh.. the logging implementation is LibLog.

   1: /// <summary>Caches asynchronously retrieved data.</summary>
   2: [DebuggerDisplay("Count={Count}")]
   3: public class AsyncCache : IEnumerable<AsyncCache.CachedItem>
   4: {
   5:     private static ILog s_log = LogProvider.GetCurrentClassLogger();
   6:     private readonly int _maxItems;
   7:     private readonly Timer _checkTimer;
   8:     private readonly TimeSpan _defaultLifeTime;
   9:  
  10:     /// <summary>The dictionary to store all of the tasks.</summary>
  11:     private readonly ConcurrentDictionary<string, CachedItem> _map;
  12:  
  13:     /// <summary>Initializes the cache.</summary>
  14:     public AsyncCache(int maxItems = 1000, TimeSpan? defaultLifeTime = null, TimeSpan? cacheCheckInterval = null)
  15:     {
  16:         _maxItems = maxItems;
  17:         _defaultLifeTime = defaultLifeTime ?? TimeSpan.FromHours(1);
  18:         _map = new ConcurrentDictionary<string, CachedItem>();
  19:         _checkTimer = new Timer((cacheCheckInterval ?? TimeSpan.FromSeconds(2)).TotalMilliseconds);
  20:         _checkTimer.Elapsed += CheckItemsForExpirations;
  21:         _checkTimer.Start();
  22:     }
  23:  
  24:     internal void CheckItemsForExpirations(object state = null, EventArgs args = null)
  25:     {
  26:         try
  27:         {
  28:             DateTime now = DateTime.UtcNow;
  29:  
  30:             foreach(var item in this.Where(x => x.ExpireAfter <= now).ToList())
  31:             {
  32:                 ExpireValue(item, "removeDate expired");
  33:             }
  34:  
  35:             if(this.Count > _maxItems)
  36:             {
  37:                 var itemsToRemove = this.OrderByDescending(x => x.LastAccessed)
  38:                     .Skip(_maxItems)
  39:                     .ToList();
  40:  
  41:                 foreach(var item in itemsToRemove)
  42:                 {
  43:                     ExpireValue(item, "removeDate expired");
  44:                 }
  45:             }
  46:         }
  47:         catch (Exception ex)
  48:         {
  49:             s_log.WarnException("Failed to clear cache", ex);
  50:         }
  51:  
  52:     }
  53:  
  54:     private void ExpireValue(CachedItem item, string reason)
  55:     {
  56:         CachedItem removedItem;
  57:         _map.TryRemove(item.Key, out removedItem);
  58:         s_log.InfoFormat("Expired item with key {itemKey} and expiration Date {expire} from cache due to {reason}", item.Key, item.ExpireAfter, reason);
  59:     }
  60:  
  61:     /// <summary>Gets the number of items in the cache.</summary>
  62:     public int Count
  63:     {
  64:         get { return _map.Count; }
  65:     }
  66:  
  67:     /// <summary>Gets an enumerator for the contents of the cache.</summary>
  68:     /// <returns>An enumerator for the contents of the cache.</returns>
  69:     public IEnumerator<CachedItem> GetEnumerator()
  70:     {
  71:         return _map.Values.GetEnumerator();
  72:     }
  73:  
  74:     /// <summary>Gets an enumerator for the contents of the cache.</summary>
  75:     /// <returns>An enumerator for the contents of the cache.</returns>
  76:     IEnumerator IEnumerable.GetEnumerator()
  77:     {
  78:         return GetEnumerator();
  79:     }
  80:  
  81:     /// <summary>Gets a Task to retrieve the value for the specified key.</summary>
  82:     /// <param name="key">The key whose value should be retrieved.</param>
  83:     /// <param name="valueFactory">A factory for producing the cache's values.</param>
  84:     /// <param name="expireAfter">When does the cached item expire</param>
  85:     /// <returns>A Task for the value of the specified key.</returns>
  86:     public async Task<TValue> GetOrAdd<TValue>(string key, Func<string, Task<TValue>> valueFactory, 
  87:         DateTime? expireAfter = null)
  88:     {
  89:         if(key == null)
  90:         {
  91:             throw new ArgumentNullException("key");
  92:         }
  93:         if(valueFactory == null)
  94:         {
  95:             throw new ArgumentNullException("valueFactory");
  96:         }
  97:  
  98:         Func<string, Task<TValue>> loggingValuefactory = async (k) =>
  99:         {
 100:             var v = await valueFactory(k);
 101:             s_log.InfoFormat("Created cached value for key {0}", key);
 102:             return v;
 103:         };
 104:  
 105:  
 106:         var cachedItem = _map.GetOrAdd(key, (k) =>
 107:         {
 108:             return new CachedItem(k,
 109:                 async () => await loggingValuefactory(k),
 110:                 expireAfter ?? DateTime.UtcNow.Add(_defaultLifeTime));
 111:         });
 112:  
 113:         var value = (await cachedItem.Value.Value);
 114:  
 115:         cachedItem.LastAccessed = DateTime.UtcNow;
 116:         if (value == null)
 117:         s_log.InfoFormat("Returned cached value for key {key}", key);
 118:  
 119:         if(Equals(value, default(TValue)))
 120:             return default(TValue);
 121:  
 122:         return (TValue) value;
 123:     }
 124:  
 125:     /// <summary>Sets the value for the specified key.</summary>
 126:     /// <param name="key">The key whose value should be set.</param>
 127:     /// <param name="value">The value to which the key should be set.</param>
 128:     public void SetValue(string key, object value)
 129:     {
 130:         SetValue(key, Task.Factory.FromResult(value));
 131:     }
 132:  
 133:     /// <summary>Sets the value for the specified key.</summary>
 134:     /// <param name="key">The key whose value should be set.</param>
 135:     /// <param name="asyncValue">The value to which the key should be set.</param>
 136:     public void SetValue(string key, Task<object> asyncValue, DateTime? expireAfter = null)
 137:     {
 138:         if(key == null)
 139:         {
 140:             throw new ArgumentNullException("key");
 141:         }
 142:         Func<Task<object>> func = async () =>
 143:         {
 144:             var value = await asyncValue;
 145:             s_log.InfoFormat("Created cached value for key {0}", key);
 146:             return value;
 147:         };
 148:         var cachedItem = new CachedItem(key, func, expireAfter ?? DateTime.UtcNow.Add(_defaultLifeTime));
 149:         _map[key] = cachedItem;
 150:     }
 151:  
 152:     /// <summary>Empties the cache.</summary>
 153:     public void Clear()
 154:     {
 155:         _map.Clear();
 156:     }
 157:  
 158:     public class CachedItem
 159:     {
 160:         public CachedItem(string key, Func<Task<object>> value, DateTime expireAfter)
 161:         {
 162:             Key = key;
 163:             Value = new Lazy<Task<object>>(value);
 164:             ExpireAfter = expireAfter;
 165:             LastAccessed = DateTime.UtcNow;
 166:         }
 167:  
 168:         private Lazy<Task<object>> _value;
 169:         public string Key { get; private set; }
 170:  
 171:         public Lazy<Task<object>> Value
 172:         {
 173:             get { return _value; }
 174:             set
 175:             {
 176:                 LastAccessed = DateTime.UtcNow;
 177:                 _value = value;
 178:             }
 179:         }
 180:  
 181:         public DateTime ExpireAfter { get; private set; }
 182:         public DateTime LastAccessed { get; set; }
 183:             
 184:     }
 185: }

 

I did write some tests to prove to myself that it works, using XUnit and Fluent Assertions:


   1: public class AsyncCacheTest
   2: {
   3:     [Fact]
   4:     public async Task CacheCanStoreAsyncValues()
   5:     {
   6:         Func<string, Task<int>> valueFactory = (_) => Task.FromResult(3);
   7:  
   8:         var subject = new AsyncCache();
   9:  
  10:         var result = await subject.GetOrAdd("key", valueFactory);
  11:  
  12:         result.Should().Be(3);
  13:  
  14:     }
  15:  
  16:     [Fact]
  17:     public async Task Cache_is_not_cleared_when_items_are_not_yet_expired()
  18:     {
  19:         int i = 0;
  20:         Func<string, Task<int>> valueFactory = (_) => Task.FromResult(Interlocked.Increment(ref i));
  21:         var subject = new AsyncCache();
  22:  
  23:         await subject.GetOrAdd("key", valueFactory);
  24:         subject.CheckItemsForExpirations();
  25:         var result = await subject.GetOrAdd("key", valueFactory);
  26:  
  27:         result.Should().Be(1);
  28:     }
  29:  
  30:  
  31:     [Fact]
  32:     public async Task Cache_does_not_store_more_values_than_it_can_support()
  33:     {
  34:         int i = 0;
  35:         int cacheSize = 3;
  36:         Func<string, Task<int>> valueFactory = (_) => Task.FromResult(Interlocked.Increment(ref i));
  37:         var subject = new AsyncCache(cacheSize);
  38:  
  39:         // Add 4 items (Access them twice to be sure they are cached)
  40:         (await subject.GetOrAdd("1", valueFactory)).Should().Be(1);
  41:         (await subject.GetOrAdd("1", valueFactory)).Should().Be(1);
  42:         (await subject.GetOrAdd("2", valueFactory)).Should().Be(2);
  43:         (await subject.GetOrAdd("2", valueFactory)).Should().Be(2);
  44:         (await subject.GetOrAdd("3", valueFactory)).Should().Be(3);
  45:         (await subject.GetOrAdd("3", valueFactory)).Should().Be(3);
  46:         (await subject.GetOrAdd("4", valueFactory)).Should().Be(4);
  47:         (await subject.GetOrAdd("4", valueFactory)).Should().Be(4);
  48:  
  49:         // trigger scavenging
  50:         subject.CheckItemsForExpirations();
  51:  
  52:         // Cache should be trimmed to the max size
  53:         subject.Count.Should().Be(cacheSize);
  54:  
  55:         // Now check the contents of the cache to be sure the oldest added item is removed first
  56:         (await subject.GetOrAdd("1", valueFactory)).Should().Be(5);
  57:         (await subject.GetOrAdd("2", valueFactory)).Should().BeOneOf(2,6); // Depending on timing of cleanup job, it's either 2 or 6
  58:         (await subject.GetOrAdd("3", valueFactory)).Should().Be(3);
  59:         (await subject.GetOrAdd("4", valueFactory)).Should().Be(4);
  60:     }
  61:  
  62:     [Fact]
  63:     public async Task Cache_is_LRU()
  64:     {
  65:         int i = 0;
  66:         int cacheSize = 3;
  67:         Func<string, Task<int>> valueFactory = (_) => Task.FromResult(Interlocked.Increment(ref i));
  68:         var subject = new AsyncCache(cacheSize);
  69:  
  70:         // Add 4 items (Access them twice to be sure they are cached)
  71:         await subject.GetOrAdd("1", valueFactory);
  72:         await Task.Delay(TimeSpan.FromMilliseconds(1));
  73:         await subject.GetOrAdd("2", valueFactory);
  74:         await Task.Delay(TimeSpan.FromMilliseconds(1));
  75:         await subject.GetOrAdd("3", valueFactory);
  76:         await Task.Delay(TimeSpan.FromMilliseconds(1));
  77:  
  78:         // Access first item again.. this should mark it as being accessed
  79:         (await subject.GetOrAdd("1", valueFactory)).Should().Be(1);
  80:  
  81:         // Now add one too many which triggers a cleanup
  82:         (await subject.GetOrAdd("4", valueFactory)).Should().Be(4);
  83:  
  84:         // Trigger scavenging 
  85:         subject.CheckItemsForExpirations();
  86:  
  87:         // Cache should be trimmed to the max size
  88:         subject.Count.Should().Be(cacheSize);
  89:  
  90:         // Now check the contents of the cache to be sure the one least accesseditem is removed first
  91:         (await subject.GetOrAdd("1", valueFactory)).Should().Be(1);
  92:         (await subject.GetOrAdd("2", valueFactory)).Should().Be(5);
  93:         (await subject.GetOrAdd("3", valueFactory)).Should().Be(3);
  94:         (await subject.GetOrAdd("4", valueFactory)).Should().Be(4);
  95:     }
  96:  
  97:     [Fact]
  98:     public async Task Cache_removes_items_after_expiration()
  99:     {
 100:         int i = 0;
 101:         Func<string, Task<int>> valueFactory = (_) => Task.FromResult(Interlocked.Increment(ref i));
 102:         var subject = new AsyncCache(3, cacheCheckInterval: TimeSpan.FromMilliseconds(1));
 103:  
 104:         // Add more than 
 105:         await subject.GetOrAdd("1", valueFactory, DateTime.UtcNow.AddMilliseconds(500));
 106:  
 107:         (await subject.GetOrAdd("1", valueFactory)).Should().Be(1);
 108:  
 109:         await Task.Delay(TimeSpan.FromSeconds(1));
 110:  
 111:         (await subject.GetOrAdd("1", valueFactory)).Should().Be(2);
 112:     }
 113:  
 114:  
 115:     [Fact]
 116:     public async Task CacheDoesNotUpdateAutomatically()
 117:     {
 118:         int i = 3;
 119:         var waitForTaskStart = new TaskCompletionSource<bool>();
 120:         var waitForTaskComplete = new TaskCompletionSource<bool>();
 121:         Func<string, Task<int>> valueFactory = async (_) =>
 122:         {
 123:             Interlocked.Increment(ref i);
 124:             await waitForTaskComplete.Task;
 125:             return Interlocked.Increment(ref i);
 126:         };
 127:  
 128:         var subject = new AsyncCache();
 129:  
 130:         int result = 0;
 131:         var task1 = Task.Run(async () =>
 132:         {
 133:             await waitForTaskStart.Task;
 134:             await subject.GetOrAdd("key", valueFactory);
 135:         });
 136:         var task2 = Task.Run(async () =>
 137:         {
 138:             await waitForTaskStart.Task;
 139:             await subject.GetOrAdd("key", valueFactory);
 140:         });
 141:         var task3 = Task.Run(async () =>
 142:         {
 143:             await waitForTaskStart.Task;
 144:             result = await subject.GetOrAdd("key", valueFactory);
 145:         });
 146:             
 147:         waitForTaskStart.SetResult(true);
 148:         waitForTaskComplete.SetResult(true);
 149:  
 150:         await Task.WhenAll(task1, task2, task3);
 151:  
 152:         result.Should().Be(5);
 153:     }
 154:  
 155: }