-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
IMN-792 Add events service v2 in agreement-platformstate-writer #987
Open
taglioni-r
wants to merge
65
commits into
IMN-790_agreement-platform-state-writer-scaffold
Choose a base branch
from
IMN-792_agreement-platform-state-writer-v2
base: IMN-790_agreement-platform-state-writer-scaffold
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
65 commits
Select commit
Hold shift + click to select a range
ee673ef
Draft
taglioni-r 00d7ab4
Refactor
taglioni-r fccc9d1
Add util function
taglioni-r ac24497
Fix logic
taglioni-r 91a4714
Draft tests
taglioni-r c23daf3
Add test utils
taglioni-r e5b4fd0
Fix sorting
taglioni-r 4eceb10
Improve tests
taglioni-r 9ae27a7
Merge branch 'IMN-790_agreement-platform-state-writer-scaffold' into …
taglioni-r 0cd01be
Refactor test setup
taglioni-r 9aba0d8
Refactor test split
taglioni-r 84575f2
Fix test
taglioni-r 260f2de
Refactor
taglioni-r 3b5beae
Fix env
taglioni-r 97b39d3
Merge branch 'IMN-790_agreement-platform-state-writer-scaffold' into …
taglioni-r d1a6344
Remove comment
taglioni-r 1a0877a
Fix import
taglioni-r 19efc7e
Merge branch 'IMN-790_agreement-platform-state-writer-scaffold' into …
taglioni-r 1215e29
Merge branch 'IMN-790_agreement-platform-state-writer-scaffold' into …
taglioni-r 4607d2c
Fix
taglioni-r 99355d9
Fix util
taglioni-r dfce359
Fix
taglioni-r a6aac62
Update tests
taglioni-r 58005a7
Fix test util
taglioni-r 268ddb3
Fix
taglioni-r c41382a
Add comment
taglioni-r 8c20ee6
Merge branch 'IMN-790_agreement-platform-state-writer-scaffold' into …
taglioni-r 38a77a9
Fix
taglioni-r 6326944
Fix
taglioni-r 22bdaff
Fix GSI name
taglioni-r aeffad7
Fix
taglioni-r 8a7f4ea
Merge branch 'IMN-790_agreement-platform-state-writer-scaffold' into …
taglioni-r 73b5a7b
Merge branch 'IMN-790_agreement-platform-state-writer-scaffold' into …
taglioni-r d14718f
Renaming
taglioni-r dc93027
Merge branch 'IMN-790_agreement-platform-state-writer-scaffold' into …
taglioni-r f57b13f
Fix
taglioni-r f6044eb
Merge branch 'IMN-790_agreement-platform-state-writer-scaffold' into …
taglioni-r d08535a
Fix
taglioni-r cca2121
Fix
taglioni-r 6ea4bed
Merge branch 'IMN-790_agreement-platform-state-writer-scaffold' into …
taglioni-r 6f31a3f
Fix
taglioni-r c1e3bc8
Add test
taglioni-r 5f2337b
Add tests
taglioni-r 0e9258b
Refactor
taglioni-r db891b9
Add tests
taglioni-r 132b4b7
Refactor
taglioni-r 9d29955
Refactor
taglioni-r f62d23d
Merge branch 'IMN-790_agreement-platform-state-writer-scaffold' into …
taglioni-r cf262b0
Merge branch 'IMN-790_agreement-platform-state-writer-scaffold' into …
taglioni-r 42be492
Add skip
taglioni-r d9e8c28
Refactor
taglioni-r a5c4fb4
Fix
taglioni-r de68f6a
Update env
taglioni-r a973e8b
Merge branch 'IMN-790_agreement-platform-state-writer-scaffold' into …
taglioni-r ec6a35d
Fix tests
taglioni-r cc0e68c
Fix
taglioni-r b676cd6
Remove duplicate test
taglioni-r 6500147
Fix import
taglioni-r a469511
Fix
taglioni-r 1825de9
Merge branch 'IMN-790_agreement-platform-state-writer-scaffold' into …
taglioni-r 9964cf9
Merge branch 'IMN-790_agreement-platform-state-writer-scaffold' into …
taglioni-r 9f5b01f
Merge branch 'IMN-790_agreement-platform-state-writer-scaffold' into …
taglioni-r 2d91410
Renaming
taglioni-r a523345
Renaming
taglioni-r 93887a0
Renaming
taglioni-r File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,27 +1,281 @@ | ||
import { DynamoDBClient } from "@aws-sdk/client-dynamodb"; | ||
import { AgreementEventEnvelopeV2 } from "pagopa-interop-models"; | ||
import { | ||
Agreement, | ||
AgreementEventEnvelopeV2, | ||
AgreementV2, | ||
fromAgreementV2, | ||
genericInternalError, | ||
makeGSIPKConsumerIdEServiceId, | ||
makeGSIPKEServiceIdDescriptorId, | ||
makePlatformStatesAgreementPK, | ||
makePlatformStatesEServiceDescriptorPK, | ||
PlatformStatesAgreementEntry, | ||
PlatformStatesCatalogEntry, | ||
} from "pagopa-interop-models"; | ||
import { match } from "ts-pattern"; | ||
import { | ||
agreementStateToItemState, | ||
deleteAgreementEntry, | ||
isAgreementTheLatest, | ||
readAgreementEntry, | ||
readCatalogEntry, | ||
updateAgreementStateInPlatformStatesEntry, | ||
updateAgreementStateInTokenGenerationStatesTable, | ||
updateAgreementStateInTokenGenerationStatesTablePlusDescriptorInfo, | ||
writeAgreementEntry, | ||
} from "./utils.js"; | ||
|
||
export async function handleMessageV2( | ||
message: AgreementEventEnvelopeV2, | ||
_dynamoDBClient: DynamoDBClient | ||
dynamoDBClient: DynamoDBClient | ||
): Promise<void> { | ||
await match(message) | ||
.with({ type: "AgreementActivated" }, async (msg) => { | ||
const agreement = parseAgreement(msg.data.agreement); | ||
const primaryKey = makePlatformStatesAgreementPK(agreement.id); | ||
|
||
const existingAgreementEntry = await readAgreementEntry( | ||
primaryKey, | ||
dynamoDBClient | ||
); | ||
const GSIPK_consumerId_eserviceId = makeGSIPKConsumerIdEServiceId({ | ||
consumerId: agreement.consumerId, | ||
eserviceId: agreement.eserviceId, | ||
}); | ||
|
||
if (existingAgreementEntry) { | ||
if (existingAgreementEntry.version > msg.version) { | ||
// Stops processing if the message is older than the agreement entry | ||
return Promise.resolve(); | ||
} else { | ||
await updateAgreementStateInPlatformStatesEntry( | ||
dynamoDBClient, | ||
primaryKey, | ||
agreementStateToItemState(agreement.state), | ||
msg.version | ||
); | ||
} | ||
} else { | ||
const agreementEntry: PlatformStatesAgreementEntry = { | ||
PK: primaryKey, | ||
state: agreementStateToItemState(agreement.state), | ||
version: msg.version, | ||
updatedAt: new Date().toISOString(), | ||
GSIPK_consumerId_eserviceId, | ||
GSISK_agreementTimestamp: | ||
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion | ||
agreement.stamps.activation!.when.toISOString(), | ||
agreementDescriptorId: agreement.descriptorId, | ||
}; | ||
|
||
await writeAgreementEntry(agreementEntry, dynamoDBClient); | ||
} | ||
|
||
const pkCatalogEntry = makePlatformStatesEServiceDescriptorPK({ | ||
eserviceId: agreement.eserviceId, | ||
descriptorId: agreement.descriptorId, | ||
}); | ||
const catalogEntry = await readCatalogEntry( | ||
pkCatalogEntry, | ||
dynamoDBClient | ||
); | ||
|
||
const GSIPK_eserviceId_descriptorId = makeGSIPKEServiceIdDescriptorId({ | ||
eserviceId: agreement.eserviceId, | ||
descriptorId: agreement.descriptorId, | ||
}); | ||
|
||
// token-generation-states | ||
await updateAgreementStateInTokenGenerationStatesTablePlusDescriptorInfo({ | ||
GSIPK_consumerId_eserviceId, | ||
agreementState: agreement.state, | ||
dynamoDBClient, | ||
GSIPK_eserviceId_descriptorId, | ||
catalogEntry, | ||
}); | ||
}) | ||
.with( | ||
{ type: "AgreementAdded" }, | ||
{ type: "AgreementDeleted" }, | ||
{ type: "DraftAgreementUpdated" }, | ||
{ type: "AgreementSubmitted" }, | ||
{ type: "AgreementActivated" }, | ||
{ type: "AgreementUnsuspendedByProducer" }, | ||
{ type: "AgreementUnsuspendedByConsumer" }, | ||
{ type: "AgreementUnsuspendedByPlatform" }, | ||
{ type: "AgreementArchivedByConsumer" }, | ||
{ type: "AgreementArchivedByUpgrade" }, | ||
{ type: "AgreementUpgraded" }, | ||
{ type: "AgreementSuspendedByProducer" }, | ||
{ type: "AgreementSuspendedByConsumer" }, | ||
{ type: "AgreementSuspendedByPlatform" }, | ||
async (msg) => { | ||
const agreement = parseAgreement(msg.data.agreement); | ||
const primaryKey = makePlatformStatesAgreementPK(agreement.id); | ||
const agreementEntry = await readAgreementEntry( | ||
primaryKey, | ||
dynamoDBClient | ||
); | ||
|
||
if (!agreementEntry || agreementEntry.version > msg.version) { | ||
return Promise.resolve(); | ||
} else { | ||
await updateAgreementStateInPlatformStatesEntry( | ||
dynamoDBClient, | ||
primaryKey, | ||
agreementStateToItemState(agreement.state), | ||
msg.version | ||
); | ||
|
||
const GSIPK_consumerId_eserviceId = makeGSIPKConsumerIdEServiceId({ | ||
consumerId: agreement.consumerId, | ||
eserviceId: agreement.eserviceId, | ||
}); | ||
|
||
if ( | ||
await isAgreementTheLatest( | ||
GSIPK_consumerId_eserviceId, | ||
agreement.id, | ||
dynamoDBClient | ||
) | ||
) { | ||
// token-generation-states only if agreement is the latest | ||
|
||
await updateAgreementStateInTokenGenerationStatesTable({ | ||
GSIPK_consumerId_eserviceId, | ||
agreementState: agreement.state, | ||
dynamoDBClient, | ||
}); | ||
} | ||
} | ||
} | ||
) | ||
.with({ type: "AgreementUpgraded" }, async (msg) => { | ||
const agreement = parseAgreement(msg.data.agreement); | ||
const primaryKey = makePlatformStatesAgreementPK(agreement.id); | ||
const agreementEntry = await readAgreementEntry( | ||
primaryKey, | ||
dynamoDBClient | ||
); | ||
|
||
const pkCatalogEntry = makePlatformStatesEServiceDescriptorPK({ | ||
eserviceId: agreement.eserviceId, | ||
descriptorId: agreement.descriptorId, | ||
}); | ||
const catalogEntry = await readCatalogEntry( | ||
pkCatalogEntry, | ||
dynamoDBClient | ||
); | ||
if (!catalogEntry) { | ||
// TODO double-check | ||
throw genericInternalError("Catalog entry not found"); | ||
} | ||
|
||
const GSIPK_consumerId_eserviceId = makeGSIPKConsumerIdEServiceId({ | ||
consumerId: agreement.consumerId, | ||
eserviceId: agreement.eserviceId, | ||
}); | ||
|
||
if (agreementEntry) { | ||
if (agreementEntry.version > msg.version) { | ||
return Promise.resolve(); | ||
} else { | ||
await updateAgreementStateInPlatformStatesEntry( | ||
dynamoDBClient, | ||
primaryKey, | ||
agreementStateToItemState(agreement.state), | ||
msg.version | ||
); | ||
} | ||
} else { | ||
const newAgreementEntry: PlatformStatesAgreementEntry = { | ||
PK: primaryKey, | ||
state: agreementStateToItemState(agreement.state), | ||
version: msg.version, | ||
updatedAt: new Date().toISOString(), | ||
GSIPK_consumerId_eserviceId, | ||
GSISK_agreementTimestamp: | ||
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion | ||
agreement.stamps.activation!.when.toISOString(), | ||
agreementDescriptorId: agreement.descriptorId, | ||
}; | ||
|
||
await writeAgreementEntry(newAgreementEntry, dynamoDBClient); | ||
} | ||
|
||
const doOperationOnTokenStates = async ( | ||
catalogEntry: PlatformStatesCatalogEntry | ||
): Promise<void> => { | ||
if ( | ||
await isAgreementTheLatest( | ||
GSIPK_consumerId_eserviceId, | ||
agreement.id, | ||
dynamoDBClient | ||
) | ||
) { | ||
// token-generation-states only if agreement is the latest | ||
const GSIPK_eserviceId_descriptorId = makeGSIPKEServiceIdDescriptorId( | ||
{ | ||
eserviceId: agreement.eserviceId, | ||
descriptorId: agreement.descriptorId, | ||
} | ||
); | ||
|
||
await updateAgreementStateInTokenGenerationStatesTablePlusDescriptorInfo( | ||
{ | ||
GSIPK_consumerId_eserviceId, | ||
agreementState: agreement.state, | ||
dynamoDBClient, | ||
GSIPK_eserviceId_descriptorId, | ||
catalogEntry, | ||
} | ||
); | ||
} | ||
}; | ||
|
||
await doOperationOnTokenStates(catalogEntry); | ||
|
||
const secondRetrievalCatalogEntry = await readCatalogEntry( | ||
pkCatalogEntry, | ||
dynamoDBClient | ||
); | ||
if (!secondRetrievalCatalogEntry) { | ||
// TODO double-check | ||
throw genericInternalError("Catalog entry not found"); | ||
} | ||
Comment on lines
+234
to
+237
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Double-check. Should this error be raised? |
||
if (secondRetrievalCatalogEntry.state !== catalogEntry.state) { | ||
await doOperationOnTokenStates(secondRetrievalCatalogEntry); | ||
} | ||
}) | ||
.with({ type: "AgreementArchivedByUpgrade" }, async (msg) => { | ||
const agreement = parseAgreement(msg.data.agreement); | ||
const pk = makePlatformStatesAgreementPK(agreement.id); | ||
await deleteAgreementEntry(pk, dynamoDBClient); | ||
}) | ||
.with({ type: "AgreementArchivedByConsumer" }, async (msg) => { | ||
const agreement = parseAgreement(msg.data.agreement); | ||
|
||
const primaryKey = makePlatformStatesAgreementPK(agreement.id); | ||
const GSIPK_consumerId_eserviceId = makeGSIPKConsumerIdEServiceId({ | ||
consumerId: agreement.consumerId, | ||
eserviceId: agreement.eserviceId, | ||
}); | ||
|
||
if ( | ||
await isAgreementTheLatest( | ||
GSIPK_consumerId_eserviceId, | ||
agreement.id, | ||
dynamoDBClient | ||
) | ||
) { | ||
// token-generation-states only if agreement is the latest | ||
|
||
await updateAgreementStateInTokenGenerationStatesTable({ | ||
GSIPK_consumerId_eserviceId, | ||
agreementState: agreement.state, | ||
dynamoDBClient, | ||
}); | ||
} | ||
|
||
await deleteAgreementEntry(primaryKey, dynamoDBClient); | ||
}) | ||
.with( | ||
{ type: "AgreementAdded" }, | ||
{ type: "AgreementDeleted" }, | ||
{ type: "DraftAgreementUpdated" }, | ||
{ type: "AgreementSubmitted" }, | ||
{ type: "AgreementRejected" }, | ||
{ type: "AgreementConsumerDocumentAdded" }, | ||
{ type: "AgreementConsumerDocumentRemoved" }, | ||
|
@@ -31,3 +285,11 @@ export async function handleMessageV2( | |
) | ||
.exhaustive(); | ||
} | ||
|
||
const parseAgreement = (agreementV2: AgreementV2 | undefined): Agreement => { | ||
if (!agreementV2) { | ||
throw genericInternalError(`Agreement not found in message data`); | ||
} | ||
|
||
return fromAgreementV2(agreementV2); | ||
}; |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Double-check. Should this error be raised?