Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Table Recovery deletes newly assigned keys #171

Closed
PJ-Schulz opened this issue Jul 12, 2021 · 1 comment · Fixed by #174
Closed

Table Recovery deletes newly assigned keys #171

PJ-Schulz opened this issue Jul 12, 2021 · 1 comment · Fixed by #174

Comments

@PJ-Schulz
Copy link
Contributor

Hello

I have found an error when restoring the table from the Changelog Topic.
I have a use case where I want to remove a key from the table and reassign it later. For example, a ship arriving in a port in Germany is added to the table. After it has left Germany, it is to be removed from the table. The ship then sails to Australia and back to Germany. When it arrives again, it has to be added again.

When I remove the ship from my table, it is marked as null in the Changelog Topic. When restoring, the app checks for null values and puts the key in a set of keys to be deleted. Even if a new value is assigned to this key, the key is still in the set of keys to be deleted and the newly assigned value will be removed from table after Recovery.

Steps to reproduce

The following table changelog can be used as an example:

table_changelog = [
    (1, "Foo"),
    (2, "Bar"),
    (2, None),
    (2, "Baz")
]

Expected behavior

The table should look like this:
table = {1: "Foo", 2: "Baz"}

Actual behavior

But the table looks like this
table = {1: "Foo"}

Source Code

The source code to this Bug can be found here: faust.stores.memory.py, lines 18 to 49.

def apply_changelog_batch(
    self,
    batch: Iterable[EventT],
    to_key: Callable[[Any], Any],
    to_value: Callable[[Any], Any],
) -> None:
    """Apply batch of changelog events to in-memory table."""
    # default store does not do serialization, so we need
    # to convert these raw json serialized keys to proper structures
    # (E.g. regenerate tuples in WindowedKeys etc).
    to_delete: Set[Any] = set()
    delete_key = self.data.pop
    self.data.update(
        self._create_batch_iterator(to_delete.add, to_key, to_value, batch)
    )
    for key in to_delete:
        delete_key(key, None)

def _create_batch_iterator(
    self,
    mark_as_delete: Callable[[Any], None],
    to_key: Callable[[Any], Any],
    to_value: Callable[[Any], Any],
    batch: Iterable[EventT],
) -> Iterable[Tuple[Any, Any]]:
    for event in batch:
        key = to_key(event.key)
        # to delete keys in the table we set the raw value to None
        if event.message.value is None:
            mark_as_delete(key)
            continue
        yield key, to_value(event.value)

Proposal for a solution

In my opinion, keys should not be marked for deletion during iteration. It is better to skip key-value pairs where the value is null. That way, if the key is assigned later, it will still be maintained.

Versions

  • Python 3.7.9
  • Faust 0.6.9
@patkivikram
Copy link
Collaborator

Can you open a PR with your proposed change?

PJ-Schulz added a commit to PJ-Schulz/faust that referenced this issue Jul 14, 2021
patkivikram pushed a commit that referenced this issue Jul 23, 2021
* fix for reassign Table Keys #171

* change comment, because line was to long for linter (< 88 characters)

* add unittest to check "insert > delete > insert" for a key in table
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants