Skip to content

Commit

Permalink
add new publisher
Browse files Browse the repository at this point in the history
  • Loading branch information
yaacovCR committed Nov 27, 2022
1 parent db5742c commit 63d96fc
Showing 1 changed file with 159 additions and 97 deletions.
256 changes: 159 additions & 97 deletions spec/Section 6 -- Execution.md
Original file line number Diff line number Diff line change
Expand Up @@ -361,32 +361,87 @@ The Async Payload Records may be published lazily as requested, with the
internal state of the unpublished stream held by a Publisher Record unique to
the request.

- {subsequentPayloads}: the set of Async Payload Records for this response that
have not yet been published.
- {pending}: the set of Async Payload Records for this response that have not
yet been completed.
- {waiting}: the set of Async Payload Records for this response that have been
completed, but are waiting for a parent to complete.
- {waitingByParent}: an unordered map of uncompleted parent Async Payload
Records to sets of completed child Async Payload Records.
- {pushed}: a weakly held set of Async Payload Records for this response that
have been completed, but are waiting for a parent to complete.
- {current}: the set of Async Payload Records for this response that may be
yielded on the next request.
- {signal}: An asynchronous signal that can be awaited and triggered.

## Create Publisher

CreatePublisher():

- Let {publisherRecord} be a publisher record.
- Initialize {subsequentPayloads} on {publisherRecord} to an empty set.
- Initialize {pending} on {publisherRecord} to an empty set.
- Initialize {waiting} on {publisherRecord} to an empty set.
- Initialize {waitingByParent} on {publisherRecord} to an empty unordered map.
- Initialize {pushed} on {publisherRecord} to an empty set.
- Initialize {current} on {publisherRecord} to an empty set.
- Initialize {signal}.
- Return {publisherRecord}.

## Has Subsequent Payloads

HasSubsequentPayloads(publisherRecord):

- Let {subsequentPayloads} be the corresponding entry on {publisherRecord}.
- Let {size} be the number of payloads within {subsequentPayloads}.
- Let {pending} be the corresponding entry on {publisherRecord}.
- Let {size} be the number of payloads within {pending}.
- If {size} is greater than zero, return {true}.
- Let {waiting} be the corresponding entry on {publisherRecord}.
- Let {size} be the number of payloads within {waiting}.
- If {size} is greater than zero, return {true}.
- Let {current} be the corresponding entry on {publisherRecord}.
- Let {size} be the number of payloads within {current}.
- If {size} is greater than zero, return {true}.
- Return {false}.

## Add Payload

AddPayload(payload, publisherRecord):

- Let {subsequentPayloads} be the corresponding entry on {publisherRecord}.
- Add {payload} to {subsequentPayloads}.
- Let {pending} be the corresponding entry on {publisherRecord}.
- Add {payload} to {pending}.

## Complete Payload

CompletePayload(payload, publisherRecord):

- Let {pending} be the corresponding entry on {publisherRecord}.
- If {payload} is not within {pending}, return.
- Remove {payload} from {pending}.
- Let {parentRecord} be the corresponding entry on {payload}.
- If {parentRecord} is not defined:
- Call PushPayload(payload, publisherRecord).
- Let {signal} be the corresponding entry on {publisherRecord}.
- Trigger {signal}.
- Otherwise:
- Let {waiting} and {waitingByChildren} be the corresponding entries on
{publisherRecord}.
- Add {payload} to {waiting}.
- Let {children} be the set in {waitingByParent} for {parentRecord}; if no
such set exists, create it as an empty set.
- Append {payload} to {children}.

## Push Payload

PushPayload(payload, publisherRecord):

- Let {pushed}, {current}, and {waitingByParent} be the corresponding entries on
{publisherRecord}.
- Add {payload} to {pushed} and {current}.
- Let {children} be the set in {waitingByParent} for {parentRecord}.
- If {children} is not defined, return.
- Let {waiting} be the corresponding entry on {publisherRecord}.
- For each {child} in {children}:
- Call {PushPayload(payload, publisherRecord)}.
- Remove {child} from {waiting}.
- Remove the set in {waitingByParent} for {parentRecord}.

