Skip to content
This repository was archived by the owner on Aug 13, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ dodo diff --min-duration-diff 200ms --original-sqls 'output/sql/*.sql' output/re

# diff of two replay result directories
dodo diff replay1/ replay2/


# Export table data
dodo export --help
```

### Config
Expand Down
34 changes: 12 additions & 22 deletions cmd/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@ limitations under the License.
package cmd

import (
"errors"
"fmt"
"os"
"path/filepath"
"slices"
"strings"

"github.com/emirpasic/gods/queues/circularbuffer"
"github.com/samber/lo"
Expand Down Expand Up @@ -151,30 +150,13 @@ func completeCreateConfig() (err error) {
}
}
createTableDDLs = tableDDLs

return err
return nil
}

ddldir := filepath.Join(GlobalConfig.OutputDir, "ddl")

GlobalConfig.DBs, GlobalConfig.Tables = lo.Uniq(GlobalConfig.DBs), lo.Uniq(GlobalConfig.Tables)
dbs, tables := GlobalConfig.DBs, GlobalConfig.Tables
if len(dbs) == 0 && len(tables) == 0 {
return errors.New("expected at least one database or tables, please use --dbs/--tables flag or --ddl flag")
} else if len(dbs) == 1 {
// prepend default database if only one database specified
prefix := dbs[0] + "."
for i, t := range GlobalConfig.Tables {
if !strings.Contains(t, ".") {
GlobalConfig.Tables[i] = prefix + t
}
}
} else {
for _, t := range tables {
if !strings.Contains(t, ".") {
return errors.New("expected database in table name when zero/multiple databases specified, e.g. --tables db1.table1,db2.table2")
}
}
if err := completeDBTables(); err != nil {
return err
}

if len(GlobalConfig.Tables) == 0 {
Expand All @@ -198,6 +180,14 @@ func completeCreateConfig() (err error) {
} else {
for _, table := range GlobalConfig.Tables {
tableddl := filepath.Join(ddldir, fmt.Sprintf("%s.table.sql", table))
if _, err := os.Stat(tableddl); err != nil {
// maybe a view
fmatch := filepath.Join(ddldir, fmt.Sprintf("%s.*view.sql", table))
if viewddls, err := src.FileGlob([]string{fmatch}); err == nil && len(viewddls) > 0 {
createOtherDDLs = append(createOtherDDLs, viewddls...)
}
continue
}
createTableDDLs = append(createTableDDLs, tableddl)
}
}
Expand Down
4 changes: 4 additions & 0 deletions cmd/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,10 @@ func init() {
pFlags.StringVar(&DumpConfig.SSHPrivateKey, "ssh-private-key", "~/.ssh/id_rsa", "File path of SSH private key for '--ssh-address'")
addAnonymizeBaseFlags(pFlags, false)

dumpCmd.RegisterFlagCompletionFunc("query-states", func(_ *cobra.Command, _ []string, _ string) ([]string, cobra.ShellCompDirective) {
return []string{"ok", "eof", "err"}, cobra.ShellCompDirectiveNoFileComp | cobra.ShellCompDirectiveDefault
})

flags := dumpCmd.Flags()
flags.BoolVar(&DumpConfig.Clean, "clean", false, "Clean previous data and output directory")
}
Expand Down
201 changes: 201 additions & 0 deletions cmd/export.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
/*
Copyright © 2025 Thearas thearas850@gmail.com

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cmd

import (
"context"
"errors"
"fmt"
"os/signal"
"strings"
"syscall"

"github.com/jmoiron/sqlx"
"github.com/samber/lo"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/valyala/fasttemplate"

"github.com/Thearas/dodo/src"
)

// ExportConfig holds the configuration values
var ExportConfig = Export{}

// Export holds the configuration for the export command
type Export struct {
Target string
ToURL string
Properties map[string]string
With map[string]string

dbconn *sqlx.DB
}

// TODO: Support BROKER export?
// exportCmd represents the export command
var exportCmd = &cobra.Command{
Use: "export",
Short: "Export data from Doris",
Long: `Export data from Doris via [Export](https://doris.apache.org/docs/sql-manual/sql-statements/data-modification/load-and-export/EXPORT) command.

Example:
dodo export --target s3 --url 's3://bucket/export/{db}/{table}_' -p timeout=60 -w s3.endpoint=xxx -w s3.access_key=xxx -w s3.secret_key=xxx
dodo export --target hdfs --url 'hdfs://path/to/export/{db}/{table}_' -w fs.defaultFS=hdfs://HDFS8000871 -w hadoop.username=xxx`,
Aliases: []string{"e"},
PersistentPreRunE: func(cmd *cobra.Command, _ []string) error {
return initConfig(cmd)
},
SilenceUsage: true,
RunE: func(cmd *cobra.Command, _ []string) (err error) {
ctx, _ := signal.NotifyContext(cmd.Context(), syscall.SIGABRT, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP)

if err := completeExportConfig(ctx); err != nil {
return err
}
GlobalConfig.Parallel = min(GlobalConfig.Parallel, len(GlobalConfig.Tables))

logrus.Infof("Export data for %d table(s) to '%s', parallel: %d\n", len(GlobalConfig.Tables), ExportConfig.ToURL, GlobalConfig.Parallel)
if len(GlobalConfig.Tables) == 0 {
return nil
}
if !src.Confirm("Confirm") {
return nil
}

g := src.ParallelGroup(GlobalConfig.Parallel)
for _, t := range GlobalConfig.Tables {
dbtable := strings.SplitN(t, ".", 2)
if len(dbtable) != 2 {
return fmt.Errorf("invalid table format '%s', expected 'db.table'", t)
}
dbname, table := dbtable[0], dbtable[1]
toURL := fasttemplate.ExecuteString(ExportConfig.ToURL, "{", "}", map[string]any{"db": dbname, "table": table})

g.Go(func() error {
logrus.Infof("Exporting table '%s.%s' to '%s'", dbname, table, toURL)
if err := src.Export(ctx, ExportConfig.dbconn, dbname, table, ExportConfig.Target, toURL, ExportConfig.With, ExportConfig.Properties); err != nil {
return fmt.Errorf("export table '%s.%s' failed: %w", dbname, table, err)
}
logrus.Infof("Export completed for table '%s.%s'", dbname, table)
return nil
})
}

return g.Wait()
},
}

