Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
51a81df
parser: support graph related ddl/query syntax (#5)
lonng Dec 25, 2021
fb8c416
add more unit test for parser
lonng Dec 26, 2021
8d36682
planner/core: support build graph related ast nodes to logical plans
lonng Dec 26, 2021
5360282
parser: support omit destination of edge pattern
lonng Dec 27, 2021
89e36c6
planner/core: refine the plan of graph pattern
lonng Dec 28, 2021
e267cc4
Merge pull request #6 from tigraph/hackathon2021-planner
vodkaslime Dec 31, 2021
a784e57
planner,ddl: fix some panic and refine the planer logical (#8)
lonng Jan 1, 2022
a14f2f5
codec: support write graph records (#9)
0xPoe Jan 1, 2022
8bb5a34
graph write supports adaptive primary/source/destination keys (#10)
lonng Jan 1, 2022
acb703b
apply change to depend on tipb from tigraph
vodkaslime Jan 2, 2022
c88fe11
Merge pull request #11 from JeepYiheihou/hackathon2021
vodkaslime Jan 2, 2022
c0ff240
*: implement the graph read path (#12)
lonng Jan 2, 2022
a4cc19e
*: reuse the regular table to store graph vertex data (#13)
lonng Jan 3, 2022
96c8375
executor: finish graph executor (#14)
sleepymole Jan 4, 2022
5e016eb
executor: graph supports prune column optimizer rule (#15)
lonng Jan 5, 2022
e1a0802
planner/core: disable the graph edge scan predicates push down
lonng Jan 6, 2022
ed63a26
parser: fix source/destination key column option restore
lonng Jan 6, 2022
7b4a959
executor: fix show create table result of edge table
lonng Jan 7, 2022
b881fbf
planner: prune destination columns if destination is not specified (#17)
sleepymole Jan 7, 2022
636de72
executor: make BOTH iter bidirectional edge (#18)
sleepymole Jan 7, 2022
bc5aeed
ddl: support alter column add graph option (#19)
sleepymole Jan 8, 2022
14f540b
executor,planner: graph support match any shortest (#20)
sleepymole Jan 9, 2022
eef8768
executor: fix graph shortest path dst id
sleepymole Jan 9, 2022
3fca98d
Merge pull request #21 from gozssky/graph-shortest
sleepymole Jan 9, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions br/pkg/lightning/backend/local/duplicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,16 +464,17 @@ func (manager *DuplicateManager) CollectDuplicateRowsFromLocalIndex(
logger := log.With(zap.String("table", tableName))

allRanges := make([]tidbkv.KeyRange, 0)
tableIDs := physicalTableIDs(tbl.Meta())
tableInfo := tbl.Meta()
tableIDs := physicalTableIDs(tableInfo)
// Collect row handle duplicates.
var dataConflictInfos []errormanager.DataConflictInfo
hasDataConflict := false
{
ranges := ranger.FullIntRange(false)
if tbl.Meta().IsCommonHandle {
if tableInfo.IsCommonHandle {
ranges = ranger.FullRange()
}
keyRanges, err := distsql.TableHandleRangesToKVRanges(nil, tableIDs, tbl.Meta().IsCommonHandle, ranges, nil)
keyRanges, err := distsql.TableHandleRangesToKVRanges(nil, tableIDs, tableInfo.IsCommonHandle, tableInfo.Type, ranges, nil)
if err != nil {
return false, errors.Trace(err)
}
Expand Down Expand Up @@ -534,7 +535,7 @@ func (manager *DuplicateManager) CollectDuplicateRowsFromLocalIndex(
}
}
handles := makePendingIndexHandlesWithCapacity(0)
for _, indexInfo := range tbl.Meta().Indices {
for _, indexInfo := range tableInfo.Indices {
if indexInfo.State != model.StatePublic {
continue
}
Expand Down Expand Up @@ -763,7 +764,7 @@ func (manager *DuplicateManager) makeConn(ctx context.Context, storeID uint64) (
func buildDuplicateRequests(tableInfo *model.TableInfo) ([]*DuplicateRequest, error) {
var reqs []*DuplicateRequest
for _, id := range physicalTableIDs(tableInfo) {
tableReqs, err := buildTableRequests(id, tableInfo.IsCommonHandle)
tableReqs, err := buildTableRequests(id, tableInfo.IsCommonHandle, tableInfo.Type)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
34 changes: 34 additions & 0 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -935,6 +935,40 @@ func (w *worker) onModifyColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver in
return w.doModifyColumnTypeWithData(d, t, job, dbInfo, tblInfo, jobParam.changingCol, oldCol, jobParam.newCol.Name, jobParam.pos, jobParam.changingIdxs)
}

func onModifyColumnAddGraphOption(t *meta.Meta, job *model.Job) (ver int64, _ error) {
var srcCol, dstCol ast.ColumnDef
if err := job.DecodeArgs(&srcCol, &dstCol); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

tblInfo, err := getTableInfoAndCancelFaultJob(t, job, job.SchemaID)
if err != nil {
return ver, errors.Trace(err)
}
for _, col := range tblInfo.Columns {
if col.Name.L == srcCol.Name.Name.L {
col.Flag |= mysql.SrcKeyFlag
}
if col.Name.L == dstCol.Name.Name.L {
col.Flag |= mysql.DstKeyFlag
}
}
srcRefTable := srcCol.Options[0].Refer.Table
dstRefTable := dstCol.Options[0].Refer.Table
tblInfo.EdgeOptions = &model.EdgeOptions{
Source: &model.EdgeReference{Schema: srcRefTable.Schema, Table: srcRefTable.Name},
Destination: &model.EdgeReference{Schema: dstRefTable.Schema, Table: dstRefTable.Name},
}

ver, err = updateVersionAndTableInfo(t, job, tblInfo, true)
if err != nil {
return ver, errors.Trace(err)
}
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
return ver, nil
}

// rollbackModifyColumnJobWithData is used to rollback modify-column job which need to reorg the data.
func rollbackModifyColumnJobWithData(t *meta.Meta, tblInfo *model.TableInfo, job *model.Job, oldCol *model.ColumnInfo, jobParam *modifyColumnJobParameter) (ver int64, err error) {
// If the not-null change is included, we should clean the flag info in oldCol.
Expand Down
26 changes: 26 additions & 0 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7641,3 +7641,29 @@ func (s *testDBSuite8) TestCreateTextAdjustLen(c *C) {
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin"))
tk.MustExec("drop table if exists t")
}

func (s *testSerialDBSuite) TestCreateGraph(c *C) {
tk := testkit.NewTestKit(c, s.store)
defer config.RestoreFunc()()
config.UpdateGlobal(func(conf *config.Config) {
conf.AlterPrimaryKey = false
})

tk.MustExec("drop database if exists test_graph")
defer tk.MustExec("drop database if exists test_graph")
tk.MustExec("create database test_graph")
tk.MustExec("set @@tidb_enable_clustered_index=0")
tk.MustExec("use test_graph")
tk.MustExec("create table people1 (id bigint)")
tk.MustExec("create table people2 (id bigint, name varchar(32), uid int)")
people1 := testGetTableByName(c, tk.Se, "test_graph", "people1")
c.Assert(people1.Meta().EdgeOptions, IsNil)

tk.MustExec("create table friend1 (src bigint SOURCE KEY REFERENCES people1, dst bigint DESTINATION KEY REFERENCES people1)")
tk.MustExec("create table friend2 (src bigint SOURCE KEY REFERENCES people1, dst bigint DESTINATION KEY REFERENCES people1, description varchar(32), start timestamp)")
friend1 := testGetTableByName(c, tk.Se, "test_graph", "friend1")

c.Assert(friend1.Meta().EdgeOptions, NotNil)
c.Assert(friend1.Meta().EdgeOptions.Source.Table.O, Equals, "people1")
c.Assert(friend1.Meta().EdgeOptions.Destination.Table.O, Equals, "people1")
}
146 changes: 146 additions & 0 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1908,6 +1908,10 @@ func buildTableInfoWithStmt(ctx sessionctx.Context, s *ast.CreateTableStmt, dbCh
return nil, errors.Trace(err)
}

if err = handleEdgeOptions(tbInfo, colDefs); err != nil {
return nil, errors.Trace(err)
}

if tbInfo.TempTableType == model.TempTableNone && tbInfo.PlacementPolicyRef == nil && tbInfo.DirectPlacementOpts == nil {
// Set the defaults from Schema. Note: they are mutual exclusive!
if placementPolicyRef != nil {
Expand Down Expand Up @@ -1939,6 +1943,87 @@ func buildTableInfoWithStmt(ctx sessionctx.Context, s *ast.CreateTableStmt, dbCh
return tbInfo, nil
}

func handleEdgeOptions(tbInfo *model.TableInfo, colDefs []*ast.ColumnDef) error {
var (
srcIdx, dstIdx int
edgeOptions = &model.EdgeOptions{}
)
for i, cd := range colDefs {
if len(cd.Options) == 0 {
continue
}
for _, opt := range cd.Options {
switch opt.Tp {
case ast.ColumnOptionSourceKey:
if edgeOptions.Source != nil {
return errors.Errorf("Only one column can be specified SOURCE KEY option")
}
srcIdx = i
edgeOptions.Source = &model.EdgeReference{Schema: opt.Refer.Table.Schema, Table: opt.Refer.Table.Name}
case ast.ColumnOptionDestinationKey:
if edgeOptions.Destination != nil {
return errors.Errorf("Only one column can be specified DESTINATION KEY option")
}
dstIdx = i
edgeOptions.Destination = &model.EdgeReference{Schema: opt.Refer.Table.Schema, Table: opt.Refer.Table.Name}
}
}
}

// Regular table.
if edgeOptions.Source == nil && edgeOptions.Destination == nil {
return nil
}

// Edge table cannot be assigned a primary key.
if tbInfo.IsCommonHandle || tbInfo.PKIsHandle {
return errors.New("can not specified primary key on edge")
}

if edgeOptions.Source == nil || edgeOptions.Destination == nil {
return errors.Errorf("SOURCE KEY and DESTINATION KEY columns need to be specified at the same time")
}

primaryKey := &model.IndexInfo{
Name: model.NewCIStr(mysql.PrimaryKeyName),
Unique: true,
Primary: true,
State: model.StatePublic,
}

edgeKey := &model.IndexInfo{
Name: model.NewCIStr(fmt.Sprintf(mysql.GraphEdgeKeyName)),
Unique: true,
State: model.StatePublic,
}

for _, idx := range []int{srcIdx, dstIdx} {
tbInfo.Columns[idx].Flag |= mysql.PriKeyFlag
tbInfo.Columns[idx].Flag |= mysql.NotNullFlag
primaryKey.Columns = append(primaryKey.Columns, &model.IndexColumn{
Name: model.NewCIStr(tbInfo.Columns[idx].Name.O),
Offset: idx,
Length: types.UnspecifiedLength,
})
}
for _, idx := range []int{dstIdx, srcIdx} {
edgeKey.Columns = append(edgeKey.Columns, &model.IndexColumn{
Name: model.NewCIStr(tbInfo.Columns[idx].Name.O),
Offset: idx,
Length: types.UnspecifiedLength,
})
}

tbInfo.Columns[srcIdx].Flag |= mysql.SrcKeyFlag
tbInfo.Columns[dstIdx].Flag |= mysql.DstKeyFlag
tbInfo.Indices = append(tbInfo.Indices, primaryKey)
tbInfo.Indices = append(tbInfo.Indices, edgeKey)
tbInfo.EdgeOptions = edgeOptions
tbInfo.IsCommonHandle = true

return nil
}

func (d *ddl) assignTableID(tbInfo *model.TableInfo) error {
genIDs, err := d.genGlobalIDs(1)
if err != nil {
Expand Down Expand Up @@ -2645,6 +2730,39 @@ func (d *ddl) AlterTable(ctx context.Context, sctx sessionctx.Context, ident ast
return ErrWrongObject.GenWithStackByArgs(ident.Schema, ident.Name, "BASE TABLE")
}

var s0, s1 *ast.AlterTableSpec
if func() bool {
if len(validSpecs) != 2 {
return false
}
s0, s1 = validSpecs[0], validSpecs[1]
if len(s0.NewColumns) != 1 || len(s1.NewColumns) != 1 {
return false
}
col0 := s0.NewColumns[0]
col1 := s1.NewColumns[0]
if len(col0.Options) != 1 || len(col1.Options) != 1 {
return false
}
opt0 := col0.Options[0]
opt1 := col1.Options[0]
if opt0.Tp == ast.ColumnOptionDestinationKey {
opt0, opt1 = opt1, opt0
s0, s1 = s1, s0
}
return opt0.Tp == ast.ColumnOptionSourceKey && opt1.Tp == ast.ColumnOptionDestinationKey
}() {
if err := d.ModifyColumnAddGraphOption(sctx, ident, s0, s1); err != nil {
return err
}
keys := []*ast.IndexPartSpecification{
{Column: s1.NewColumns[0].Name, Length: -1},
{Column: s0.NewColumns[0].Name, Length: -1},
}
return d.CreateIndex(sctx, ident, ast.IndexKeyTypeUnique, model.NewCIStr(fmt.Sprintf(mysql.GraphEdgeKeyName)),
keys, nil, false)
}

err = checkMultiSpecs(sctx, validSpecs)
if err != nil {
return err
Expand Down Expand Up @@ -4526,6 +4644,34 @@ func (d *ddl) ModifyColumn(ctx context.Context, sctx sessionctx.Context, ident a
return errors.Trace(err)
}

func (d *ddl) ModifyColumnAddGraphOption(
ctx sessionctx.Context,
ident ast.Ident,
srcSpec, destSpec *ast.AlterTableSpec,
) error {
is := d.infoCache.GetLatest()
schema, ok := is.SchemaByName(ident.Schema)
if !ok {
return infoschema.ErrDatabaseNotExists.GenWithStackByArgs(ident.Schema)
}

tb, err := is.TableByName(ident.Schema, ident.Name)
if err != nil {
return errors.Trace(infoschema.ErrTableNotExists.GenWithStackByArgs(ident.Schema, ident.Name))
}
job := &model.Job{
SchemaID: schema.ID,
TableID: tb.Meta().ID,
SchemaName: schema.Name.L,
Type: model.ActionModifyColumnAddGraphOption,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{srcSpec.NewColumns[0], destSpec.NewColumns[0]},
}
err = d.doDDLJob(ctx, job)
err = d.callHookOnChanged(err)
return errors.Trace(err)
}

func (d *ddl) AlterColumn(ctx sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) error {
specNewColumn := spec.NewColumns[0]
is := d.infoCache.GetLatest()
Expand Down
2 changes: 2 additions & 0 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -776,6 +776,8 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64,
ver, err = onDropColumns(t, job)
case model.ActionModifyColumn:
ver, err = w.onModifyColumn(d, t, job)
case model.ActionModifyColumnAddGraphOption:
ver, err = onModifyColumnAddGraphOption(t, job)
case model.ActionSetDefaultValue:
ver, err = onSetDefaultValue(t, job)
case model.ActionAddIndex:
Expand Down
5 changes: 3 additions & 2 deletions ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,12 +448,13 @@ func (dc *ddlCtx) buildDescTableScan(ctx context.Context, startTS uint64, tbl ta
var b distsql.RequestBuilder
var builder *distsql.RequestBuilder
var ranges []*ranger.Range
if tbl.Meta().IsCommonHandle {
tableInfo := tbl.Meta()
if tableInfo.IsCommonHandle {
ranges = ranger.FullNotNullRange()
} else {
ranges = ranger.FullIntRange(false)
}
builder = b.SetHandleRanges(sctx.GetSessionVars().StmtCtx, tbl.GetPhysicalID(), tbl.Meta().IsCommonHandle, ranges, nil)
builder = b.SetHandleRanges(sctx.GetSessionVars().StmtCtx, tbl.GetPhysicalID(), tableInfo.IsCommonHandle, ranges, nil)
builder.SetDAGRequest(dagPB).
SetStartTS(startTS).
SetKeepOrder(true).
Expand Down
7 changes: 4 additions & 3 deletions distsql/request_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package distsql

import (
"github.com/pingcap/tidb/parser/model"
"testing"

"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -59,7 +60,7 @@ func TestTableHandlesToKVRanges(t *testing.T) {

// Build key ranges.
expect := getExpectedRanges(1, hrs)
actual := TableHandlesToKVRanges(1, handles)
actual := TableHandlesToKVRanges(1, model.TableTypeIsRegular, handles)

// Compare key ranges and expected key ranges.
require.Equal(t, len(expect), len(actual))
Expand Down Expand Up @@ -214,7 +215,7 @@ func TestRequestBuilder1(t *testing.T) {
},
}

actual, err := (&RequestBuilder{}).SetHandleRanges(nil, 12, false, ranges, nil).
actual, err := (&RequestBuilder{}).SetHandleRanges(nil, 12, false, model.TableTypeIsRegular, ranges, nil).
SetDAGRequest(&tipb.DAGRequest{}).
SetDesc(false).
SetKeepOrder(false).
Expand Down Expand Up @@ -343,7 +344,7 @@ func TestRequestBuilder3(t *testing.T) {
handles := []kv.Handle{kv.IntHandle(0), kv.IntHandle(2), kv.IntHandle(3), kv.IntHandle(4),
kv.IntHandle(5), kv.IntHandle(10), kv.IntHandle(11), kv.IntHandle(100)}

actual, err := (&RequestBuilder{}).SetTableHandles(15, handles).
actual, err := (&RequestBuilder{}).SetTableHandles(15, model.TableTypeIsRegular, handles).
SetDAGRequest(&tipb.DAGRequest{}).
SetDesc(false).
SetKeepOrder(false).
Expand Down
Loading