## Yield Subsequent Payloads

Expand All @@ -396,15 +451,13 @@ payloads should be processed.

YieldSubsequentPayloads(initialResponse, publisherRecord):

- Let {subsequentPayloads} be the corresponding entry on {publisherRecord}.
- Let {initialRecords} be any items in {subsequentPayloads} with a completed
{dataExecution}.
- Let {current} be the corresponding entry on {publisherRecord}.
- Initialize {initialIncremental} to an empty list.
- For each {record} in {initialRecords}:
- Remove {record} from {subsequentPayloads}.
- For each {record} in {current}:
- Remove {record} from {current}.
- If {isCompletedIterator} on {record} is {true}:
- Continue to the next record in {records}.
- Let {payload} be the completed result returned by {dataExecution}.
- Let {payload} be the corresponding entry on {record}.
- Append {payload} to {initialIncremental}.
- If {initialIncremental} is not empty:
- Add an entry to {initialResponse} named `incremental` containing the value
Expand All @@ -416,17 +469,17 @@ YieldSubsequentPayloads(initialResponse, publisherRecord):
- If {record} contains {iterator}:
- Send a termination signal to {iterator}.
- Return.
- Wait for at least one record in {subsequentPayloads} to have a completed
{dataExecution}.
- Let {signal} be the corresponding entry on {publisherRecord}.
- Wait for {signal} to be triggered.
- Reinitialize {signal} on {publisherRecord}.
- Let {subsequentResponse} be an unordered map with an entry {incremental}
initialized to an empty list.
- Let {records} be the items in {subsequentPayloads} with a completed
{dataExecution}.
- For each {record} in {records}:
- Let {current} be the corresponding entry on {publisherRecord}.
- For each {record} in {current}:
- Remove {record} from {subsequentPayloads}.
- If {isCompletedIterator} on {record} is {true}:
- Continue to the next record in {records}.
- Let {payload} be the completed result returned by {dataExecution}.
- Let {payload} be the corresponding entry on {record}.
- Append {payload} to the {incremental} entry on {subsequentResponse}.
- If {subsequentPayloads} is empty:
- Add an entry to {subsequentResponse} named `hasNext` with the value
Expand Down Expand Up @@ -505,20 +558,34 @@ outside of {ExecuteDeferredFragment} or {ExecuteStreamField}.

FilterSubsequentPayloads(publisherRecord, nullPath, currentAsyncRecord):

- Let {subsequentPayloads} be the corresponding entry on {publisherRecord}.
- For each {asyncRecord} in {subsequentPayloads}:
- If {asyncRecord} is the same record as {currentAsyncRecord}:
- Continue to the next record in {subsequentPayloads}.
- Initialize {index} to zero.
- While {index} is less then the length of {nullPath}:
- Initialize {nullPathItem} to the element at {index} in {nullPath}.
- Initialize {asyncRecordPathItem} to the element at {index} in the {path}
of {asyncRecord}.
- If {nullPathItem} is not equivalent to {asyncRecordPathItem}:
- Continue to the next record in {subsequentPayloads}.
- Increment {index} by one.
- Remove {asyncRecord} from {subsequentPayloads}. Optionally, cancel any
incomplete work in the execution of {asyncRecord}.
- Let {pending}, {current}, {waiting}, and {waitingByParent} be the
corresponding entries on {publisherRecord}.
- For each {asyncRecord} in {pending} and {current}:
- If {ShouldKeepPayload(asyncRecord, nullPath, currentAsyncRecord)} is {true}:
- Continue to the next record in {set}.
- Remove {asyncRecord} from {set}. Optionally, cancel any incomplete work in
the execution of {asyncRecord}.
- For each {asyncRecord} in {waiting}:
- If {ShouldKeepPayload(asyncRecord, nullPath, currentAsyncRecord)} is {true}:
- Continue to the next record in {waiting}.
- Remove {asyncRecord} from {waiting}. Optionally, cancel any incomplete work
in the execution of {asyncRecord}.
- Let {parentRecord} be the corresponding entry on {asyncRecord}.
- Let {children} be the set in {waitingByParent} for {parentRecord}.
- Remove {asyncRecord} from {children}.

ShouldKeepPayload(asyncRecord, nullPath, currentAsyncRecord):

- If {asyncRecord} is the same record as {currentAsyncRecord}:
- Return {true}.
- Initialize {index} to zero.
- While {index} is less then the length of {nullPath}:
- Initialize {nullPathItem} to the element at {index} in {nullPath}.
- Initialize {asyncRecordPathItem} to the element at {index} in the {path} of
{asyncRecord}.
- If {nullPathItem} is not equivalent to {asyncRecordPathItem}:
- Return {true}.
- Increment {index} by one. Return {false}.

For example, assume the field `alwaysThrows` is a `Non-Null` type that always
raises a field error:
Expand Down Expand Up @@ -797,51 +864,49 @@ DoesFragmentTypeApply(objectType, fragmentType):
An Async Payload Record is either a Deferred Fragment Record or a Stream Record.
All Async Payload Records are structures containing:

- {parentRecord}: The generating parent Async Payload Record, not defined if
this Async Payload Record is spawned by the initial result.
- {label}: value derived from the corresponding `@defer` or `@stream` directive.
- {path}: a list of field names and indices from root to the location of the
corresponding `@defer` or `@stream` directive.
- {iterator}: The underlying iterator if created from a `@stream` directive.
- {isCompletedIterator}: a boolean indicating the payload record was generated
from an iterator that has completed.
- {errors}: a list of field errors encountered during execution.
- {dataExecution}: A result that can notify when the corresponding execution has
completed.
- {payload}: An unordered map containing the formatted payload.

#### Execute Deferred Fragment

ExecuteDeferredFragment(label, objectType, objectValue, groupedFieldSet, path,
variableValues, parentRecord, publisherRecord):

- Let {deferRecord} be an async payload record created from {label} and {path}.
- Initialize {errors} on {deferRecord} to an empty list.
- Let {dataExecution} be the asynchronous future value of:
- Let {payload} be an unordered map.
- Initialize {resultMap} to an empty ordered map.
- For each {groupedFieldSet} as {responseKey} and {fields}:
- Let {fieldName} be the name of the first entry in {fields}. Note: This
value is unaffected if an alias is used.
- Let {fieldType} be the return type defined for the field {fieldName} of
{objectType}.
- If {fieldType} is defined:
- Let {responseValue} be {ExecuteField(objectType, objectValue, fieldType,
fields, variableValues, path, publisherRecord, asyncRecord)}.
- Set {responseValue} as the value for {responseKey} in {resultMap}.
- Append any encountered field errors to {errors}.
- If {parentRecord} is defined:
- Wait for the result of {dataExecution} on {parentRecord}.
- If {errors} is not empty:
- Add an entry to {payload} named `errors` with the value {errors}.
- If a field error was raised, causing a {null} to be propagated to
{responseValue}:
- Add an entry to {payload} named `data` with the value {null}.
- Otherwise:
- Add an entry to {payload} named `data` with the value {resultMap}.
- If {label} is defined:
- Add an entry to {payload} named `label` with the value {label}.
- Add an entry to {payload} named `path` with the value {path}.
- Return {payload}.
- Set {dataExecution} on {deferredFragmentRecord}.
- Let {deferRecord} be an async payload record created from {parentRecord},
{label}, and {path}.
- Call {AddPayload(deferRecord, publisherRecord)}.
- Initialize {errors} on {deferRecord} to an empty list.
- Initialize {resultMap} to an empty ordered map.
- For each {groupedFieldSet} as {responseKey} and {fields}:
- Let {fieldName} be the name of the first entry in {fields}. Note: This value
is unaffected if an alias is used.
- Let {fieldType} be the return type defined for the field {fieldName} of
{objectType}.
- If {fieldType} is defined:
- Let {responseValue} be {ExecuteField(objectType, objectValue, fieldType,
fields, variableValues, path, publisherRecord, asyncRecord)}.
- Set {responseValue} as the value for {responseKey} in {resultMap}.
- Append any encountered field errors to {errors}.
- If {errors} is not empty:
- Add an entry to {payload} named `errors` with the value {errors}.
- If a field error was raised, causing a {null} to be propagated to
{responseValue}:
- Add an entry to {payload} named `data` with the value {null}.
- Otherwise:
- Add an entry to {payload} named `data` with the value {resultMap}.
- If {label} is defined:
- Add an entry to {payload} named `label` with the value {label}.
- Add an entry to {payload} named `path` with the value {path}.
- Set {payload} on {deferRecord}.
- Call {CompletePayload(payload, publisherRecord)}.

