Skip to content

Commit

Permalink
Adding autoflush unit tests (sebastienros#232)
Browse files Browse the repository at this point in the history
  • Loading branch information
sebastienros authored May 21, 2020
1 parent 4c5ce86 commit 4231b8c
Show file tree
Hide file tree
Showing 2 changed files with 164 additions and 35 deletions.
73 changes: 58 additions & 15 deletions src/YesSql.Core/Session.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,20 @@ public class Session : ISession
private readonly IdentityMap _identityMap = new IdentityMap();
internal readonly List<IIndexCommand> _commands = new List<IIndexCommand>();
private readonly Dictionary<IndexDescriptor, List<MapState>> _maps = new Dictionary<IndexDescriptor, List<MapState>>();

// entities that need to be created in the next flush
private readonly HashSet<object> _saved = new HashSet<object>();

// entities that already exist and need to be updated in the next flush
private readonly HashSet<object> _updated = new HashSet<object>();

// entities that are already saved or updated in a previous flush
private readonly HashSet<object> _tracked = new HashSet<object>();

// ids of entities that are checked for concurrency
private readonly HashSet<int> _concurrent = new HashSet<int>();

// entities that need to be deleted in the next flush
private readonly HashSet<object> _deleted = new HashSet<object>();
protected readonly Dictionary<string, IEnumerable<IndexDescriptor>> _descriptors = new Dictionary<string, IEnumerable<IndexDescriptor>>();
internal readonly Store _store;
Expand Down Expand Up @@ -72,12 +83,15 @@ public void Save(object entity, bool checkConcurrency = false)
{
CheckDisposed();

// already being saved or updated?
// already being saved or updated or tracked?
if (_saved.Contains(entity) || _updated.Contains(entity))
{
return;
}

// remove from tracked entities if explicitly saved
_tracked.Remove(entity);

// is it a new object?
if (_identityMap.TryGetDocumentId(entity, out var id))
{
Expand Down Expand Up @@ -132,7 +146,7 @@ public bool Import(object entity, int id = 0)
CheckDisposed();

// already known?
if (_saved.Contains(entity) || _updated.Contains(entity))
if (_identityMap.HasEntity(entity))
{
return false;
}
Expand Down Expand Up @@ -190,6 +204,7 @@ public void Detach(object entity)

_saved.Remove(entity);
_updated.Remove(entity);
_tracked.Remove(entity);
_deleted.Remove(entity);

if (_identityMap.TryGetDocumentId(entity, out var id))
Expand Down Expand Up @@ -239,7 +254,7 @@ private async Task SaveEntityAsync(object entity)
await MapNew(doc, entity);
}

private async Task UpdateEntityAsync(object entity)
private async Task UpdateEntityAsync(object entity, bool tracked)
{
if (entity == null)
{
Expand Down Expand Up @@ -274,6 +289,15 @@ private async Task UpdateEntityAsync(object entity)
}
}

var newContent = Store.Configuration.ContentSerializer.Serialize(entity);

// if the document has already been updated or saved with this session (auto or intentional flush), ensure it has
// been changed before doing another query
if (tracked && String.Equals(newContent, oldDoc.Content))
{
return;
}

long version = -1;

if (_concurrent.Contains(id))
Expand All @@ -290,12 +314,10 @@ private async Task UpdateEntityAsync(object entity)

await DemandAsync();

oldDoc.Content = Store.Configuration.ContentSerializer.Serialize(entity);
oldDoc.Content = newContent;
oldDoc.Version += 1;

_commands.Add(new UpdateDocumentCommand(oldDoc, Store.Configuration.TablePrefix, version));

_concurrent.Remove(id);
}

private async Task<Document> GetDocumentByIdAsync(int id)
Expand Down Expand Up @@ -577,9 +599,10 @@ private void ReleaseSession()
/// </summary>
private void ReleaseTransaction()
{
_updated.Clear();
_concurrent.Clear();
_saved.Clear();
_updated.Clear();
_tracked.Clear();
_deleted.Clear();
_commands.Clear();
_maps.Clear();
Expand Down Expand Up @@ -639,12 +662,21 @@ public async Task FlushAsync()

try
{
// saving all tracked entities
foreach (var obj in _tracked)
{
if (!_deleted.Contains(obj))
{
await UpdateEntityAsync(obj, true);
}
}

// saving all updated entities
foreach (var obj in _updated)
{
if (!_deleted.Contains(obj))
{
await UpdateEntityAsync(obj);
await UpdateEntityAsync(obj, false);
}
}

Expand Down Expand Up @@ -680,10 +712,22 @@ public async Task FlushAsync()
}
finally
{
_updated.Clear();
_concurrent.Clear();
// Track all saved and updated entities in case they are modified before
// CommitAsync is called
foreach(var saved in _saved)
{
_tracked.Add(saved);
}

foreach (var updated in _updated)
{
_tracked.Add(updated);
}

_saved.Clear();
_updated.Clear();
_deleted.Clear();

_commands.Clear();
_maps.Clear();
_flushing = false;
Expand Down Expand Up @@ -808,9 +852,10 @@ private void CommitTransaction()
internal bool HasWork()
{
return
_saved.Count != 0 ||
_updated.Count != 0 ||
_deleted.Count != 0
_saved.Count +
_updated.Count +
_tracked.Count +
_deleted.Count > 0
;
}

Expand Down Expand Up @@ -1122,8 +1167,6 @@ public async Task<DbTransaction> DemandAsync()
// In the case of shared connections (InMemory) this can throw as the transation
// might already be set by a concurrent thread on the same shared connection.
_transaction = _connection.BeginTransaction(_isolationLevel);

_cancel = false;
}

return _transaction;
Expand Down
126 changes: 106 additions & 20 deletions test/YesSql.Tests/CoreTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1885,6 +1885,54 @@ public async Task ChangesAreAutoFlushed()
}
}

[Fact]
public async Task AutoflushCanHappenMultipleTimes()
{
_store.RegisterIndexes<ArticleIndexProvider>();

using (var session = _store.CreateSession())
{
var d1 = new Article { PublishedUtc = new DateTime(2011, 11, 1) };
var d2 = new Article { PublishedUtc = new DateTime(2011, 11, 1) };

session.Save(d1);
session.Save(d2);

var articles = await session.Query<Article, ArticlesByDay>(x => x.DayOfYear == 305).ListAsync();

d1.PublishedUtc = new DateTime(2011, 11, 2);

articles = await session.Query<Article, ArticlesByDay>(x => x.DayOfYear == 306).ListAsync();

Assert.Single(articles);
}
}

[Fact]
public async Task ChangesAfterAutoflushAreSaved()
{
_store.RegisterIndexes<ArticleIndexProvider>();

using (var session = _store.CreateSession())
{
var d1 = new Article { PublishedUtc = new DateTime(2011, 11, 1) };
var d2 = new Article { PublishedUtc = new DateTime(2011, 11, 1) };

session.Save(d1);
session.Save(d2);

var articles = await session.Query<Article, ArticlesByDay>(x => x.DayOfYear == 305).ListAsync();

d1.PublishedUtc = new DateTime(2011, 11, 2);
}

using (var session = _store.CreateSession())
{
var articles = await session.Query<Article, ArticlesByDay>(x => x.DayOfYear == 306).ListAsync();
Assert.Single(articles);
}
}

[Fact]
public async Task ShouldOrderOnValueType()
{
Expand Down Expand Up @@ -2295,26 +2343,6 @@ public async Task ShouldIgnoreNonSerializedAttribute()
}
}

[Fact]
public async Task ShouldNotHaveWorkAfterFlush()
{
using (var session = (Session)_store.CreateSession())
{
var circle = new Circle
{
Radius = 10
};

session.Save(circle);

Assert.True(session.HasWork());

await session.FlushAsync();

Assert.False(session.HasWork());
}
}

[Fact]
public async Task ShouldGetTypeById()
{
Expand Down Expand Up @@ -2552,6 +2580,64 @@ public virtual async Task ShouldNotCommitTransaction()
}
}

[Fact]
public virtual async Task ShouldNotCreatDocumentInCanceledSessions()
{
using (var session = _store.CreateSession())
{
var circle = new Circle
{
Radius = 10
};

session.Save(circle);

session.Cancel();

circle.Radius = 20;

await session.Query().For<Circle>().CountAsync();
}

using (var session = _store.CreateSession())
{
Assert.Equal(0, await session.Query().For<Circle>().CountAsync());
}
}

[Fact]
public virtual async Task ShouldNotUpdateDocumentInCanceledSessions()
{
using (var session = _store.CreateSession())
{
var circle = new Circle
{
Radius = 10
};

session.Save(circle);
}

using (var session = _store.CreateSession())
{
session.Cancel();

var circle = await session.Query().For<Circle>().FirstOrDefaultAsync();

circle.Radius = 20;

session.Save(circle);
}

using (var session = _store.CreateSession())
{

var circle = await session.Query().For<Circle>().FirstOrDefaultAsync();

Assert.Equal(10, circle.Radius);
}
}

[Fact]
public async Task ShouldSaveChangesExplicitly()
{
Expand Down

0 comments on commit 4231b8c

Please sign in to comment.