From 5185d1477206757dc6288d6ea54888583a255a04 Mon Sep 17 00:00:00 2001 From: Itamar Yuran Date: Sun, 13 Oct 2024 15:05:43 +0300 Subject: [PATCH 1/4] let us configurate --- cmd/lakectl/cmd/fs_download.go | 1 + cmd/lakectl/cmd/fs_upload.go | 7 ++ cmd/lakectl/cmd/local_checkout.go | 1 + cmd/lakectl/cmd/local_clone.go | 1 + cmd/lakectl/cmd/local_commit.go | 1 + cmd/lakectl/cmd/local_pull.go | 1 + cmd/lakectl/cmd/root.go | 11 ++- pkg/actions/gomock_reflect_2332145910/prog.go | 68 +++++++++++++++++++ pkg/local/config.go | 1 + pkg/local/sync.go | 6 +- 10 files changed, 94 insertions(+), 4 deletions(-) create mode 100644 pkg/actions/gomock_reflect_2332145910/prog.go diff --git a/cmd/lakectl/cmd/fs_download.go b/cmd/lakectl/cmd/fs_download.go index aeaae79cf52..79cd047200b 100644 --- a/cmd/lakectl/cmd/fs_download.go +++ b/cmd/lakectl/cmd/fs_download.go @@ -105,6 +105,7 @@ var fsDownloadCmd = &cobra.Command{ s := local.NewSyncManager(ctx, client, getHTTPClient(), local.Config{ SyncFlags: syncFlags, + Parallelism: cfg.Options.Parallelism, SkipNonRegularFiles: cfg.Local.SkipNonRegularFiles, IncludePerm: false, }) diff --git a/cmd/lakectl/cmd/fs_upload.go b/cmd/lakectl/cmd/fs_upload.go index dc8c4260f9d..d94e3fd56fe 100644 --- a/cmd/lakectl/cmd/fs_upload.go +++ b/cmd/lakectl/cmd/fs_upload.go @@ -2,10 +2,12 @@ package cmd import ( "context" + "fmt" "os" "os/signal" "path/filepath" "strings" + "time" "github.com/spf13/cobra" "github.com/treeverse/lakefs/pkg/api/apigen" @@ -21,6 +23,7 @@ var fsUploadCmd = &cobra.Command{ Args: cobra.ExactArgs(1), ValidArgsFunction: ValidArgsRepository, Run: func(cmd *cobra.Command, args []string) { + start := time.Now() client := getClient() pathURI, _ := getSyncArgs(args, true, false) syncFlags := getSyncFlags(cmd, client) @@ -63,6 +66,7 @@ var fsUploadCmd = &cobra.Command{ }() s := local.NewSyncManager(ctx, client, getHTTPClient(), local.Config{ SyncFlags: syncFlags, + Parallelism: cfg.Options.Parallelism, SkipNonRegularFiles: cfg.Local.SkipNonRegularFiles, IncludePerm: false, }) @@ -81,6 +85,9 @@ var fsUploadCmd = &cobra.Command{ Operation: "Upload", Tasks: s.Summary(), }) + finish := time.Since(start) + fmt.Printf("Execution time: %s\n", finish) + }, } diff --git a/cmd/lakectl/cmd/local_checkout.go b/cmd/lakectl/cmd/local_checkout.go index 1a40e739203..4e0c5246093 100644 --- a/cmd/lakectl/cmd/local_checkout.go +++ b/cmd/lakectl/cmd/local_checkout.go @@ -59,6 +59,7 @@ func localCheckout(cmd *cobra.Command, localPath string, specifiedRef string, co sigCtx := localHandleSyncInterrupt(cmd.Context(), idx, string(checkoutOperation)) syncMgr := local.NewSyncManager(sigCtx, client, getHTTPClient(), local.Config{ SyncFlags: syncFlags, + Parallelism: cfg.Options.Parallelism, SkipNonRegularFiles: cfg.Local.SkipNonRegularFiles, IncludePerm: cfg.Experimental.Local.POSIXPerm.Enabled, IncludeUID: cfg.Experimental.Local.POSIXPerm.IncludeUID, diff --git a/cmd/lakectl/cmd/local_clone.go b/cmd/lakectl/cmd/local_clone.go index 0b223a4907c..aae3f721283 100644 --- a/cmd/lakectl/cmd/local_clone.go +++ b/cmd/lakectl/cmd/local_clone.go @@ -88,6 +88,7 @@ var localCloneCmd = &cobra.Command{ sigCtx := localHandleSyncInterrupt(ctx, idx, string(cloneOperation)) s := local.NewSyncManager(sigCtx, client, getHTTPClient(), local.Config{ SyncFlags: syncFlags, + Parallelism: cfg.Options.Parallelism, SkipNonRegularFiles: cfg.Local.SkipNonRegularFiles, IncludePerm: cfg.Experimental.Local.POSIXPerm.Enabled, IncludeUID: cfg.Experimental.Local.POSIXPerm.IncludeUID, diff --git a/cmd/lakectl/cmd/local_commit.go b/cmd/lakectl/cmd/local_commit.go index 8e037fbc67b..20c6424b5f5 100644 --- a/cmd/lakectl/cmd/local_commit.go +++ b/cmd/lakectl/cmd/local_commit.go @@ -174,6 +174,7 @@ var localCommitCmd = &cobra.Command{ sigCtx := localHandleSyncInterrupt(cmd.Context(), idx, string(commitOperation)) s := local.NewSyncManager(sigCtx, client, getHTTPClient(), local.Config{ SyncFlags: syncFlags, + Parallelism: cfg.Options.Parallelism, SkipNonRegularFiles: cfg.Local.SkipNonRegularFiles, IncludePerm: cfg.Experimental.Local.POSIXPerm.Enabled, IncludeUID: cfg.Experimental.Local.POSIXPerm.IncludeUID, diff --git a/cmd/lakectl/cmd/local_pull.go b/cmd/lakectl/cmd/local_pull.go index 103928ff85d..fa26726bd9c 100644 --- a/cmd/lakectl/cmd/local_pull.go +++ b/cmd/lakectl/cmd/local_pull.go @@ -68,6 +68,7 @@ var localPullCmd = &cobra.Command{ sigCtx := localHandleSyncInterrupt(cmd.Context(), idx, string(pullOperation)) s := local.NewSyncManager(sigCtx, client, getHTTPClient(), local.Config{ SyncFlags: syncFlags, + Parallelism: cfg.Options.Parallelism, SkipNonRegularFiles: cfg.Local.SkipNonRegularFiles, IncludePerm: cfg.Experimental.Local.POSIXPerm.Enabled, IncludeUID: cfg.Experimental.Local.POSIXPerm.IncludeUID, diff --git a/cmd/lakectl/cmd/root.go b/cmd/lakectl/cmd/root.go index 844f04225a9..10d2271f705 100644 --- a/cmd/lakectl/cmd/root.go +++ b/cmd/lakectl/cmd/root.go @@ -72,6 +72,9 @@ type Configuration struct { EndpointURL lakefsconfig.OnlyString `mapstructure:"endpoint_url"` Retries RetriesCfg `mapstructure:"retries"` } `mapstructure:"server"` + Options struct { + Parallelism int `mapstructure:"parallelism"` + } `mapstructure:"Job"` Metastore struct { Type lakefsconfig.OnlyString `mapstructure:"type"` Hive struct { @@ -153,9 +156,10 @@ const ( parallelismFlagName = "parallelism" noProgressBarFlagName = "no-progress" - defaultSyncParallelism = 25 - defaultSyncPresign = true - defaultNoProgress = false + defaultParallelismConfig = -1 + defaultSyncParallelism = 25 + defaultSyncPresign = true + defaultNoProgress = false myRepoExample = "lakefs://my-repo" myBucketExample = "s3://my-bucket" @@ -574,6 +578,7 @@ func initConfig() { viper.SetDefault("server.retries.min_wait_interval", defaultMinRetryInterval) viper.SetDefault("experimental.local.posix_permissions.enabled", false) viper.SetDefault("local.skip_non_regular_files", false) + viper.SetDefault("job.parallelism", defaultParallelismConfig) cfgErr = viper.ReadInConfig() } diff --git a/pkg/actions/gomock_reflect_2332145910/prog.go b/pkg/actions/gomock_reflect_2332145910/prog.go new file mode 100644 index 00000000000..9532872d7f2 --- /dev/null +++ b/pkg/actions/gomock_reflect_2332145910/prog.go @@ -0,0 +1,68 @@ + +package main + +import ( + "encoding/gob" + "flag" + "fmt" + "os" + "path" + "reflect" + + "github.com/golang/mock/mockgen/model" + + pkg_ "github.com/treeverse/lakefs/pkg/actions" +) + +var output = flag.String("output", "", "The output file name, or empty to use stdout.") + +func main() { + flag.Parse() + + its := []struct{ + sym string + typ reflect.Type + }{ + + { "Source", reflect.TypeOf((*pkg_.Source)(nil)).Elem()}, + + { "OutputWriter", reflect.TypeOf((*pkg_.OutputWriter)(nil)).Elem()}, + + } + pkg := &model.Package{ + // NOTE: This behaves contrary to documented behaviour if the + // package name is not the final component of the import path. + // The reflect package doesn't expose the package name, though. + Name: path.Base("github.com/treeverse/lakefs/pkg/actions"), + } + + for _, it := range its { + intf, err := model.InterfaceFromInterfaceType(it.typ) + if err != nil { + fmt.Fprintf(os.Stderr, "Reflection: %v\n", err) + os.Exit(1) + } + intf.Name = it.sym + pkg.Interfaces = append(pkg.Interfaces, intf) + } + + outfile := os.Stdout + if len(*output) != 0 { + var err error + outfile, err = os.Create(*output) + if err != nil { + fmt.Fprintf(os.Stderr, "failed to open output file %q", *output) + } + defer func() { + if err := outfile.Close(); err != nil { + fmt.Fprintf(os.Stderr, "failed to close output file %q", *output) + os.Exit(1) + } + }() + } + + if err := gob.NewEncoder(outfile).Encode(pkg); err != nil { + fmt.Fprintf(os.Stderr, "gob encode: %v\n", err) + os.Exit(1) + } +} diff --git a/pkg/local/config.go b/pkg/local/config.go index b0533a54e8a..b3eaa914f94 100644 --- a/pkg/local/config.go +++ b/pkg/local/config.go @@ -17,6 +17,7 @@ type SyncFlags struct { type Config struct { SyncFlags + Parallelism int // SkipNonRegularFiles - By default lakectl local fails if local directory contains irregular files. When set, lakectl will skip these files instead. SkipNonRegularFiles bool // IncludePerm - Experimental: preserve Unix file permissions diff --git a/pkg/local/sync.go b/pkg/local/sync.go index fa82508334d..f04f7718102 100644 --- a/pkg/local/sync.go +++ b/pkg/local/sync.go @@ -74,7 +74,11 @@ func (s *SyncManager) Sync(rootPath string, remote *uri.URI, changeSet <-chan *C defer s.progressBar.Stop() wg, ctx := errgroup.WithContext(s.ctx) - for i := 0; i < s.cfg.SyncFlags.Parallelism; i++ { + parallelismToUse := s.cfg.Parallelism + if parallelismToUse <= 0 { + parallelismToUse = s.cfg.SyncFlags.Parallelism + } + for i := 0; i < parallelismToUse; i++ { wg.Go(func() error { for change := range changeSet { if err := s.apply(ctx, rootPath, remote, change); err != nil { From 41993e0463173982437cc5fed2a328bdc33e1071 Mon Sep 17 00:00:00 2001 From: Itamar Yuran Date: Sun, 13 Oct 2024 15:14:07 +0300 Subject: [PATCH 2/4] parallelism --- cmd/lakectl/cmd/fs_upload.go | 6 ------ cmd/lakectl/cmd/root.go | 4 ++-- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/cmd/lakectl/cmd/fs_upload.go b/cmd/lakectl/cmd/fs_upload.go index d94e3fd56fe..a04a6987c13 100644 --- a/cmd/lakectl/cmd/fs_upload.go +++ b/cmd/lakectl/cmd/fs_upload.go @@ -2,12 +2,10 @@ package cmd import ( "context" - "fmt" "os" "os/signal" "path/filepath" "strings" - "time" "github.com/spf13/cobra" "github.com/treeverse/lakefs/pkg/api/apigen" @@ -23,7 +21,6 @@ var fsUploadCmd = &cobra.Command{ Args: cobra.ExactArgs(1), ValidArgsFunction: ValidArgsRepository, Run: func(cmd *cobra.Command, args []string) { - start := time.Now() client := getClient() pathURI, _ := getSyncArgs(args, true, false) syncFlags := getSyncFlags(cmd, client) @@ -85,9 +82,6 @@ var fsUploadCmd = &cobra.Command{ Operation: "Upload", Tasks: s.Summary(), }) - finish := time.Since(start) - fmt.Printf("Execution time: %s\n", finish) - }, } diff --git a/cmd/lakectl/cmd/root.go b/cmd/lakectl/cmd/root.go index 10d2271f705..dc42094ff9c 100644 --- a/cmd/lakectl/cmd/root.go +++ b/cmd/lakectl/cmd/root.go @@ -74,7 +74,7 @@ type Configuration struct { } `mapstructure:"server"` Options struct { Parallelism int `mapstructure:"parallelism"` - } `mapstructure:"Job"` + } `mapstructure:"options"` Metastore struct { Type lakefsconfig.OnlyString `mapstructure:"type"` Hive struct { @@ -578,7 +578,7 @@ func initConfig() { viper.SetDefault("server.retries.min_wait_interval", defaultMinRetryInterval) viper.SetDefault("experimental.local.posix_permissions.enabled", false) viper.SetDefault("local.skip_non_regular_files", false) - viper.SetDefault("job.parallelism", defaultParallelismConfig) + viper.SetDefault("options.parallelism", defaultParallelismConfig) cfgErr = viper.ReadInConfig() } From c76c45882dd402e00796ea8795452b4972e08150 Mon Sep 17 00:00:00 2001 From: Itamar Yuran Date: Sun, 13 Oct 2024 16:18:12 +0300 Subject: [PATCH 3/4] no linters no splinters --- pkg/actions/gomock_reflect_2332145910/prog.go | 68 ------------------- 1 file changed, 68 deletions(-) delete mode 100644 pkg/actions/gomock_reflect_2332145910/prog.go diff --git a/pkg/actions/gomock_reflect_2332145910/prog.go b/pkg/actions/gomock_reflect_2332145910/prog.go deleted file mode 100644 index 9532872d7f2..00000000000 --- a/pkg/actions/gomock_reflect_2332145910/prog.go +++ /dev/null @@ -1,68 +0,0 @@ - -package main - -import ( - "encoding/gob" - "flag" - "fmt" - "os" - "path" - "reflect" - - "github.com/golang/mock/mockgen/model" - - pkg_ "github.com/treeverse/lakefs/pkg/actions" -) - -var output = flag.String("output", "", "The output file name, or empty to use stdout.") - -func main() { - flag.Parse() - - its := []struct{ - sym string - typ reflect.Type - }{ - - { "Source", reflect.TypeOf((*pkg_.Source)(nil)).Elem()}, - - { "OutputWriter", reflect.TypeOf((*pkg_.OutputWriter)(nil)).Elem()}, - - } - pkg := &model.Package{ - // NOTE: This behaves contrary to documented behaviour if the - // package name is not the final component of the import path. - // The reflect package doesn't expose the package name, though. - Name: path.Base("github.com/treeverse/lakefs/pkg/actions"), - } - - for _, it := range its { - intf, err := model.InterfaceFromInterfaceType(it.typ) - if err != nil { - fmt.Fprintf(os.Stderr, "Reflection: %v\n", err) - os.Exit(1) - } - intf.Name = it.sym - pkg.Interfaces = append(pkg.Interfaces, intf) - } - - outfile := os.Stdout - if len(*output) != 0 { - var err error - outfile, err = os.Create(*output) - if err != nil { - fmt.Fprintf(os.Stderr, "failed to open output file %q", *output) - } - defer func() { - if err := outfile.Close(); err != nil { - fmt.Fprintf(os.Stderr, "failed to close output file %q", *output) - os.Exit(1) - } - }() - } - - if err := gob.NewEncoder(outfile).Encode(pkg); err != nil { - fmt.Fprintf(os.Stderr, "gob encode: %v\n", err) - os.Exit(1) - } -} From 56bf6e1c9ecfaca7b4fcd38540269111f37100af Mon Sep 17 00:00:00 2001 From: Itamar Yuran Date: Tue, 15 Oct 2024 17:10:01 +0300 Subject: [PATCH 4/4] one default value --- cmd/lakectl/cmd/root.go | 13 +++++++------ pkg/local/config.go | 1 + pkg/local/sync.go | 8 ++++---- 3 files changed, 12 insertions(+), 10 deletions(-) diff --git a/cmd/lakectl/cmd/root.go b/cmd/lakectl/cmd/root.go index dc42094ff9c..d9e427be34d 100644 --- a/cmd/lakectl/cmd/root.go +++ b/cmd/lakectl/cmd/root.go @@ -156,10 +156,9 @@ const ( parallelismFlagName = "parallelism" noProgressBarFlagName = "no-progress" - defaultParallelismConfig = -1 - defaultSyncParallelism = 25 - defaultSyncPresign = true - defaultNoProgress = false + defaultParallelism = 25 + defaultSyncPresign = true + defaultNoProgress = false myRepoExample = "lakefs://my-repo" myBucketExample = "s3://my-bucket" @@ -182,7 +181,7 @@ func withRecursiveFlag(cmd *cobra.Command, usage string) { } func withParallelismFlag(cmd *cobra.Command) { - cmd.Flags().IntP(parallelismFlagName, "p", defaultSyncParallelism, + cmd.Flags().IntP(parallelismFlagName, "p", defaultParallelism, "Max concurrent operations to perform") } @@ -249,9 +248,11 @@ func getSyncFlags(cmd *cobra.Command, client *apigen.ClientWithResponses) local. if parallelism < 1 { DieFmt("Invalid value for parallelism (%d), minimum is 1.\n", parallelism) } + setParallelism := cmd.Flags().Changed(parallelismFlagName) presignMode := getPresignMode(cmd, client) return local.SyncFlags{ + SetParallelism: setParallelism, Parallelism: parallelism, Presign: presignMode.Enabled, PresignMultipart: presignMode.Multipart, @@ -578,7 +579,7 @@ func initConfig() { viper.SetDefault("server.retries.min_wait_interval", defaultMinRetryInterval) viper.SetDefault("experimental.local.posix_permissions.enabled", false) viper.SetDefault("local.skip_non_regular_files", false) - viper.SetDefault("options.parallelism", defaultParallelismConfig) + viper.SetDefault("options.parallelism", defaultParallelism) cfgErr = viper.ReadInConfig() } diff --git a/pkg/local/config.go b/pkg/local/config.go index b3eaa914f94..2742bc913c3 100644 --- a/pkg/local/config.go +++ b/pkg/local/config.go @@ -9,6 +9,7 @@ const ( ) type SyncFlags struct { + SetParallelism bool Parallelism int Presign bool PresignMultipart bool diff --git a/pkg/local/sync.go b/pkg/local/sync.go index f04f7718102..64131755357 100644 --- a/pkg/local/sync.go +++ b/pkg/local/sync.go @@ -74,11 +74,11 @@ func (s *SyncManager) Sync(rootPath string, remote *uri.URI, changeSet <-chan *C defer s.progressBar.Stop() wg, ctx := errgroup.WithContext(s.ctx) - parallelismToUse := s.cfg.Parallelism - if parallelismToUse <= 0 { - parallelismToUse = s.cfg.SyncFlags.Parallelism + parallelism := s.cfg.Parallelism + if s.cfg.SyncFlags.SetParallelism { + parallelism = s.cfg.SyncFlags.Parallelism } - for i := 0; i < parallelismToUse; i++ { + for i := 0; i < parallelism; i++ { wg.Go(func() error { for change := range changeSet { if err := s.apply(ctx, rootPath, remote, change); err != nil {