Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nmt download #164

Merged
merged 3 commits into from
Feb 9, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,21 @@ private static IMachineBuilder AddHangfireBuildJobRunner(this IMachineBuilder bu
return builder;
}

private static MongoStorageOptions GetMongoStorageOptions()
{
var mongoStorageOptions = new MongoStorageOptions
{
MigrationOptions = new MongoMigrationOptions
{
MigrationStrategy = new MigrateMongoMigrationStrategy(),
BackupStrategy = new CollectionMongoBackupStrategy()
},
CheckConnection = true,
CheckQueuedJobsStrategy = CheckQueuedJobsStrategy.TailNotificationsCollection,
};
return mongoStorageOptions;
}

public static IMachineBuilder AddMongoHangfireJobClient(
this IMachineBuilder builder,
string? connectionString = null
Expand All @@ -164,19 +179,7 @@ public static IMachineBuilder AddMongoHangfireJobClient(
c.SetDataCompatibilityLevel(CompatibilityLevel.Version_170)
.UseSimpleAssemblyNameTypeSerializer()
.UseRecommendedSerializerSettings()
.UseMongoStorage(
connectionString,
new MongoStorageOptions
{
MigrationOptions = new MongoMigrationOptions
{
MigrationStrategy = new MigrateMongoMigrationStrategy(),
BackupStrategy = new CollectionMongoBackupStrategy()
},
CheckConnection = true,
CheckQueuedJobsStrategy = CheckQueuedJobsStrategy.TailNotificationsCollection,
}
)
.UseMongoStorage(connectionString, GetMongoStorageOptions())
.UseFilter(new AutomaticRetryAttribute { Attempts = 0 })
);
builder.Services.AddHealthChecks().AddCheck<HangfireHealthCheck>(name: "Hangfire");
Expand Down Expand Up @@ -402,6 +405,12 @@ public static IMachineBuilder AddBuildJobService(this IMachineBuilder builder)
return builder;
}

public static IMachineBuilder AddModelCleanupService(this IMachineBuilder builder)
{
builder.Services.AddHostedService<ModelCleanupService>();
return builder;
}

private static IMachineBuilder AddBuildJobService(this IMachineBuilder builder, BuildJobOptions options)
{
builder.Services.AddScoped<IBuildJobService, BuildJobService>();
Expand Down
11 changes: 11 additions & 0 deletions src/SIL.Machine.AspNetCore/Models/ModelDownloadUrl.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
using System;

namespace SIL.Machine.AspNetCore.Models
{
public class ModelDownloadUrl
{
public string Url { get; set; } = default!;
public int ModelRevision { get; set; } = default!;
public DateTime ExipiresAt { get; set; } = default!;
}
}
1 change: 1 addition & 0 deletions src/SIL.Machine.AspNetCore/Models/TranslationEngine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ public class TranslationEngine : IEntity
public string EngineId { get; set; } = default!;
public string SourceLanguage { get; set; } = default!;
public string TargetLanguage { get; set; } = default!;
public bool IsModelPersisted { get; set; }
public int BuildRevision { get; set; }
public Build? CurrentBuild { get; set; }
}
12 changes: 3 additions & 9 deletions src/SIL.Machine.AspNetCore/Services/HangfireHealthCheck.cs
Original file line number Diff line number Diff line change
@@ -1,15 +1,9 @@
namespace SIL.Machine.AspNetCore.Services;

