Skip to content

Commit

Permalink
Nmt download (#164)
Browse files Browse the repository at this point in the history
* Presigned URL code
cleaning script
IsModelPersisted nullable
Return IsModelPersistedState to Serval
Check for modelRevision + 1 in cleanup and just delete without keeping internal states.

* Reviewer comments

* update to most recent api
  • Loading branch information
johnml1135 authored Feb 9, 2024
1 parent 32a8471 commit 525950f
Show file tree
Hide file tree
Showing 24 changed files with 437 additions and 105 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,21 @@ private static IMachineBuilder AddHangfireBuildJobRunner(this IMachineBuilder bu
return builder;
}

private static MongoStorageOptions GetMongoStorageOptions()
{
var mongoStorageOptions = new MongoStorageOptions
{
MigrationOptions = new MongoMigrationOptions
{
MigrationStrategy = new MigrateMongoMigrationStrategy(),
BackupStrategy = new CollectionMongoBackupStrategy()
},
CheckConnection = true,
CheckQueuedJobsStrategy = CheckQueuedJobsStrategy.TailNotificationsCollection,
};
return mongoStorageOptions;
}

public static IMachineBuilder AddMongoHangfireJobClient(
this IMachineBuilder builder,
string? connectionString = null
Expand All @@ -164,19 +179,7 @@ public static IMachineBuilder AddMongoHangfireJobClient(
c.SetDataCompatibilityLevel(CompatibilityLevel.Version_170)
.UseSimpleAssemblyNameTypeSerializer()
.UseRecommendedSerializerSettings()
.UseMongoStorage(
connectionString,
new MongoStorageOptions
{
MigrationOptions = new MongoMigrationOptions
{
MigrationStrategy = new MigrateMongoMigrationStrategy(),
BackupStrategy = new CollectionMongoBackupStrategy()
},
CheckConnection = true,
CheckQueuedJobsStrategy = CheckQueuedJobsStrategy.TailNotificationsCollection,
}
)
.UseMongoStorage(connectionString, GetMongoStorageOptions())
.UseFilter(new AutomaticRetryAttribute { Attempts = 0 })
);
builder.Services.AddHealthChecks().AddCheck<HangfireHealthCheck>(name: "Hangfire");
Expand Down Expand Up @@ -402,6 +405,12 @@ public static IMachineBuilder AddBuildJobService(this IMachineBuilder builder)
return builder;
}

public static IMachineBuilder AddModelCleanupService(this IMachineBuilder builder)
{
builder.Services.AddHostedService<ModelCleanupService>();
return builder;
}

private static IMachineBuilder AddBuildJobService(this IMachineBuilder builder, BuildJobOptions options)
{
builder.Services.AddScoped<IBuildJobService, BuildJobService>();
Expand Down
11 changes: 11 additions & 0 deletions src/SIL.Machine.AspNetCore/Models/ModelDownloadUrl.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
using System;

namespace SIL.Machine.AspNetCore.Models
{
public class ModelDownloadUrl
{
public string Url { get; set; } = default!;
public int ModelRevision { get; set; } = default!;
public DateTime ExipiresAt { get; set; } = default!;
}
}
1 change: 1 addition & 0 deletions src/SIL.Machine.AspNetCore/Models/TranslationEngine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ public class TranslationEngine : IEntity
public string EngineId { get; set; } = default!;
public string SourceLanguage { get; set; } = default!;
public string TargetLanguage { get; set; } = default!;
public bool IsModelPersisted { get; set; }
public int BuildRevision { get; set; }
public Build? CurrentBuild { get; set; }
}
2 changes: 1 addition & 1 deletion src/SIL.Machine.AspNetCore/SIL.Machine.AspNetCore.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
<PackageReference Include="Microsoft.AspNetCore.Mvc.NewtonsoftJson" Version="6.0.16" />
<PackageReference Include="Microsoft.Extensions.Http.Polly" Version="6.0.14" />
<PackageReference Include="Python.Included" Version="3.11.4" />
<PackageReference Include="Serval.Grpc" Version="0.15.0" Condition="!Exists('..\..\..\serval\src\Serval.Grpc\Serval.Grpc.csproj')" />
<PackageReference Include="Serval.Grpc" Version="0.16.0" Condition="!Exists('..\..\..\serval\src\Serval.Grpc\Serval.Grpc.csproj')" />
<PackageReference Include="SIL.DataAccess" Version="0.5.2" Condition="!Exists('..\..\..\serval\src\SIL.DataAccess\SIL.DataAccess.csproj')" />
<PackageReference Include="SIL.WritingSystems" Version="12.0.1" />
<PackageReference Include="System.Linq.Async" Version="6.0.1" />
Expand Down
12 changes: 3 additions & 9 deletions src/SIL.Machine.AspNetCore/Services/HangfireHealthCheck.cs
Original file line number Diff line number Diff line change
@@ -1,15 +1,9 @@
namespace SIL.Machine.AspNetCore.Services;

public class HangfireHealthCheck : IHealthCheck
public class HangfireHealthCheck(JobStorage jobStorage, IOptions<BackgroundJobServerOptions> options) : IHealthCheck
{
private readonly JobStorage _jobStorage;
private readonly IOptions<BackgroundJobServerOptions> _options;

public HangfireHealthCheck(JobStorage jobStorage, IOptions<BackgroundJobServerOptions> options)
{
_jobStorage = jobStorage;
_options = options;
}
private readonly JobStorage _jobStorage = jobStorage;
private readonly IOptions<BackgroundJobServerOptions> _options = options;

public Task<HealthCheckResult> CheckHealthAsync(
HealthCheckContext context,
Expand Down
2 changes: 2 additions & 0 deletions src/SIL.Machine.AspNetCore/Services/IFileStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,7 @@ Task<IReadOnlyCollection<string>> ListFilesAsync(

Task<Stream> OpenWriteAsync(string path, CancellationToken cancellationToken = default);

Task<string> GetDownloadUrlAsync(string path, DateTime expiresAt, CancellationToken cancellationToken = default);

Task DeleteAsync(string path, bool recurse = false, CancellationToken cancellationToken = default);
}
8 changes: 8 additions & 0 deletions src/SIL.Machine.AspNetCore/Services/ISharedFileService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,14 @@ public interface ISharedFileService

Uri GetResolvedUri(string path);

Task<string> GetDownloadUrlAsync(string path, DateTime expiresAt);

Task<IReadOnlyCollection<string>> ListFilesAsync(
string path,
bool recurse = false,
CancellationToken cancellationToken = default
);

Task<Stream> OpenReadAsync(string path, CancellationToken cancellationToken = default);

Task<Stream> OpenWriteAsync(string path, CancellationToken cancellationToken = default);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ public interface ITranslationEngineService
{
TranslationEngineType Type { get; }

Task CreateAsync(
Task<TranslationEngine> CreateAsync(
string engineId,
string? engineName,
string sourceLanguage,
string targetLanguage,
bool? isModelPersisted = null,
CancellationToken cancellationToken = default
);
Task DeleteAsync(string engineId, CancellationToken cancellationToken = default);
Expand Down Expand Up @@ -40,6 +41,8 @@ Task StartBuildAsync(

Task CancelBuildAsync(string engineId, CancellationToken cancellationToken = default);

Task<ModelDownloadUrl> GetModelDownloadUrlAsync(string engineId, CancellationToken cancellationToken = default);

Task<int> GetQueueSizeAsync(CancellationToken cancellationToken = default);

bool IsLanguageNativeToModel(string language, out string internalCode);
Expand Down
9 changes: 9 additions & 0 deletions src/SIL.Machine.AspNetCore/Services/InMemoryStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,15 @@ public Task<IReadOnlyCollection<string>> ListFilesAsync(
);
}

public Task<string> GetDownloadUrlAsync(
string path,
DateTime expiresAt,
CancellationToken cancellationToken = default
)
{
throw new NotSupportedException();
}

public Task<Stream> OpenReadAsync(string path, CancellationToken cancellationToken = default)
{
if (!_memoryStreams.TryGetValue(Normalize(path), out Entry? ret))
Expand Down
9 changes: 9 additions & 0 deletions src/SIL.Machine.AspNetCore/Services/LocalStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,15 @@ public Task<IReadOnlyCollection<string>> ListFilesAsync(
);
}

public Task<string> GetDownloadUrlAsync(
string path,
DateTime expiresAt,
CancellationToken cancellationToken = default
)
{
throw new NotSupportedException();
}

public Task<Stream> OpenReadAsync(string path, CancellationToken cancellationToken = default)
{
Uri pathUri = new(_basePath, Normalize(path));
Expand Down
59 changes: 59 additions & 0 deletions src/SIL.Machine.AspNetCore/Services/ModelCleanupService.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
namespace SIL.Machine.AspNetCore.Services;

public class ModelCleanupService(
IServiceProvider services,
ISharedFileService sharedFileService,
IRepository<TranslationEngine> engines,
ILogger<ModelCleanupService> logger
) : RecurrentTask("Model Cleanup Service", services, RefreshPeriod, logger)
{
private readonly ISharedFileService _sharedFileService = sharedFileService;
private readonly ILogger<ModelCleanupService> _logger = logger;
private readonly IRepository<TranslationEngine> _engines = engines;
private static readonly TimeSpan RefreshPeriod = TimeSpan.FromDays(1);

protected override async Task DoWorkAsync(IServiceScope scope, CancellationToken cancellationToken)
{
await CheckModelsAsync(cancellationToken);
}

private async Task CheckModelsAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("Running model cleanup job");
IReadOnlyCollection<string> paths = await _sharedFileService.ListFilesAsync(
NmtEngineService.ModelDirectory,
cancellationToken: cancellationToken
);
// Get all engine ids from the database
IReadOnlyList<TranslationEngine>? allEngines = await _engines.GetAllAsync(cancellationToken: cancellationToken);
IEnumerable<string> validFilenames = allEngines.Select(e =>
NmtEngineService.GetModelPath(e.EngineId, e.BuildRevision)
);
// If there is a currently running build that creates and pushes a new file, but the database has not
// updated yet, don't delete the new file.
IEnumerable<string> validFilenamesForNextBuild = allEngines.Select(e =>
NmtEngineService.GetModelPath(e.EngineId, e.BuildRevision + 1)
);
HashSet<string> filenameFilter = validFilenames.Concat(validFilenamesForNextBuild).ToHashSet();

foreach (string path in paths)
{
if (!filenameFilter.Contains(path))
{
await DeleteFileAsync(
path,
$"file in S3 bucket not found in database. It may be an old rev, etc.",
cancellationToken
);
}
}
}

private async Task DeleteFileAsync(string path, string message, CancellationToken cancellationToken = default)
{
// This may delete a file while it is being downloaded, but the chance is rare
// enough and the solution easy enough (just download again) to just live with it.
_logger.LogInformation("Deleting old model file {filename}: {message}", path, message);
await _sharedFileService.DeleteAsync(path, cancellationToken);
}
}
31 changes: 13 additions & 18 deletions src/SIL.Machine.AspNetCore/Services/NmtClearMLBuildJobFactory.cs
Original file line number Diff line number Diff line change
@@ -1,24 +1,16 @@
namespace SIL.Machine.AspNetCore.Services;

public class NmtClearMLBuildJobFactory : IClearMLBuildJobFactory
public class NmtClearMLBuildJobFactory(
ISharedFileService sharedFileService,
ILanguageTagService languageTagService,
IRepository<TranslationEngine> engines,
IOptionsMonitor<ClearMLOptions> options
) : IClearMLBuildJobFactory
{
private readonly ISharedFileService _sharedFileService;
private readonly ILanguageTagService _languageTagService;
private readonly IRepository<TranslationEngine> _engines;
private readonly IOptionsMonitor<ClearMLOptions> _options;

public NmtClearMLBuildJobFactory(
ISharedFileService sharedFileService,
ILanguageTagService languageTagService,
IRepository<TranslationEngine> engines,
IOptionsMonitor<ClearMLOptions> options
)
{
_sharedFileService = sharedFileService;
_languageTagService = languageTagService;
_engines = engines;
_options = options;
}
private readonly ISharedFileService _sharedFileService = sharedFileService;
private readonly ILanguageTagService _languageTagService = languageTagService;
private readonly IRepository<TranslationEngine> _engines = engines;
private readonly IOptionsMonitor<ClearMLOptions> _options = options;

public TranslationEngineType EngineType => TranslationEngineType.Nmt;

Expand Down Expand Up @@ -52,6 +44,9 @@ public async Task<string> CreateJobScriptAsync(
+ $" 'shared_file_uri': '{baseUri}',\n"
+ $" 'shared_file_folder': '{folder}',\n"
+ (buildOptions is not null ? $" 'build_options': '''{buildOptions}''',\n" : "")
// buildRevision + 1 because the build revision is incremented after the build job
// is finished successfully but the file should be saved with the new revision number
+ (engine.IsModelPersisted ? $" 'save_model': '{engineId}_{engine.BuildRevision + 1}',\n" : $"")
+ $" 'clearml': True\n"
+ "}\n"
+ "run(args)\n";
Expand Down
Loading

0 comments on commit 525950f

Please sign in to comment.