Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IMN-792 Add events service v2 in agreement-platformstate-writer #987

Open
wants to merge 65 commits into
base: IMN-790_agreement-platform-state-writer-scaffold
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
65 commits
Select commit Hold shift + click to select a range
ee673ef
Draft
taglioni-r Sep 18, 2024
00d7ab4
Refactor
taglioni-r Sep 19, 2024
fccc9d1
Add util function
taglioni-r Sep 19, 2024
ac24497
Fix logic
taglioni-r Sep 20, 2024
91a4714
Draft tests
taglioni-r Sep 20, 2024
c23daf3
Add test utils
taglioni-r Sep 23, 2024
e5b4fd0
Fix sorting
taglioni-r Sep 23, 2024
4eceb10
Improve tests
taglioni-r Sep 23, 2024
9ae27a7
Merge branch 'IMN-790_agreement-platform-state-writer-scaffold' into …
taglioni-r Sep 25, 2024
0cd01be
Refactor test setup
taglioni-r Sep 25, 2024
9aba0d8
Refactor test split
taglioni-r Sep 25, 2024
84575f2
Fix test
taglioni-r Sep 25, 2024
260f2de
Refactor
taglioni-r Sep 25, 2024
3b5beae
Fix env
taglioni-r Sep 25, 2024
97b39d3
Merge branch 'IMN-790_agreement-platform-state-writer-scaffold' into …
taglioni-r Sep 25, 2024
d1a6344
Remove comment
taglioni-r Sep 25, 2024
1a0877a
Fix import
taglioni-r Sep 25, 2024
19efc7e
Merge branch 'IMN-790_agreement-platform-state-writer-scaffold' into …
taglioni-r Sep 25, 2024
1215e29
Merge branch 'IMN-790_agreement-platform-state-writer-scaffold' into …
taglioni-r Sep 25, 2024
4607d2c
Fix
taglioni-r Sep 25, 2024
99355d9
Fix util
taglioni-r Sep 25, 2024
dfce359
Fix
taglioni-r Sep 25, 2024
a6aac62
Update tests
taglioni-r Sep 25, 2024
58005a7
Fix test util
taglioni-r Sep 25, 2024
268ddb3
Fix
taglioni-r Sep 25, 2024
c41382a
Add comment
taglioni-r Sep 25, 2024
8c20ee6
Merge branch 'IMN-790_agreement-platform-state-writer-scaffold' into …
taglioni-r Sep 30, 2024
38a77a9
Fix
taglioni-r Sep 30, 2024
6326944
Fix
taglioni-r Sep 30, 2024
22bdaff
Fix GSI name
taglioni-r Sep 30, 2024
aeffad7
Fix
taglioni-r Sep 30, 2024
8a7f4ea
Merge branch 'IMN-790_agreement-platform-state-writer-scaffold' into …
taglioni-r Oct 1, 2024
73b5a7b
Merge branch 'IMN-790_agreement-platform-state-writer-scaffold' into …
taglioni-r Oct 1, 2024
d14718f
Renaming
taglioni-r Oct 1, 2024
dc93027
Merge branch 'IMN-790_agreement-platform-state-writer-scaffold' into …
taglioni-r Oct 1, 2024
f57b13f
Fix
taglioni-r Oct 1, 2024
f6044eb
Merge branch 'IMN-790_agreement-platform-state-writer-scaffold' into …
taglioni-r Oct 1, 2024
d08535a
Fix
taglioni-r Oct 1, 2024
cca2121
Fix
taglioni-r Oct 1, 2024
6ea4bed
Merge branch 'IMN-790_agreement-platform-state-writer-scaffold' into …
taglioni-r Oct 1, 2024
6f31a3f
Fix
taglioni-r Oct 1, 2024
c1e3bc8
Add test
taglioni-r Oct 2, 2024
5f2337b
Add tests
taglioni-r Oct 2, 2024
0e9258b
Refactor
taglioni-r Oct 2, 2024
db891b9
Add tests
taglioni-r Oct 2, 2024
132b4b7
Refactor
taglioni-r Oct 2, 2024
9d29955
Refactor
taglioni-r Oct 3, 2024
f62d23d
Merge branch 'IMN-790_agreement-platform-state-writer-scaffold' into …
taglioni-r Oct 3, 2024
cf262b0
Merge branch 'IMN-790_agreement-platform-state-writer-scaffold' into …
taglioni-r Oct 4, 2024
42be492
Add skip
taglioni-r Oct 7, 2024
d9e8c28
Refactor
taglioni-r Oct 7, 2024
a5c4fb4
Fix
taglioni-r Oct 8, 2024
de68f6a
Update env
taglioni-r Oct 8, 2024
a973e8b
Merge branch 'IMN-790_agreement-platform-state-writer-scaffold' into …
taglioni-r Oct 8, 2024
ec6a35d
Fix tests
taglioni-r Oct 8, 2024
cc0e68c
Fix
taglioni-r Oct 8, 2024
b676cd6
Remove duplicate test
taglioni-r Oct 8, 2024
6500147
Fix import
taglioni-r Oct 8, 2024
a469511
Fix
taglioni-r Oct 8, 2024
1825de9
Merge branch 'IMN-790_agreement-platform-state-writer-scaffold' into …
taglioni-r Oct 8, 2024
9964cf9
Merge branch 'IMN-790_agreement-platform-state-writer-scaffold' into …
taglioni-r Oct 8, 2024
9f5b01f
Merge branch 'IMN-790_agreement-platform-state-writer-scaffold' into …
taglioni-r Oct 8, 2024
2d91410
Renaming
taglioni-r Oct 9, 2024
a523345
Renaming
taglioni-r Oct 9, 2024
93887a0
Renaming
taglioni-r Oct 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
282 changes: 272 additions & 10 deletions packages/agreement-platformstate-writer/src/consumerServiceV2.ts
Original file line number Diff line number Diff line change
@@ -1,27 +1,281 @@
import { DynamoDBClient } from "@aws-sdk/client-dynamodb";
import { AgreementEventEnvelopeV2 } from "pagopa-interop-models";
import {
Agreement,
AgreementEventEnvelopeV2,
AgreementV2,
fromAgreementV2,
genericInternalError,
makeGSIPKConsumerIdEServiceId,
makeGSIPKEServiceIdDescriptorId,
makePlatformStatesAgreementPK,
makePlatformStatesEServiceDescriptorPK,
PlatformStatesAgreementEntry,
PlatformStatesCatalogEntry,
} from "pagopa-interop-models";
import { match } from "ts-pattern";
import {
agreementStateToItemState,
deleteAgreementEntry,
isAgreementTheLatest,
readAgreementEntry,
readCatalogEntry,
updateAgreementStateInPlatformStatesEntry,
updateAgreementStateInTokenGenerationStatesTable,
updateAgreementStateInTokenGenerationStatesTablePlusDescriptorInfo,
writeAgreementEntry,
} from "./utils.js";

export async function handleMessageV2(
message: AgreementEventEnvelopeV2,
_dynamoDBClient: DynamoDBClient
dynamoDBClient: DynamoDBClient
): Promise<void> {
await match(message)
.with({ type: "AgreementActivated" }, async (msg) => {
const agreement = parseAgreement(msg.data.agreement);
const primaryKey = makePlatformStatesAgreementPK(agreement.id);

const existingAgreementEntry = await readAgreementEntry(
primaryKey,
dynamoDBClient
);
const GSIPK_consumerId_eserviceId = makeGSIPKConsumerIdEServiceId({
consumerId: agreement.consumerId,
eserviceId: agreement.eserviceId,
});

if (existingAgreementEntry) {
if (existingAgreementEntry.version > msg.version) {
// Stops processing if the message is older than the agreement entry
return Promise.resolve();
} else {
await updateAgreementStateInPlatformStatesEntry(
dynamoDBClient,
primaryKey,
agreementStateToItemState(agreement.state),
msg.version
);
}
} else {
const agreementEntry: PlatformStatesAgreementEntry = {
PK: primaryKey,
state: agreementStateToItemState(agreement.state),
version: msg.version,
updatedAt: new Date().toISOString(),
GSIPK_consumerId_eserviceId,
GSISK_agreementTimestamp:
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
agreement.stamps.activation!.when.toISOString(),
agreementDescriptorId: agreement.descriptorId,
};

await writeAgreementEntry(agreementEntry, dynamoDBClient);
}

const pkCatalogEntry = makePlatformStatesEServiceDescriptorPK({
eserviceId: agreement.eserviceId,
descriptorId: agreement.descriptorId,
});
const catalogEntry = await readCatalogEntry(
pkCatalogEntry,
dynamoDBClient
);

const GSIPK_eserviceId_descriptorId = makeGSIPKEServiceIdDescriptorId({
eserviceId: agreement.eserviceId,
descriptorId: agreement.descriptorId,
});

// token-generation-states
await updateAgreementStateInTokenGenerationStatesTablePlusDescriptorInfo({
GSIPK_consumerId_eserviceId,
agreementState: agreement.state,
dynamoDBClient,
GSIPK_eserviceId_descriptorId,
catalogEntry,
});
})
.with(
{ type: "AgreementAdded" },
{ type: "AgreementDeleted" },
{ type: "DraftAgreementUpdated" },
{ type: "AgreementSubmitted" },
{ type: "AgreementActivated" },
{ type: "AgreementUnsuspendedByProducer" },
{ type: "AgreementUnsuspendedByConsumer" },
{ type: "AgreementUnsuspendedByPlatform" },
{ type: "AgreementArchivedByConsumer" },
{ type: "AgreementArchivedByUpgrade" },
{ type: "AgreementUpgraded" },
{ type: "AgreementSuspendedByProducer" },
{ type: "AgreementSuspendedByConsumer" },
{ type: "AgreementSuspendedByPlatform" },
async (msg) => {
const agreement = parseAgreement(msg.data.agreement);
const primaryKey = makePlatformStatesAgreementPK(agreement.id);
const agreementEntry = await readAgreementEntry(
primaryKey,
dynamoDBClient
);

if (!agreementEntry || agreementEntry.version > msg.version) {
return Promise.resolve();
} else {
await updateAgreementStateInPlatformStatesEntry(
dynamoDBClient,
primaryKey,
agreementStateToItemState(agreement.state),
msg.version
);

const GSIPK_consumerId_eserviceId = makeGSIPKConsumerIdEServiceId({
consumerId: agreement.consumerId,
eserviceId: agreement.eserviceId,
});

if (
await isAgreementTheLatest(
GSIPK_consumerId_eserviceId,
agreement.id,
dynamoDBClient
)
) {
// token-generation-states only if agreement is the latest

await updateAgreementStateInTokenGenerationStatesTable({
GSIPK_consumerId_eserviceId,
agreementState: agreement.state,
dynamoDBClient,
});
}
}
}
)
.with({ type: "AgreementUpgraded" }, async (msg) => {
const agreement = parseAgreement(msg.data.agreement);
const primaryKey = makePlatformStatesAgreementPK(agreement.id);
const agreementEntry = await readAgreementEntry(
primaryKey,
dynamoDBClient
);

const pkCatalogEntry = makePlatformStatesEServiceDescriptorPK({
eserviceId: agreement.eserviceId,
descriptorId: agreement.descriptorId,
});
const catalogEntry = await readCatalogEntry(
pkCatalogEntry,
dynamoDBClient
);
if (!catalogEntry) {
// TODO double-check
throw genericInternalError("Catalog entry not found");
}
Comment on lines +161 to +164
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Double-check. Should this error be raised?


const GSIPK_consumerId_eserviceId = makeGSIPKConsumerIdEServiceId({
consumerId: agreement.consumerId,
eserviceId: agreement.eserviceId,
});

if (agreementEntry) {
if (agreementEntry.version > msg.version) {
return Promise.resolve();
} else {
await updateAgreementStateInPlatformStatesEntry(
dynamoDBClient,
primaryKey,
agreementStateToItemState(agreement.state),
msg.version
);
}
} else {
const newAgreementEntry: PlatformStatesAgreementEntry = {
PK: primaryKey,
state: agreementStateToItemState(agreement.state),
version: msg.version,
updatedAt: new Date().toISOString(),
GSIPK_consumerId_eserviceId,
GSISK_agreementTimestamp:
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
agreement.stamps.activation!.when.toISOString(),
agreementDescriptorId: agreement.descriptorId,
};

await writeAgreementEntry(newAgreementEntry, dynamoDBClient);
}

const doOperationOnTokenStates = async (
catalogEntry: PlatformStatesCatalogEntry
): Promise<void> => {
if (
await isAgreementTheLatest(
GSIPK_consumerId_eserviceId,
agreement.id,
dynamoDBClient
)
) {
// token-generation-states only if agreement is the latest
const GSIPK_eserviceId_descriptorId = makeGSIPKEServiceIdDescriptorId(
{
eserviceId: agreement.eserviceId,
descriptorId: agreement.descriptorId,
}
);

await updateAgreementStateInTokenGenerationStatesTablePlusDescriptorInfo(
{
GSIPK_consumerId_eserviceId,
agreementState: agreement.state,
dynamoDBClient,
GSIPK_eserviceId_descriptorId,
catalogEntry,
}
);
}
};

await doOperationOnTokenStates(catalogEntry);

const secondRetrievalCatalogEntry = await readCatalogEntry(
pkCatalogEntry,
dynamoDBClient
);
if (!secondRetrievalCatalogEntry) {
// TODO double-check
throw genericInternalError("Catalog entry not found");
}
Comment on lines +234 to +237
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Double-check. Should this error be raised?

if (secondRetrievalCatalogEntry.state !== catalogEntry.state) {
await doOperationOnTokenStates(secondRetrievalCatalogEntry);
}
})
.with({ type: "AgreementArchivedByUpgrade" }, async (msg) => {
const agreement = parseAgreement(msg.data.agreement);
const pk = makePlatformStatesAgreementPK(agreement.id);
await deleteAgreementEntry(pk, dynamoDBClient);
})
.with({ type: "AgreementArchivedByConsumer" }, async (msg) => {
const agreement = parseAgreement(msg.data.agreement);

const primaryKey = makePlatformStatesAgreementPK(agreement.id);
const GSIPK_consumerId_eserviceId = makeGSIPKConsumerIdEServiceId({
consumerId: agreement.consumerId,
eserviceId: agreement.eserviceId,
});

if (
await isAgreementTheLatest(
GSIPK_consumerId_eserviceId,
agreement.id,
dynamoDBClient
)
) {
// token-generation-states only if agreement is the latest

await updateAgreementStateInTokenGenerationStatesTable({
GSIPK_consumerId_eserviceId,
agreementState: agreement.state,
dynamoDBClient,
});
}

await deleteAgreementEntry(primaryKey, dynamoDBClient);
})
.with(
{ type: "AgreementAdded" },
{ type: "AgreementDeleted" },
{ type: "DraftAgreementUpdated" },
{ type: "AgreementSubmitted" },
{ type: "AgreementRejected" },
{ type: "AgreementConsumerDocumentAdded" },
{ type: "AgreementConsumerDocumentRemoved" },
Expand All @@ -31,3 +285,11 @@ export async function handleMessageV2(
)
.exhaustive();
}

const parseAgreement = (agreementV2: AgreementV2 | undefined): Agreement => {
if (!agreementV2) {
throw genericInternalError(`Agreement not found in message data`);
}

return fromAgreementV2(agreementV2);
};
Loading