public class HangfireHealthCheck : IHealthCheck
public class HangfireHealthCheck(JobStorage jobStorage, IOptions<BackgroundJobServerOptions> options) : IHealthCheck
{
private readonly JobStorage _jobStorage;
private readonly IOptions<BackgroundJobServerOptions> _options;

public HangfireHealthCheck(JobStorage jobStorage, IOptions<BackgroundJobServerOptions> options)
{
_jobStorage = jobStorage;
_options = options;
}
private readonly JobStorage _jobStorage = jobStorage;
private readonly IOptions<BackgroundJobServerOptions> _options = options;

public Task<HealthCheckResult> CheckHealthAsync(
HealthCheckContext context,
Expand Down
2 changes: 2 additions & 0 deletions src/SIL.Machine.AspNetCore/Services/IFileStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,7 @@ Task<IReadOnlyCollection<string>> ListFilesAsync(

Task<Stream> OpenWriteAsync(string path, CancellationToken cancellationToken = default);

Task<string> GetDownloadUrlAsync(string path, DateTime expiresAt, CancellationToken cancellationToken = default);

Task DeleteAsync(string path, bool recurse = false, CancellationToken cancellationToken = default);
}
8 changes: 8 additions & 0 deletions src/SIL.Machine.AspNetCore/Services/ISharedFileService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,14 @@ public interface ISharedFileService

Uri GetResolvedUri(string path);

Task<string> GetDownloadUrlAsync(string path, DateTime expiresAt);

Task<IReadOnlyCollection<string>> ListFilesAsync(
string path,
bool recurse = false,
CancellationToken cancellationToken = default
);

Task<Stream> OpenReadAsync(string path, CancellationToken cancellationToken = default);

Task<Stream> OpenWriteAsync(string path, CancellationToken cancellationToken = default);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ public interface ITranslationEngineService
{
TranslationEngineType Type { get; }

Task CreateAsync(
Task<TranslationEngine> CreateAsync(
string engineId,
string? engineName,
string sourceLanguage,
string targetLanguage,
bool? isModelPersisted = null,
CancellationToken cancellationToken = default
);
Task DeleteAsync(string engineId, CancellationToken cancellationToken = default);
Expand Down Expand Up @@ -40,6 +41,8 @@ Task StartBuildAsync(

Task CancelBuildAsync(string engineId, CancellationToken cancellationToken = default);

Task<ModelDownloadUrl> GetModelDownloadUrlAsync(string engineId, CancellationToken cancellationToken = default);

Task<int> GetQueueSizeAsync(CancellationToken cancellationToken = default);

bool IsLanguageNativeToModel(string language, out string internalCode);
Expand Down
9 changes: 9 additions & 0 deletions src/SIL.Machine.AspNetCore/Services/InMemoryStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,15 @@ public Task<IReadOnlyCollection<string>> ListFilesAsync(
);
}

public Task<string> GetDownloadUrlAsync(
string path,
DateTime expiresAt,
CancellationToken cancellationToken = default
)
{
throw new NotSupportedException();
}

public Task<Stream> OpenReadAsync(string path, CancellationToken cancellationToken = default)
{
if (!_memoryStreams.TryGetValue(Normalize(path), out Entry? ret))
Expand Down
9 changes: 9 additions & 0 deletions src/SIL.Machine.AspNetCore/Services/LocalStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,15 @@ public Task<IReadOnlyCollection<string>> ListFilesAsync(
);
}

public Task<string> GetDownloadUrlAsync(
string path,
DateTime expiresAt,
CancellationToken cancellationToken = default
)
{
throw new NotSupportedException();
}

public Task<Stream> OpenReadAsync(string path, CancellationToken cancellationToken = default)
{
Uri pathUri = new(_basePath, Normalize(path));
Expand Down
61 changes: 61 additions & 0 deletions src/SIL.Machine.AspNetCore/Services/ModelCleanupService.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
namespace SIL.Machine.AspNetCore.Services;

public class ModelCleanupService(
IServiceProvider services,
ISharedFileService sharedFileService,
IRepository<TranslationEngine> engines,
ILogger<ModelCleanupService> logger
) : RecurrentTask("Model Cleanup Service", services, RefreshPeriod, logger)
{
private ISharedFileService _sharedFileService = sharedFileService;
private ILogger<ModelCleanupService> _logger = logger;
private IRepository<TranslationEngine> _engines = engines;
private List<string> _filesPreviouslyMarkedForDeletion = [];
private readonly List<string> _filesNewlyMarkedForDeletion = [];
private static readonly TimeSpan RefreshPeriod = TimeSpan.FromDays(1);

protected override async Task DoWorkAsync(IServiceScope scope, CancellationToken cancellationToken)
{
await CheckModelsAsync(cancellationToken);
}

private async Task CheckModelsAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("Running model cleanup job");
IReadOnlyCollection<string> paths = await _sharedFileService.ListFilesAsync(
NmtEngineService.ModelDirectory,
cancellationToken: cancellationToken
);
// Get all engine ids from the database
IReadOnlyList<TranslationEngine>? allEngines = await _engines.GetAllAsync(cancellationToken: cancellationToken);
IEnumerable<string> validFilenames = allEngines.Select(e =>
NmtEngineService.GetModelPath(e.EngineId, e.BuildRevision)
);
// If there is a currently running build that creates and pushes a new file, but the database has not
// updated yet, don't delete the new file.
IEnumerable<string> validFilenamesForNextBuild = allEngines.Select(e =>
NmtEngineService.GetModelPath(e.EngineId, e.BuildRevision + 1)
);
HashSet<string> filenameFilter = validFilenames.Concat(validFilenamesForNextBuild).ToHashSet();

foreach (string path in paths)
{
if (!filenameFilter.Contains(path))
{
await DeleteFileAsync(
path,
$"file in S3 bucket not found in database. It may be an old rev, etc.",
cancellationToken
);
}
}
}

private async Task DeleteFileAsync(string path, string message, CancellationToken cancellationToken = default)
{
// This may delete a file while it is being downloaded, but the chance is rare
// enough and the solution easy enough (just download again) to just live with it.
_logger.LogInformation("Deleting old model file {filename}: {message}", path, message);
await _sharedFileService.DeleteAsync(path, cancellationToken);
}
}
31 changes: 13 additions & 18 deletions src/SIL.Machine.AspNetCore/Services/NmtClearMLBuildJobFactory.cs
Original file line number Diff line number Diff line change
@@ -1,24 +1,16 @@
namespace SIL.Machine.AspNetCore.Services;

public class NmtClearMLBuildJobFactory : IClearMLBuildJobFactory
public class NmtClearMLBuildJobFactory(
ISharedFileService sharedFileService,
ILanguageTagService languageTagService,
IRepository<TranslationEngine> engines,
IOptionsMonitor<ClearMLOptions> options
) : IClearMLBuildJobFactory
{
private readonly ISharedFileService _sharedFileService;
private readonly ILanguageTagService _languageTagService;
private readonly IRepository<TranslationEngine> _engines;
private readonly IOptionsMonitor<ClearMLOptions> _options;

public NmtClearMLBuildJobFactory(
ISharedFileService sharedFileService,
ILanguageTagService languageTagService,
IRepository<TranslationEngine> engines,
IOptionsMonitor<ClearMLOptions> options
)
{
_sharedFileService = sharedFileService;
_languageTagService = languageTagService;
_engines = engines;
_options = options;
}
private readonly ISharedFileService _sharedFileService = sharedFileService;
private readonly ILanguageTagService _languageTagService = languageTagService;
private readonly IRepository<TranslationEngine> _engines = engines;
private readonly IOptionsMonitor<ClearMLOptions> _options = options;

public TranslationEngineType EngineType => TranslationEngineType.Nmt;

Expand Down Expand Up @@ -52,6 +44,9 @@ public async Task<string> CreateJobScriptAsync(
+ $" 'shared_file_uri': '{baseUri}',\n"
+ $" 'shared_file_folder': '{folder}',\n"
+ (buildOptions is not null ? $" 'build_options': '''{buildOptions}''',\n" : "")
// buildRevision + 1 because the build revision is incremented after the build job
// is finished successfully but the file should be saved with the new revision number
+ (engine.IsModelPersisted ? $" 'save_model': '{engineId}_{engine.BuildRevision + 1}',\n" : $"")
+ $" 'clearml': True\n"
+ "}\n"
+ "run(args)\n";
Expand Down
75 changes: 62 additions & 13 deletions src/SIL.Machine.AspNetCore/Services/NmtEngineService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,44 +14,56 @@ public class NmtEngineService(
IRepository<TranslationEngine> engines,
IBuildJobService buildJobService,
ILanguageTagService languageTagService,
ClearMLMonitorService clearMLMonitorService
ClearMLMonitorService clearMLMonitorService,
ISharedFileService sharedFileService
) : ITranslationEngineService
{
private readonly IDistributedReaderWriterLockFactory _lockFactory = lockFactory;
private readonly IPlatformService _platformService = platformService;
private readonly IDataAccessContext _dataAccessContext = dataAccessContext;
private readonly IRepository<TranslationEngine> _engines = engines;
private readonly IBuildJobService _buildJobService = buildJobService;
private readonly ILanguageTagService _languageTagService = languageTagService;
private readonly ClearMLMonitorService _clearMLMonitorService = clearMLMonitorService;
private readonly ILanguageTagService _languageTagService = languageTagService;
private readonly ISharedFileService _sharedFileService = sharedFileService;

public const string ModelDirectory = "models/";

public static string GetModelPath(string engineId, int buildRevision)
{
return $"{ModelDirectory}{engineId}_{buildRevision}.tar.gz";
}

public TranslationEngineType Type => TranslationEngineType.Nmt;

public async Task CreateAsync(
private const int MinutesToExpire = 60;

public async Task<TranslationEngine> CreateAsync(
string engineId,
string? engineName,
string sourceLanguage,
string targetLanguage,
bool? isModelPersisted = null,
CancellationToken cancellationToken = default
)
{
await _dataAccessContext.BeginTransactionAsync(cancellationToken);
await _engines.InsertAsync(
new TranslationEngine
{
EngineId = engineId,
SourceLanguage = sourceLanguage,
TargetLanguage = targetLanguage
},
cancellationToken
);
var translationEngine = new TranslationEngine
{
EngineId = engineId,
SourceLanguage = sourceLanguage,
TargetLanguage = targetLanguage,
IsModelPersisted = isModelPersisted ?? false // models are not persisted if not specified
};
await _engines.InsertAsync(translationEngine, cancellationToken);
await _buildJobService.CreateEngineAsync(
new[] { BuildJobType.Cpu, BuildJobType.Gpu },
[BuildJobType.Cpu, BuildJobType.Gpu],
engineId,
engineName,
cancellationToken
);
await _dataAccessContext.CommitTransactionAsync(CancellationToken.None);
return translationEngine;
}

public async Task DeleteAsync(string engineId, CancellationToken cancellationToken = default)
Expand Down Expand Up @@ -109,6 +121,35 @@ public async Task CancelBuildAsync(string engineId, CancellationToken cancellati
}
}

public async Task<ModelDownloadUrl> GetModelDownloadUrlAsync(
string engineId,
CancellationToken cancellationToken = default
)
{
TranslationEngine engine = await GetEngineAsync(engineId, cancellationToken);
if (engine.IsModelPersisted != true)
throw new NotSupportedException(
"The model cannot be downloaded. "
+ "To enable downloading the model, recreate the engine with IsModelPersisted property to true."
);
if (engine.BuildRevision == 0)
throw new InvalidOperationException("The engine has not been built yet.");
string filepath = GetModelPath(engineId, engine.BuildRevision);
bool fileExists = await _sharedFileService.ExistsAsync(filepath, cancellationToken);
if (!fileExists)
throw new FileNotFoundException(
$"The model should exist to be downloaded but is not there for BuildRevision {engine.BuildRevision}."
);
var expiresAt = DateTime.UtcNow.AddMinutes(MinutesToExpire);
var modelInfo = new ModelDownloadUrl
{
Url = await _sharedFileService.GetDownloadUrlAsync(filepath, expiresAt),
ModelRevision = engine.BuildRevision,
ExipiresAt = expiresAt
};
return modelInfo;
}

public Task<IReadOnlyList<TranslationResult>> TranslateAsync(
string engineId,
int n,
Expand Down Expand Up @@ -159,4 +200,12 @@ private async Task<bool> CancelBuildJobAsync(string engineId, CancellationToken
await _platformService.BuildCanceledAsync(buildId, CancellationToken.None);
return buildId is not null;
}

private async Task<TranslationEngine> GetEngineAsync(string engineId, CancellationToken cancellationToken)
{
TranslationEngine? engine = await _engines.GetAsync(e => e.EngineId == engineId, cancellationToken);
if (engine is null)
throw new InvalidOperationException($"The engine {engineId} does not exist.");
return engine;
}
}
Loading
Loading