Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FATE2.0读取外部数据源 #5669

Open
FancyXun opened this issue Jul 17, 2024 · 8 comments
Open

FATE2.0读取外部数据源 #5669

FancyXun opened this issue Jul 17, 2024 · 8 comments
Assignees

Comments

@FancyXun
Copy link

请教个问题,目前官方给的例子都是基于内置的数据进行训练预测的,在实际生产中,我们会从数据库,比如Mysql读取数据,请问fate2.0支持从外部读取数据吗?我看有个table bind,但不知道具体用法,这个怎么把外部的数据库信息给到fate呢

@sagewe
Copy link
Contributor

sagewe commented Jul 18, 2024

目前FATE端不支持直接从mysql中读取数据,原因之一是不同的计算引擎支持的存储数据格式不一样,比如spark支持文件跟hdfs,eggroll支持的是自己的格式(底层是lmdb)。我们在之前的版本中有采取转换的思路,这个版本可能还没有适配?
@zhihuiwan

@sagewe
Copy link
Contributor

sagewe commented Jul 18, 2024

理想情况下如果底层的引擎直接支持是最简单的,只需要在https://github.com/FederatedAI/FATE/blob/master/python/fate/arch/computing/backends/eggroll/_csession.py#L64-L90 中插入新的uri支持

@FancyXun
Copy link
Author

目前FATE端不支持直接从mysql中读取数据,原因之一是不同的计算引擎支持的存储数据格式不一样,比如spark支持文件跟hdfs,eggroll支持的是自己的格式(底层是lmdb)。我们在之前的版本中有采取转换的思路,这个版本可能还没有适配? @zhihuiwan

感谢回复,我看现在官方默认upload 数据的时候,里面配置对应的数据路径(csv文件),但是这个路径必须存在在fate flow里面,这样Job里面会出现tranfromer这些,我理解就是你说的转成imdb格式。

@FancyXun
Copy link
Author

我理解原始数据最终都是要转成fate能读取的数据格式lmdb,现在都是直接从内置的fate flow 里面读取csv文件进行转换,我的述求就是如何读取外部的数据库,进行转换也可。我理解直接1.x版本是可以 table bind 一个外部数据源,比如mysql这样。@sagewe

@sagewe
Copy link
Contributor

sagewe commented Jul 18, 2024

我理解原始数据最终都是要转成fate能读取的数据格式lmdb,现在都是直接从内置的fate flow 里面读取csv文件进行转换,我的述求就是如何读取外部的数据库,进行转换也可。我理解直接1.x版本是可以 table bind 一个外部数据源,比如mysql这样。@sagewe

是的,这个后续版本会有支持,是我们推进容器化支持的一部分

@FancyXun
Copy link
Author

FancyXun commented Jul 18, 2024

理想情况下如果底层的引擎直接支持是最简单的,只需要在https://github.com/FederatedAI/FATE/blob/master/python/fate/arch/computing/backends/eggroll/_csession.py#L64-L90 中插入新的uri支持

因为近期有这个需求,可能等不到你们升级了,那如果我想要支持,是不是按照你说的说法得在这里修改成本最小,读取外部数据源?

@sagewe
Copy link
Contributor

sagewe commented Jul 19, 2024

理想情况下如果底层的引擎直接支持是最简单的,只需要在https://github.com/FederatedAI/FATE/blob/master/python/fate/arch/computing/backends/eggroll/_csession.py#L64-L90 中插入新的uri支持

因为近期有这个需求,可能等不到你们升级了,那如果我想要支持,是不是按照你说的说法得在这里修改成本最小,读取外部数据源?

从这里改可能更简单:

class TableReader(_ArtifactTypeReader):
def read(self):
self.artifact.consumed()
return self.ctx.computing.load(
uri=self.artifact.uri,
schema=self.artifact.metadata.metadata.get("schema", {}),
options=self.artifact.metadata.metadata.get("options", None),
)

class TableReader(_ArtifactTypeReader):
    def read(self):
        self.artifact.consumed()
        if self.artifact.uri.scheme == "mysql":
            from sqlalchemy import create_engine
            import copy

            database, table = self.artifact.uri.path_splits()
            database_uri = copy.deepcopy(self.artifact.uri)
            database_uri.path.replace(f"/{table}", "")
            engine = create_engine(database_uri.to_string(), echo=True)
            with engine.connect() as con:

                rs = con.execute(f'SELECT * FROM {table}')
                def get_data():
                    for row in rs:
                        # TODO: process row
                        yield ...

                table = self.ctx.computing.parallelize(
                    data=get_data(),
                    partition=16,
                )
                table.schema = self.artifact.metadata.metadata.get("schema", {})
                return table

        return self.ctx.computing.load(
            uri=self.artifact.uri,
            schema=self.artifact.metadata.metadata.get("schema", {}),
            options=self.artifact.metadata.metadata.get("options", None),
        )

你可能会碰到的问题:

  1. flow是否能传递mysql uri进来?这个需要 @zhihuiwan 来给你相应的指导
  2. 读mysql是单线程的,数据量大了可能有点慢

在这个位置实现的利弊

  • 好处是对引擎透明
  • 缺点是如果底层引擎有更好的实现无法发挥,但是可以根据未来需要通过简单的接口重构克服

@sagewe sagewe self-assigned this Jul 19, 2024
@FancyXun
Copy link
Author

FancyXun commented Jul 22, 2024

理想情况下如果底层的引擎直接支持是最简单的,只需要在https://github.com/FederatedAI/FATE/blob/master/python/fate/arch/computing/backends/eggroll/_csession.py#L64-L90 中插入新的uri支持

因为近期有这个需求,可能等不到你们升级了,那如果我想要支持,是不是按照你说的说法得在这里修改成本最小,读取外部数据源?

从这里改可能更简单:

class TableReader(_ArtifactTypeReader):
def read(self):
self.artifact.consumed()
return self.ctx.computing.load(
uri=self.artifact.uri,
schema=self.artifact.metadata.metadata.get("schema", {}),
options=self.artifact.metadata.metadata.get("options", None),
)

class TableReader(_ArtifactTypeReader):
    def read(self):
        self.artifact.consumed()
        if self.artifact.uri.scheme == "mysql":
            from sqlalchemy import create_engine
            import copy

            database, table = self.artifact.uri.path_splits()
            database_uri = copy.deepcopy(self.artifact.uri)
            database_uri.path.replace(f"/{table}", "")
            engine = create_engine(database_uri.to_string(), echo=True)
            with engine.connect() as con:

                rs = con.execute(f'SELECT * FROM {table}')
                def get_data():
                    for row in rs:
                        # TODO: process row
                        yield ...

                table = self.ctx.computing.parallelize(
                    data=get_data(),
                    partition=16,
                )
                table.schema = self.artifact.metadata.metadata.get("schema", {})
                return table

        return self.ctx.computing.load(
            uri=self.artifact.uri,
            schema=self.artifact.metadata.metadata.get("schema", {}),
            options=self.artifact.metadata.metadata.get("options", None),
        )

你可能会碰到的问题:

  1. flow是否能传递mysql uri进来?这个需要 @zhihuiwan 来给你相应的指导
  2. 读mysql是单线程的,数据量大了可能有点慢

在这个位置实现的利弊

  • 好处是对引擎透明
  • 缺点是如果底层引擎有更好的实现无法发挥,但是可以根据未来需要通过简单的接口重构克服

非常感谢你提供的思路,这个我后续可以看看如何实现,目前使用了一个比较简单快捷的办法FederatedAI/FATE-Flow#574

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants