Skip to content

Commit

Permalink
improve code quality for Sql Server Broker provider
Browse files Browse the repository at this point in the history
  • Loading branch information
danielgerlag committed Apr 22, 2018
1 parent 3d6a281 commit fcc8d9e
Show file tree
Hide file tree
Showing 24 changed files with 320 additions and 543 deletions.
10 changes: 7 additions & 3 deletions src/WorkflowCore/Models/WorkflowOptions.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using Microsoft.Extensions.DependencyInjection;
using System;
using WorkflowCore.Interface;
using WorkflowCore.Services;

Expand All @@ -11,10 +12,13 @@ public class WorkflowOptions
internal Func<IServiceProvider, IDistributedLockProvider> LockFactory;
internal TimeSpan PollInterval;
internal TimeSpan IdleTime;
internal TimeSpan ErrorRetryInterval;
internal TimeSpan ErrorRetryInterval;

public WorkflowOptions()
public IServiceCollection Services { get; private set; }

public WorkflowOptions(IServiceCollection services)
{
Services = services;
PollInterval = TimeSpan.FromSeconds(10);
IdleTime = TimeSpan.FromMilliseconds(100);
ErrorRetryInterval = TimeSpan.FromSeconds(60);
Expand Down
2 changes: 1 addition & 1 deletion src/WorkflowCore/ServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public static void AddWorkflow(this IServiceCollection services, Action<Workflow
if (services.Any(x => x.ServiceType == typeof(WorkflowOptions)))
throw new InvalidOperationException("Workflow services already registered");

var options = new WorkflowOptions();
var options = new WorkflowOptions(services);
setupAction?.Invoke(options);
services.AddTransient<IPersistenceProvider>(options.PersistanceFactory);
services.AddSingleton<IQueueProvider>(options.QueueFactory);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#region using

using System;
using System.Linq;
using WorkflowCore.Interface;
using WorkflowCore.QueueProviders.SqlServer.Models;

#endregion

namespace WorkflowCore.QueueProviders.SqlServer.Interfaces
{
public interface IQueueConfigProvider
{
QueueConfig GetByQueue(QueueType queue);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#region using

using System;
using System.Collections.Generic;
using System.Data;
using System.Data.Common;
using System.Data.SqlClient;
using System.Linq;

#endregion

namespace WorkflowCore.QueueProviders.SqlServer.Interfaces
{
public interface ISqlCommandExecutor
{
TResult ExecuteScalar<TResult>(IDbConnection cn, IDbTransaction tx, string cmdtext, params DbParameter[] parameters);
int ExecuteCommand(IDbConnection cn, IDbTransaction tx, string cmdtext, params DbParameter[] parameters);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace WorkflowCore.QueueProviders.SqlServer.Interfaces
{
public interface ISqlServerQueueProviderMigrator
{
void MigrateDb();
void CreateDb();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace WorkflowCore.QueueProviders.SqlServer.Models
{
public class QueueConfig
{
public QueueConfig(string msgType, string initiatorService, string targetService, string contractName, string queueName)
{
MsgType = msgType;
InitiatorService = initiatorService;
TargetService = targetService;
ContractName = contractName;
QueueName = queueName;
}

public string MsgType { get; }
public string InitiatorService { get; }
public string TargetService { get; }
public string ContractName { get; }
public string QueueName { get; }
}
}
4 changes: 2 additions & 2 deletions src/providers/WorkflowCore.QueueProviders.SqlServer/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# SQL Server Service Broker queue provider for Workflow Core

Provides distributed worker support on [Workflow Core](../../../README.md) using [SQL Server Service Broker](https://docs.microsoft.com/en-us/sql/database-engine/configure-windows/sql-server-service-broker).
Provides distributed worker support on [Workflow Core](../../../README.md) using [SQL Server Service Broker](https://docs.microsoft.com/en-us/sql/database-engine/configure-windows/sql-server-service-broker).

This makes it possible to have a cluster of nodes processing your workflows, along with a distributed lock manager.

Expand All @@ -17,7 +17,7 @@ PM> Install-Package WorkflowCore.QueueProviders.SqlServer -Pre
Use the .UseSqlServerQueue extension method when building your service provider.

```C#
services.AddWorkflow(x => x.UseSqlServerQueue(sp => new SqlServerQueueProvider(connectionString, workflowHostName, canMigrateDB));
services.AddWorkflow(x => x.UseSqlServerBroker(sp => new SqlServerQueueProvider(connectionString, workflowHostName, canMigrateDB));

```

Original file line number Diff line number Diff line change
@@ -1,49 +1,43 @@
#region using

using System;
using System.Linq;
using Microsoft.Extensions.DependencyInjection;

using WorkflowCore.Models;
using WorkflowCore.QueueProviders.SqlServer;
using WorkflowCore.QueueProviders.SqlServer.Interfaces;
using WorkflowCore.QueueProviders.SqlServer.Services;

#endregion

namespace Microsoft.Extensions.DependencyInjection
{
public static class ServiceCollectionExtensions
{
{
/// <summary>
/// Use SQL Server as a queue provider
/// </summary>
/// <param name="options"></param>
/// <param name="opt"></param>
/// <param name="connectionString"></param>
/// <returns></returns>
public static WorkflowOptions UseSqlServerBroker(this WorkflowOptions options, SqlServerQueueProviderOption opt)
public static WorkflowOptions UseSqlServerBroker(this WorkflowOptions options, string connectionString, bool canCreateDb, bool canMigrateDb)
{
options.UseQueueProvider(sp =>
options.Services.AddTransient<IQueueConfigProvider, QueueConfigProvider>();
options.Services.AddTransient<ISqlCommandExecutor, SqlCommandExecutor>();
options.Services.AddTransient<ISqlServerQueueProviderMigrator, SqlServerQueueProviderMigrator>();

var sqlOptions = new SqlServerQueueProviderOptions()
{
var names = sp.GetService<IBrokerNamesProvider>()
?? new BrokerNamesProvider(opt.WorkflowHostName);
var sqlCommandExecutor = sp.GetService<ISqlCommandExecutor>()
?? new SqlCommandExecutor();
var migrator = sp.GetService<ISqlServerQueueProviderMigrator>()
?? new SqlServerQueueProviderMigrator(opt.ConnectionString, names, sqlCommandExecutor);
ConnectionString = connectionString,
CanCreateDb = canCreateDb,
CanMigrateDb = canMigrateDb
};

return new SqlServerQueueProvider(opt, names, migrator, sqlCommandExecutor);
options.UseQueueProvider(sp =>
{
return new SqlServerQueueProvider(sqlOptions, sp.GetService<IQueueConfigProvider>(), sp.GetService<ISqlServerQueueProviderMigrator>(), sp.GetService<ISqlCommandExecutor>());
});
return options;
}

/// <summary>
/// Use SQL Server as a queue provider (use 'default' as workflowHostName)
/// </summary>
/// <param name="options"></param>
/// <param name="connectionString"></param>
/// <returns></returns>
public static WorkflowOptions UseSqlServerBroker(this WorkflowOptions options, string connectionString)
{
UseSqlServerBroker(options, new SqlServerQueueProviderOption {ConnectionString = connectionString});
return options;
}
}
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#region using

using System;
using System.Linq;
using WorkflowCore.Interface;
using WorkflowCore.QueueProviders.SqlServer.Interfaces;
using WorkflowCore.QueueProviders.SqlServer.Models;

#endregion

namespace WorkflowCore.QueueProviders.SqlServer.Services
{

/// <summary>
/// Build names for SSSB objects
/// </summary>
/// <remarks>
/// Message type and contract are global, service name and queue different for every workflow host
/// </remarks>
public class QueueConfigProvider : IQueueConfigProvider
{
private readonly QueueConfig _workflowQueueConfig;
private readonly QueueConfig _eventQueueConfig;

public QueueConfigProvider()
{
var workflowMessageType = "//workflow-core/workflow";
var eventMessageType = "//workflow-core/event";

var eventContractName = "//workflow-core/eventContract";
var workflowContractName = "//workflow-core/workflowContract";

var initiatorEventServiceName = $"//workflow-core/initiatorEventService";
var targetEventServiceName = $"//workflow-core/targetEventService";

var initiatorWorkflowServiceName = $"//workflow-core/initiatorWorkflowService";
var targetWorkflowServiceName = $"//workflow-core/targetWorkflowService";

var eventQueueName = $"//workflow-core/eventQueue";
var workflowQueueName = $"//workflow-core/workflowQueue";

_workflowQueueConfig = new QueueConfig(workflowMessageType, initiatorWorkflowServiceName, targetWorkflowServiceName, workflowContractName, eventQueueName);
_eventQueueConfig = new QueueConfig(eventMessageType, initiatorEventServiceName, targetEventServiceName, eventContractName, workflowQueueName);
}

public QueueConfig GetByQueue(QueueType queue)
{
switch (queue)
{
case QueueType.Workflow:
return _workflowQueueConfig;
case QueueType.Event:
return _eventQueueConfig;
default:
throw new ArgumentOutOfRangeException(nameof(queue), queue, null);
}
}
}
}
Loading

0 comments on commit fcc8d9e

Please sign in to comment.