func init() {
rootCmd.AddCommand(exportCmd)
exportCmd.PersistentFlags().SortFlags = false
exportCmd.Flags().SortFlags = false

pFlags := exportCmd.PersistentFlags()
pFlags.StringVarP(&ExportConfig.Target, "target", "t", "s3", "Target storage for the export, e.g. 's3', 'hdfs'")
pFlags.StringVarP(&ExportConfig.ToURL, "url", "u", "", "Target URL that Doris export to, can use placeholders {db} and {table}, e.g. 's3://bucket/export/{db}/{table}_', 'hdfs://path/to/{db}/{table}_'")
pFlags.StringToStringVarP(&ExportConfig.Properties, "props", "p", map[string]string{}, "Additional properties, e.g. 'format=parquet'")
pFlags.StringToStringVarP(&ExportConfig.With, "with", "w", map[string]string{}, "Additional options for export target, e.g. 's3.endpoint=xxx'")

exportCmd.RegisterFlagCompletionFunc("target", func(_ *cobra.Command, _ []string, _ string) ([]string, cobra.ShellCompDirective) {
return []string{"s3", "hdfs", "local"}, cobra.ShellCompDirectiveNoFileComp | cobra.ShellCompDirectiveDefault
})

exportCmd.RegisterFlagCompletionFunc("url", func(_ *cobra.Command, _ []string, _ string) ([]string, cobra.ShellCompDirective) {
compopts := cobra.ShellCompDirectiveNoFileComp | cobra.ShellCompDirectiveDefault
switch ExportConfig.Target {
case "s3":
return []string{"s3://"}, compopts
case "hdfs":
return []string{"hdfs://"}, compopts
case "local":
return []string{"file://"}, compopts
}
return []string{}, cobra.ShellCompDirectiveError
})

compopts := cobra.ShellCompDirectiveNoFileComp | cobra.ShellCompDirectiveDefault | cobra.ShellCompDirectiveNoSpace | cobra.ShellCompDirectiveKeepOrder
exportCmd.RegisterFlagCompletionFunc("props", func(_ *cobra.Command, _ []string, _ string) ([]string, cobra.ShellCompDirective) {
// https://doris.apache.org/docs/sql-manual/sql-statements/data-modification/load-and-export/EXPORT#optional-parameters
return []string{
"label=",
"column_separator=",
"line_delimiter=",
"timeout=",
"columns=",
"format=",
"parallelism=",
"delete_existing_files=",
"max_file_size=",
"with_bom=",
"compress_type=",
},
compopts
})

exportCmd.RegisterFlagCompletionFunc("with", func(_ *cobra.Command, _ []string, _ string) ([]string, cobra.ShellCompDirective) {
switch ExportConfig.Target {
case "s3":
return []string{"s3.endpoint=", "s3.access_key=", "s3.secret_key=", "s3.region="}, compopts
case "hdfs":
return []string{"fs.defaultFS=", "hadoop.username=", "fs.", "dfs.", "hadoop."}, compopts
}
return []string{}, cobra.ShellCompDirectiveError
})
}

func completeExportConfig(ctx context.Context) (err error) {
if err = completeDBTables(); err != nil {
return err
}

ExportConfig.Target = strings.ToLower(ExportConfig.Target)
if ExportConfig.Target == "" {
return errors.New("export target is required, use --target or -t to specify it")
}
urlPrefix := ExportConfig.Target
if urlPrefix == "local" {
urlPrefix = "file"
}
if !strings.HasPrefix(ExportConfig.ToURL, urlPrefix+"://") {
return fmt.Errorf("export URL must start with '%s://', got: '%s'", urlPrefix, ExportConfig.ToURL)
}

ExportConfig.dbconn, err = connectDBWithoutDBName()
if err != nil {
return fmt.Errorf("failed to connect to database: %w", err)
}

// find tables if not provided
if len(GlobalConfig.Tables) > 0 {
return nil
}
for _, db := range GlobalConfig.DBs {
schemas, err := src.ShowTables(ctx, ExportConfig.dbconn, db)
if err != nil {
return fmt.Errorf("failed to get tables for database '%s': %w", db, err)
}
tables := lo.FilterMap(schemas, func(s *src.Schema, _ int) (string, bool) {
return s.Name, s.Type == src.SchemaTypeTable
})
logrus.Infof("Found %d table(s) in database '%s'", len(tables), db)
for _, table := range tables {
GlobalConfig.Tables = append(GlobalConfig.Tables, db+"."+table)
}
}

return nil
}
24 changes: 6 additions & 18 deletions cmd/gendata.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,10 @@ func init() {
pFlags.StringVarP(&GendataConfig.Query, "query", "q", "", "SQL query file to generate data, only can be used when LLM is on")
pFlags.StringVarP(&GendataConfig.Prompt, "prompt", "p", "", "Additional user prompt for LLM")
addAnonymizeBaseFlags(pFlags, false)

gendataCmd.RegisterFlagCompletionFunc("llm", func(_ *cobra.Command, _ []string, _ string) ([]string, cobra.ShellCompDirective) {
return []string{"deepseek-reasoner", "deepseek-chat"}, cobra.ShellCompDirectiveNoFileComp | cobra.ShellCompDirectiveDefault
})
}

// completeGendataConfig validates and completes the gendata configuration
Expand Down Expand Up @@ -299,24 +303,8 @@ func completeGendataConfig() (err error) {
return nil
}

GlobalConfig.DBs, GlobalConfig.Tables = lo.Uniq(GlobalConfig.DBs), lo.Uniq(GlobalConfig.Tables)
dbs, tables := GlobalConfig.DBs, GlobalConfig.Tables
if len(dbs) == 0 && len(tables) == 0 {
return errors.New("expected at least one database or tables, please use --dbs/--tables flag or --ddl flag with '.sql' file(s)")
} else if len(dbs) == 1 {
// prepend default database if only one database specified
prefix := dbs[0] + "."
for i, t := range GlobalConfig.Tables {
if !strings.Contains(t, ".") {
GlobalConfig.Tables[i] = prefix + t
}
}
} else {
for _, t := range tables {
if !strings.Contains(t, ".") {
return errors.New("expected database in table name when zero/multiple databases specified, e.g. --tables db1.table1,db2.table2")
}
}
if err := completeDBTables(); err != nil {
return err
}

ddls := []string{}
Expand Down
20 changes: 2 additions & 18 deletions cmd/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,24 +105,8 @@ func completeImportConfig() (err error) {
ImportConfig.Data = filepath.Join(GlobalConfig.OutputDir, "gendata")
}

GlobalConfig.DBs, GlobalConfig.Tables = lo.Uniq(GlobalConfig.DBs), lo.Uniq(GlobalConfig.Tables)
dbs, tables := GlobalConfig.DBs, GlobalConfig.Tables
if len(dbs) == 0 && len(tables) == 0 {
return errors.New("expected at least one database or tables, please use --dbs/--tables flag")
} else if len(dbs) == 1 {
// prepend default database if only one database specified
prefix := dbs[0] + "."
for i, t := range GlobalConfig.Tables {
if !strings.Contains(t, ".") {
GlobalConfig.Tables[i] = prefix + t
}
}
} else {
for _, t := range tables {
if !strings.Contains(t, ".") {
return errors.New("expected database in table name when zero/multiple databases specified, e.g. --tables db1.table1,db2.table2")
}
}
if err := completeDBTables("expected at least one database or tables, please use --dbs/--tables flag"); err != nil {
return err
}

table2datafiles := map[string][]string{}
Expand Down
Loading