## Executing Fields

Expand Down Expand Up @@ -968,43 +1033,40 @@ yielded items satisfies `initialCount` specified on the `@stream` directive.
ExecuteStreamField(label, iterator, index, fields, innerType, path,
streamRecord, variableValues, publisherRecord):

- Let {streamRecord} be an async payload record created from {label}, {path},
and {iterator}.
- Let {streamRecord} be an async payload record created from {parentRecord},
{label}, {path}, and {iterator}.
- Call {AddPayload(streamRecord, publisherRecord)}.
- Initialize {errors} on {streamRecord} to an empty list.
- Let {itemPath} be {path} with {index} appended.
- Let {dataExecution} be the asynchronous future value of:
- Wait for the next item from {iterator}.
- If an item is not retrieved because {iterator} has completed:
- Set {isCompletedIterator} to {true} on {streamRecord}.
- Return {null}.
- Let {payload} be an unordered map.
- If an item is not retrieved because of an error:
- Append the encountered error to {errors}.
- Wait for the next item from {iterator}.
- If an item is not retrieved because {iterator} has completed:
- Set {isCompletedIterator} to {true} on {streamRecord}.
- Return {null}.
- Let {payload} be an unordered map.
- If an item is not retrieved because of an error:
- Append the encountered error to {errors}.
- Add an entry to {payload} named `items` with the value {null}.
- Otherwise:
- Let {item} be the item retrieved from {iterator}.
- Let {data} be the result of calling {CompleteValue(innerType, fields, item,
variableValues, itemPath, publisherRecord, parentRecord)}.
- Append any encountered field errors to {errors}.
- Increment {index}.
- Call {ExecuteStreamField(label, iterator, index, fields, innerType, path,
streamRecord, variableValues, publisherRecord)}.
- If a field error was raised, causing a {null} to be propagated to {data},
and {innerType} is a Non-Nullable type:
- Add an entry to {payload} named `items` with the value {null}.
- Otherwise:
- Let {item} be the item retrieved from {iterator}.
- Let {data} be the result of calling {CompleteValue(innerType, fields,
item, variableValues, itemPath, publisherRecord, parentRecord)}.
- Append any encountered field errors to {errors}.
- Increment {index}.
- Call {ExecuteStreamField(label, iterator, index, fields, innerType, path,
streamRecord, variableValues, publisherRecord)}.
- If a field error was raised, causing a {null} to be propagated to {data},
and {innerType} is a Non-Nullable type:
- Add an entry to {payload} named `items` with the value {null}.
- Otherwise:
- Add an entry to {payload} named `items` with a list containing the value
{data}.
- If {errors} is not empty:
- Add an entry to {payload} named `errors` with the value {errors}.
- If {label} is defined:
- Add an entry to {payload} named `label` with the value {label}.
- Add an entry to {payload} named `path` with the value {itemPath}.
- If {parentRecord} is defined:
- Wait for the result of {dataExecution} on {parentRecord}.
- Return {payload}.
- Set {dataExecution} on {streamRecord}.
- Call {AddPayload(streamRecord, publisherRecord)}.
- Add an entry to {payload} named `items` with a list containing the value
{data}.
- If {errors} is not empty:
- Add an entry to {payload} named `errors` with the value {errors}.
- If {label} is defined:
- Add an entry to {payload} named `label` with the value {label}.
- Add an entry to {payload} named `path` with the value {itemPath}.
- Set {payload} on {streamRecord}.
- Call {CompletePayload(streamRecord, publisherRecord)}.

CompleteValue(fieldType, fields, result, variableValues, path, publisherRecord,
asyncRecord):
Expand Down

0 comments on commit 63d96fc

Please sign in to comment.