Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[flytepropeller] [compiler] Support non-Any Python types as Any input in workflows #5408

Open
wants to merge 7 commits into
base: master
Choose a base branch
from

Conversation

Future-Outlier
Copy link
Member

@Future-Outlier Future-Outlier commented May 22, 2024

Tracking issue

#5366

Why are the changes needed?

We want to allow the Any type to accept all kinds of inputs and outputs, not just the types we currently can't handle.

For example, workflow like this.

@task
def foo(a: Any) -> int:
    if type(a) == int:
        return a + 1
    return 0

@workflow
def wf(a: int) -> int:
    return foo(a=a)

Potential Issues

Note: This PR only supports Any in python, if we want to support java, we will need to add more code.

What changes were proposed in this pull request?

  1. Added a function isTypeAny to check if upstreamType or downstreamType in the workflow is Any in Python.
  2. Updated the AreTypesCastable function to use the isTypeAny function. If upstreamType or downstreamType is Any in Python, the function should return true, indicating that the types are castable and the compilation passes.

How was this patch tested?

unit test and remote cluster

from flytekit import task, workflow
from typing import Any

@task
def foo(a: Any) -> int:
    if type(a) == int:
        return a + 1
    return 0

@workflow
def wf(a: int) -> int:
    return foo(a=a)
FROM python:3.9-slim-buster
USER root
WORKDIR /root
ENV PYTHONPATH /root
RUN apt-get update && apt-get install build-essential -y
RUN apt-get install git -y

RUN pip install -U git+https://github.com/flyteorg/flytekit.git@3804a5155a523e2028ece8e7e581f794ebabd788

Setup process

pyflyte run --remote --image localhost:30000/any:0522 any_task.py wf --a 1

Screenshots

image image

Check all the applicable boxes

  • I updated the documentation accordingly.
  • All new and existing tests passed.
  • All commits are signed-off.

Related PRs

flyteorg/flytekit#2432

Copy link

codecov bot commented May 22, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 61.07%. Comparing base (d04cf66) to head (b9825b3).
Report is 7 commits behind head on master.

Additional details and impacted files
@@            Coverage Diff             @@
##           master    #5408      +/-   ##
==========================================
- Coverage   61.10%   61.07%   -0.04%     
==========================================
  Files         793      793              
  Lines       51164    51232      +68     
==========================================
+ Hits        31265    31288      +23     
- Misses      17027    17068      +41     
- Partials     2872     2876       +4     
Flag Coverage Δ
unittests-datacatalog 69.31% <ø> (ø)
unittests-flyteadmin 58.86% <ø> (-0.05%) ⬇️
unittests-flytecopilot 17.79% <ø> (ø)
unittests-flytectl 67.97% <ø> (-0.35%) ⬇️
unittests-flyteidl 79.04% <ø> (-0.26%) ⬇️
unittests-flyteplugins 61.94% <ø> (ø)
unittests-flytepropeller 57.34% <100.00%> (+0.01%) ⬆️
unittests-flytestdlib 65.82% <ø> (ø)

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@Future-Outlier Future-Outlier changed the title [flytepropeller] [compiler] Support non-any type as any input in workflow [flytepropeller] [compiler] Support Python non-any type as any input in workflow May 22, 2024
@Future-Outlier Future-Outlier changed the title [flytepropeller] [compiler] Support Python non-any type as any input in workflow [flytepropeller] [compiler] Support Python non-any type as any input and output in workflow May 22, 2024
@Future-Outlier Future-Outlier changed the title [flytepropeller] [compiler] Support Python non-any type as any input and output in workflow [flytepropeller] [compiler] Support non-Any Python types as Any input and output in workflows May 22, 2024
Signed-off-by: Future-Outlier <eric901201@gmail.com>
Signed-off-by: Future-Outlier <eric901201@gmail.com>
Signed-off-by: Future-Outlier <eric901201@gmail.com>
@Future-Outlier Future-Outlier changed the title [flytepropeller] [compiler] Support non-Any Python types as Any input and output in workflows [flytepropeller] [compiler] Support non-Any Python types as Any input in workflows May 24, 2024
@Future-Outlier Future-Outlier changed the title [flytepropeller] [compiler] Support non-Any Python types as Any input in workflows [flytepropeller] [compiler] Support non-Any Python types as Any output in workflows May 24, 2024
@Future-Outlier Future-Outlier changed the title [flytepropeller] [compiler] Support non-Any Python types as Any output in workflows [flytepropeller] [compiler] Support non-Any Python types as Any input in workflows May 24, 2024
@Future-Outlier
Copy link
Member Author

