diff --git a/.gitmodules b/.gitmodules index d90eae78ca..3e981458c8 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,12 +1,12 @@ [submodule "fateboard"] path = fateboard url = https://github.com/FederatedAI/FATE-Board.git - branch = v1.11.1 + branch = v1.11.2 [submodule "eggroll"] path = eggroll url = https://github.com/WeBankFinTech/eggroll.git - branch = v2.5.1 + branch = v2.5.2 [submodule "fateflow"] path = fateflow url = https://github.com/FederatedAI/FATE-Flow.git - branch = v1.11.1 + branch = v1.11.2 diff --git a/README.md b/README.md index 805a2b158b..baa3995af7 100644 --- a/README.md +++ b/README.md @@ -50,7 +50,7 @@ Deploying FATE to multiple nodes to achieve scalability, reliability and managea - [EggRoll](https://github.com/WeBankFinTech/eggroll): A simple high-performance computing framework for (federated) machine learning. - [AnsibleFATE](https://github.com/FederatedAI/AnsibleFATE): A tool to optimize and automate the configuration and deployment operations via Ansible. - [FATE-Builder](https://github.com/FederatedAI/FATE-Builder): A tool to build package and docker image for FATE and KubeFATE. -- [FATE-LLM](https://github.com/FederatedAI/FATE-LLM/blob/main/README.md) +- [FATE-LLM](https://github.com/FederatedAI/FATE-LLM/blob/main/README.md) : A framework to support federated learning for large language models(LLMs). ## Documentation ### FATE Design diff --git a/RELEASE.md b/RELEASE.md index 63f61ac848..fd0b24b247 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -1,3 +1,10 @@ +## Release 1.11.3 +### Major Features and Improvements +> FederatedML +* FedAVGTrainer update code strcuture: support OffsitetTuningTrainer +* FedAVGTrainer update log format: report batch progress instead of batch index + + ## Release 1.11.2 ### Major Features and Improvements > FederatedML diff --git a/deploy/cluster-deploy/doc/fate_on_spark/fate_on_spark_deployment_guide.zh.md b/deploy/cluster-deploy/doc/fate_on_spark/fate_on_spark_deployment_guide.zh.md index bf63423ce1..10c0cf5245 100644 --- a/deploy/cluster-deploy/doc/fate_on_spark/fate_on_spark_deployment_guide.zh.md +++ b/deploy/cluster-deploy/doc/fate_on_spark/fate_on_spark_deployment_guide.zh.md @@ -183,7 +183,7 @@ wget https://webank-ai-1251170195.cos.ap-guangzhou.myqcloud.com/fate/${version}/ scp *.tar.gz app@192.168.0.1:/data/projects/install scp *.tar.gz app@192.168.0.2:/data/projects/install ``` -注意: 当前文档需要部署的FATE version>=1.7.0,${version}替换为如1.11.2,不带v字符 +注意: 当前文档需要部署的FATE version>=1.7.0,${version}替换为如1.11.3,不带v字符 ### 5.2 操作系统参数检查 **在目标服务器(192.168.0.1 192.168.0.2 192.168.0.3)app用户下执行** diff --git a/deploy/standalone-deploy/README.md b/deploy/standalone-deploy/README.md index 217af0274e..0a389d31cb 100644 --- a/deploy/standalone-deploy/README.md +++ b/deploy/standalone-deploy/README.md @@ -41,7 +41,7 @@ export version={FATE version for this deployment} example: ```bash -export version=1.11.2 +export version=1.11.3 ``` ### 2.2 Pulling mirrors diff --git a/deploy/standalone-deploy/README.zh.md b/deploy/standalone-deploy/README.zh.md index e1ffdbcbb0..de1da9b58d 100644 --- a/deploy/standalone-deploy/README.zh.md +++ b/deploy/standalone-deploy/README.zh.md @@ -35,13 +35,13 @@ 设置部署所需环境变量(注意, 通过以下方式设置的环境变量仅在当前终端会话有效, 若打开新的终端会话, 如重新登录或者新窗口, 请重新设置) ```bash -export version={本次部署的FATE版本号, 如1.11.2} +export version={本次部署的FATE版本号, 如1.11.3} ``` 样例: ```bash -export version=1.11.2 +export version=1.11.3 ``` ### 2.2 拉取镜像 diff --git a/doc/federatedml_component/README.md b/doc/federatedml_component/README.md index be3ca4c6bb..0b1a8c561b 100644 --- a/doc/federatedml_component/README.md +++ b/doc/federatedml_component/README.md @@ -40,7 +40,6 @@ provide: | [Homo-LR](logistic_regression.md) | HomoLR | Build homo logistic regression model through multiple parties. | Table, values are instances. | Table, values are instances. | | Logistic Regression Model, consists of model-meta and model-param. | | [Homo-NN](homo_nn.md) | HomoNN | Build homo neural network model through multiple parties. | Table, values are instances. | Table, values are instances. | | Neural Network Model, consists of model-meta and model-param. | | [Hetero Secure Boosting](ensemble.md) | HeteroSecureBoost | Build hetero secure boosting model through multiple parties | Table, values are instances. | Table, values are instances. | | SecureBoost Model, consists of model-meta and model-param. | -| [Hetero Fast Secure Boosting](ensemble.md) | HeteroFastSecureBoost | Build hetero secure boosting model through multiple parties in layered/mix manners. | Table, values are instances. | Table, values are instances. | | FastSecureBoost Model, consists of model-meta and model-param. | | [Evaluation](evaluation.md) | Evaluation | Output the model evaluation metrics for user. | Table(s), values are instances. | | | | | [Hetero Pearson](correlation.md) | HeteroPearson | Calculate hetero correlation of features from different parties. | Table, values are instances. | | | | | [Hetero-NN](hetero_nn.md) | HeteroNN | Build hetero neural network model. | Table, values are instances. | Table, values are instances. | | Hetero Neural Network Model, consists of model-meta and model-param. | diff --git a/doc/federatedml_component/README.zh.md b/doc/federatedml_component/README.zh.md index c70df233e0..0114b9a5e5 100644 --- a/doc/federatedml_component/README.zh.md +++ b/doc/federatedml_component/README.zh.md @@ -30,7 +30,6 @@ Federatedml模块包括许多常见机器学习算法联邦化实现。所有模 | [Homo-LR](logistic_regression.md) | HomoLR | 通过多方构建横向逻辑回归模块。 | Table, 值为Instance | | | Logistic回归模型,由模型本身和模型参数组成 | | [Homo-NN](homo_nn.md) | HomoNN | 通过多方构建横向神经网络模块。 | Table, 值为Instance | | | 神经网络模型,由模型本身和模型参数组成 | | [Hetero Secure Boosting](ensemble.md) | HeteroSecureBoost | 通过多方构建纵向Secure Boost模块。 | Table,值为Instance | | | SecureBoost模型,由模型本身和模型参数组成 | -| [Hetero Fast Secure Boosting](ensemble.md) | HeteroFastSecureBoost | 使用分层/混合模式快速构建树模型 | Table,值为Instance | Table,值为Instance | | FastSecureBoost模型 | | [Evaluation](evaluation.md) | Evaluation | 为用户输出模型评估指标。 | Table(s), 值为Instance | | | | | [Hetero Pearson](correlation.md) | HeteroPearson | 计算来自不同方的特征的Pearson相关系数。 | Table, 值为Instance | | | | | [Hetero-NN](hetero_nn.md) | HeteroNN | 构建纵向神经网络模块。 | Table, 值为Instance | | | 纵向神经网络模型 | diff --git a/eggroll b/eggroll index 28cda38c27..ccdddf2c10 160000 --- a/eggroll +++ b/eggroll @@ -1 +1 @@ -Subproject commit 28cda38c27fe1e94387ba3dbf7e212fc033920fb +Subproject commit ccdddf2c108b9b3e556f2688dc41140a599f71e7 diff --git a/fate.env b/fate.env index 6c7f6c1df1..0c312f1a3b 100644 --- a/fate.env +++ b/fate.env @@ -1,7 +1,7 @@ -FATE=1.11.2 -FATEFlow=1.11.1 -FATEBoard=1.11.1 -EGGROLL=2.5.1 +FATE=1.11.3 +FATEFlow=1.11.2 +FATEBoard=1.11.2 +EGGROLL=2.5.2 CENTOS=7.2 UBUNTU=16.04 PYTHON=3.8 diff --git a/fateboard b/fateboard index cc9d36360c..ce465d745a 160000 --- a/fateboard +++ b/fateboard @@ -1 +1 @@ -Subproject commit cc9d36360cc1bd9904bcde72973db201eb1b4a9c +Subproject commit ce465d745a461d61cbfbed67e586b80bf88ccada diff --git a/fateflow b/fateflow index 0c7ea37724..c8167883fb 160000 --- a/fateflow +++ b/fateflow @@ -1 +1 @@ -Subproject commit 0c7ea37724e7d6990ad5b0c1d9d92d008ef7a37f +Subproject commit c8167883fbfc69afdcfedbdded2f400f8f7b289c diff --git a/python/fate_client/pipeline/component/homo_nn.py b/python/fate_client/pipeline/component/homo_nn.py index c6b68adf6a..9f821b6566 100644 --- a/python/fate_client/pipeline/component/homo_nn.py +++ b/python/fate_client/pipeline/component/homo_nn.py @@ -44,7 +44,8 @@ 'loss': None, 'optimizer': None, 'nn_define': None, - 'ds_config': None + 'ds_config': None, + 'server_init': False } except Exception as e: print(e) @@ -65,7 +66,10 @@ class HomoNN(FateComponent): torch_seed, global random seed loss, loss function from fate_torch optimizer, optimizer from fate_torch + ds_config, config for deepspeed model, a fate torch sequential defining the model structure + server_init, whether to initialize the model, loss and optimizer on server, if configs are provided, they will be used. In + current version this option is specially designed for offsite-tuning """ @extract_explicit_parameter @@ -82,7 +86,9 @@ def __init__(self, loss=None, optimizer: OptimizerType = None, ds_config: dict = None, - model: Sequential = None, **kwargs): + model: Sequential = None, + server_init: bool = False, + **kwargs): explicit_parameters = copy.deepcopy(DEFAULT_PARAM_DICT) if 'name' not in kwargs["explict_parameters"]: @@ -94,8 +100,15 @@ def __init__(self, self.input = Input(self.name, data_type="multi") self.output = Output(self.name, data_type='single') self._module_name = "HomoNN" - self._updated = {'trainer': False, 'dataset': False, - 'torch_seed': False, 'loss': False, 'optimizer': False, 'model': False} + self._updated = { + 'trainer': False, + 'dataset': False, + 'torch_seed': False, + 'loss': False, + 'optimizer': False, + 'model': False, + 'ds_config': False, + 'server_init': False} self._set_param(kwargs["explict_parameters"]) self._check_parameters() diff --git a/python/fate_client/pipeline/component/nn/backend/torch/cust.py b/python/fate_client/pipeline/component/nn/backend/torch/cust.py index 4eba0c54c6..de60736081 100644 --- a/python/fate_client/pipeline/component/nn/backend/torch/cust.py +++ b/python/fate_client/pipeline/component/nn/backend/torch/cust.py @@ -3,9 +3,12 @@ from pipeline.component.nn.backend.torch.base import FateTorchLayer, FateTorchLoss import difflib +ML_PATH = 'federatedml.nn' +LLM_PATH = "fate_llm" -MODEL_PATH = None -LOSS_PATH = None +LLM_MODEL_PATH = '{}.model_zoo'.format(LLM_PATH) +MODEL_PATH = '{}.model_zoo'.format(ML_PATH) +LOSS_PATH = '{}.loss'.format(ML_PATH) def str_simi(str_a, str_b): @@ -45,9 +48,14 @@ class CustModel(FateTorchLayer, nn.Module): def __init__(self, module_name, class_name, **kwargs): super(CustModel, self).__init__() - assert isinstance(module_name, str), 'name must be a str, specify the module in the model_zoo' - assert isinstance(class_name, str), 'class name must be a str, specify the class in the module' - self.param_dict = {'module_name': module_name, 'class_name': class_name, 'param': kwargs} + assert isinstance( + module_name, str), 'name must be a str, specify the module in the model_zoo' + assert isinstance( + class_name, str), 'class name must be a str, specify the class in the module' + self.param_dict = { + 'module_name': module_name, + 'class_name': class_name, + 'param': kwargs} self._model = None def init_model(self): @@ -62,11 +70,18 @@ def forward(self, x): def get_pytorch_model(self, module_path=None): if module_path is None: - return get_class( - self.param_dict['module_name'], - self.param_dict['class_name'], - self.param_dict['param'], - MODEL_PATH) + try: + return get_class( + self.param_dict['module_name'], + self.param_dict['class_name'], + self.param_dict['param'], + MODEL_PATH) + except BaseException: + return get_class( + self.param_dict['module_name'], + self.param_dict['class_name'], + self.param_dict['param'], + LLM_MODEL_PATH) else: return get_class( self.param_dict['module_name'], diff --git a/python/fate_client/pyproject.toml b/python/fate_client/pyproject.toml index 2affdd8f40..899f3445d6 100644 --- a/python/fate_client/pyproject.toml +++ b/python/fate_client/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "fate_client" -version = "1.11.2" +version = "1.11.3" description = "Clients for FATE, including flow_client and pipeline" authors = ["FederatedAI "] license = "Apache-2.0" diff --git a/python/fate_client/setup.py b/python/fate_client/setup.py index 77e20c3eb2..3473774490 100644 --- a/python/fate_client/setup.py +++ b/python/fate_client/setup.py @@ -48,7 +48,7 @@ setup_kwargs = { "name": "fate-client", - "version": "1.11.2", + "version": "1.11.3", "description": "Clients for FATE, including flow_client and pipeline", "long_description": "FATE Client\n===========\n\nTools for interacting with FATE.\n\nquick start\n-----------\n\n1. (optional) create virtual env\n\n .. code-block:: bash\n\n python -m venv venv\n source venv/bin/activate\n\n\n2. install FATE Client\n\n .. code-block:: bash\n\n pip install fate-client\n\n\nPipeline\n========\n\nA high-level python API that allows user to design, start,\nand query FATE jobs in a sequential manner. For more information,\nplease refer to this `guide <./pipeline/README.rst>`__\n\nInitial Configuration\n---------------------\n\n1. Configure server information\n\n .. code-block:: bash\n\n # configure values in pipeline/config.yaml\n # use real ip address to configure pipeline\n pipeline init --ip 127.0.0.1 --port 9380 --log-directory ./logs\n\n\nFATE Flow Command Line Interface (CLI) v2\n=========================================\n\nA command line interface providing series of commands for user to design, start,\nand query FATE jobs. For more information, please refer to this `guide <./flow_client/README.rst>`__\n\nInitial Configuration\n---------------------\n\n1. Configure server information\n\n .. code-block:: bash\n\n # configure values in conf/service_conf.yaml\n flow init -c /data/projects/fate/conf/service_conf.yaml\n # use real ip address to initialize cli\n flow init --ip 127.0.0.1 --port 9380\n\n", "author": "FederatedAI", diff --git a/python/fate_test/pyproject.toml b/python/fate_test/pyproject.toml index 040ccef0d6..f858cfb1c7 100644 --- a/python/fate_test/pyproject.toml +++ b/python/fate_test/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "fate_test" -version = "1.11.2" +version = "1.11.3" description = "test tools for FATE" authors = ["FederatedAI "] license = "Apache-2.0" diff --git a/python/fate_test/setup.py b/python/fate_test/setup.py index 3341c656ad..6d23171c07 100644 --- a/python/fate_test/setup.py +++ b/python/fate_test/setup.py @@ -23,7 +23,7 @@ setup_kwargs = { "name": "fate-test", - "version": "1.11.2", + "version": "1.11.3", "description": "test tools for FATE", "long_description": 'FATE Test\n=========\n\nA collection of useful tools to running FATE\'s test.\n\n.. image:: images/tutorial.gif\n :align: center\n :alt: tutorial\n\nquick start\n-----------\n\n1. (optional) create virtual env\n\n .. code-block:: bash\n\n python -m venv venv\n source venv/bin/activate\n pip install -U pip\n\n\n2. install fate_test\n\n .. code-block:: bash\n\n pip install fate_test\n fate_test --help\n\n\n3. edit default fate_test_config.yaml\n\n .. code-block:: bash\n\n # edit priority config file with system default editor\n # filling some field according to comments\n fate_test config edit\n\n4. configure FATE-Pipeline and FATE-Flow Commandline server setting\n\n.. code-block:: bash\n\n # configure FATE-Pipeline server setting\n pipeline init --port 9380 --ip 127.0.0.1\n # configure FATE-Flow Commandline server setting\n flow init --port 9380 --ip 127.0.0.1\n\n5. run some fate_test suite\n\n .. code-block:: bash\n\n fate_test suite -i \n\n\n6. run some fate_test benchmark\n\n .. code-block:: bash\n\n fate_test benchmark-quality -i \n\n7. useful logs or exception will be saved to logs dir with namespace shown in last step\n\ndevelop install\n---------------\nIt is more convenient to use the editable mode during development: replace step 2 with flowing steps\n\n.. code-block:: bash\n\n pip install -e ${FATE}/python/fate_client && pip install -e ${FATE}/python/fate_test\n\n\n\ncommand types\n-------------\n\n- suite: used for running testsuites, collection of FATE jobs\n\n .. code-block:: bash\n\n fate_test suite -i \n\n\n- benchmark-quality used for comparing modeling quality between FATE and other machine learning systems\n\n .. code-block:: bash\n\n fate_test benchmark-quality -i \n\n\n\nconfiguration by examples\n--------------------------\n\n1. no need ssh tunnel:\n\n - 9999, service: service_a\n - 10000, service: service_b\n\n and both service_a, service_b can be requested directly:\n\n .. code-block:: yaml\n\n work_mode: 1 # 0 for standalone, 1 for cluster\n data_base_dir: \n parties:\n guest: [10000]\n host: [9999, 10000]\n arbiter: [9999]\n services:\n - flow_services:\n - {address: service_a, parties: [9999]}\n - {address: service_b, parties: [10000]}\n\n2. need ssh tunnel:\n\n - 9999, service: service_a\n - 10000, service: service_b\n\n service_a, can be requested directly while service_b don\'t,\n but you can request service_b in other node, say B:\n\n .. code-block:: yaml\n\n work_mode: 0 # 0 for standalone, 1 for cluster\n data_base_dir: \n parties:\n guest: [10000]\n host: [9999, 10000]\n arbiter: [9999]\n services:\n - flow_services:\n - {address: service_a, parties: [9999]}\n - flow_services:\n - {address: service_b, parties: [10000]}\n ssh_tunnel: # optional\n enable: true\n ssh_address: :\n ssh_username: \n ssh_password: # optional\n ssh_priv_key: "~/.ssh/id_rsa"\n\n\nTestsuite\n---------\n\nTestsuite is used for running a collection of jobs in sequence. Data used for jobs could be uploaded before jobs are\nsubmitted, and are cleaned when jobs finished. This tool is useful for FATE\'s release test.\n\ncommand options\n~~~~~~~~~~~~~~~\n\n.. code-block:: bash\n\n fate_test suite --help\n\n1. include:\n\n .. code-block:: bash\n\n fate_test suite -i \n\n will run testsuites in *path1*\n\n2. exclude:\n\n .. code-block:: bash\n\n fate_test suite -i -e -e ...\n\n will run testsuites in *path1* but not in *path2* and *path3*\n\n3. glob:\n\n .. code-block:: bash\n\n fate_test suite -i -g "hetero*"\n\n will run testsuites in sub directory start with *hetero* of *path1*\n\n4. replace:\n\n .. code-block:: bash\n\n fate_test suite -i -r \'{"maxIter": 5}\'\n\n will find all key-value pair with key "maxIter" in `data conf` or `conf` or `dsl` and replace the value with 5\n\n\n5. skip-data:\n\n .. code-block:: bash\n\n fate_test suite -i --skip-data\n\n will run testsuites in *path1* without uploading data specified in *benchmark.json*.\n\n\n6. yes:\n\n .. code-block:: bash\n\n fate_test suite -i --yes\n\n will run testsuites in *path1* directly, skipping double check\n\n7. skip-dsl-jobs:\n\n .. code-block:: bash\n\n fate_test suite -i --skip-dsl-jobs\n\n will run testsuites in *path1* but skip all *tasks* in testsuites. It\'s would be useful when only pipeline tasks needed.\n\n8. skip-pipeline-jobs:\n\n .. code-block:: bash\n\n fate_test suite -i --skip-pipeline-jobs\n\n will run testsuites in *path1* but skip all *pipeline tasks* in testsuites. It\'s would be useful when only dsl tasks needed.\n\n\nBenchmark Quality\n------------------\n\nBenchmark-quality is used for comparing modeling quality between FATE\nand other machine learning systems. Benchmark produces a metrics comparison\nsummary for each benchmark job group.\n\n.. code-block:: bash\n\n fate_test benchmark-quality -i examples/benchmark_quality/hetero_linear_regression\n\n.. code-block:: bash\n\n +-------+--------------------------------------------------------------+\n | Data | Name |\n +-------+--------------------------------------------------------------+\n | train | {\'guest\': \'motor_hetero_guest\', \'host\': \'motor_hetero_host\'} |\n | test | {\'guest\': \'motor_hetero_guest\', \'host\': \'motor_hetero_host\'} |\n +-------+--------------------------------------------------------------+\n +------------------------------------+--------------------+--------------------+-------------------------+---------------------+\n | Model Name | explained_variance | r2_score | root_mean_squared_error | mean_squared_error |\n +------------------------------------+--------------------+--------------------+-------------------------+---------------------+\n | local-linear_regression-regression | 0.9035168452250094 | 0.9035070863155368 | 0.31340413289880553 | 0.09822215051805216 |\n | FATE-linear_regression-regression | 0.903146386539082 | 0.9031411831961411 | 0.3139977881119483 | 0.09859461093919596 |\n +------------------------------------+--------------------+--------------------+-------------------------+---------------------+\n +-------------------------+-----------+\n | Metric | All Match |\n +-------------------------+-----------+\n | explained_variance | True |\n | r2_score | True |\n | root_mean_squared_error | True |\n | mean_squared_error | True |\n +-------------------------+-----------+\n\ncommand options\n~~~~~~~~~~~~~~~\n\nuse the following command to show help message\n\n.. code-block:: bash\n\n fate_test benchmark-quality --help\n\n1. include:\n\n .. code-block:: bash\n\n fate_test benchmark-quality -i \n\n will run benchmark testsuites in *path1*\n\n2. exclude:\n\n .. code-block:: bash\n\n fate_test benchmark-quality -i -e -e ...\n\n will run benchmark testsuites in *path1* but not in *path2* and *path3*\n\n3. glob:\n\n .. code-block:: bash\n\n fate_test benchmark-quality -i -g "hetero*"\n\n will run benchmark testsuites in sub directory start with *hetero* of *path1*\n\n4. tol:\n\n .. code-block:: bash\n\n fate_test benchmark-quality -i -t 1e-3\n\n will run benchmark testsuites in *path1* with absolute tolerance of difference between metrics set to 0.001.\n If absolute difference between metrics is smaller than *tol*, then metrics are considered\n almost equal. Check benchmark testsuite `writing guide <#benchmark-testsuite>`_ on setting alternative tolerance.\n\n5. skip-data:\n\n .. code-block:: bash\n\n fate_test benchmark-quality -i --skip-data\n\n will run benchmark testsuites in *path1* without uploading data specified in *benchmark.json*.\n\n\n6. yes:\n\n .. code-block:: bash\n\n fate_test benchmark-quality -i --yes\n\n will run benchmark testsuites in *path1* directly, skipping double check\n\n\nbenchmark testsuite\n~~~~~~~~~~~~~~~~~~~\n\nConfiguration of jobs should be specified in a benchmark testsuite whose file name ends\nwith "\\*benchmark.json". For benchmark testsuite example,\nplease refer `here <../../examples/benchmark_quality>`_.\n\nA benchmark testsuite includes the following elements:\n\n- data: list of local data to be uploaded before running FATE jobs\n\n - file: path to original data file to be uploaded, should be relative to testsuite or FATE installation path\n - head: whether file includes header\n - partition: number of partition for data storage\n - table_name: table name in storage\n - namespace: table namespace in storage\n - role: which role to upload the data, as specified in fate_test.config;\n naming format is: "{role_type}_{role_index}", index starts at 0\n\n .. code-block:: json\n\n "data": [\n {\n "file": "examples/data/motor_hetero_host.csv",\n "head": 1,\n "partition": 8,\n "table_name": "motor_hetero_host",\n "namespace": "experiment",\n "role": "host_0"\n }\n ]\n\n- job group: each group includes arbitrary number of jobs with paths to corresponding script and configuration\n\n - job: name of job to be run, must be unique within each group list\n\n - script: path to `testing script <#testing-script>`_, should be relative to testsuite\n - conf: path to job configuration file for script, should be relative to testsuite\n\n .. code-block:: json\n\n "local": {\n "script": "./local-linr.py",\n "conf": "./linr_config.yaml"\n }\n\n - compare_setting: additional setting for quality metrics comparison, currently only takes ``relative_tol``\n\n If metrics *a* and *b* satisfy *abs(a-b) <= max(relative_tol \\* max(abs(a), abs(b)), absolute_tol)*\n (from `math module `_),\n they are considered almost equal. In the below example, metrics from "local" and "FATE" jobs are\n considered almost equal if their relative difference is smaller than\n *0.05 \\* max(abs(local_metric), abs(pipeline_metric)*.\n\n .. code-block:: json\n\n "linear_regression-regression": {\n "local": {\n "script": "./local-linr.py",\n "conf": "./linr_config.yaml"\n },\n "FATE": {\n "script": "./fate-linr.py",\n "conf": "./linr_config.yaml"\n },\n "compare_setting": {\n "relative_tol": 0.01\n }\n }\n\n\ntesting script\n~~~~~~~~~~~~~~\n\nAll job scripts need to have ``Main`` function as an entry point for executing jobs; scripts should\nreturn two dictionaries: first with data information key-value pairs: {data_type}: {data_name_dictionary};\nthe second contains {metric_name}: {metric_value} key-value pairs for metric comparison.\n\nBy default, the final data summary shows the output from the job named "FATE"; if no such job exists,\ndata information returned by the first job is shown. For clear presentation, we suggest that user follow\nthis general `guideline <../../examples/data/README.md#data-set-naming-rule>`_ for data set naming. In the case of multi-host\ntask, consider numbering host as such:\n\n::\n\n {\'guest\': \'default_credit_homo_guest\',\n \'host_1\': \'default_credit_homo_host_1\',\n \'host_2\': \'default_credit_homo_host_2\'}\n\nReturned quality metrics of the same key are to be compared.\nNote that only **real-value** metrics can be compared.\n\n- FATE script: ``Main`` always has three inputs:\n\n - config: job configuration, `JobConfig <../fate_client/pipeline/utils/tools.py#L64>`_ object loaded from "fate_test_config.yaml"\n - param: job parameter setting, dictionary loaded from "conf" file specified in benchmark testsuite\n - namespace: namespace suffix, user-given *namespace* or generated timestamp string when using *namespace-mangling*\n\n- non-FATE script: ``Main`` always has one input:\n\n - param: job parameter setting, dictionary loaded from "conf" file specified in benchmark testsuite\n\n\ndata\n----\n\n`Data` sub-command is used for upload or delete dataset in suite\'s.\n\ncommand options\n~~~~~~~~~~~~~~~\n\n.. code-block:: bash\n\n fate_test data --help\n\n1. include:\n\n .. code-block:: bash\n\n fate_test data [upload|delete] -i \n\n will upload/delete dataset in testsuites in *path1*\n\n2. exclude:\n\n .. code-block:: bash\n\n fate_test data [upload|delete] -i -e -e ...\n\n will upload/delete dataset in testsuites in *path1* but not in *path2* and *path3*\n\n3. glob:\n\n .. code-block:: bash\n\n fate_test data [upload|delete] -i -g "hetero*"\n\n will upload/delete dataset in testsuites in sub directory start with *hetero* of *path1*\n\n\nfull command options\n---------------------\n\n.. click:: fate_test.scripts.cli:cli\n :prog: fate_test\n :show-nested:\n', "author": "FederatedAI", diff --git a/python/federatedml/framework/homo/aggregator/aggregator_base.py b/python/federatedml/framework/homo/aggregator/aggregator_base.py index d00f8d52f0..d46e96f195 100644 --- a/python/federatedml/framework/homo/aggregator/aggregator_base.py +++ b/python/federatedml/framework/homo/aggregator/aggregator_base.py @@ -1,4 +1,5 @@ from federatedml.framework.homo.blocks import ServerCommunicator, ClientCommunicator +from federatedml.util import consts class AutoSuffix(object): @@ -19,7 +20,10 @@ def __call__(self): class AggregatorBaseClient(object): - def __init__(self, communicate_match_suffix: str = None): + def __init__( + self, communicate_match_suffix: str = None, server=( + consts.ARBITER,), clients=( + consts.GUEST, consts.HOST)): """Base class of client aggregator Parameters @@ -28,7 +32,7 @@ def __init__(self, communicate_match_suffix: str = None): To make sure that client and server can communicate correctly, the server-side and client-side aggregators need to have the same suffix """ - self.communicator = ClientCommunicator(prefix=communicate_match_suffix) + self.communicator = ClientCommunicator(prefix=communicate_match_suffix, server=server, clients=clients) self.suffix = {} def _get_suffix(self, var_name, user_suffix=tuple()): @@ -52,7 +56,7 @@ def get(self, suffix): class AggregatorBaseServer(object): - def __init__(self, communicate_match_suffix=None): + def __init__(self, communicate_match_suffix=None, server=(consts.ARBITER,), clients=(consts.GUEST, consts.HOST)): """Base class of server aggregator Parameters @@ -61,7 +65,7 @@ def __init__(self, communicate_match_suffix=None): To make sure that client and server can communicate correctly, the server-side and client-side aggregators need to have the same suffix """ - self.communicator = ServerCommunicator(prefix=communicate_match_suffix) + self.communicator = ServerCommunicator(prefix=communicate_match_suffix, server=server, clients=clients) self.suffix = {} def _get_suffix(self, var_name, user_suffix=tuple()): diff --git a/python/federatedml/framework/homo/aggregator/secure_aggregator.py b/python/federatedml/framework/homo/aggregator/secure_aggregator.py index ce6a7ec545..0ebe7f5180 100644 --- a/python/federatedml/framework/homo/aggregator/secure_aggregator.py +++ b/python/federatedml/framework/homo/aggregator/secure_aggregator.py @@ -14,11 +14,22 @@ class SecureAggregatorClient(AggregatorBaseClient): - def __init__(self, secure_aggregate=True, aggregate_type='weighted_mean', aggregate_weight=1.0, - communicate_match_suffix=None): + def __init__( + self, + secure_aggregate=True, + aggregate_type='weighted_mean', + aggregate_weight=1.0, + communicate_match_suffix=None, + server=( + consts.ARBITER, + ), + clients=( + consts.GUEST, + consts.HOST), + lm_aggregate=False): super(SecureAggregatorClient, self).__init__( - communicate_match_suffix=communicate_match_suffix) + communicate_match_suffix=communicate_match_suffix, clients=clients, server=server) self.secure_aggregate = secure_aggregate self.suffix = { "local_loss": AutoSuffix("local_loss"), @@ -31,7 +42,10 @@ def __init__(self, secure_aggregate=True, aggregate_type='weighted_mean', aggreg # init secure aggregate random padding: if self.secure_aggregate: self._random_padding_cipher: PadsCipher = RandomPaddingCipherClient( - trans_var=RandomPaddingCipherTransVar(prefix=communicate_match_suffix)).create_cipher() + trans_var=RandomPaddingCipherTransVar( + prefix=communicate_match_suffix, + clients=clients, + server=server)).create_cipher() LOGGER.info('initialize secure aggregator done') # compute weight @@ -50,9 +64,20 @@ def __init__(self, secure_aggregate=True, aggregate_type='weighted_mean', aggreg self._weight = 1 self._set_table_amplify_factor = False + self._lm_aggregate = lm_aggregate LOGGER.debug('aggregate compute weight is {}'.format(self._weight)) + def _handle_table_data(self, model): + + model = model.mapValues(lambda x: x * self._weight) + if self.secure_aggregate: + if not self._set_table_amplify_factor: + self._random_padding_cipher.set_amplify_factor( + consts.SECURE_AGG_AMPLIFY_FACTOR) + model = self._random_padding_cipher.encrypt_table(model) + return model + def _process_model(self, model): to_agg = None @@ -70,19 +95,21 @@ def _process_model(self, model): # is FATE distrubed Table elif is_table(model): - model = model.mapValues(lambda x: x * self._weight) - - if self.secure_aggregate: - if not self._set_table_amplify_factor: - self._random_padding_cipher.set_amplify_factor( - consts.SECURE_AGG_AMPLIFY_FACTOR) - model = self._random_padding_cipher.encrypt_table(model) - return model + return self._handle_table_data(model) if isinstance(model, t.nn.Module): - parameters = list(model.parameters()) - tmp_list = [[np.array(p.cpu().detach().tolist()) for p in parameters if p.requires_grad]] - LOGGER.debug('Aggregate trainable parameters: {}/{}'.format(len(tmp_list[0]), len(parameters))) + if self._lm_aggregate: + # if model is large, cannot send large object, need to break into key/value + # and aggregate them in the format of distributed table + from fate_arch.session import computing_session as session + parameters = list(model.named_parameters()) + tmp_list = [(k, v.cpu().detach().numpy()) for k, v in parameters if v.requires_grad] + table = session.parallelize(tmp_list, include_key=True, partition=4) + return self._handle_table_data(table) + else: + parameters = list(model.parameters()) + tmp_list = [[np.array(p.cpu().detach().tolist()) for p in parameters if p.requires_grad]] + LOGGER.debug('Aggregate trainable parameters: {}/{}'.format(len(tmp_list[0]), len(parameters))) elif isinstance(model, t.optim.Optimizer): tmp_list = [[np.array(p.cpu().detach().tolist()) for p in group["params"]] for group in model.param_groups] @@ -112,7 +139,15 @@ def _recover_model(self, model, agg_model): elif isinstance(model, Weights): return agg_model elif is_table(agg_model): - return agg_model + if self._lm_aggregate and isinstance(model, t.nn.Module): + # recover weights from table + parameters = dict(agg_model.collect()) + for k, v in model.named_parameters(): + if k in parameters and v.requires_grad: + v.data.copy_(t.Tensor(parameters[k])) + return model + else: + return agg_model else: if self.secure_aggregate: agg_model = [[np_weight.unboxed for np_weight in arr_list] @@ -186,9 +221,20 @@ def loss_aggregation(self, loss, suffix=tuple()): class SecureAggregatorServer(AggregatorBaseServer): - def __init__(self, secure_aggregate=True, communicate_match_suffix=None): + def __init__( + self, + secure_aggregate=True, + communicate_match_suffix=None, + server=( + consts.ARBITER, + ), + clients=( + consts.GUEST, + consts.HOST) + ): + super(SecureAggregatorServer, self).__init__( - communicate_match_suffix=communicate_match_suffix) + communicate_match_suffix=communicate_match_suffix, clients=clients, server=server) self.suffix = { "local_loss": AutoSuffix("local_loss"), "agg_loss": AutoSuffix("agg_loss"), @@ -199,7 +245,7 @@ def __init__(self, secure_aggregate=True, communicate_match_suffix=None): self.secure_aggregate = secure_aggregate if self.secure_aggregate: RandomPaddingCipherServer(trans_var=RandomPaddingCipherTransVar( - prefix=communicate_match_suffix)).exchange_secret_keys() + prefix=communicate_match_suffix, clients=clients, server=server)).exchange_secret_keys() LOGGER.info('initialize secure aggregator done') agg_weights = self.collect(suffix=('agg_weight', )) diff --git a/python/federatedml/framework/homo/blocks.py b/python/federatedml/framework/homo/blocks.py index 7d2d730e4c..cb8cae608c 100644 --- a/python/federatedml/framework/homo/blocks.py +++ b/python/federatedml/framework/homo/blocks.py @@ -52,16 +52,20 @@ def server_parties(self): class CommunicatorTransVar(HomoTransferBase): - def __init__(self, server=(consts.ARBITER,), clients=(consts.GUEST, consts.HOST), prefix=None): + def __init__(self, server=(consts.ARBITER,), clients=(consts.GUEST, consts.HOST), prefix=None, disable_gc=False): super().__init__(server=server, clients=clients, prefix=prefix) - self.client_to_server = self.create_client_to_server_variable(name="client_to_server") - self.server_to_client = self.create_server_to_client_variable(name="server_to_client") + if not disable_gc: + self.client_to_server = self.create_client_to_server_variable(name="client_to_server") + self.server_to_client = self.create_server_to_client_variable(name="server_to_client") + else: + self.client_to_server = self.create_client_to_server_variable(name="client_to_server").disable_auto_clean() + self.server_to_client = self.create_server_to_client_variable(name="server_to_client").disable_auto_clean() class ServerCommunicator(object): - def __init__(self, prefix=None): - self.trans_var = CommunicatorTransVar(prefix=prefix) + def __init__(self, prefix=None, server=(consts.ARBITER,), clients=(consts.GUEST, consts.HOST)): + self.trans_var = CommunicatorTransVar(prefix=prefix, server=server, clients=clients) self._client_parties = self.trans_var.client_parties def get_parties(self, party_idx): @@ -85,8 +89,8 @@ def broadcast_obj(self, obj, suffix=tuple(), party_idx=-1): class ClientCommunicator(object): - def __init__(self, prefix=None): - trans_var = CommunicatorTransVar(prefix=prefix) + def __init__(self, prefix=None, server=(consts.ARBITER,), clients=(consts.GUEST, consts.HOST)): + trans_var = CommunicatorTransVar(prefix=prefix, server=server, clients=clients) self.trans_var = trans_var self._server_parties = trans_var.server_parties diff --git a/python/federatedml/nn/backend/torch/cust_model.py b/python/federatedml/nn/backend/torch/cust_model.py deleted file mode 100644 index e9fff34839..0000000000 --- a/python/federatedml/nn/backend/torch/cust_model.py +++ /dev/null @@ -1,55 +0,0 @@ -import importlib - -from torch import nn - -from federatedml.nn.backend.torch.base import FateTorchLayer -from federatedml.nn.backend.utils.common import ML_PATH - -PATH = '{}.model_zoo'.format(ML_PATH) - - -class CustModel(FateTorchLayer, nn.Module): - - def __init__(self, module_name, class_name, **kwargs): - super(CustModel, self).__init__() - assert isinstance( - module_name, str), 'name must be a str, specify the module in the model_zoo' - assert isinstance( - class_name, str), 'class name must be a str, specify the class in the module' - self.param_dict = { - 'module_name': module_name, - 'class_name': class_name, - 'param': kwargs} - self._model = None - - def init_model(self): - if self._model is None: - self._model = self.get_pytorch_model() - - def forward(self, x): - if self._model is None: - raise ValueError('model not init, call init_model() function') - return self._model(x) - - def get_pytorch_model(self): - - module_name: str = self.param_dict['module_name'] - class_name = self.param_dict['class_name'] - module_param: dict = self.param_dict['param'] - if module_name.endswith('.py'): - module_name = module_name.replace('.py', '') - nn_modules = importlib.import_module('{}.{}'.format(PATH, module_name)) - try: - for k, v in nn_modules.__dict__.items(): - if isinstance(v, type): - if issubclass( - v, nn.Module) and v is not nn.Module and v.__name__ == class_name: - return v(**module_param) - raise ValueError( - 'Did not find any class in {}.py that is pytorch nn.Module and named {}'. format( - module_name, class_name)) - except ValueError as e: - raise e - - def __repr__(self): - return 'CustModel({})'.format(str(self.param_dict)) diff --git a/python/federatedml/nn/homo/_init.py b/python/federatedml/nn/homo/_init.py new file mode 100644 index 0000000000..59824990a2 --- /dev/null +++ b/python/federatedml/nn/homo/_init.py @@ -0,0 +1,149 @@ +import json +import torch +import inspect +from federatedml.nn.homo.trainer.trainer_base import get_trainer_class, TrainerBase +from federatedml.util import LOGGER +from federatedml.nn.backend.torch import serialization as s +from federatedml.nn.backend.torch.base import FateTorchOptimizer +from federatedml.nn.backend.utils.common import recover_model_bytes +from federatedml.nn.backend.utils import deepspeed_util + + +def init( + trainer, + trainer_param, + nn_define, + config_optimizer, + config_loss, + torch_seed, + model_loaded_flag, + loaded_model, + ds_config): + + warm_start_iter = None + + if ds_config: + deepspeed_util.init_deepspeed_env(ds_config) + + # load trainer class + if trainer is None: + raise ValueError( + 'Trainer is not specified, please specify your trainer') + + trainer_class = get_trainer_class(trainer) + LOGGER.info('trainer class is {}'.format(trainer_class)) + + # recover model from model config / or recover from saved model param + loaded_model_dict = None + + # if has model protobuf, load model config from protobuf + load_opt_state_dict = False + + if model_loaded_flag: + param, meta = get_homo_param_meta(loaded_model) + LOGGER.info('save path is {}'.format(param.local_save_path)) + if param.local_save_path == '': + LOGGER.info('Load model from model protobuf') + warm_start_iter = param.epoch_idx + if param is None or meta is None: + raise ValueError( + 'model protobuf is None, make sure' + 'that your trainer calls export_model() function to save models') + + if meta.nn_define[0] is None: + raise ValueError( + 'nn_define is None, model protobuf has no nn-define, make sure' + 'that your trainer calls export_model() function to save models') + + nn_define = json.loads(meta.nn_define[0]) + loss = json.loads(meta.loss_func_define[0]) + optimizer = json.loads(meta.optimizer_define[0]) + loaded_model_dict = recover_model_bytes(param.model_bytes) + extra_data = recover_model_bytes(param.extra_data_bytes) + + else: + LOGGER.info('Load model from local save path') + save_dict = torch.load(open(param.local_save_path, 'rb')) + warm_start_iter = save_dict['epoch_idx'] + nn_define = save_dict['model_define'] + loss = save_dict['loss_define'] + optimizer = save_dict['optimizer_define'] + loaded_model_dict = save_dict + extra_data = save_dict['extra_data'] + + if config_optimizer is not None and optimizer != config_optimizer: + LOGGER.info('optimizer updated') + else: + config_optimizer = optimizer + load_opt_state_dict = True + + if config_loss is not None and config_loss != loss: + LOGGER.info('loss updated') + else: + config_loss = loss + else: + extra_data = {} + + # check key param + if nn_define is None: + raise ValueError( + 'Model structure is not defined, nn_define is None, please check your param') + + # get model from nn define + model = s.recover_sequential_from_dict(nn_define) + if loaded_model_dict: + model.load_state_dict(loaded_model_dict['model']) + LOGGER.info('load model state dict from check point') + + LOGGER.info('model structure is {}'.format(model)) + # init optimizer + if config_optimizer is not None and not ds_config: + optimizer_: FateTorchOptimizer = s.recover_optimizer_from_dict( + config_optimizer) + # pass model parameters to optimizer + optimizer = optimizer_.to_torch_instance(model.parameters()) + if load_opt_state_dict: + LOGGER.info('load optimizer state dict') + optimizer.load_state_dict(loaded_model_dict['optimizer']) + LOGGER.info('optimizer is {}'.format(optimizer)) + else: + optimizer = None + LOGGER.info('optimizer is not specified') + + # init loss + if config_loss is not None: + loss_fn = s.recover_loss_fn_from_dict(config_loss) + LOGGER.info('loss function is {}'.format(loss_fn)) + else: + loss_fn = None + LOGGER.info('loss function is not specified') + + # init trainer + trainer_inst: TrainerBase = trainer_class(**trainer_param) + LOGGER.info('trainer class is {}'.format(trainer_class)) + + trainer_train_args = inspect.getfullargspec(trainer_inst.train).args + args_format = [ + 'self', + 'train_set', + 'validate_set', + 'optimizer', + 'loss', + 'extra_data' + ] + if len(trainer_train_args) < 6: + raise ValueError( + 'Train function of trainer should take 6 arguments :{}, but current trainer.train ' + 'only takes {} arguments: {}'.format( + args_format, len(trainer_train_args), trainer_train_args)) + + trainer_inst.set_nn_config(nn_define, config_optimizer, config_loss) + trainer_inst.fed_mode = True + + if ds_config: + model, optimizer = deepspeed_util.deepspeed_init(model, ds_config) + trainer_inst.enable_deepspeed(ds_config=ds_config, is_zero_3=deepspeed_util.is_zero3(ds_config)) + if deepspeed_util.is_zero3(ds_config): + model.train() + + return trainer_inst, model, optimizer, loss_fn, extra_data, config_optimizer, config_loss, warm_start_iter diff --git a/python/federatedml/nn/homo/client.py b/python/federatedml/nn/homo/client.py index f705d145d6..c2d7d83fcb 100644 --- a/python/federatedml/nn/homo/client.py +++ b/python/federatedml/nn/homo/client.py @@ -23,6 +23,7 @@ from federatedml.nn.backend.utils.data import add_match_id from federatedml.protobuf.generated.homo_nn_model_param_pb2 import HomoNNParam as HomoNNParamPB from federatedml.protobuf.generated.homo_nn_model_meta_pb2 import HomoNNMeta as HomoNNMetaPB +from federatedml.nn.homo._init import init class NNModelExporter(ExporterBase): @@ -85,6 +86,12 @@ def export_model_dict( return get_homo_model_dict(param, meta) +def default_client_post_process(trainer): + model = trainer.get_cached_model() + summary = trainer.get_summary() + return model, summary + + class HomoNNClient(ModelBase): def __init__(self): @@ -136,136 +143,6 @@ def _init_model(self, param: HomoNNParam): self.optimizer = param.optimizer self.ds_config = param.ds_config - def init(self): - - # set random seed - global_seed(self.torch_seed) - - if self.ds_config: - deepspeed_util.init_deepspeed_env(self.ds_config) - - # load trainer class - if self.trainer is None: - raise ValueError( - 'Trainer is not specified, please specify your trainer') - - trainer_class = get_trainer_class(self.trainer) - LOGGER.info('trainer class is {}'.format(trainer_class)) - - # recover model from model config / or recover from saved model param - loaded_model_dict = None - - # if has model protobuf, load model config from protobuf - load_opt_state_dict = False - if self.model_loaded: - - param, meta = get_homo_param_meta(self.model) - LOGGER.info('save path is {}'.format(param.local_save_path)) - if param.local_save_path == '': - LOGGER.info('Load model from model protobuf') - self.warm_start_iter = param.epoch_idx - if param is None or meta is None: - raise ValueError( - 'model protobuf is None, make sure' - 'that your trainer calls export_model() function to save models') - - if meta.nn_define[0] is None: - raise ValueError( - 'nn_define is None, model protobuf has no nn-define, make sure' - 'that your trainer calls export_model() function to save models') - - self.nn_define = json.loads(meta.nn_define[0]) - loss = json.loads(meta.loss_func_define[0]) - optimizer = json.loads(meta.optimizer_define[0]) - loaded_model_dict = recover_model_bytes(param.model_bytes) - extra_data = recover_model_bytes(param.extra_data_bytes) - else: - LOGGER.info('Load model from local save path') - save_dict = torch.load(open(param.local_save_path, 'rb')) - self.warm_start_iter = save_dict['epoch_idx'] - self.nn_define = save_dict['model_define'] - loss = save_dict['loss_define'] - optimizer = save_dict['optimizer_define'] - loaded_model_dict = save_dict - extra_data = save_dict['extra_data'] - - if self.optimizer is not None and optimizer != self.optimizer: - LOGGER.info('optimizer updated') - else: - self.optimizer = optimizer - load_opt_state_dict = True - - if self.loss is not None and self.loss != loss: - LOGGER.info('loss updated') - else: - self.loss = loss - else: - extra_data = {} - - # check key param - if self.nn_define is None: - raise ValueError( - 'Model structure is not defined, nn_define is None, please check your param') - - # get model from nn define - model = s.recover_sequential_from_dict(self.nn_define) - if loaded_model_dict: - model.load_state_dict(loaded_model_dict['model']) - LOGGER.info('load model state dict from check point') - - LOGGER.info('model structure is {}'.format(model)) - # init optimizer - if self.optimizer is not None and not self.ds_config: - optimizer_: FateTorchOptimizer = s.recover_optimizer_from_dict( - self.optimizer) - # pass model parameters to optimizer - optimizer = optimizer_.to_torch_instance(model.parameters()) - if load_opt_state_dict: - LOGGER.info('load optimizer state dict') - optimizer.load_state_dict(loaded_model_dict['optimizer']) - LOGGER.info('optimizer is {}'.format(optimizer)) - else: - optimizer = None - LOGGER.info('optimizer is not specified') - - # init loss - if self.loss is not None: - loss_fn = s.recover_loss_fn_from_dict(self.loss) - LOGGER.info('loss function is {}'.format(loss_fn)) - else: - loss_fn = None - LOGGER.info('loss function is not specified') - - # init trainer - trainer_inst: TrainerBase = trainer_class(**self.trainer_param) - LOGGER.info('trainer class is {}'.format(trainer_class)) - - trainer_train_args = inspect.getfullargspec(trainer_inst.train).args - args_format = [ - 'self', - 'train_set', - 'validate_set', - 'optimizer', - 'loss', - 'extra_data' - ] - if len(trainer_train_args) < 6: - raise ValueError( - 'Train function of trainer should take 6 arguments :{}, but current trainer.train ' - 'only takes {} arguments: {}'.format( - args_format, len(trainer_train_args), trainer_train_args)) - - trainer_inst.set_nn_config(self.nn_define, self.optimizer, self.loss) - trainer_inst.fed_mode = True - - if self.ds_config: - model, optimizer = deepspeed_util.deepspeed_init(model, self.ds_config) - trainer_inst.enable_deepspeed(is_zero_3=deepspeed_util.is_zero3(self.ds_config)) - if deepspeed_util.is_zero3(self.ds_config): - model.train() - - return trainer_inst, model, optimizer, loss_fn, extra_data - def fit(self, train_input, validate_input=None): LOGGER.debug('train input is {}'.format(train_input)) @@ -294,10 +171,22 @@ def fit(self, train_input, validate_input=None): # set random seed global_seed(self.torch_seed) - self.trainer_inst, model, optimizer, loss_fn, extra_data = self.init() + # init + self.trainer_inst, model, optimizer, loss_fn, extra_data, self.optimizer, self.loss, self.warm_start_iter = init( + trainer=self.trainer, trainer_param=self.trainer_param, nn_define=self.nn_define, + config_optimizer=self.optimizer, config_loss=self.loss, torch_seed=self.torch_seed, model_loaded_flag=self.model_loaded, + loaded_model=self.model, ds_config=self.ds_config + ) + + # prepare to train self.trainer_inst.set_model(model) self.trainer_inst.set_tracker(self.tracker) self.trainer_inst.set_model_exporter(self.exporter) + party_id_list = [self.component_properties.guest_partyid] + if self.component_properties.host_party_idlist is not None: + for i in self.component_properties.host_party_idlist: + party_id_list.append(i) + self.trainer_inst.set_party_id_list(party_id_list) # load dataset class dataset_inst = load_dataset( @@ -339,8 +228,8 @@ def fit(self, train_input, validate_input=None): ) # training is done, get exported model - self.model = self.trainer_inst.get_cached_model() - self.set_summary(self.trainer_inst.get_summary()) + self.model, summary = default_client_post_process(self.trainer_inst) + self.set_summary(summary) def predict(self, cpn_input): diff --git a/python/federatedml/nn/homo/server.py b/python/federatedml/nn/homo/server.py index 106ab0757f..27fb226e32 100644 --- a/python/federatedml/nn/homo/server.py +++ b/python/federatedml/nn/homo/server.py @@ -6,6 +6,9 @@ from federatedml.nn.homo.client import NNModelExporter from federatedml.callbacks.model_checkpoint import ModelCheckpoint from federatedml.nn.backend.utils.common import get_homo_param_meta, recover_model_bytes +from federatedml.nn.homo._init import init +from federatedml.util import consts +from federatedml.nn.backend.utils.common import global_seed class HomoNNServer(ModelBase): @@ -13,7 +16,7 @@ class HomoNNServer(ModelBase): def __init__(self): super(HomoNNServer, self).__init__() self.model_param = HomoNNParam() - self.trainer = None + self.trainer = consts.FEDAVG_TRAINER self.trainer_param = None # arbiter side models @@ -24,13 +27,25 @@ def __init__(self): self.exporter = NNModelExporter() self.extra_data = {} # warm start + self.model_loaded = False self.warm_start_iter = None + # server init: if arbiter need to load model, loss, optimizer from config + self.server_init = False + + self.dataset_module = None + self.dataset = None + self.dataset_param = {} + self.torch_seed = None + self.loss = None + self.optimizer = None + self.nn_define = None + self.ds_config = None def export_model(self): if self.model is None: LOGGER.debug('export an empty model') - return self.exporter.export_model_dict() # return an exporter + return self.exporter.export_model_dict() # return an empyty model return self.model @@ -43,13 +58,24 @@ def load_model(self, model_dict): # load extra data self.extra_data = recover_model_bytes(param.extra_data_bytes) self.warm_start_iter = param.epoch_idx + self.model_loaded = True def _init_model(self, param: HomoNNParam()): + train_param = param.trainer.to_dict() + dataset_param = param.dataset.to_dict() self.trainer = train_param['trainer_name'] + self.dataset = dataset_param['dataset_name'] self.trainer_param = train_param['param'] + self.torch_seed = param.torch_seed + self.nn_define = param.nn_define + self.loss = param.loss + self.optimizer = param.optimizer + self.ds_config = param.ds_config + LOGGER.debug('trainer and trainer param {} {}'.format( self.trainer, self.trainer_param)) + self.server_init = param.server_init def fit(self, data_instance=None, validate_data=None): @@ -63,17 +89,37 @@ def fit(self, data_instance=None, validate_data=None): if self.component_properties.is_warm_start: self.callback_warm_start_init_iter(self.warm_start_iter) - # initialize trainer - trainer_class = get_trainer_class(self.trainer) - LOGGER.info('trainer class is {}'.format(trainer_class)) - # init trainer - trainer_inst = trainer_class(**self.trainer_param) + if self.server_init: + LOGGER.info('server try to load model, loss, optimizer from config') + # init + global_seed(self.torch_seed) + + trainer_inst, model, optimizer, loss_fn, extra_data, optimizer, loss, self.warm_start_iter = init( + trainer=self.trainer, trainer_param=self.trainer_param, nn_define=self.nn_define, + config_optimizer=self.optimizer, config_loss=self.loss, torch_seed=self.torch_seed, model_loaded_flag=self.model_loaded, + loaded_model=self.model, ds_config=self.ds_config + ) + trainer_inst.set_model(model) + + else: + # initialize trainer only + trainer_class = get_trainer_class(self.trainer) + trainer_inst = trainer_class(**self.trainer_param) + LOGGER.info('trainer class is {}'.format(trainer_class)) + # set tracker for fateboard callback trainer_inst.set_tracker(self.tracker) # set exporter trainer_inst.set_model_exporter(self.exporter) + # set party info + party_id_list = [self.component_properties.guest_partyid] + if self.component_properties.host_party_idlist is not None: + for i in self.component_properties.host_party_idlist: + party_id_list.append(i) + trainer_inst.set_party_id_list(party_id_list) # set chceckpoint trainer_inst.set_checkpoint(ModelCheckpoint(self, save_freq=1)) + # run trainer server procedure trainer_inst.server_aggregate_procedure(self.extra_data) diff --git a/python/federatedml/nn/homo/trainer/fedavg_trainer.py b/python/federatedml/nn/homo/trainer/fedavg_trainer.py index 9b2597f17f..28754378fd 100644 --- a/python/federatedml/nn/homo/trainer/fedavg_trainer.py +++ b/python/federatedml/nn/homo/trainer/fedavg_trainer.py @@ -150,6 +150,11 @@ def __init__(self, epochs=10, batch_size=512, # training parameter self.check_trainer_param( [self.tol], ['tol'], self.is_float, '{} is not a float') + # federation + self.client_agg = None + self.server_agg = None + self.aggregate_round = None + def _init_aggregator(self, train_set): # compute round to aggregate cur_agg_round = 0 @@ -183,6 +188,7 @@ def set_model(self, model: t.nn.Module): if self.cuda is not None: self.model = self.model.cuda(self.cuda_main_device) if self.data_parallel: + LOGGER.info('device ids are {}'.format(self.cuda)) self.parallel_model = DataParallel(model, device_ids=self.cuda, output_device=self.cuda_main_device) def _select_model(self): @@ -191,7 +197,7 @@ def _select_model(self): else: return self.model - def train_an_epoch(self, epoch_idx, model, train_set, optimizer, loss): + def train_an_epoch(self, epoch_idx, model, train_set, optimizer, loss_func): epoch_loss = 0.0 batch_idx = 0 @@ -202,6 +208,9 @@ def train_an_epoch(self, epoch_idx, model, train_set, optimizer, loss): dl = self.data_loader + total_batch_len = len(dl) + LOGGER.info('total batch len is {}'.format(total_batch_len)) + if not self.fed_mode: to_iterate = tqdm.tqdm(dl) else: @@ -210,19 +219,10 @@ def train_an_epoch(self, epoch_idx, model, train_set, optimizer, loss): batch_label = None for _batch_iter in to_iterate: _batch_iter = self._decode(_batch_iter) - if isinstance(_batch_iter, list): + if isinstance(_batch_iter, list) or isinstance(_batch_iter, tuple): batch_data, batch_label = _batch_iter else: batch_data = _batch_iter - """ - if self.task_type in [consts.CAUSAL_LM, consts.SEQ_2_SEQ_LM]: - batch_data = _batch_iter - else: - batch_data, batch_label = _batch_iter - - batch_data = self._decode(batch_data) - batch_label = self._decode(batch_label) - """ if self.cuda is not None or self._enable_deepspeed: device = self.cuda_main_device if self.cuda_main_device is not None else self.model.device @@ -237,17 +237,19 @@ def train_an_epoch(self, epoch_idx, model, train_set, optimizer, loss): pred = model(batch_data) - if not loss and hasattr(pred, "loss"): - batch_loss = pred.loss - - elif loss is not None: + if not loss_func and hasattr(pred, "loss"): + if isinstance(model, DataParallel): + batch_loss = pred.loss.mean() + else: + batch_loss = pred.loss + elif loss_func is not None: if batch_label is None: raise ValueError( "When loss is set, please provide label to calculate loss" ) if not isinstance(pred, torch.Tensor) and hasattr(pred, "logits"): pred = pred.logits - batch_loss = loss(pred, batch_label) + batch_loss = loss_func(pred, batch_label) else: raise ValueError( 'FedAVGTrainer requires a loss function, but got None, please specify loss function in the' @@ -274,15 +276,89 @@ def train_an_epoch(self, epoch_idx, model, train_set, optimizer, loss): epoch_loss += batch_loss_np batch_idx += 1 - # LOGGER.info(f"finish epoch={epoch_idx}, batch={batch_idx}") - - if self.fed_mode: - LOGGER.debug( - 'epoch {} batch {} finished'.format(epoch_idx, batch_idx)) - + if self.fed_mode: + if total_batch_len > 100: + if batch_idx % (total_batch_len // 100) == 0: + percentage = (batch_idx / total_batch_len) * 100 + LOGGER.debug(f"Training progress of epoch {epoch_idx}: {percentage:.1f}%") + else: + LOGGER.debug("Training epoch {}:batch {}".format(epoch_idx, batch_idx)) epoch_loss = epoch_loss / len(train_set) return epoch_loss + def on_loop_begin_client(self, **kwargs): + pass + + def on_loop_end_client(self, **kwargs): + pass + + def on_loop_begin_server(self, **kwargs): + pass + + def on_loop_end_server(self, **kwargs): + pass + + def _client_sends_data(self, epoch_idx, epoch_loss, cur_agg_round): + need_stop = False + if self.client_agg is not None or distributed_util.is_distributed(): + if not (self.aggregate_every_n_epoch is not None and (epoch_idx + 1) % self.aggregate_every_n_epoch != 0): + + # model averaging, only aggregate trainable param + if self._deepspeed_zero_3: + deepspeed_util.gather_model(self.model) + + if not distributed_util.is_distributed() or distributed_util.is_rank_0(): + self.model = self.client_agg.model_aggregation(self.model) + if distributed_util.is_distributed() and distributed_util.get_num_workers() > 1: + self._share_model() + else: + self._share_model() + + # agg loss and get converge status + if not distributed_util.is_distributed() or distributed_util.is_rank_0(): + converge_status = self.client_agg.loss_aggregation(epoch_loss) + cur_agg_round += 1 + if distributed_util.is_distributed() and distributed_util.get_num_workers() > 1: + self._sync_converge_status(converge_status) + else: + converge_status = self._sync_converge_status() + + if not distributed_util.is_distributed() or distributed_util.is_rank_0(): + LOGGER.info( + 'model averaging finished, aggregate round {}/{}'.format( + cur_agg_round, self.aggregate_round)) + + if converge_status: + LOGGER.info('early stop triggered, stop training') + need_stop = True + + return need_stop + + def _server_aggregates_data(self, epoch_idx, check_converge, converge_func): + + need_stop = False + if not (self.aggregate_every_n_epoch is not None and (epoch_idx + 1) % self.aggregate_every_n_epoch != 0): + + # model aggregate + self.server_agg.model_aggregation() + + # loss aggregate + agg_loss, converge_status = self.server_agg.loss_aggregation( + check_converge=check_converge, converge_func=converge_func) + self.callback_loss(agg_loss, epoch_idx) + + # save check point process + if self.save_freq is not None and ((epoch_idx + 1) % self.save_freq == 0): + self.checkpoint(epoch_idx=epoch_idx) + LOGGER.info('save checkpoint : epoch {}'.format(epoch_idx)) + + # check stop condition + if converge_status: + LOGGER.debug('stop triggered, stop aggregation') + need_stop = True + + return need_stop + def train( self, train_set: Dataset, @@ -293,15 +369,18 @@ def train( if optimizer is None: raise ValueError( - 'FedAVGTrainer requires an optimizer, but got None, please specify optimizer in the ' + 'An optimizer is required, but got None, please specify optimizer in the ' 'job configuration') + self._optimizer = optimizer + self._loss_fn = loss + if self.batch_size > len(train_set) or self.batch_size == -1: self.batch_size = len(train_set) # compute round to aggregate cur_agg_round = 0 - client_agg, aggregate_round = self._init_aggregator(train_set) + self.client_agg, self.aggregate_round = self._init_aggregator(train_set) # running var cur_epoch = 0 @@ -309,51 +388,24 @@ def train( need_stop = False evaluation_summary = {} - self._get_train_data_loader(train_set) + self.data_loader = self._get_train_data_loader(train_set) + + self.on_loop_begin_client() + # training process for i in range(self.epochs): cur_epoch = i LOGGER.info('epoch is {}'.format(i)) model = self._select_model() - epoch_loss = self.train_an_epoch(i, model, train_set, optimizer, loss) + epoch_loss = self.train_an_epoch(i, model, train_set, self._optimizer, self._loss_fn) if not distributed_util.is_distributed() or distributed_util.is_rank_0(): self.callback_loss(epoch_loss, i) loss_history.append(float(epoch_loss)) - LOGGER.info('epoch loss is {}'.format(epoch_loss)) # federation process, if running local mode, cancel federation - if client_agg is not None or distributed_util.is_distributed(): - if not (self.aggregate_every_n_epoch is not None and (i + 1) % self.aggregate_every_n_epoch != 0): - - # model averaging, only aggregate trainable param - if self._deepspeed_zero_3: - deepspeed_util.gather_model(self.model) - - if not distributed_util.is_distributed() or distributed_util.is_rank_0(): - self.model = client_agg.model_aggregation(self.model) - if distributed_util.is_distributed() and distributed_util.get_num_workers() > 1: - self._share_model() - else: - self._share_model() - - # agg loss and get converge status - if not distributed_util.is_distributed() or distributed_util.is_rank_0(): - converge_status = client_agg.loss_aggregation(epoch_loss) - cur_agg_round += 1 - if distributed_util.is_distributed() and distributed_util.get_num_workers() > 1: - self._sync_converge_status(converge_status) - else: - converge_status = self._sync_converge_status() - - if not distributed_util.is_distributed() or distributed_util.is_rank_0(): - LOGGER.info( - 'model averaging finished, aggregate round {}/{}'.format( - cur_agg_round, aggregate_round)) - - if converge_status: - LOGGER.info('early stop triggered, stop training') - need_stop = True + need_stop = self._client_sends_data(i, epoch_loss, cur_agg_round) + cur_agg_round += 1 # validation process if self.validation_freq and ((i + 1) % self.validation_freq == 0): @@ -386,10 +438,10 @@ def train( if self.save_to_local_dir: self.local_checkpoint( - self.model, i, optimizer, converge_status=need_stop, loss_history=loss_history) + self.model, i, self._optimizer, converge_status=need_stop, loss_history=loss_history) else: self.checkpoint( - self.model, i, optimizer, converge_status=need_stop, loss_history=loss_history) + self.model, i, self._optimizer, converge_status=need_stop, loss_history=loss_history) LOGGER.info('save checkpoint : epoch {}'.format(i)) # if meet stop condition then stop @@ -400,14 +452,21 @@ def train( if self._deepspeed_zero_3: deepspeed_util.gather_model(self.model) + self.on_loop_end_client() + if not distributed_util.is_distributed() or distributed_util.is_rank_0(): best_epoch = int(np.array(loss_history).argmin()) if self.save_to_local_dir: - self.local_save(model=self.model, optimizer=optimizer, epoch_idx=cur_epoch, loss_history=loss_history, - converge_status=need_stop, best_epoch=best_epoch) + self.local_save( + model=self.model, + optimizer=self._optimizer, + epoch_idx=cur_epoch, + loss_history=loss_history, + converge_status=need_stop, + best_epoch=best_epoch) else: - self.save(model=self.model, optimizer=optimizer, epoch_idx=cur_epoch, loss_history=loss_history, + self.save(model=self.model, optimizer=self._optimizer, epoch_idx=cur_epoch, loss_history=loss_history, converge_status=need_stop, best_epoch=best_epoch) best_epoch = int(np.array(loss_history).argmin()) @@ -490,31 +549,23 @@ def server_aggregate_procedure(self, extra_data={}): 'check early stop, converge func is {}'.format(converge_func)) LOGGER.info('server running aggregate procedure') - server_agg = SecureAggServer(self.secure_aggregate, communicate_match_suffix=self.comm_suffix) + self.server_agg = SecureAggServer(self.secure_aggregate, communicate_match_suffix=self.comm_suffix) + self.on_loop_begin_server() # aggregate and broadcast models for i in range(self.epochs): - if not (self.aggregate_every_n_epoch is not None and (i + 1) % self.aggregate_every_n_epoch != 0): - - # model aggregate - server_agg.model_aggregation() - - # loss aggregate - agg_loss, converge_status = server_agg.loss_aggregation( - check_converge=check_converge, converge_func=converge_func) - self.callback_loss(agg_loss, i) - # save check point process - if self.save_freq is not None and ((i + 1) % self.save_freq == 0): - self.checkpoint(epoch_idx=i) - LOGGER.info('save checkpoint : epoch {}'.format(i)) - - # check stop condition - if converge_status: - LOGGER.debug('stop triggered, stop aggregation') - break + need_stop = self._server_aggregates_data(i, check_converge, converge_func) + if need_stop: + break - LOGGER.info('server aggregation process done') + self.on_loop_end_server() + if self._model is not None: + if self.save_to_local_dir: + self.local_save(model=self.model, epoch_idx=i, converge_status=need_stop) + else: + self.save(model=self.model, epoch_idx=i, converge_status=need_stop) + LOGGER.info('sever side model saved') def _decode(self, data): if isinstance(data, transformers.tokenization_utils_base.BatchEncoding): @@ -550,7 +601,7 @@ def _get_train_data_loader(self, train_set): collate_fn = self._get_collate_fn(train_set) if not distributed_util.is_distributed() or distributed_util.get_num_workers() <= 1: - self.data_loader = DataLoader( + data_loader = DataLoader( train_set, batch_size=self.batch_size, pin_memory=self.pin_memory, @@ -564,7 +615,7 @@ def _get_train_data_loader(self, train_set): num_replicas=dist.get_world_size(), rank=dist.get_rank() ) - self.data_loader = DataLoader( + data_loader = DataLoader( train_set, batch_size=self.batch_size, pin_memory=self.pin_memory, @@ -573,15 +624,19 @@ def _get_train_data_loader(self, train_set): sampler=train_sampler ) - def _share_model(self): + return data_loader + + def _share_model(self, sync_trainable_only=True): if distributed_util.is_rank_0(): + for p in self.model.parameters(): - if p.requires_grad: + if (not sync_trainable_only) or (sync_trainable_only and p.requires_grad): scatter_list = [p.data for _ in range(distributed_util.get_num_workers())] dist.scatter(p.data, scatter_list, async_op=False) else: + for p in self.model.parameters(): - if p.requires_grad: + if (not sync_trainable_only) or (sync_trainable_only and p.requires_grad): dist.scatter(p.data, src=0, async_op=False) def _sync_converge_status(self, converge_status=None): diff --git a/python/federatedml/nn/homo/trainer/trainer_base.py b/python/federatedml/nn/homo/trainer/trainer_base.py index 3af07dd355..bd92691f3f 100644 --- a/python/federatedml/nn/homo/trainer/trainer_base.py +++ b/python/federatedml/nn/homo/trainer/trainer_base.py @@ -9,7 +9,7 @@ from federatedml.util import consts from federatedml.util import LOGGER from federatedml.model_base import serialize_models -from federatedml.nn.backend.utils.common import ML_PATH +from federatedml.nn.backend.utils.common import ML_PATH, LLM_PATH from federatedml.feature.instance import Instance from federatedml.evaluation.evaluation import Evaluation from federatedml.model_base import Metric, MetricMeta @@ -52,6 +52,9 @@ def __init__(self, **kwargs): self._model_checkpoint = None self._exporter = None self._evaluation_summary = {} + self._client_num = None + self._optimizer = None + self._loss_fn = None # running status self._set_model_checkpoint_epoch = set() @@ -66,6 +69,9 @@ def __init__(self, **kwargs): self._enable_deepspeed = False self._deepspeed_zero_3 = False + # deepspeed config + self._ds_config = None + @staticmethod def is_pos_int(val): return val > 0 and isinstance(val, int) @@ -119,7 +125,8 @@ def fed_mode(self, val): assert isinstance(val, bool), 'fed mode must be a bool' self._fed_mode = val - def enable_deepspeed(self, is_zero_3=False): + def enable_deepspeed(self, ds_config, is_zero_3=False): + self._ds_config = ds_config self._enable_deepspeed = True self._deepspeed_zero_3 = is_zero_3 @@ -225,10 +232,17 @@ def _local_save( if hasattr(model, "enable_save_pretrained") and model.enable_save_pretrained: unwrap_model.save_pretrained(save_path) else: - model_state_dict = model.state_dict() + if model is None: + model_state_dict = None + else: + model_state_dict = model.state_dict() + if optimizer is None: + optimizer_state_dict = None + else: + optimizer_state_dict = optimizer.state_dict() model_dict = { 'model': model_state_dict, - 'optimizer': optimizer.state_dict(), + 'optimizer': optimizer_state_dict, 'model_define': self.nn_define, 'optimizer_define': self.opt_define, 'loss_define': self.loss_define, @@ -238,6 +252,7 @@ def _local_save( 'best_epoch': best_epoch, 'extra_data': extra_data } + LOGGER.info('save path is {}'.format(save_path)) t.save(model_dict, save_path) local_save_path = save_path if not self._enable_deepspeed else os.environ[consts.FLOW_MODEL_SYNC_PATH] @@ -548,22 +563,43 @@ def unwrap_model(model): def get_trainer_class(trainer_module_name: str): - if trainer_module_name.endswith('.py'): trainer_module_name = trainer_module_name.replace('.py', '') - ds_modules = importlib.import_module( - '{}.homo.trainer.{}'.format( - ML_PATH, trainer_module_name)) + + std_fate_trainer_path = '{}.homo.trainer.{}'.format(ML_PATH, trainer_module_name) + + paths_to_check = [std_fate_trainer_path] + errors = [] try: - trainers = [] - for k, v in ds_modules.__dict__.items(): - if isinstance(v, type): - if issubclass(v, TrainerBase) and v is not TrainerBase: - trainers.append(v) - if len(trainers) == 0: - raise ValueError('Did not find any class in {}.py that is the subclass of Trainer class'. - format(trainer_module_name)) - else: - return trainers[-1] # return the last defined trainer - except ValueError as e: - raise e + importlib.import_module(LLM_PATH) + fate_llm_trainer_path = '{}.trainer.{}'.format(LLM_PATH, trainer_module_name) + paths_to_check.append(fate_llm_trainer_path) + except Exception as e: + pass + + trainers = [] + ds_modules = None + + for path in paths_to_check: + try: + ds_modules = importlib.import_module(path) + break + except Exception as e: + errors.append(str(e)) + + if ds_modules is None: + raise ImportError( + 'Could not import from any of the paths: {}, error details {}'.format( + ', '.join(paths_to_check), errors)) + + for k, v in ds_modules.__dict__.items(): + + if isinstance(v, type): + if issubclass(v, TrainerBase) and v is not TrainerBase: + trainers.append(v) + + if len(trainers) == 0: + raise ValueError('Did not find any class in {}.py that is the subclass of Trainer class'. + format(trainer_module_name)) + else: + return trainers[-1] # return the last defined trainer diff --git a/python/federatedml/param/homo_nn_param.py b/python/federatedml/param/homo_nn_param.py index 297ba12c72..64650a5762 100644 --- a/python/federatedml/param/homo_nn_param.py +++ b/python/federatedml/param/homo_nn_param.py @@ -42,7 +42,8 @@ def __init__(self, nn_define: dict = None, loss: dict = None, optimizer: dict = None, - ds_config: dict = None + ds_config: dict = None, + server_init: bool = False ): super(HomoNNParam, self).__init__() @@ -53,6 +54,7 @@ def __init__(self, self.loss = loss self.optimizer = optimizer self.ds_config = ds_config + self.server_init = server_init def check(self): diff --git a/python/requirements-fate-llm.txt b/python/requirements-fate-llm.txt index 12360e9db0..69b6dbcc9f 100644 --- a/python/requirements-fate-llm.txt +++ b/python/requirements-fate-llm.txt @@ -6,3 +6,4 @@ pydantic==1.10.7 deepspeed==0.9.2 sentencepiece==0.1.97 peft==0.2.0 +datasets==2.11.0 \ No newline at end of file