Signed-off-by: Future-Outlier <eric901201@gmail.com>
@Future-Outlier
Copy link
Member Author

Not sure if we should treat Any as enum SimpleType or create a field for Any like

message AnyType {
    LiteralType variant = 1;
}

Signed-off-by: Future-Outlier <eric901201@gmail.com>
Signed-off-by: Future-Outlier <eric901201@gmail.com>
@kumare3
Copy link
Contributor

kumare3 commented Jun 5, 2024

Hold on - I am not sure we should do this?
Cc @EngHabu / @fg91 / @cosmicBboy
Can you folks tal

@cosmicBboy
Copy link
Contributor

@kumare3 isn't this necessary for gradual typing that serializes as pickle (at least for Python)?

@kumare3
Copy link
Contributor

kumare3 commented Jun 5, 2024

My concern is - how do we coerce the type into another type? I think once something is any it has to be any all the way through.
If we pickle we cannot move to int. We can move from into to any maybe - that too if we support it

@cosmicBboy
Copy link
Contributor

cosmicBboy commented Jun 5, 2024

how do we coerce the type into another type?

At deserialization time, once we unpickle the object, can't we do int(object)? It'll basically be a runtime error if the unpickled object can't be coerced to the specified type:

from flytekit import task, workflow
from typing import Any

@task
def foo(a: Any) -> Any:
    if type(a) == int:
        return a + 1
    return 0

@task
def bar(a: int) -> int:
    return a * 2

@workflow
def wf(a: int) -> int:
    x = foo(a=a)  # here x is Any
    return bar(a=x)  # unpickle `x` and try to and coerce to `int` at deserialization time

This will work for most cases but not for things like files, directories, very large dataframes (which would be crazy to annotate with Any)

@fg91
Copy link
Member

fg91 commented Jun 5, 2024

Hold on - I am not sure we should do this? Cc @EngHabu / @fg91 / @cosmicBboy Can you folks tal

I'm personally also not convinced we need to support Any, type safety is a good thing and Flyte workflows are a DSL after all that uses a subset of python syntax but doesn't support all of python.
I do, however, feel we should catch when users want to use Any and raise an error mentioning the DSL (like e.g. this one) and explain that workflows are type safe.

@Future-Outlier
Copy link
Member Author

I will list why we need this and how we can do it today.
Thank you all!

@Future-Outlier
Copy link
Member Author

@kumare3 @pingsutw @cosmicBboy @fg91 @EngHabu

Why do we need this PR?

In the upcoming agent, LangChain, we will use Interface(inputs={"input": Any}, outputs={"o0": Any})
for our LangChainTask.
Since LangChain supports many types,
it would be very difficult to write numerous type transformers for all types.

However, our ideal LangChain workflow might look like this:

import os
from typing import Any, Union

from flytekit import workflow
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts.prompt import PromptTemplate
from langchain_openai import ChatOpenAI

api_key = os.environ.get("OPENAI_API_KEY")

model = ChatOpenAI(
    model="gpt-3.5-turbo",
    openai_api_key=api_key,
    openai_organization="org-NayNG68kGnVXMJ8Ak4PMgQv7",
)

prompt = PromptTemplate(
    input_variables=["question"],
    template="Question: {question}?",
)

output_parser = StrOutputParser()

@workflow
def wf(input: str) -> Union[str, Any]:
    message = prompt(input=input)
    o0 = model(input=message)
    o1 = output_parser(input=o0)
    return o1

It's necessary for users to input str as the prompt input in a LangChain task.

Reference for Flytekit LangChain task implementation: https://github.com/flyteorg/flytekit/pull/2436/files#diff-76c7b754bdfc3be1caaba940a66376edd7f443185fe492e7ac16a142cf63c70dR54-R55
Reference for LangChain (input schema and output schema): https://python.langchain.com/v0.1/docs/expression_language/interface/#input-schema

How can we support this in Flytekit?

  • For default inputs, we can iterate over all type transformers to check if it's possible to convert the literal to a Python value.
  • For others, we will use metadata to get the python_dotted_path and convert it to the right type we need.
class TypeEngine(typing.Generic[T]):

    @classmethod
    def to_literal(cls, ctx: FlyteContext, python_val: typing.Any, python_type: Type, expected: LiteralType) -> Literal:
        # The `metadata` field is added because of the artifact feature
        try:
            metadata.update({"python_dotted_path": f"{python_type.__module__}.{python_type.__qualname__}"})
            lv.set_metadata(metadata=metadata)
        except AttributeError as e:
            logger.warning(f"Attribute error occurred: {e}")

class FlytePickleTransformer(TypeTransformer[FlytePickle]):

    def to_python_value(self, ctx: FlyteContext, lv: Literal, expected_python_type: Type[T]) -> T:
        try:
            uri = lv.scalar.blob.uri
            return FlytePickle.from_pickle(uri)
        except Exception as e:
            from pydoc import locate

            metadata = lv.metadata
            if metadata and metadata.get("python_dotted_path"):
                python_dotted_path = metadata.get("python_dotted_path")
                py_type = locate(python_dotted_path)
                if py_type != typing.Any:
                    return TypeEngine.to_python_value(ctx, lv, py_type)
            raise e

PR reference: flyteorg/flytekit#2432

Pros

  1. We can now support non-Any types in a workflow, which is more intuitive.
  2. LangChain agent can benefit from this.

Cons

  1. Slightly breaks type safety.

@davidmirror-ops
Copy link
Contributor

06/06/2024 Contributors meetup notes: while this change is not considered completely necessary, more follow-up discussion to come.

@Future-Outlier
Copy link
Member Author

Future-Outlier commented Jun 7, 2024

Hi, folks.
@kumare3 @cosmicBboy @fg91 @EngHabu
Let me clarify the details of this PR and some discussion I've finished with @pingsutw.

  1. We only want to support Non-Any to Any in workflow bindings.

for example:
(O) int -> Any
(X) Any -> int

So the example @cosmicboys created should fail.
Cause this is int -> Any

@task
def foo(a: Any) -> Any:
    if type(a) == int:
        return a + 1
    return 0

@task
def bar(a: int) -> int:
    return a * 2

@workflow
def wf(a: int) -> int:
    x = foo(a=a)  # here x is Any
    return bar(a=x)  # here should show compile error
  1. How can we support it in flytekit?

We can use the field metadata in our literal value.
(Use str(python_type) to convert type to str, and use eval to convert str back to type.
reference: https://github.com/flyteorg/flyte/blob/master/flyteidl/protos/flyteidl/core/literals.proto#L95-L114

For example:
TypeEngine

def to_literal:
    # lv is LiteralValue
    lv.metadata.update({"python_type": str(python_type)})

FlytePickleTransformer

def to_python_value(self, ctx: FlyteContext, lv: Literal, expected_python_type: Type[T]) -> T:
    metadata = lv.metadata
    try:
        uri = lv.scalar.blob.uri
            if lv.scalar.blob.metadata.type.format == self.PYTHON_PICKLE_FORMAT:
                return FlytePickle.from_pickle(uri)
            elif lv.scalar.blob.metadata.type.dimensionality == BlobType.BlobDimensionality.MULTIPART:
                return TypeEngine.to_python_value(ctx, lv, FlyteDirectory)
            elif lv.scalar.blob.metadata.type.dimensionality == BlobType.BlobDimensionality.SINGLE:
                return TypeEngine.to_python_value(ctx, lv, FlyteFile)

    except Exception as e:
        metadata = lv.metadata
        if metadata and metadata.get("python_type"):
            python_type = metadata.get("python_type")
            py_type = eval(python_type)              # turn 'list[int]' to list[int]
        if py_type != typing.Any:
            return TypeEngine.to_python_value(ctx, lv, py_type)

    raise e


def get_literal_type(self, t: Type[T]) -> LiteralType:
    lt = LiteralType(
            blob=_core_types.BlobType(
                format=self.PYTHON_PICKLE_FORMAT, dimensionality=_core_types.BlobType.BlobDimensionality.SINGLE
            )
        )
    lt.metadata = {"python_class_name": str(t)}
    lt.metadata = {"isAny": str(t == typing.Any)} # check downstream type is any or not in propeller's compiler 

    return lt

Does this implementation look good to you all?
Please take a look, thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants