diff --git a/cmd/neofs-node/grpc.go b/cmd/neofs-node/grpc.go index c31a3f9a93..bbcd88970d 100644 --- a/cmd/neofs-node/grpc.go +++ b/cmd/neofs-node/grpc.go @@ -8,6 +8,7 @@ import ( "net" "time" + iprotobuf "github.com/nspcc-dev/neofs-node/internal/protobuf" "github.com/nspcc-dev/neofs-sdk-go/object" "go.uber.org/zap" "golang.org/x/net/netutil" @@ -61,6 +62,7 @@ func initGRPC(c *cfg) { MinTime: 5 * time.Second, // w/o this server sends GoAway with ENHANCE_YOUR_CALM code "too_many_pings" PermitWithoutStream: true, }), + grpc.ForceServerCodecV2(iprotobuf.BufferedCodec{}), } if maxRecvMsgSizeOpt != nil { // TODO(@cthulhu-rider): the setting can be server-global only now, support diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index aec78b3b30..54c3643353 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "math" + "slices" "sync" "sync/atomic" "time" @@ -52,6 +53,7 @@ import ( "github.com/nspcc-dev/neofs-sdk-go/user" "github.com/nspcc-dev/neofs-sdk-go/version" "go.uber.org/zap" + "google.golang.org/grpc" ) type objectSvc struct { @@ -315,8 +317,25 @@ func initObjectService(c *cfg) { server := objectService.New(objSvc, mNumber, c.cfgObject.pool.search, fsChain, storage, c.metaService, c.key.PrivateKey, c.metricsCollector, aclChecker, aclSvc, coreConstructor) os.server = server + svcDesc := protoobject.ObjectService_ServiceDesc + svcDesc.Methods = slices.Clone(protoobject.ObjectService_ServiceDesc.Methods) + + const headMethod = "Head" + headInd := slices.IndexFunc(svcDesc.Methods, func(md grpc.MethodDesc) bool { return md.MethodName == headMethod }) + if headInd < 0 { + fatalOnErr(fmt.Errorf("missing %s method handler in object service desc", headMethod)) + } + + svcDesc.Methods[headInd].Handler = func(_ any, ctx context.Context, dec func(any) error, _ grpc.UnaryServerInterceptor) (any, error) { + req := new(protoobject.HeadRequest) + if err := dec(req); err != nil { + return nil, err + } + return server.HeadBuffered(ctx, req) + } + for _, srv := range c.cfgGRPC.servers { - protoobject.RegisterObjectServiceServer(srv, server) + srv.RegisterService(&svcDesc, server) } } diff --git a/internal/object/wire.go b/internal/object/wire.go index 548f337094..9e8d8d8c34 100644 --- a/internal/object/wire.go +++ b/internal/object/wire.go @@ -5,6 +5,7 @@ import ( "fmt" "io" + iprotobuf "github.com/nspcc-dev/neofs-node/internal/protobuf" "github.com/nspcc-dev/neofs-sdk-go/object" protoobject "github.com/nspcc-dev/neofs-sdk-go/proto/object" "github.com/nspcc-dev/neofs-sdk-go/proto/refs" @@ -12,6 +13,10 @@ import ( "google.golang.org/protobuf/proto" ) +// MaxHeaderVarintLen is varint len of [object.MaxHeaderLen]. +// TODO: unit test. +const MaxHeaderVarintLen = 3 // object.MaxHeaderLen + // Protobuf field numbers for object message. const ( _ = iota @@ -21,6 +26,35 @@ const ( fieldObjectPayload ) +// Protobuf field numbers for header message. +const ( + _ = iota + FieldHeaderVersion + FieldHeaderContainerID + FieldHeaderOwnerID + FieldHeaderCreationEpoch + FieldHeaderPayloadLength + FieldHeaderPayloadHash + FieldHeaderType + FieldHeaderHomoHash + FieldHeaderSessionToken + FieldHeaderAttributes + FieldHeaderSplit + FieldHeaderSessionTokenV2 +) + +// Protobuf field numbers for split header message. +const ( + _ = iota + FieldHeaderSplitParent + FieldHeaderSplitPrevious + FieldHeaderSplitParentSignature + FieldHeaderSplitParentHeader + /* FieldHeaderSplitChildren */ _ + /* FieldHeaderSplitSplitID */ _ + /* FieldHeaderSplitFirst */ _ +) + // WriteWithoutPayload writes the object header to the given writer without the payload. func WriteWithoutPayload(w io.Writer, obj object.Object) error { header := obj.CutPayload().Marshal() @@ -108,3 +142,135 @@ func ReadHeaderPrefix(r io.Reader) (*object.Object, []byte, error) { } return ExtractHeaderAndPayload(buf[:n]) } + +// SeekParentHeaderFields seeks parent ID, signature and header in object +// message with direct field order. +func SeekParentHeaderFields(b []byte) (iprotobuf.FieldBounds, iprotobuf.FieldBounds, iprotobuf.FieldBounds, error) { + var idf, sigf, hdrf iprotobuf.FieldBounds + + rootHdrf, err := iprotobuf.SeekBytesField(b, fieldObjectHeader) + if err != nil { + return idf, sigf, hdrf, err + } + + if rootHdrf.IsMissing() { + return idf, sigf, hdrf, nil + } + + splitf, err := iprotobuf.SeekBytesField(b[rootHdrf.ValueFrom:rootHdrf.To], FieldHeaderSplit) + if err != nil { + return idf, sigf, hdrf, err + } + + if splitf.IsMissing() { + return idf, sigf, hdrf, nil + } + + b = b[:rootHdrf.ValueFrom+splitf.To] + off := rootHdrf.ValueFrom + splitf.ValueFrom + var prevNum protowire.Number +loop: + for { + num, typ, n, err := iprotobuf.ParseTag(b, off) + if err != nil { + return idf, sigf, hdrf, err + } + + if num < prevNum { + return idf, sigf, hdrf, iprotobuf.NewUnorderedFieldsError(prevNum, num) + } + if num == prevNum && num <= FieldHeaderSplitParentHeader { + return idf, sigf, hdrf, iprotobuf.NewRepeatedFieldError(num) + } + prevNum = num + + switch num { + case FieldHeaderSplitParent: + idf, err = iprotobuf.ParseLenFieldBounds(b, off, n, num, typ) + if err != nil { + return idf, sigf, hdrf, err + } + off = idf.To + case FieldHeaderSplitPrevious: + off += n + ln, n, err := iprotobuf.ParseLenField(b, off, num, typ) + if err != nil { + return idf, sigf, hdrf, err + } + off += n + ln + case FieldHeaderSplitParentSignature: + sigf, err = iprotobuf.ParseLenFieldBounds(b, off, n, num, typ) + if err != nil { + return idf, sigf, hdrf, err + } + off = sigf.To + case FieldHeaderSplitParentHeader: + hdrf, err = iprotobuf.ParseLenFieldBounds(b, off, n, num, typ) + if err != nil { + return idf, sigf, hdrf, err + } + break loop + default: + break loop + } + + if off == len(b) { + break + } + } + + return idf, sigf, hdrf, nil +} + +// SeekParentHeaderFields seeks ID, signature and header in object message with +// direct field order. +func SeekHeaderFields(b []byte) (iprotobuf.FieldBounds, iprotobuf.FieldBounds, iprotobuf.FieldBounds, error) { + var idf, sigf, hdrf iprotobuf.FieldBounds + var off int + var prevNum protowire.Number +loop: + for { + num, typ, n, err := iprotobuf.ParseTag(b, off) + if err != nil { + return idf, sigf, hdrf, err + } + + if num < prevNum { + return idf, sigf, hdrf, iprotobuf.NewUnorderedFieldsError(prevNum, num) + } + if num == prevNum { + return idf, sigf, hdrf, iprotobuf.NewRepeatedFieldError(num) + } + prevNum = num + + switch num { + case fieldObjectID: + idf, err = iprotobuf.ParseLenFieldBounds(b, off, n, num, typ) + if err != nil { + return idf, sigf, hdrf, err + } + off = idf.To + case fieldObjectSignature: + sigf, err = iprotobuf.ParseLenFieldBounds(b, off, n, num, typ) + if err != nil { + return idf, sigf, hdrf, err + } + off = sigf.To + case fieldObjectHeader: + hdrf, err = iprotobuf.ParseLenFieldBounds(b, off, n, num, typ) + if err != nil { + return idf, sigf, hdrf, err + } + + break loop + default: + break loop + } + + if off == len(b) { + break + } + } + + return idf, sigf, hdrf, nil +} diff --git a/internal/object/wire_test.go b/internal/object/wire_test.go index cfeb77f361..4d043ffb4d 100644 --- a/internal/object/wire_test.go +++ b/internal/object/wire_test.go @@ -3,13 +3,22 @@ package object_test import ( "bytes" "crypto/rand" + "encoding/binary" "io" "testing" + "github.com/nspcc-dev/neo-go/pkg/crypto/keys" iobject "github.com/nspcc-dev/neofs-node/internal/object" + iprotobuf "github.com/nspcc-dev/neofs-node/internal/protobuf" + "github.com/nspcc-dev/neofs-node/internal/testutil" + neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto" "github.com/nspcc-dev/neofs-sdk-go/object" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test" objecttest "github.com/nspcc-dev/neofs-sdk-go/object/test" + protoobject "github.com/nspcc-dev/neofs-sdk-go/proto/object" "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" ) func TestWriteWithoutPayload(t *testing.T) { @@ -97,3 +106,290 @@ func TestReadHeaderPrefix(t *testing.T) { require.Equal(t, expectedSize, len(payloadPrefix)) require.Equal(t, payload[:expectedSize], payloadPrefix) } + +func TestSeekHeaderFields(t *testing.T) { + const pubkeyLen = 33 + id := oidtest.ID() + sig := neofscrypto.NewSignatureFromRawKey(neofscrypto.N3, testutil.RandByteSlice(pubkeyLen), testutil.RandByteSlice(keys.SignatureLen)) + payload := []byte("Hello, world!") + + obj := objecttest.Object() + obj.SetID(id) + obj.SetSignature(&sig) + obj.SetPayload(payload) + + hdrLen := obj.HeaderLen() + hdrLenVarint := binary.PutVarint(make([]byte, binary.MaxVarintLen64), int64(hdrLen)) + + encodeBuffer := func(obj []byte) []byte { + b := testutil.RandByteSlice(object.MaxHeaderLen * 2) + copy(b, obj) + return b + } + + encodeObject := func(obj object.Object) []byte { + return encodeBuffer(obj.Marshal()) + } + + assertID := func(t *testing.T, data []byte, f iprotobuf.FieldBounds, off int) { + require.EqualValues(t, off, f.From) + require.EqualValues(t, f.From+(1+1), f.ValueFrom) + const idFldLen = 1 + 1 + 32 + require.EqualValues(t, f.ValueFrom+idFldLen, f.To) + require.Equal(t, []byte{10, idFldLen}, data[f.From:f.ValueFrom]) + var gotID oid.ID + require.NoError(t, gotID.Unmarshal(data[f.ValueFrom:f.To])) + require.Equal(t, obj.GetID(), gotID) + } + assertSignature := func(t *testing.T, data []byte, f iprotobuf.FieldBounds, off int) { + require.EqualValues(t, off, f.From) + require.EqualValues(t, f.From+(1+1), f.ValueFrom) + const sigFldLen = (1 + 1 + pubkeyLen) + (1 + 1 + keys.SignatureLen) + (1 + 1) + require.EqualValues(t, f.ValueFrom+sigFldLen, f.To) + require.Equal(t, []byte{18, sigFldLen}, data[f.From:f.ValueFrom]) + var gotSig neofscrypto.Signature + require.NoError(t, gotSig.Unmarshal(data[f.ValueFrom:f.To])) + require.Equal(t, sig, gotSig) + } + assertHeader := func(t *testing.T, data []byte, f iprotobuf.FieldBounds, off int) { + require.EqualValues(t, off, f.From) + require.EqualValues(t, f.From+(1+hdrLenVarint), f.ValueFrom) + require.EqualValues(t, f.ValueFrom+hdrLen, f.To) + require.Equal(t, binary.AppendUvarint([]byte{26}, uint64(hdrLen)), data[f.From:f.ValueFrom]) + var gotHdr protoobject.Header + require.NoError(t, proto.Unmarshal(data[f.ValueFrom:f.To], &gotHdr)) + require.True(t, proto.Equal(obj.ProtoMessage().Header, &gotHdr)) + } + + t.Run("empty", func(t *testing.T) { + _, _, _, err := iobject.SeekHeaderFields(nil) + require.ErrorIs(t, err, io.ErrUnexpectedEOF) + require.EqualError(t, err, "parse field tag: "+io.ErrUnexpectedEOF.Error()) + + _, _, _, err = iobject.SeekHeaderFields([]byte{}) + require.ErrorIs(t, err, io.ErrUnexpectedEOF) + require.EqualError(t, err, "parse field tag: "+io.ErrUnexpectedEOF.Error()) + }) + t.Run("payload tag", func(t *testing.T) { + idf, sigf, hdrf, err := iobject.SeekHeaderFields([]byte{34}) + require.NoError(t, err) + + require.True(t, idf.IsMissing()) + require.True(t, sigf.IsMissing()) + require.True(t, hdrf.IsMissing()) + }) + t.Run("payload tag and len", func(t *testing.T) { + idf, sigf, hdrf, err := iobject.SeekHeaderFields([]byte{34, 13}) + require.NoError(t, err) + + require.True(t, idf.IsMissing()) + require.True(t, sigf.IsMissing()) + require.True(t, hdrf.IsMissing()) + }) + t.Run("payload", func(t *testing.T) { + var obj object.Object + obj.SetPayload(payload) + + data := encodeObject(obj) + + idf, sigf, hdrf, err := iobject.SeekHeaderFields(data) + require.NoError(t, err) + + require.True(t, idf.IsMissing()) + require.True(t, sigf.IsMissing()) + require.True(t, hdrf.IsMissing()) + }) + t.Run("id,signature,payload", func(t *testing.T) { + var obj object.Object + obj.SetID(id) + obj.SetSignature(&sig) + obj.SetPayload(payload) + + data := encodeObject(obj) + + idf, sigf, hdrf, err := iobject.SeekHeaderFields(data) + require.NoError(t, err) + + require.False(t, idf.IsMissing()) + require.False(t, sigf.IsMissing()) + require.True(t, hdrf.IsMissing()) + + assertID(t, data, idf, 0) + assertSignature(t, data, sigf, idf.To) + }) + t.Run("id,header,payload", func(t *testing.T) { + obj := obj + obj.SetSignature(nil) + + data := encodeObject(obj) + + idf, sigf, hdrf, err := iobject.SeekHeaderFields(data) + require.NoError(t, err) + + require.False(t, idf.IsMissing()) + require.True(t, sigf.IsMissing()) + require.False(t, hdrf.IsMissing()) + + assertID(t, data, idf, 0) + assertHeader(t, data, hdrf, idf.To) + }) + t.Run("id,signature", func(t *testing.T) { + var obj object.Object + obj.SetID(id) + obj.SetSignature(&sig) + + data := obj.Marshal() + + idf, sigf, hdrf, err := iobject.SeekHeaderFields(data) + require.NoError(t, err) + + require.False(t, idf.IsMissing()) + require.False(t, sigf.IsMissing()) + require.True(t, hdrf.IsMissing()) + + assertID(t, data, idf, 0) + assertSignature(t, data, sigf, idf.To) + }) + t.Run("id,header", func(t *testing.T) { + obj := obj + obj.SetSignature(nil) + obj.SetPayload(nil) + + data := encodeObject(obj) + + idf, sigf, hdrf, err := iobject.SeekHeaderFields(data) + require.NoError(t, err) + + require.False(t, idf.IsMissing()) + require.True(t, sigf.IsMissing()) + require.False(t, hdrf.IsMissing()) + + assertID(t, data, idf, 0) + assertHeader(t, data, hdrf, idf.To) + }) + t.Run("id,payload", func(t *testing.T) { + var obj object.Object + obj.SetID(id) + obj.SetPayload(payload) + + data := encodeObject(obj) + + idf, sigf, hdrf, err := iobject.SeekHeaderFields(data) + require.NoError(t, err) + + require.False(t, idf.IsMissing()) + require.True(t, sigf.IsMissing()) + require.True(t, hdrf.IsMissing()) + + assertID(t, data, idf, 0) + }) + t.Run("id", func(t *testing.T) { + var obj object.Object + obj.SetID(id) + + data := obj.Marshal() + + idf, sigf, hdrf, err := iobject.SeekHeaderFields(data) + require.NoError(t, err) + + require.False(t, idf.IsMissing()) + require.True(t, sigf.IsMissing()) + require.True(t, hdrf.IsMissing()) + + assertID(t, data, idf, 0) + }) + t.Run("signature,header,payload", func(t *testing.T) { + obj := obj + obj.SetID(oid.ID{}) + + data := encodeObject(obj) + + idf, sigf, hdrf, err := iobject.SeekHeaderFields(data) + require.NoError(t, err) + + require.True(t, idf.IsMissing()) + require.False(t, sigf.IsMissing()) + require.False(t, hdrf.IsMissing()) + + require.True(t, idf.IsMissing()) + assertSignature(t, data, sigf, 0) + assertHeader(t, data, hdrf, sigf.To) + }) + t.Run("signature,payload", func(t *testing.T) { + var obj object.Object + obj.SetSignature(&sig) + obj.SetPayload(payload) + + data := encodeObject(obj) + + idf, sigf, hdrf, err := iobject.SeekHeaderFields(data) + require.NoError(t, err) + + require.True(t, idf.IsMissing()) + require.False(t, sigf.IsMissing()) + require.True(t, hdrf.IsMissing()) + + assertSignature(t, data, sigf, 0) + }) + t.Run("signature", func(t *testing.T) { + var obj object.Object + obj.SetSignature(&sig) + + data := obj.Marshal() + + idf, sigf, hdrf, err := iobject.SeekHeaderFields(data) + require.NoError(t, err) + + require.True(t, idf.IsMissing()) + require.False(t, sigf.IsMissing()) + require.True(t, hdrf.IsMissing()) + + assertSignature(t, data, sigf, 0) + }) + t.Run("header,payload", func(t *testing.T) { + obj := obj + obj.SetID(oid.ID{}) + obj.SetSignature(nil) + + data := encodeObject(obj) + + idf, sigf, hdrf, err := iobject.SeekHeaderFields(data) + require.NoError(t, err) + + require.True(t, idf.IsMissing()) + require.True(t, sigf.IsMissing()) + require.False(t, hdrf.IsMissing()) + + assertHeader(t, data, hdrf, 0) + }) + t.Run("header", func(t *testing.T) { + obj := obj + obj.SetID(oid.ID{}) + obj.SetSignature(nil) + obj.SetPayload(nil) + + data := encodeObject(obj) + + idf, sigf, hdrf, err := iobject.SeekHeaderFields(data) + require.NoError(t, err) + + require.True(t, idf.IsMissing()) + require.True(t, sigf.IsMissing()) + require.False(t, hdrf.IsMissing()) + + assertHeader(t, data, hdrf, 0) + }) + + data := encodeObject(obj) + + idf, sigf, hdrf, err := iobject.SeekHeaderFields(data) + require.NoError(t, err) + + require.False(t, idf.IsMissing()) + require.False(t, sigf.IsMissing()) + require.False(t, hdrf.IsMissing()) + + assertID(t, data, idf, 0) + assertSignature(t, data, sigf, idf.To) + assertHeader(t, data, hdrf, sigf.To) +} diff --git a/internal/protobuf/api.go b/internal/protobuf/api.go new file mode 100644 index 0000000000..3252270f17 --- /dev/null +++ b/internal/protobuf/api.go @@ -0,0 +1,246 @@ +package protobuf + +import ( + "bytes" + "errors" + "fmt" + + "github.com/nspcc-dev/neo-go/pkg/crypto/hash" + "github.com/nspcc-dev/neo-go/pkg/encoding/address" + "github.com/nspcc-dev/neofs-sdk-go/checksum" + "github.com/nspcc-dev/neofs-sdk-go/user" + "github.com/nspcc-dev/neofs-sdk-go/version" + "google.golang.org/protobuf/encoding/protowire" +) + +// Various field numbers. +const ( + FieldVersionMajor = 1 + FieldVersionMinor = 2 + + FieldUserIDValue = 1 + + FieldChecksumType = 1 + FieldChecksumValue = 2 + + FieldAttributeKey = 1 + FieldAttributeValue = 2 +) + +// ParseAPIVersionField parses version.Version from the next field with known +// number and type at given offset. Also returns field length. +func ParseAPIVersionField(b []byte, off int, num protowire.Number, typ protowire.Type) (version.Version, int, error) { + lnFull, nFull, err := ParseLenField(b, off, num, typ) + if err != nil || lnFull == 0 { + return version.Version{}, nFull, err + } + off += nFull + + b = b[:off+lnFull] + + var ver version.Version + var prevNum protowire.Number + for { + num, typ, n, err := ParseTag(b, off) + if err != nil { + return version.Version{}, 0, err + } + off += n + + if num < prevNum { + return version.Version{}, 0, NewUnorderedFieldsError(prevNum, num) + } + if num == prevNum { + return version.Version{}, 0, NewRepeatedFieldError(num) + } + prevNum = num + + switch num { + case FieldVersionMajor: + u, n, err := ParseUint32Field(b, off, num, typ) + if err != nil { + return version.Version{}, 0, err + } + off += n + + ver.SetMajor(u) + case FieldVersionMinor: + u, n, err := ParseUint32Field(b, off, num, typ) + if err != nil { + return version.Version{}, 0, err + } + off += n + + ver.SetMinor(u) + default: + return version.Version{}, 0, NewUnsupportedFieldError(num, typ) + } + + if off == len(b) { + break + } + } + + return ver, nFull + lnFull, nil +} + +// ParseUserIDField parses user ID from the next field with known number and +// type at given offset. Also returns field length. +func ParseUserIDField(b []byte, off int, num protowire.Number, typ protowire.Type) ([]byte, int, error) { + lnFull, nFull, err := ParseLenField(b, off, num, typ) + if err != nil || lnFull == 0 { + return nil, nFull, err + } + off += nFull + + b = b[:off+lnFull] + + num, typ, n, err := ParseTag(b, off) + if err != nil { + return nil, 0, err + } + off += n + + if num != FieldUserIDValue { + return nil, 0, NewUnsupportedFieldError(num, typ) + } + + ln, n, err := ParseLenField(b, off, num, typ) + if err != nil { + return nil, 0, err + } + off += n + + b = b[off : off+ln] + + // TODO https://github.com/nspcc-dev/neofs-sdk-go/issues/669 + switch { + case len(b) != user.IDSize: + return nil, 0, fmt.Errorf("invalid length %d, expected %d", len(b), user.IDSize) + case b[0] != address.NEO3Prefix: + return nil, 0, fmt.Errorf("invalid prefix byte 0x%X, expected 0x%X", b[0], address.NEO3Prefix) + case !bytes.Equal(b[21:], hash.Checksum(b[:21])): + return nil, 0, errors.New("checksum mismatch") + } + + return b, nFull + lnFull, nil +} + +// ParseChecksum parses checksum.Checksum from the next field with known number +// and type at given offset. Also returns field length. +func ParseChecksum(b []byte, off int, num protowire.Number, typ protowire.Type) (checksum.Checksum, int, error) { + lnFull, nFull, err := ParseLenField(b, off, num, typ) + if err != nil || lnFull == 0 { + return checksum.Checksum{}, nFull, err + } + off += nFull + + b = b[:off+lnFull] + + var csTyp checksum.Type + var csVal []byte + var prevNum protowire.Number + for { + num, typ, n, err := ParseTag(b, off) + if err != nil { + return checksum.Checksum{}, 0, err + } + off += n + + if num < prevNum { + return checksum.Checksum{}, 0, NewUnorderedFieldsError(prevNum, num) + } + if num == prevNum { + return checksum.Checksum{}, 0, NewRepeatedFieldError(num) + } + prevNum = num + + switch num { + case FieldChecksumType: + csTyp, n, err = ParseEnumField[checksum.Type](b, off, num, typ) + if err != nil { + return checksum.Checksum{}, 0, err + } + off += n + case FieldChecksumValue: + ln, n, err := ParseLenField(b, off, num, typ) + if err != nil { + return checksum.Checksum{}, 0, err + } + off += n + + csVal = b[off : off+ln] + + off += ln + default: + return checksum.Checksum{}, 0, NewUnsupportedFieldError(num, typ) + } + + if off == len(b) { + break + } + } + + return checksum.New(csTyp, csVal), nFull + lnFull, nil +} + +// ParseChecksum parses key-value attribute from the next field with known +// number and type at given offset. Also returns field length. +func ParseAttribute(b []byte, off int, num protowire.Number, typ protowire.Type) ([]byte, []byte, int, error) { + lnFull, nFull, err := ParseLenField(b, off, num, typ) + if err != nil || lnFull == 0 { + return nil, nil, nFull, err + } + off += nFull + + b = b[:off+lnFull] + + var k, v []byte + var prevNum protowire.Number + for { + num, typ, n, err := ParseTag(b, off) + if err != nil { + return nil, nil, 0, err + } + off += n + + if num < prevNum { + return nil, nil, 0, NewUnorderedFieldsError(prevNum, num) + } + if num == prevNum { + return nil, nil, 0, NewRepeatedFieldError(num) + } + prevNum = num + + switch num { + case FieldAttributeKey: + ln, n, err := ParseLenField(b, off, num, typ) + if err != nil { + return nil, nil, 0, err + } + off += n + + k = b[off : off+ln] + + off += ln + case FieldAttributeValue: + ln, n, err := ParseLenField(b, off, num, typ) + if err != nil { + return nil, nil, 0, err + } + off += n + + v = b[off : off+ln] + + off += ln + default: + return nil, nil, 0, NewUnsupportedFieldError(num, typ) + } + + if off == len(b) { + break + } + } + + return k, v, nFull + lnFull, nil +} diff --git a/internal/protobuf/buffers.go b/internal/protobuf/buffers.go new file mode 100644 index 0000000000..5974e6b9ff --- /dev/null +++ b/internal/protobuf/buffers.go @@ -0,0 +1,58 @@ +package protobuf + +import ( + "sync" + "sync/atomic" + + "google.golang.org/grpc/mem" +) + +// TODO: docs. +type MemBuffer struct { + mem.SliceBuffer + ln int + refs atomic.Int32 + pool *sync.Pool +} + +func (x *MemBuffer) Finalize(ln int) { + x.ln = ln + x.Ref() +} + +func (x *MemBuffer) Len() int { + return x.ln +} + +func (x *MemBuffer) Ref() { + x.refs.Add(1) // TODO: fix in HEAD commit +} + +func (x *MemBuffer) Free() { + if x.refs.Add(-1) == 0 { + x.pool.Put(x) + } +} + +type MemBufferPool struct { + syncPool *sync.Pool +} + +func NewBufferPool(ln int) *MemBufferPool { + return &MemBufferPool{ + syncPool: &sync.Pool{ + New: func() any { + return &MemBuffer{ + SliceBuffer: make([]byte, ln), + } + }, + }, + } +} + +func (x *MemBufferPool) Get() *MemBuffer { + item := x.syncPool.Get().(*MemBuffer) + item.pool = x.syncPool + item.Ref() // TODO: not always needed + return item +} diff --git a/internal/protobuf/codecs.go b/internal/protobuf/codecs.go new file mode 100644 index 0000000000..38e140ee67 --- /dev/null +++ b/internal/protobuf/codecs.go @@ -0,0 +1,29 @@ +package protobuf + +import ( + "google.golang.org/grpc/encoding" + "google.golang.org/grpc/encoding/proto" + "google.golang.org/grpc/mem" +) + +// TODO: docs. +type BufferedCodec struct{} + +// TODO: docs. +func (BufferedCodec) Marshal(msg any) (mem.BufferSlice, error) { + if bs, ok := msg.(mem.Buffer); ok { + return mem.BufferSlice{bs}, nil + } + return encoding.GetCodecV2(proto.Name).Marshal(msg) +} + +// TODO: docs. +func (BufferedCodec) Unmarshal(data mem.BufferSlice, msg any) error { + return encoding.GetCodecV2(proto.Name).Unmarshal(data, msg) +} + +// TODO: docs. +func (BufferedCodec) Name() string { + // may be any non-empty, conflicts are unlikely to arise + return "neofs_custom_buffers" +} diff --git a/internal/protobuf/errors.go b/internal/protobuf/errors.go new file mode 100644 index 0000000000..5d7019ea4f --- /dev/null +++ b/internal/protobuf/errors.go @@ -0,0 +1,35 @@ +package protobuf + +import ( + "fmt" + + "google.golang.org/protobuf/encoding/protowire" +) + +// NewUnorderedFieldsError returns common error for field order violation when +// field #n2 goes after #n1. +func NewUnorderedFieldsError(n1, n2 protowire.Number) error { + return fmt.Errorf("unordered fields: #%d after #%d", n2, n1) +} + +// NewRepeatedFieldError returns common error for field #n repeated more than +// once. +func NewRepeatedFieldError(n protowire.Number) error { + return fmt.Errorf("repeated field #%d", n) +} + +// NewUnsupportedFieldError returns common error for unsupported field #n of +// type t. +func NewUnsupportedFieldError(n protowire.Number, t protowire.Type) error { + return fmt.Errorf("unsupported field #%d of type %v", n, t) +} + +// WrapParseFieldError wraps cause of parsing field #n of type t. +func WrapParseFieldError(n protowire.Number, t protowire.Type, cause error) error { + return fmt.Errorf("parse field (#%d,type=%v): %w", n, t, cause) +} + +// WrapSeekFieldError wraps cause of seeking field #n. +func WrapSeekFieldError(n protowire.Number, cause error) error { + return fmt.Errorf("seek field %d: %w", n, cause) +} diff --git a/internal/protobuf/field.go b/internal/protobuf/field.go new file mode 100644 index 0000000000..c40c27414c --- /dev/null +++ b/internal/protobuf/field.go @@ -0,0 +1,32 @@ +package protobuf + +// One-byte tags for varlen fields. +const ( + _ = iota<<3 | 2 + TagBytes1 + TagBytes2 + TagBytes3 + TagBytes4 + /* TagBytes5 */ _ + TagBytes6 +) + +// One-byte tags for varint fields. +const ( + _ = iota << 3 + TagVarint1 + TagVarint2 +) + +// FieldBounds represents bounds of some field in some continuous protobuf +// message. +type FieldBounds struct { + From int // tag index + ValueFrom int // first value byte index + To int // last byte index +} + +// IsMissing returns field absence flag. +func (x FieldBounds) IsMissing() bool { + return x.From == x.To +} diff --git a/internal/protobuf/parsers.go b/internal/protobuf/parsers.go new file mode 100644 index 0000000000..15e92e4c5b --- /dev/null +++ b/internal/protobuf/parsers.go @@ -0,0 +1,163 @@ +package protobuf + +import ( + "fmt" + "io" + "math" + + "google.golang.org/protobuf/encoding/protowire" +) + +// ParseTag parses tag of the next field in at given offset. Returns field +// number, type and tag length. +func ParseTag(b []byte, off int) (protowire.Number, protowire.Type, int, error) { + num, typ, n := protowire.ConsumeTag(b[off:]) + if n < 0 { + return 0, 0, 0, fmt.Errorf("parse field tag: %w", protowire.ParseError(n)) + } + + return num, typ, n, nil +} + +// TODO: docs. +func ParseVarint(b []byte, off int) (uint64, int, error) { + u, n := protowire.ConsumeVarint(b[off:]) + if n < 0 { + return 0, 0, fmt.Errorf("parse varint: %w", protowire.ParseError(n)) + } + + return u, n, nil +} + +// ParseLen parses length of the varlen field at given offset. Returns length of +// varlen tag and the length itself. +func ParseLen(b []byte, off int) (int, int, error) { + ln, n, err := ParseVarint(b, off) + if err != nil { + return 0, 0, fmt.Errorf("parse field len: %w", err) + } + + if rem := uint64(len(b) - off - n); ln > rem { + return 0, 0, fmt.Errorf("parse field len: %w (got %d, left buffer %d)", io.ErrUnexpectedEOF, ln, rem) + } + + return int(ln), n, nil +} + +// ParseLen parses length of the next varlen field with known number and type at +// given offset. Returns length of varlen tag and the length itself. +func ParseLenField(b []byte, off int, num protowire.Number, typ protowire.Type) (int, int, error) { + err := checkFieldType(num, protowire.BytesType, typ) + if err != nil { + return 0, 0, err + } + + ln, n, err := ParseLen(b, off) + if err != nil { + return 0, 0, WrapParseFieldError(num, protowire.BytesType, err) + } + + return ln, n, nil +} + +// ParseLenFieldBounds parses length of the next varlen field with tag length, +// number and type at given offset. +func ParseLenFieldBounds(b []byte, off int, tagLn int, num protowire.Number, typ protowire.Type) (FieldBounds, error) { + ln, nLn, err := ParseLenField(b, off+tagLn, num, typ) + if err != nil { + return FieldBounds{}, err + } + + var f FieldBounds + f.From = off + f.ValueFrom = f.From + tagLn + nLn + f.To = f.ValueFrom + ln + + return f, nil +} + +// ParseAnyField parses value of the next field with known number and type at +// given offset. Returns value length. +func ParseAnyField(b []byte, off int, num protowire.Number, typ protowire.Type) (int, error) { + // TODO: can be optimized by calculating len only? + n := protowire.ConsumeFieldValue(num, typ, b[off:]) + if n < 0 { + return 0, WrapParseFieldError(num, typ, protowire.ParseError(n)) + } + + return n, nil +} + +func parseEnum[T ~int32](b []byte, off int) (T, int, error) { + u, n, err := ParseVarint(b, off) + if err != nil { + return 0, 0, err + } + + if u > math.MaxUint32 { + return 0, 0, fmt.Errorf("value %d overflows int32", u) + } + + return T(u), n, nil +} + +// ParseEnumField parses value of the next enum field with known number and type +// at given offset. Returns value and its length. +func ParseEnumField[T ~int32](b []byte, off int, num protowire.Number, typ protowire.Type) (T, int, error) { + err := checkFieldType(num, protowire.VarintType, typ) + if err != nil { + return 0, 0, err + } + + e, n, err := parseEnum[T](b, off) + if err != nil { + return 0, 0, WrapParseFieldError(num, protowire.VarintType, err) + } + + return e, n, nil +} + +func parseUint32(b []byte, off int) (uint32, int, error) { + u, n, err := ParseVarint(b, off) + if err != nil { + return 0, 0, err + } + + if u > math.MaxUint32 { + return 0, 0, fmt.Errorf("value %d overflows uint32", u) + } + + return uint32(u), n, nil +} + +// ParseUint32Field parses value of the next uint32 field with known number and +// type at given offset. Returns value and its length. +func ParseUint32Field(b []byte, off int, num protowire.Number, typ protowire.Type) (uint32, int, error) { + err := checkFieldType(num, protowire.VarintType, typ) + if err != nil { + return 0, 0, err + } + + u, n, err := parseUint32(b, off) + if err != nil { + return 0, 0, WrapParseFieldError(num, protowire.VarintType, err) + } + + return u, n, nil +} + +// ParseUint64Field parses value of the next uint64 field with known number and +// type at given offset. Returns value and its length. +func ParseUint64Field(b []byte, off int, num protowire.Number, typ protowire.Type) (uint64, int, error) { + err := checkFieldType(num, protowire.VarintType, typ) + if err != nil { + return 0, 0, err + } + + u, n, err := ParseVarint(b, off) + if err != nil { + return 0, 0, WrapParseFieldError(num, protowire.VarintType, err) + } + + return u, n, nil +} diff --git a/internal/protobuf/protobuf.go b/internal/protobuf/protobuf.go index 807deefc37..7b85b6e048 100644 --- a/internal/protobuf/protobuf.go +++ b/internal/protobuf/protobuf.go @@ -30,3 +30,53 @@ func GetFirstBytesField(b []byte) ([]byte, error) { return b, nil } + +// SeekBytesField seeks varlen field type by number. +func SeekBytesField(b []byte, num protowire.Number) (FieldBounds, error) { + off, tagLn, typ, err := seekField(b, num) + if err != nil { + return FieldBounds{}, WrapSeekFieldError(num, err) + } + + if off < 0 { + return FieldBounds{}, nil + } + + return ParseLenFieldBounds(b, off, tagLn, num, typ) +} + +func seekField(b []byte, seekNum protowire.Number) (int, int, protowire.Type, error) { + var off int + var prevNum protowire.Number + for { + num, typ, n, err := ParseTag(b, off) + if err != nil { + return 0, 0, 0, err + } + + if num == seekNum { + return off, n, typ, nil + } + if num < prevNum { + return 0, 0, 0, NewUnorderedFieldsError(prevNum, num) + } + if num > seekNum { + break + } + prevNum = num + + off += n + + n, err = ParseAnyField(b, off, num, typ) + if err != nil { + return 0, 0, 0, err + } + off += n + + if off == len(b) { + break + } + } + + return -1, 0, 0, nil +} diff --git a/internal/protobuf/util.go b/internal/protobuf/util.go new file mode 100644 index 0000000000..45fb9e32b2 --- /dev/null +++ b/internal/protobuf/util.go @@ -0,0 +1,14 @@ +package protobuf + +import ( + "fmt" + + "google.golang.org/protobuf/encoding/protowire" +) + +func checkFieldType(num protowire.Number, exp, got protowire.Type) error { + if exp == got { + return nil + } + return fmt.Errorf("wrong type of field #%d: expected %v, got %v", num, exp, got) +} diff --git a/pkg/local_object_storage/blobstor/common/storage.go b/pkg/local_object_storage/blobstor/common/storage.go index cb8f02adbe..aab5c3f32a 100644 --- a/pkg/local_object_storage/blobstor/common/storage.go +++ b/pkg/local_object_storage/blobstor/common/storage.go @@ -28,7 +28,9 @@ type Storage interface { Get(oid.Address) (*object.Object, error) GetRangeStream(addr oid.Address, off uint64, ln uint64) (io.ReadCloser, error) GetStream(oid.Address) (*object.Object, io.ReadCloser, error) + OpenStream(oid.Address, func() []byte) (int, io.ReadCloser, error) Head(oid.Address) (*object.Object, error) + HeadToBuffer(oid.Address, func() []byte) (int, error) Exists(oid.Address) (bool, error) Put(oid.Address, []byte) error PutBatch(map[oid.Address][]byte) error diff --git a/pkg/local_object_storage/blobstor/compression/compress.go b/pkg/local_object_storage/blobstor/compression/compress.go index d0d3c99193..a02dcf1017 100644 --- a/pkg/local_object_storage/blobstor/compression/compress.go +++ b/pkg/local_object_storage/blobstor/compression/compress.go @@ -8,6 +8,9 @@ import ( "github.com/nspcc-dev/neofs-sdk-go/object" ) +// PrefixLength is a length of compression marker in compressed data. +const PrefixLength = 4 + // Config represents common compression-related configuration. type Config struct { Enabled bool @@ -73,7 +76,7 @@ func (c *Config) NeedsCompression(obj *object.Object) bool { // IsCompressed checks whether given data is compressed. func (c *Config) IsCompressed(data []byte) bool { - return len(data) >= 4 && bytes.Equal(data[:4], zstdFrameMagic) + return len(data) >= PrefixLength && bytes.Equal(data[:PrefixLength], zstdFrameMagic) } // Decompress decompresses data if it starts with the magic diff --git a/pkg/local_object_storage/blobstor/compression/compress_internal_test.go b/pkg/local_object_storage/blobstor/compression/compress_internal_test.go new file mode 100644 index 0000000000..8a775ceb33 --- /dev/null +++ b/pkg/local_object_storage/blobstor/compression/compress_internal_test.go @@ -0,0 +1,11 @@ +package compression + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestPrefixLen(t *testing.T) { + require.Len(t, zstdFrameMagic, PrefixLength) +} diff --git a/pkg/local_object_storage/blobstor/fstree/bench_test.go b/pkg/local_object_storage/blobstor/fstree/bench_test.go index 2c8af2c286..305e9d131f 100644 --- a/pkg/local_object_storage/blobstor/fstree/bench_test.go +++ b/pkg/local_object_storage/blobstor/fstree/bench_test.go @@ -18,6 +18,14 @@ func BenchmarkFSTree_Head(b *testing.B) { } } +func BenchmarkFSTree_HeadToBuffer(b *testing.B) { + for _, size := range payloadSizes { + b.Run(generateSizeLabel(size), func(b *testing.B) { + runReadBenchmark(b, "HeadToBuffer", size) + }) + } +} + func BenchmarkFSTree_Get(b *testing.B) { for _, size := range payloadSizes { b.Run(generateSizeLabel(size), func(b *testing.B) { @@ -89,6 +97,18 @@ func runReadBenchmark(b *testing.B, methodName string, payloadSize int) { addr := prepareSingleObject(b, fsTree, payloadSize) b.ReportAllocs() + + if methodName == "HeadToBuffer" { + buf := make([]byte, object.MaxHeaderLen*2) + + for b.Loop() { + _, err := fsTree.HeadToBuffer(addr, func() []byte { return buf }) + require.NoError(b, err) + } + + return + } + for b.Loop() { testRead(fsTree, addr) } @@ -99,6 +119,19 @@ func runReadBenchmark(b *testing.B, methodName string, payloadSize int) { addrs := prepareMultipleObjects(b, fsTree, payloadSize) b.ReportAllocs() + + if methodName == "HeadToBuffer" { + buf := make([]byte, object.MaxHeaderLen*2) + + b.ResetTimer() + for k := range b.N { + _, err := fsTree.HeadToBuffer(addrs[k%len(addrs)], func() []byte { return buf }) + require.NoError(b, err) + } + + return + } + b.ResetTimer() for k := range b.N { testRead(fsTree, addrs[k%len(addrs)]) @@ -111,6 +144,18 @@ func runReadBenchmark(b *testing.B, methodName string, payloadSize int) { addr := prepareSingleObject(b, fsTree, payloadSize) b.ReportAllocs() + + if methodName == "HeadToBuffer" { + buf := make([]byte, object.MaxHeaderLen*2) + + for b.Loop() { + _, err := fsTree.HeadToBuffer(addr, func() []byte { return buf }) + require.NoError(b, err) + } + + return + } + for b.Loop() { testRead(fsTree, addr) } @@ -122,6 +167,19 @@ func runReadBenchmark(b *testing.B, methodName string, payloadSize int) { addrs := prepareMultipleObjects(b, fsTree, payloadSize) b.ReportAllocs() + + if methodName == "HeadToBuffer" { + buf := make([]byte, object.MaxHeaderLen*2) + + b.ResetTimer() + for k := range b.N { + _, err := fsTree.HeadToBuffer(addrs[k%len(addrs)], func() []byte { return buf }) + require.NoError(b, err) + } + + return + } + b.ResetTimer() for k := range b.N { testRead(fsTree, addrs[k%len(addrs)]) diff --git a/pkg/local_object_storage/blobstor/fstree/fstree.go b/pkg/local_object_storage/blobstor/fstree/fstree.go index 1ca6721037..8449386ff7 100644 --- a/pkg/local_object_storage/blobstor/fstree/fstree.go +++ b/pkg/local_object_storage/blobstor/fstree/fstree.go @@ -13,6 +13,7 @@ import ( "strings" "time" + "github.com/klauspost/compress/zstd" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/compression" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util/logicerr" @@ -555,6 +556,192 @@ func (t *FSTree) GetRangeStream(addr oid.Address, off uint64, ln uint64) (io.Rea }, nil } +// OpenStream looks up for referenced object in t and returns object data +// stream. The stream must be finally closed by the caller. +// +// If object is missing, OpenStream returns [apistatus.ErrObjectNotFound]. +// TODO: docs about buffer. +// TODO: tests. +func (t *FSTree) OpenStream(addr oid.Address, getBuffer func() []byte) (int, io.ReadCloser, error) { + p := t.treePath(addr) + + f, err := os.Open(p) + if err != nil { + if errors.Is(err, fs.ErrNotExist) { + return 0, nil, logicerr.Wrap(apistatus.ErrObjectNotFound) + } + return 0, nil, fmt.Errorf("open file %q: %w", p, err) + } + + buf := getBuffer() + if len(buf) < combinedDataOff+compression.PrefixLength { + return 0, nil, fmt.Errorf("too short buffer: %d < %d", len(buf), combinedDataOff) + } + + seekID := addr.Object() + var fileOff int64 + var n int + rem := int64(-1) + +nextRead: + for { + n, err = f.ReadAt(buf, fileOff) + if err != nil && !errors.Is(err, io.EOF) { + f.Close() + return 0, nil, fmt.Errorf("read file: %w", err) + } + + for bufOff := 0; ; { + id, ln := parseCombinedPrefix(buf[bufOff:n]) + if id == nil { + if fileOff > 0 { // combined + f.Close() + return 0, nil, errors.New("malformed combined file") // used in several places, share? + } + + if n < len(buf) { // EOF => fully buffered + rem = 0 + } + break nextRead + } + + bufOff += combinedDataOff + + if !bytes.Equal(id, seekID[:]) { + bufOff += int(ln) + if bufOff < n { + continue + } + + if n < len(buf) { // EOF + f.Close() + return 0, nil, io.ErrUnexpectedEOF + } + + fileOff += int64(bufOff) + continue nextRead + } + + if int(ln) <= n-bufOff { // fully buffered + if bufOff > 0 { + n = copy(buf, buf[bufOff:][:ln]) + } + rem = 0 + break nextRead + } + + if n < len(buf) { // EOF + f.Close() + return 0, nil, io.ErrUnexpectedEOF + } + + if bufOff > 0 { + n = copy(buf, buf[bufOff:n]) + } + rem = int64(ln) - int64(n) + fileOff += int64(bufOff + n) + break nextRead + } + } + + compressed := t.IsCompressed(buf[:n]) + + if rem == 0 { // fully buffered + f.Close() + + if !compressed { + return n, nopReadCloser{}, nil + } + + // can be shared with HeadToBuffer() + dec, err := zstd.NewReader(nil) + if err != nil { + return 0, nil, fmt.Errorf("zstd decoder: %w", err) + } + + var decBuf []byte + if n < object.MaxHeaderLen { + decBuf = buf[n:] + } else { + decBuf = make([]byte, object.MaxHeaderLen) + } + decBuf, err = dec.DecodeAll(buf[:n], decBuf[:0]) // shouldn't this be io.ReadFull? + if err != nil { + return 0, nil, fmt.Errorf("zstd read: %w", err) + } + if len(decBuf) > len(buf) { + return 0, nil, fmt.Errorf("decompressed %d bytes overflow buffer %d", n, len(buf)) + } + + return copy(buf, decBuf), nopReadCloser{}, nil + } + + if rem < 0 { // non-combined, full file for object + if _, err = f.Seek(int64(n), io.SeekStart); err != nil { + return 0, nil, fmt.Errorf("seek object file: %w", err) + } + + if !compressed { + return n, f, nil + } + + dec, err := zstd.NewReader(io.MultiReader(bytes.NewReader(buf[:n]), f)) + if err != nil { + return 0, nil, fmt.Errorf("zstd decoder: %w", err) + } + + var decBuf []byte + if n < object.MaxHeaderLen { + decBuf = buf[n:][:object.MaxHeaderLen] + } else { + decBuf = make([]byte, object.MaxHeaderLen) + } + n, err = dec.Read(decBuf) + if err != nil { + return 0, nil, fmt.Errorf("zstd read: %w", err) + } + if n > len(buf) { + return 0, nil, fmt.Errorf("decompressed %d bytes overflow buffer %d", n, len(buf)) + } + + return copy(buf, decBuf[:n]), zstdStream{Decoder: dec, src: f}, nil + } + + if _, err = f.Seek(fileOff, io.SeekStart); err != nil { + return 0, nil, fmt.Errorf("seek combined file: %w", err) + } + + rdr := io.LimitReader(f, rem) + + if !compressed { + return n, struct { + io.Reader + io.Closer + }{Reader: rdr, Closer: f}, nil + } + + dec, err := zstd.NewReader(io.MultiReader(bytes.NewReader(buf[:n]), rdr)) + if err != nil { + return 0, nil, fmt.Errorf("zstd decoder: %w", err) + } + + var decBuf []byte + if n < object.MaxHeaderLen { + decBuf = buf[n:][:object.MaxHeaderLen] + } else { + decBuf = make([]byte, object.MaxHeaderLen) + } + n, err = dec.Read(decBuf) + if err != nil { + return 0, nil, fmt.Errorf("zstd read: %w", err) + } + if n > len(buf) { + return 0, nil, fmt.Errorf("decompressed %d bytes overflow buffer %d", n, len(buf)) + } + + return copy(buf, decBuf[:n]), zstdStream{Decoder: dec, src: f}, nil +} + // Type is fstree storage type used in logs and configuration. const Type = "fstree" diff --git a/pkg/local_object_storage/blobstor/fstree/fstree_test.go b/pkg/local_object_storage/blobstor/fstree/fstree_test.go index 219a85519e..e3fec90d7b 100644 --- a/pkg/local_object_storage/blobstor/fstree/fstree_test.go +++ b/pkg/local_object_storage/blobstor/fstree/fstree_test.go @@ -1,12 +1,15 @@ package fstree import ( + "bytes" + "io" "testing" "testing/iotest" "github.com/nspcc-dev/neofs-node/internal/testutil" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/compression" apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test" objecttest "github.com/nspcc-dev/neofs-sdk-go/object/test" "github.com/stretchr/testify/require" @@ -88,3 +91,50 @@ func testGetRangeStream(t *testing.T, fst *FSTree) { _, err = fst.GetRangeStream(addr, 1, pldLen-1) require.ErrorIs(t, err, apistatus.ErrObjectNotFound) } + +func assertOpenStreamOK(t *testing.T, fst *FSTree, addr oid.Address, data []byte) { + _assertOpenStreamOK(t, fst, addr, data, -1, -1) +} + +func assertOpenStreamOKWithBufferLen(t *testing.T, fst *FSTree, addr oid.Address, data []byte, bufLen int) { + _assertOpenStreamOK(t, fst, addr, data, bufLen, -1) +} + +func assertOpenStreamOKWithPrefixLen(t *testing.T, fst *FSTree, addr oid.Address, data []byte, prefixLen int) { + _assertOpenStreamOK(t, fst, addr, data, -1, prefixLen) +} + +func _assertOpenStreamOK(t *testing.T, fst *FSTree, addr oid.Address, data []byte, bufLen int, prefixLen int) { + buf, n, reader, err := _openStream(fst, addr, bufLen) + require.NoError(t, err) + require.NotNil(t, reader) + require.GreaterOrEqual(t, len(buf), n) + if prefixLen >= 0 { + require.EqualValues(t, prefixLen, n) + } + + require.NoError(t, iotest.TestReader(io.MultiReader(bytes.NewReader(buf[:n]), reader), data)) + require.NoError(t, err) + require.NoError(t, reader.Close()) +} + +func openStream(fst *FSTree, addr oid.Address) ([]byte, int, io.ReadCloser, error) { + return _openStream(fst, addr, -1) +} + +func _openStream(fst *FSTree, addr oid.Address, bufLen int) ([]byte, int, io.ReadCloser, error) { + if bufLen < 0 { + bufLen = 42 + } + + var buf []byte + + n, stream, err := fst.OpenStream(addr, func() []byte { + if buf == nil { + buf = make([]byte, bufLen) + } + return buf + }) + + return buf, n, stream, err +} diff --git a/pkg/local_object_storage/blobstor/fstree/getstream_test.go b/pkg/local_object_storage/blobstor/fstree/getstream_test.go index 18240a3312..6a7105307f 100644 --- a/pkg/local_object_storage/blobstor/fstree/getstream_test.go +++ b/pkg/local_object_storage/blobstor/fstree/getstream_test.go @@ -1,6 +1,7 @@ package fstree import ( + "bytes" "crypto/rand" "fmt" "io" @@ -44,7 +45,9 @@ func TestGetStream(t *testing.T) { obj.SetID(addr.Object()) obj.SetPayload(payload) - require.NoError(t, tree.Put(addr, obj.Marshal())) + data := obj.Marshal() + + require.NoError(t, tree.Put(addr, data)) retrievedObj, reader, err := tree.GetStream(addr) require.NoError(t, err) @@ -56,6 +59,8 @@ func TestGetStream(t *testing.T) { require.NoError(t, err) require.Equal(t, payload, streamedPayload) require.NoError(t, reader.Close()) + + assertOpenStreamOK(t, tree, addr, data) } t.Run("different objects", func(t *testing.T) { @@ -149,7 +154,8 @@ func TestGetStreamAfterErrors(t *testing.T) { f, err := os.Create(objPath) require.NoError(t, err) - _, err = f.Write([]byte("corrupt data that isn't a valid object")) + data := []byte("corrupt data that isn't a valid object") + _, err = f.Write(data) require.NoError(t, err) require.NoError(t, f.Close()) @@ -157,6 +163,9 @@ func TestGetStreamAfterErrors(t *testing.T) { require.Error(t, err) require.Nil(t, obj) require.Nil(t, reader) + + assertOpenStreamOK(t, tree, addr, data) + assertOpenStreamOKWithPrefixLen(t, tree, addr, data, len(data)) }) t.Run("corrupt compressed data", func(t *testing.T) { @@ -182,6 +191,12 @@ func TestGetStreamAfterErrors(t *testing.T) { _, _, err = tree.GetStream(addr) require.Error(t, err) + + buf, n, reader, err2 := openStream(tree, addr) + require.NoError(t, err2) + t.Cleanup(func() { reader.Close() }) + _, err2 = io.ReadAll(io.MultiReader(bytes.NewReader(buf[:n]), reader)) + require.ErrorIs(t, err, err2) }) t.Run("ID not found in combined object", func(t *testing.T) { @@ -210,6 +225,9 @@ func TestGetStreamAfterErrors(t *testing.T) { _, _, err = fsTree.GetStream(newAddr) require.ErrorIs(t, err, io.ErrUnexpectedEOF) + + _, _, _, err = openStream(fsTree, newAddr) + require.EqualError(t, err, "malformed combined file") }) } diff --git a/pkg/local_object_storage/blobstor/fstree/head.go b/pkg/local_object_storage/blobstor/fstree/head.go index d05db9ec80..7a6e01a652 100644 --- a/pkg/local_object_storage/blobstor/fstree/head.go +++ b/pkg/local_object_storage/blobstor/fstree/head.go @@ -27,6 +27,75 @@ func (t *FSTree) Head(addr oid.Address) (*object.Object, error) { return obj, nil } +// TODO: docs. +func (t *FSTree) HeadToBuffer(addr oid.Address, getBuffer func() []byte) (int, error) { + p := t.treePath(addr) + + f, err := os.Open(p) + if err != nil { + if errors.Is(err, fs.ErrNotExist) { + return 0, logicerr.Wrap(apistatus.ErrObjectNotFound) + } + return 0, fmt.Errorf("read file %q: %w", p, err) + } + + buf := getBuffer() + + from, to, err := t.readHeader(addr.Object(), f, buf) + if err != nil { + f.Close() + return 0, fmt.Errorf("read header from file %q: %w", p, err) + } + + // following is mostly copied from readHeaderAndPayload() + + compressed := t.IsCompressed(buf[from:to]) + if !compressed { + f.Close() + + if from > 0 { + copy(buf, buf[from:to]) + } + + return to - from, nil + } + + if to-from < object.MaxHeaderLen { + f.Close() + + dec, err := t.DecompressForce(buf[from:to]) + if err != nil { + return 0, fmt.Errorf("decompress initial data: %w", err) + } + if len(dec) > len(buf) { + return 0, fmt.Errorf("decompressed %d bytes overflow buffer %d", len(dec), len(buf)) + } + + return copy(buf, dec), nil + } + + decoder, err := zstd.NewReader(io.MultiReader(bytes.NewReader(buf[from:to]), f)) + if err != nil { + f.Close() + return 0, fmt.Errorf("zstd decoder: %w", err) + } + + decBuf := make([]byte, object.MaxHeaderLen) + + n, err := decoder.Read(decBuf) // shouldn't this be io.ReadFull? + + f.Close() + + if err != nil && !errors.Is(err, io.EOF) { + return 0, fmt.Errorf("zstd read: %w", err) + } + if n > len(buf) { + return 0, fmt.Errorf("decompressed %d bytes overflow buffer %d", n, len(buf)) + } + + return copy(buf, decBuf[:n]), nil +} + // getObjectStream reads an object from the storage by address as a stream. // It returns the object with header only, and a reader for the payload. func (t *FSTree) getObjectStream(addr oid.Address) (*object.Object, io.ReadSeekCloser, error) { @@ -113,6 +182,63 @@ func (t *FSTree) extractHeaderAndStream(id oid.ID, f *os.File) (*object.Object, } } +func (t *FSTree) readHeader(id oid.ID, f *os.File, buf []byte) (int, int, error) { + // copy-pasted from extractHeaderAndStream(). Consider merging + n, err := io.ReadFull(f, buf[:object.MaxHeaderLen]) + if err != nil && !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) { + return 0, 0, err + } + if n < combinedDataOff { + return 0, n, nil + } + + thisOID, l := parseCombinedPrefix(buf) + if thisOID == nil { + return 0, n, nil + } + + offset := combinedDataOff + for { + if bytes.Equal(thisOID, id[:]) { + size := min(offset+int(l), offset+object.MaxHeaderLen) + if n < size { + _, err = io.ReadFull(f, buf[n:size]) + if err != nil { + return 0, 0, fmt.Errorf("read up to size: %w", err) + } + } + return offset, size, nil + } + + offset += int(l) + if n-offset < combinedDataOff { + if offset > n { + _, err = f.Seek(int64(offset-n), io.SeekCurrent) + if err != nil { + return 0, 0, err + } + } + n = copy(buf, buf[min(offset, n):n]) + offset = 0 + k, err := io.ReadFull(f, buf[n:n+object.MaxHeaderLen]) + if err != nil && !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) { + return 0, 0, fmt.Errorf("read full: %w", err) + } + if k == 0 { + return 0, 0, fmt.Errorf("file was found, but this object is not in it: %w", io.ErrUnexpectedEOF) + } + n += k + } + + thisOID, l = parseCombinedPrefix(buf[offset:]) + if thisOID == nil { + return 0, 0, errors.New("malformed combined file") + } + + offset += combinedDataOff + } +} + // readHeaderAndPayload reads an object header from the file and returns reader for payload. // This function takes ownership of the io.ReadCloser and will close it if it does not return it. func (t *FSTree) readHeaderAndPayload(f io.ReadCloser, initial []byte) (*object.Object, io.ReadSeekCloser, error) { diff --git a/pkg/local_object_storage/blobstor/fstree/head_bench_test.go b/pkg/local_object_storage/blobstor/fstree/head_bench_test.go index 0076bbef9d..7a0d047a3c 100644 --- a/pkg/local_object_storage/blobstor/fstree/head_bench_test.go +++ b/pkg/local_object_storage/blobstor/fstree/head_bench_test.go @@ -2,6 +2,8 @@ package fstree_test import ( "testing" + + "github.com/nspcc-dev/neofs-sdk-go/object" ) func BenchmarkFSTree_HeadVsGet(b *testing.B) { @@ -41,6 +43,20 @@ func runHeadVsGetBenchmark(b *testing.B, payloadSize int, compressed bool) { } }) + b.Run("HeadToBuffer"+suffix, func(b *testing.B) { + buf := make([]byte, object.MaxHeaderLen*2) + + b.ReportAllocs() + for b.Loop() { + _, err := fsTree.HeadToBuffer(addr, func() []byte { + return buf + }) + if err != nil { + b.Fatal(err) + } + } + }) + b.Run("Get"+suffix, func(b *testing.B) { b.ReportAllocs() for b.Loop() { diff --git a/pkg/local_object_storage/blobstor/fstree/head_test.go b/pkg/local_object_storage/blobstor/fstree/head_test.go index 76a5e44ed5..885f6719ed 100644 --- a/pkg/local_object_storage/blobstor/fstree/head_test.go +++ b/pkg/local_object_storage/blobstor/fstree/head_test.go @@ -1,10 +1,13 @@ package fstree_test import ( + "bytes" + "encoding/binary" "fmt" "testing" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree" + apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "github.com/stretchr/testify/require" @@ -33,6 +36,8 @@ func TestHeadStorage(t *testing.T) { fullObj, err := fsTree.Get(obj.Address()) require.NoError(t, err) require.Equal(t, obj, fullObj) + + testHeadToBufferOK(t, fsTree, *obj) } testCombinedObjects := func(t *testing.T, fsTree *fstree.FSTree, size int) { @@ -60,6 +65,8 @@ func TestHeadStorage(t *testing.T) { require.Len(t, attrs, 1) require.Equal(t, fmt.Sprintf("key-%d", i), attrs[0].Key()) require.Equal(t, fmt.Sprintf("value-%d", i), attrs[0].Value()) + + testHeadToBufferOK(t, fsTree, *objects[i]) } } @@ -78,6 +85,8 @@ func TestHeadStorage(t *testing.T) { require.NoError(t, err) require.Equal(t, obj.CutPayload(), res) require.Len(t, res.Attributes(), numAttrs) + + testHeadToBufferOK(t, fsTree, *obj) }) t.Run("non-existent object", func(t *testing.T) { @@ -86,6 +95,11 @@ func TestHeadStorage(t *testing.T) { _, err := fsTree.Head(addr) require.Error(t, err) + + _, err = fsTree.HeadToBuffer(obj.Address(), func() []byte { + return make([]byte, object.MaxHeaderLen*2) + }) + require.ErrorIs(t, err, apistatus.ErrObjectNotFound) }) t.Run("different payload sizes", func(t *testing.T) { @@ -119,3 +133,28 @@ func TestHeadStorage(t *testing.T) { } }) } + +func testHeadToBufferOK(t *testing.T, fst *fstree.FSTree, obj object.Object) { + var buf []byte + n, err := fst.HeadToBuffer(obj.Address(), func() []byte { + buf = make([]byte, object.MaxHeaderLen*2) + return buf + }) + require.NoError(t, err) + + _, tail, ok := bytes.Cut(buf[:n], obj.CutPayload().Marshal()) + require.True(t, ok) + + prefix := make([]byte, 1+binary.MaxVarintLen64) + prefix[0] = 34 // payload field tag + prefix = prefix[:1+binary.PutUvarint(prefix[1:], uint64(len(obj.Payload())))] + + if len(tail) < len(prefix) { + require.True(t, bytes.HasPrefix(prefix, tail)) + return + } + + tail, ok = bytes.CutPrefix(tail, prefix) + require.True(t, ok) + require.True(t, bytes.HasPrefix(obj.Payload(), tail)) +} diff --git a/pkg/local_object_storage/blobstor/fstree/util.go b/pkg/local_object_storage/blobstor/fstree/util.go new file mode 100644 index 0000000000..b86f2695da --- /dev/null +++ b/pkg/local_object_storage/blobstor/fstree/util.go @@ -0,0 +1,23 @@ +package fstree + +import ( + "io" + + "github.com/klauspost/compress/zstd" +) + +type nopReadCloser struct{} + +func (nopReadCloser) Read([]byte) (int, error) { return 0, io.EOF } + +func (nopReadCloser) Close() error { return nil } + +type zstdStream struct { + *zstd.Decoder + src io.ReadCloser +} + +func (x zstdStream) Close() error { + x.Decoder.Close() + return x.src.Close() +} diff --git a/pkg/local_object_storage/engine/control_test.go b/pkg/local_object_storage/engine/control_test.go index 85c675df14..956634e40c 100644 --- a/pkg/local_object_storage/engine/control_test.go +++ b/pkg/local_object_storage/engine/control_test.go @@ -15,6 +15,7 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/writecache" cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test" + "github.com/nspcc-dev/neofs-sdk-go/object" "github.com/stretchr/testify/require" "go.uber.org/zap/zaptest" ) @@ -141,12 +142,16 @@ func TestExecBlocks(t *testing.T) { // try to exec some op _, err := e.Head(addr, false) require.ErrorIs(t, err, errBlock) + _, err = e.HeadToBuffer(addr, false, func() []byte { return make([]byte, object.MaxHeaderLen*2) }) + require.ErrorIs(t, err, errBlock) // resume executions require.NoError(t, e.ResumeExecution()) _, err = e.Head(addr, false) // can be any data-related op require.NoError(t, err) + _, err = e.HeadToBuffer(addr, false, func() []byte { return make([]byte, object.MaxHeaderLen*2) }) + require.NoError(t, err) // close require.NoError(t, e.Close()) @@ -154,6 +159,8 @@ func TestExecBlocks(t *testing.T) { // try exec after close _, err = e.Head(addr, false) require.Error(t, err) + _, err = e.HeadToBuffer(addr, false, func() []byte { return make([]byte, object.MaxHeaderLen*2) }) + require.Error(t, err) // try to resume require.Error(t, e.ResumeExecution()) diff --git a/pkg/local_object_storage/engine/ec_test.go b/pkg/local_object_storage/engine/ec_test.go index 905742bd87..de604c6d25 100644 --- a/pkg/local_object_storage/engine/ec_test.go +++ b/pkg/local_object_storage/engine/ec_test.go @@ -1345,8 +1345,12 @@ func testPutTombstoneEC(t *testing.T) { require.ErrorIs(t, err, target) _, err = s.Head(parentAddr, false) require.ErrorIs(t, err, target) + _, err = s.HeadToBuffer(parentAddr, false, func() []byte { return make([]byte, object.MaxHeaderLen*2) }) + require.ErrorIs(t, err, target) _, err = s.Head(parentAddr, true) require.ErrorIs(t, err, target) + _, err = s.HeadToBuffer(parentAddr, true, func() []byte { return make([]byte, object.MaxHeaderLen*2) }) + require.ErrorIs(t, err, target) for i := range parts { _, err := s.Get(partAddrs[i]) @@ -1359,8 +1363,12 @@ func testPutTombstoneEC(t *testing.T) { require.ErrorIs(t, err, target) _, err = s.Head(partAddrs[i], false) require.ErrorIs(t, err, target) + _, err = s.HeadToBuffer(partAddrs[i], false, func() []byte { return make([]byte, object.MaxHeaderLen*2) }) + require.ErrorIs(t, err, target) _, err = s.Head(partAddrs[i], true) require.ErrorIs(t, err, target) + _, err = s.HeadToBuffer(partAddrs[i], true, func() []byte { return make([]byte, object.MaxHeaderLen*2) }) + require.ErrorIs(t, err, target) _, _, err = s.GetECPart(cnr, parent.GetID(), iec.PartInfo{RuleIndex: ruleIdx, Index: i}) require.ErrorIs(t, err, target) } diff --git a/pkg/local_object_storage/engine/get.go b/pkg/local_object_storage/engine/get.go index 5737642fdf..15897ddf25 100644 --- a/pkg/local_object_storage/engine/get.go +++ b/pkg/local_object_storage/engine/get.go @@ -237,3 +237,28 @@ func (e *StorageEngine) getRangeStream(addr oid.Address, off, ln uint64) (io.Rea return stream, err } + +// TODO: docs. +func (e *StorageEngine) OpenStream(addr oid.Address, getBuffer func() []byte) (int, io.ReadCloser, error) { + if e.metrics != nil { + defer elapsed(e.metrics.AddGetStreamDuration)() + } + + e.blockMtx.RLock() + defer e.blockMtx.RUnlock() + + if e.blockErr != nil { + return 0, nil, e.blockErr + } + + var n int + var stream io.ReadCloser + + err := e.get(addr, func(s *shard.Shard, ignoreMetadata bool) error { + var err error + n, stream, err = s.OpenStream(addr, ignoreMetadata, getBuffer) + return err + }) + + return n, stream, err +} diff --git a/pkg/local_object_storage/engine/head.go b/pkg/local_object_storage/engine/head.go index b6bae358ee..9b876f5354 100644 --- a/pkg/local_object_storage/engine/head.go +++ b/pkg/local_object_storage/engine/head.go @@ -77,3 +77,61 @@ func (e *StorageEngine) Head(addr oid.Address, raw bool) (*object.Object, error) return nil, apistatus.ObjectNotFound{} } + +// TODO: docs. +func (e *StorageEngine) HeadToBuffer(addr oid.Address, raw bool, getBuffer func() []byte) (int, error) { + // implementation is similar to Head() + if e.metrics != nil { + defer elapsed(e.metrics.AddHeadDuration)() + } + + e.blockMtx.RLock() + defer e.blockMtx.RUnlock() + + if e.blockErr != nil { + return 0, e.blockErr + } + + var splitInfo *object.SplitInfo + + for _, sh := range e.sortedShards(addr) { + n, err := sh.HeadToBuffer(addr, raw, getBuffer) + if err != nil { + var siErr *object.SplitInfoError + + switch { + case shard.IsErrNotFound(err): + continue // ignore, go to next shard + case errors.As(err, &siErr): + if splitInfo == nil { + splitInfo = object.NewSplitInfo() + } + + util.MergeSplitInfo(siErr.SplitInfo(), splitInfo) + + // stop iterating over shards if SplitInfo structure is complete + if !splitInfo.GetLink().IsZero() && !splitInfo.GetLastPart().IsZero() { + return 0, logicerr.Wrap(object.NewSplitInfoError(splitInfo)) + } + continue + case shard.IsErrRemoved(err): + return 0, err // stop, return it back + case shard.IsErrObjectExpired(err): + // object is found but should not + // be returned + return 0, apistatus.ObjectNotFound{} + default: + e.reportShardError(sh, "could not head object from shard", err, zap.Stringer("addr", addr)) + continue + } + } + + return n, nil + } + + if splitInfo != nil { + return 0, logicerr.Wrap(object.NewSplitInfoError(splitInfo)) + } + + return 0, apistatus.ObjectNotFound{} +} diff --git a/pkg/local_object_storage/engine/head_test.go b/pkg/local_object_storage/engine/head_test.go index 5408c6037b..435f4ed95d 100644 --- a/pkg/local_object_storage/engine/head_test.go +++ b/pkg/local_object_storage/engine/head_test.go @@ -37,6 +37,24 @@ func TestHeadRaw(t *testing.T) { link.SetSplitID(splitID) t.Run("virtual object split in different shards", func(t *testing.T) { + checkError := func(err error) { + require.Error(t, err) + + var si *object.SplitInfoError + require.ErrorAs(t, err, &si) + + // SplitInfoError should contain info from both shards + require.Equal(t, splitID, si.SplitInfo().SplitID()) + + id1 := child.GetID() + id2 := si.SplitInfo().GetLastPart() + require.Equal(t, id1, id2) + + id1 = link.GetID() + id2 = si.SplitInfo().GetLink() + require.Equal(t, id1, id2) + } + s1 := testNewShard(t, 1) s2 := testNewShard(t, 2) @@ -53,20 +71,9 @@ func TestHeadRaw(t *testing.T) { // head with raw flag should return SplitInfoError _, err = e.Head(parentAddr, true) - require.Error(t, err) - - var si *object.SplitInfoError - require.ErrorAs(t, err, &si) - - // SplitInfoError should contain info from both shards - require.Equal(t, splitID, si.SplitInfo().SplitID()) - - id1 := child.GetID() - id2 := si.SplitInfo().GetLastPart() - require.Equal(t, id1, id2) + checkError(err) - id1 = link.GetID() - id2 = si.SplitInfo().GetLink() - require.Equal(t, id1, id2) + _, err = e.HeadToBuffer(parentAddr, true, func() []byte { return make([]byte, object.MaxHeaderLen*2) }) + checkError(err) }) } diff --git a/pkg/local_object_storage/engine/lock_test.go b/pkg/local_object_storage/engine/lock_test.go index dd0717c069..0847c19a2b 100644 --- a/pkg/local_object_storage/engine/lock_test.go +++ b/pkg/local_object_storage/engine/lock_test.go @@ -311,6 +311,7 @@ func testLockRemoved(t *testing.T, shardNum int) { lockErr := s.Put(lockObj, nil) locked, lockedErr := s.IsLocked(objAddr) _, lockHeadErr := s.Head(lockAddr, false) + _, lockHeadToBufferErr := s.HeadToBuffer(lockAddr, false, func() []byte { return make([]byte, object.MaxHeaderLen*2) }) _, lockGetErr := s.Get(lockAddr) if tc.assertLockErr != nil { @@ -319,6 +320,7 @@ func testLockRemoved(t *testing.T, shardNum int) { require.NoError(t, lockedErr) require.False(t, locked) require.ErrorIs(t, lockHeadErr, apistatus.ErrObjectNotFound) + require.ErrorIs(t, lockHeadToBufferErr, apistatus.ErrObjectNotFound) require.ErrorIs(t, lockGetErr, apistatus.ErrObjectNotFound) } else { require.NoError(t, lockErr) @@ -326,6 +328,7 @@ func testLockRemoved(t *testing.T, shardNum int) { require.NoError(t, lockedErr) require.True(t, locked) require.NoError(t, lockHeadErr) + require.NoError(t, lockHeadToBufferErr) require.NoError(t, lockGetErr) } }) diff --git a/pkg/local_object_storage/engine/metrics.go b/pkg/local_object_storage/engine/metrics.go index ba8e52794c..6e695ca584 100644 --- a/pkg/local_object_storage/engine/metrics.go +++ b/pkg/local_object_storage/engine/metrics.go @@ -13,6 +13,7 @@ type MetricRegister interface { AddGetDuration(d time.Duration) AddHeadDuration(d time.Duration) AddGetStreamDuration(d time.Duration) + AddOpenStreamDuration(d time.Duration) AddGetRangeStreamDuration(d time.Duration) AddInhumeDuration(d time.Duration) AddPutDuration(d time.Duration) diff --git a/pkg/local_object_storage/shard/bench_test.go b/pkg/local_object_storage/shard/bench_test.go index a86eeda0ee..311568e806 100644 --- a/pkg/local_object_storage/shard/bench_test.go +++ b/pkg/local_object_storage/shard/bench_test.go @@ -123,6 +123,7 @@ func BenchmarkHead(b *testing.B) { b.Run(name, func(b *testing.B) { ptt, objs := prepareObjects(b, creat, tc.objSize, nObjects) + b.ReportAllocs() b.ResetTimer() for n := range b.N { var wg sync.WaitGroup @@ -145,6 +146,46 @@ func BenchmarkHead(b *testing.B) { } } +func BenchmarkFSTree_HeadToBuffer(b *testing.B) { + const nObjects = 10000 + + for _, tc := range tests { + b.Run(fmt.Sprintf("size=%d,thread=%d", tc.objSize, tc.nThreads), func(b *testing.B) { + for name, creat := range map[string]func(testing.TB) common.Storage{ + "fstree": newTestFSTree, + } { + b.Run(name, func(b *testing.B) { + ptt, objs := prepareObjects(b, creat, tc.objSize, nObjects) + + bufs := make([][]byte, tc.nThreads) + for i := range bufs { + bufs[i] = make([]byte, object.MaxHeaderLen*2) + } + + b.ResetTimer() + for n := range b.N { + var wg sync.WaitGroup + + for i := range tc.nThreads { + wg.Add(1) + go func(ind int) { + defer wg.Done() + + _, err := ptt.HeadToBuffer(objs[nObjects/tc.nThreads*ind+n%(nObjects/tc.nThreads)], func() []byte { + return bufs[ind] + }) + require.NoError(b, err) + }(i) + } + + wg.Wait() + } + }) + } + }) + } +} + func prepareObjects(b *testing.B, creat func(testing.TB) common.Storage, objSize, nObjects uint64) (common.Storage, []oid.Address) { var objs = make([]oid.Address, 0, nObjects) diff --git a/pkg/local_object_storage/shard/get.go b/pkg/local_object_storage/shard/get.go index 9996c54c51..be25efaae6 100644 --- a/pkg/local_object_storage/shard/get.go +++ b/pkg/local_object_storage/shard/get.go @@ -202,3 +202,33 @@ func (s *Shard) GetStream(addr oid.Address, skipMeta bool) (*object.Object, io.R return res, reader, err } + +// TODO: docs. +func (s *Shard) OpenStream(addr oid.Address, skipMeta bool, getBuffer func() []byte) (int, io.ReadCloser, error) { + s.m.RLock() + defer s.m.RUnlock() + + var n int + var stream io.ReadCloser + + cb := func(stor common.Storage) error { + var err error + n, stream, err = stor.OpenStream(addr, getBuffer) + return err + } + + wc := func(c writecache.Cache) error { + var err error + n, stream, err = c.OpenStream(addr, getBuffer) + return err + } + + skipMeta = skipMeta || s.info.Mode.NoMetabase() + + gotMeta, err := s.fetchObjectData(addr, skipMeta, cb, wc) + if err != nil && gotMeta { + err = fmt.Errorf("%w, %w", err, ErrMetaWithNoObject) + } + + return n, stream, err +} diff --git a/pkg/local_object_storage/shard/head.go b/pkg/local_object_storage/shard/head.go index 3017dde7cc..d1b8dc6e85 100644 --- a/pkg/local_object_storage/shard/head.go +++ b/pkg/local_object_storage/shard/head.go @@ -4,6 +4,8 @@ import ( "errors" iec "github.com/nspcc-dev/neofs-node/internal/ec" + iobject "github.com/nspcc-dev/neofs-node/internal/object" + iprotobuf "github.com/nspcc-dev/neofs-node/internal/protobuf" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util/logicerr" apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" "github.com/nspcc-dev/neofs-sdk-go/object" @@ -89,3 +91,103 @@ func (s *Shard) Head(addr oid.Address, raw bool) (*object.Object, error) { return s.blobStor.Head(addr) } + +// TODO: docs. +func (s *Shard) HeadToBuffer(addr oid.Address, raw bool, getBuffer func() []byte) (int, error) { + // implementation is similar to Head() + var ( + errSplitInfo *object.SplitInfoError + children []oid.Address + ) + if !s.GetMode().NoMetabase() { + available, err := s.metaBase.Exists(addr, false) + if err != nil { + var errECParts iec.ErrParts + switch { + default: + return 0, err + case errors.As(err, &errSplitInfo): + if raw { + return 0, err + } + var si = errSplitInfo.SplitInfo() + + children = []oid.Address{oid.NewAddress(addr.Container(), si.GetLastPart()), + oid.NewAddress(addr.Container(), si.GetLink())} + case errors.As(err, &errECParts): + if len(errECParts) == 0 { + panic(errors.New("empty EC part set")) + } + + children = make([]oid.Address, len(errECParts)) + for i := range errECParts { + children[i] = oid.NewAddress(addr.Container(), errECParts[i]) + } + } + } else if !available { + return 0, logicerr.Wrap(apistatus.ObjectNotFound{}) + } + } + + for _, child := range children { + if child.Object().IsZero() { + continue + } + + var buf []byte + if s.hasWriteCache() { + n, err := s.writeCache.HeadToBuffer(child, func() []byte { buf = getBuffer(); return buf }) + if err == nil { + return shiftParentHeader(buf[:n]) + } + } + + n, err := s.blobStor.HeadToBuffer(child, func() []byte { buf = getBuffer(); return buf }) + if err == nil { + return shiftParentHeader(buf[:n]) + } + } + + if len(children) != 0 { + if errSplitInfo == nil { + return 0, logicerr.Wrap(apistatus.ErrObjectNotFound) + } + // SI present, but no objects found -> let caller handle SI. + return 0, errSplitInfo + } + + if s.hasWriteCache() { + n, err := s.writeCache.HeadToBuffer(addr, getBuffer) + if err == nil { + return n, nil + } + } + + return s.blobStor.HeadToBuffer(addr, getBuffer) +} + +func shiftParentHeader(b []byte) (int, error) { + idf, sigf, hdrf, err := iobject.SeekParentHeaderFields(b) + if err != nil { + return 0, err + } + + var n int + + if !idf.IsMissing() { + // ID has same tag in header and split header + n = copy(b, b[idf.From:idf.To]) + } + + if !sigf.IsMissing() { + b[sigf.From] = iprotobuf.TagBytes2 + n += copy(b[n:], b[sigf.From:sigf.To]) + } + + if !hdrf.IsMissing() { + b[hdrf.From] = iprotobuf.TagBytes3 + n += copy(b[n:], b[hdrf.From:hdrf.To]) + } + + return n, nil +} diff --git a/pkg/local_object_storage/shard/shard_internal_test.go b/pkg/local_object_storage/shard/shard_internal_test.go index ff288dc809..a8b6db2f27 100644 --- a/pkg/local_object_storage/shard/shard_internal_test.go +++ b/pkg/local_object_storage/shard/shard_internal_test.go @@ -243,6 +243,10 @@ func (unimplementedBLOBStore) Head(oid.Address) (*object.Object, error) { panic("unimplemented") } +func (unimplementedBLOBStore) HeadToBuffer(oid.Address, func() []byte) (int, error) { + panic("unimplemented") +} + func (unimplementedBLOBStore) Exists(oid.Address) (bool, error) { panic("unimplemented") } @@ -289,6 +293,10 @@ func (unimplementedWriteCache) Head(oid.Address) (*object.Object, error) { panic("unimplemented") } +func (unimplementedWriteCache) HeadToBuffer(oid.Address, func() []byte) (int, error) { + panic("unimplemented") +} + func (unimplementedWriteCache) Delete(oid.Address) error { panic("unimplemented") } diff --git a/pkg/local_object_storage/writecache/get.go b/pkg/local_object_storage/writecache/get.go index 615236ec9b..5358018fec 100644 --- a/pkg/local_object_storage/writecache/get.go +++ b/pkg/local_object_storage/writecache/get.go @@ -40,6 +40,20 @@ func (c *cache) Head(addr oid.Address) (*object.Object, error) { return obj, nil } +// TODO: docs. +// TODO: tests. +func (c *cache) HeadToBuffer(addr oid.Address, getBuffer func() []byte) (int, error) { + if !c.objCounters.HasAddress(addr) { + return 0, logicerr.Wrap(apistatus.ObjectNotFound{}) + } + n, err := c.fsTree.HeadToBuffer(addr, getBuffer) + if err != nil { + return 0, fmt.Errorf("read header from underlying FS tree: %w", err) + } + + return n, nil +} + func (c *cache) GetBytes(addr oid.Address) ([]byte, error) { if !c.objCounters.HasAddress(addr) { return nil, logicerr.Wrap(apistatus.ObjectNotFound{}) @@ -92,3 +106,17 @@ func (c *cache) GetRangeStream(addr oid.Address, off uint64, ln uint64) (io.Read return stream, nil } + +// TODO: docs. +// TODO: tests. +func (c *cache) OpenStream(addr oid.Address, getBuffer func() []byte) (int, io.ReadCloser, error) { + if !c.objCounters.HasAddress(addr) { + return 0, nil, logicerr.Wrap(apistatus.ErrObjectNotFound) + } + n, stream, err := c.fsTree.OpenStream(addr, getBuffer) + if err != nil { + return 0, nil, fmt.Errorf("open stream in underlying FS tree: %w", err) + } + + return n, stream, nil +} diff --git a/pkg/local_object_storage/writecache/writecache.go b/pkg/local_object_storage/writecache/writecache.go index c02e9384a8..c4210658c4 100644 --- a/pkg/local_object_storage/writecache/writecache.go +++ b/pkg/local_object_storage/writecache/writecache.go @@ -27,8 +27,10 @@ type Cache interface { GetBytes(oid.Address) ([]byte, error) // GetStream returns an object and a stream to read its payload. GetStream(oid.Address) (*object.Object, io.ReadCloser, error) + OpenStream(oid.Address, func() []byte) (int, io.ReadCloser, error) GetRangeStream(addr oid.Address, off uint64, ln uint64) (io.ReadCloser, error) Head(oid.Address) (*object.Object, error) + HeadToBuffer(oid.Address, func() []byte) (int, error) // Delete removes object referenced by the given oid.Address from the // Cache. Returns any error encountered that prevented the object to be // removed. diff --git a/pkg/metrics/engine.go b/pkg/metrics/engine.go index 4051b22cb2..e4eb1071c3 100644 --- a/pkg/metrics/engine.go +++ b/pkg/metrics/engine.go @@ -16,6 +16,7 @@ type ( getDuration prometheus.Histogram headDuration prometheus.Histogram getStreamDuration prometheus.Histogram + openStreamDuration prometheus.Histogram getRangeStreamDuration prometheus.Histogram inhumeDuration prometheus.Histogram putDuration prometheus.Histogram @@ -92,6 +93,13 @@ func newEngineMetrics() engineMetrics { Help: "Engine 'get stream' operations handling time", }) + openStreamDuration = prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: storageNodeNameSpace, + Subsystem: engineSubsystem, + Name: "open_stream_time", + Help: "Engine 'open stream' operations handling time", + }) + getRangeStreamDuration = prometheus.NewHistogram(prometheus.HistogramOpts{ Namespace: storageNodeNameSpace, Subsystem: engineSubsystem, @@ -188,6 +196,7 @@ func newEngineMetrics() engineMetrics { getDuration: getDuration, headDuration: headDuration, getStreamDuration: getStreamDuration, + openStreamDuration: openStreamDuration, getRangeStreamDuration: getRangeStreamDuration, inhumeDuration: inhumeDuration, putDuration: putDuration, @@ -212,6 +221,7 @@ func (m engineMetrics) register() { prometheus.MustRegister(m.getDuration) prometheus.MustRegister(m.headDuration) prometheus.MustRegister(m.getStreamDuration) + prometheus.MustRegister(m.openStreamDuration) prometheus.MustRegister(m.getRangeStreamDuration) prometheus.MustRegister(m.inhumeDuration) prometheus.MustRegister(m.putDuration) @@ -258,6 +268,10 @@ func (m engineMetrics) AddGetStreamDuration(d time.Duration) { m.getStreamDuration.Observe(d.Seconds()) } +func (m engineMetrics) AddOpenStreamDuration(d time.Duration) { + m.openStreamDuration.Observe(d.Seconds()) +} + func (m engineMetrics) AddGetRangeStreamDuration(d time.Duration) { m.getRangeStreamDuration.Observe(d.Seconds()) } diff --git a/pkg/services/object/acl/acl.go b/pkg/services/object/acl/acl.go index a020448728..d00db6a42f 100644 --- a/pkg/services/object/acl/acl.go +++ b/pkg/services/object/acl/acl.go @@ -163,6 +163,8 @@ func (c *Checker) CheckEACL(msg any, reqInfo v2.RequestInfo) error { if req, ok := msg.(eaclV2.Request); ok { hdrSrcOpts = append(hdrSrcOpts, eaclV2.WithServiceRequest(req)) + } else if b, ok := msg.([]byte); ok { + hdrSrcOpts = append(hdrSrcOpts, eaclV2.WithObjectHeaderBinary(b)) } else { hdrSrcOpts = append(hdrSrcOpts, eaclV2.WithServiceResponse( diff --git a/pkg/services/object/acl/eacl/v2/headers.go b/pkg/services/object/acl/eacl/v2/headers.go index b76fe2a395..2a759d3935 100644 --- a/pkg/services/object/acl/eacl/v2/headers.go +++ b/pkg/services/object/acl/eacl/v2/headers.go @@ -19,7 +19,7 @@ type cfg struct { storage ObjectStorage headerSource HeaderSource - msg xHeaderSource + msg any cnr cid.ID obj *oid.ID @@ -81,7 +81,9 @@ func (h *headerSource) HeadersOfType(typ eaclSDK.FilterHeaderType) ([]eaclSDK.He return nil, true, nil case eaclSDK.HeaderFromRequest: if h.requestHeaders == nil { - h.requestHeaders = requestHeaders(h.cfg.msg) + if x, ok := h.cfg.msg.(xHeaderSource); ok { + h.requestHeaders = requestHeaders(x) + } } return h.requestHeaders, true, nil case eaclSDK.HeaderFromObject: @@ -115,6 +117,12 @@ func (h *cfg) readObjectHeaders(dst *headerSource) error { switch m := h.msg.(type) { default: panic(fmt.Sprintf("unexpected message type %T", h.msg)) + case binaryHeader: + var err error + dst.objectHeaders, err = headersFromBinaryObjectHeader(m, h.cnr, h.obj) + if err != nil { + return err + } case requestXHeaderSource: switch req := m.req.(type) { case diff --git a/pkg/services/object/acl/eacl/v2/object.go b/pkg/services/object/acl/eacl/v2/object.go index 1328647768..376a3911b4 100644 --- a/pkg/services/object/acl/eacl/v2/object.go +++ b/pkg/services/object/acl/eacl/v2/object.go @@ -3,11 +3,15 @@ package v2 import ( "strconv" + "github.com/mr-tron/base58" + iobject "github.com/nspcc-dev/neofs-node/internal/object" + iprotobuf "github.com/nspcc-dev/neofs-node/internal/protobuf" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" eaclSDK "github.com/nspcc-dev/neofs-sdk-go/eacl" "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "github.com/nspcc-dev/neofs-sdk-go/version" + "google.golang.org/protobuf/encoding/protowire" ) type sysObjHdr struct { @@ -94,3 +98,158 @@ func headersFromObject(obj *object.Object, cnr cid.ID, oid *oid.ID) []eaclSDK.He return res } + +type binaryHeader []byte + +func headersFromBinaryObjectHeader(b []byte, cnr cid.ID, id *oid.ID) ([]eaclSDK.Header, error) { + res, err := _headersFromBinaryObjectHeader(b) + if err != nil { + return nil, err + } + + if id != nil { + return append(res, cidHeader(cnr), oidHeader(*id)), nil + } + + return append(res, cidHeader(cnr)), nil +} + +func _headersFromBinaryObjectHeader(b []byte) ([]eaclSDK.Header, error) { + var ver version.Version + var creationEpoch uint64 + var payloadLen uint64 + var objTyp object.Type + res := make([]eaclSDK.Header, 0, 10) + + var off int + var prevNum protowire.Number + for { + num, typ, n, err := iprotobuf.ParseTag(b, off) + if err != nil { + return nil, err + } + off += n + + if num < prevNum { + return nil, iprotobuf.NewUnorderedFieldsError(prevNum, num) + } + if num == prevNum && num != iobject.FieldHeaderAttributes { + return nil, iprotobuf.NewRepeatedFieldError(num) + } + prevNum = num + + switch num { + case iobject.FieldHeaderVersion: + ver, n, err = iprotobuf.ParseAPIVersionField(b, off, num, typ) + if err != nil { + return nil, err + } + off += n + case iobject.FieldHeaderContainerID: + ln, n, err := iprotobuf.ParseLenField(b, off, num, typ) + if err != nil { + return nil, err + } + off += n + ln + case iobject.FieldHeaderOwnerID: + owner, n, err := iprotobuf.ParseUserIDField(b, off, num, typ) + if err != nil { + return nil, err + } + off += n + + res = append(res, sysObjHdr{k: eaclSDK.FilterObjectOwnerID, v: base58.Encode(owner)}) + case iobject.FieldHeaderCreationEpoch: + creationEpoch, n, err = iprotobuf.ParseUint64Field(b, off, num, typ) + if err != nil { + return nil, err + } + off += n + case iobject.FieldHeaderPayloadLength: + payloadLen, n, err = iprotobuf.ParseUint64Field(b, off, num, typ) + if err != nil { + return nil, err + } + off += n + case iobject.FieldHeaderPayloadHash: + cs, n, err := iprotobuf.ParseChecksum(b, off, num, typ) + if err != nil { + return nil, err + } + off += n + + res = append(res, sysObjHdr{k: eaclSDK.FilterObjectPayloadChecksum, v: cs.String()}) + case iobject.FieldHeaderType: + objTyp, n, err = iprotobuf.ParseEnumField[object.Type](b, off, num, typ) + if err != nil { + return nil, err + } + off += n + case iobject.FieldHeaderHomoHash: + cs, n, err := iprotobuf.ParseChecksum(b, off, num, typ) + if err != nil { + return nil, err + } + off += n + + res = append(res, sysObjHdr{k: eaclSDK.FilterObjectPayloadHomomorphicChecksum, v: cs.String()}) + case iobject.FieldHeaderSessionToken: + ln, n, err := iprotobuf.ParseLenField(b, off, num, typ) + if err != nil { + return nil, err + } + off += n + ln + case iobject.FieldHeaderAttributes: + k, v, n, err := iprotobuf.ParseAttribute(b, off, num, typ) + if err != nil { + return nil, err + } + off += n + + res = append(res, sysObjHdr{k: string(k), v: string(v)}) + case iobject.FieldHeaderSplit: + ln, n, err := iprotobuf.ParseLenField(b, off, num, typ) + if err != nil { + return nil, err + } + off += n + + parHdrf, err := iprotobuf.SeekBytesField(b[off:off+ln], iobject.FieldHeaderSplitParentHeader) + if err != nil { + return nil, iprotobuf.WrapParseFieldError(iobject.FieldHeaderSplit, protowire.BytesType, err) + } + + if !parHdrf.IsMissing() { + parRes, err := _headersFromBinaryObjectHeader(b[off:][parHdrf.ValueFrom:parHdrf.To]) + if err != nil { + return nil, iprotobuf.WrapParseFieldError(iobject.FieldHeaderSplit, protowire.BytesType, err) + } + + res = append(res, parRes...) + } + + off += ln + case iobject.FieldHeaderSessionTokenV2: + ln, n, err := iprotobuf.ParseLenField(b, off, num, typ) + if err != nil { + return nil, err + } + off += n + ln + default: + return nil, iprotobuf.NewUnsupportedFieldError(num, typ) + } + + if off == len(b) { + break + } + } + + res = append(res, + sysObjHdr{k: eaclSDK.FilterObjectCreationEpoch, v: u64Value(creationEpoch)}, + sysObjHdr{k: eaclSDK.FilterObjectPayloadSize, v: u64Value(payloadLen)}, + sysObjHdr{k: eaclSDK.FilterObjectVersion, v: ver.String()}, + sysObjHdr{k: eaclSDK.FilterObjectType, v: objTyp.String()}, + ) + + return res, nil +} diff --git a/pkg/services/object/acl/eacl/v2/opts.go b/pkg/services/object/acl/eacl/v2/opts.go index 5e6c286f04..af53c243ac 100644 --- a/pkg/services/object/acl/eacl/v2/opts.go +++ b/pkg/services/object/acl/eacl/v2/opts.go @@ -43,6 +43,12 @@ func WithServiceResponse(resp Response, req Request) Option { } } +func WithObjectHeaderBinary(b []byte) Option { + return func(c *cfg) { + c.msg = binaryHeader(b) + } +} + func WithCID(v cid.ID) Option { return func(c *cfg) { c.cnr = v diff --git a/pkg/services/object/get/get.go b/pkg/services/object/get/get.go index 27c1ac941b..7a24ab0c36 100644 --- a/pkg/services/object/get/get.go +++ b/pkg/services/object/get/get.go @@ -247,6 +247,14 @@ func (s *Service) Head(ctx context.Context, prm HeadPrm) error { } if prm.common.LocalOnly() { + if prm.getBufferFn != nil { + n, err := s.localObjects.HeadToBuffer(prm.addr, prm.raw, prm.getBufferFn) + if err == nil { + prm.putBytesReadFn(n) + } + return err + } + return s.copyLocalObjectHeader(prm.objWriter, prm.addr.Container(), prm.addr.Object(), prm.raw) } diff --git a/pkg/services/object/get/local.go b/pkg/services/object/get/local.go index 8e00326cd1..ec5bbd4a10 100644 --- a/pkg/services/object/get/local.go +++ b/pkg/services/object/get/local.go @@ -3,6 +3,7 @@ package getsvc import ( "errors" "fmt" + "io" "github.com/nspcc-dev/neofs-node/pkg/services/object/internal" apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" @@ -15,7 +16,17 @@ import ( func (exec *execCtx) executeLocal() { var err error - exec.collectedHeader, exec.collectedReader, err = exec.svc.localStorage.get(exec) + localGET := exec.isLocal() && !exec.headOnly() && exec.ctxRange() == nil + if localGET { + var n int + var stream io.ReadCloser + n, stream, err = exec.svc.localObjects.OpenStream(exec.address(), exec.prm.getBufferFn) + if err == nil { + exec.prm.putBytesReadWithStreamFn(n, stream) + } + } else { + exec.collectedHeader, exec.collectedReader, err = exec.svc.localStorage.get(exec) + } var errSplitInfo *object.SplitInfoError @@ -30,6 +41,11 @@ func (exec *execCtx) executeLocal() { case err == nil: exec.status = statusOK exec.err = nil + + if localGET { + break + } + exec.writeCollectedObject() case errors.Is(err, apistatus.Error): if errors.Is(err, apistatus.ErrObjectNotFound) { diff --git a/pkg/services/object/get/prm.go b/pkg/services/object/get/prm.go index 50841d41e1..73c1a999e2 100644 --- a/pkg/services/object/get/prm.go +++ b/pkg/services/object/get/prm.go @@ -4,6 +4,7 @@ import ( "context" "crypto/ecdsa" "hash" + "io" coreclient "github.com/nspcc-dev/neofs-node/pkg/core/client" "github.com/nspcc-dev/neofs-node/pkg/services/object/internal" @@ -61,6 +62,10 @@ type commonPrm struct { // requests (if any), could be nil if incoming request handling // routine does not include any key fetching operations signerKey *ecdsa.PrivateKey + + getBufferFn func() []byte + putBytesReadFn func(int) + putBytesReadWithStreamFn func(int, io.ReadCloser) } // ChunkWriter is an interface of target component @@ -80,6 +85,12 @@ func (p *Prm) SetObjectWriter(w ObjectWriter) { p.objWriter = w } +// TODO: docs. +func (p *Prm) WithBuffersFuncs(getBufferFn func() []byte, putBytesReadWithStreamFn func(int, io.ReadCloser)) { + p.getBufferFn = getBufferFn + p.putBytesReadWithStreamFn = putBytesReadWithStreamFn +} + // SetChunkWriter sets target component to write the object payload range. func (p *RangePrm) SetChunkWriter(w ChunkWriter) { p.objWriter = &partWriter{ @@ -141,3 +152,9 @@ func (p *HeadPrm) SetHeaderWriter(w internal.HeaderWriter) { headWriter: w, } } + +// TODO: docs. +func (p *HeadPrm) WithBuffersFuncs(getBufferFn func() []byte, putBytesReadFn func(int)) { + p.getBufferFn = getBufferFn + p.putBytesReadFn = putBytesReadFn +} diff --git a/pkg/services/object/get/service.go b/pkg/services/object/get/service.go index a1178ed684..b476e0f78f 100644 --- a/pkg/services/object/get/service.go +++ b/pkg/services/object/get/service.go @@ -82,6 +82,8 @@ type cfg struct { Head(oid.Address, bool) (*object.Object, error) // HeadECPart is similar to GetECPart but returns only the header. HeadECPart(cnr cid.ID, parent oid.ID, pi iec.PartInfo) (object.Object, error) + HeadToBuffer(oid.Address, bool, func() []byte) (int, error) + OpenStream(oid.Address, func() []byte) (int, io.ReadCloser, error) } localStorage interface { get(*execCtx) (*object.Object, io.ReadCloser, error) diff --git a/pkg/services/object/get/service_test.go b/pkg/services/object/get/service_test.go index 715022b687..19a2cc0390 100644 --- a/pkg/services/object/get/service_test.go +++ b/pkg/services/object/get/service_test.go @@ -194,3 +194,7 @@ func (unimplementedLocalStorage) Head(oid.Address, bool) (*object.Object, error) func (unimplementedLocalStorage) HeadECPart(cid.ID, oid.ID, iec.PartInfo) (object.Object, error) { panic("unimplemented") } + +func (unimplementedLocalStorage) HeadToBuffer(oid.Address, bool, func() []byte) (int, error) { + panic("unimplemented") +} diff --git a/pkg/services/object/protobuf.go b/pkg/services/object/protobuf.go new file mode 100644 index 0000000000..f21cbe2949 --- /dev/null +++ b/pkg/services/object/protobuf.go @@ -0,0 +1,183 @@ +package object + +import ( + "encoding/binary" + "fmt" + + iobject "github.com/nspcc-dev/neofs-node/internal/object" + iprotobuf "github.com/nspcc-dev/neofs-node/internal/protobuf" + neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto" + "github.com/nspcc-dev/neofs-sdk-go/object" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + protoobject "github.com/nspcc-dev/neofs-sdk-go/proto/object" + "github.com/nspcc-dev/neofs-sdk-go/proto/refs" + protostatus "github.com/nspcc-dev/neofs-sdk-go/proto/status" + "github.com/nspcc-dev/neofs-sdk-go/version" + "google.golang.org/protobuf/encoding/protowire" + "google.golang.org/protobuf/proto" +) + +const ( + maxHeadResponseBodyVarintLen = iobject.MaxHeaderVarintLen + maxHeaderOffsetInHeadResponse = 1 + maxHeadResponseBodyVarintLen + 1 + iobject.MaxHeaderVarintLen // 1 for iprotobuf.TagBytes1 + // TODO: test it is sufficient for everything + headResponseBufferLen = maxHeaderOffsetInHeadResponse + object.MaxHeaderLen*2 + + // TODO: share header buffers for HEAD and GET + maxGetResponseHeaderVarintLen = iobject.MaxHeaderVarintLen + maxHeaderOffsetInGetResponse = 1 + maxGetResponseHeaderVarintLen + 1 + maxGetResponseHeaderVarintLen // 1 for iprotobuf.TagBytes1 + getResponseHeaderBufferLen = maxHeaderOffsetInGetResponse + +object.MaxHeaderLen*2 + + maxGetResponseChunkLen = 256 << 10 // we already have such const, share? + maxGetResponseChunkVarintLen = 3 + maxChunkOffsetInGetResponse = 1 + maxGetResponseChunkVarintLen + // 1 for iprotobuf.TagBytes1 + 1 + maxGetResponseChunkVarintLen // 1 for iprotobuf.TagBytes2 + getResponseChunkBufferLen = maxChunkOffsetInGetResponse + maxGetResponseChunkLen +) + +var currentVersionResponseMetaHeader []byte + +func init() { + ver := version.Current() + mjr := ver.Major() + mnr := ver.Minor() + + verLn := 1 + protowire.SizeVarint(uint64(mjr)) + 1 + protowire.SizeVarint(uint64(mnr)) + + b := make([]byte, 1+protowire.SizeBytes(verLn)) + + b[0] = iprotobuf.TagBytes1 + off := 1 + binary.PutUvarint(b[1:], uint64(verLn)) + b[off] = iprotobuf.TagVarint1 + off += 1 + binary.PutUvarint(b[off+1:], uint64(mjr)) + b[off] = iprotobuf.TagVarint2 + off += 1 + binary.PutUvarint(b[off+1:], uint64(mnr)) + + currentVersionResponseMetaHeader = b[:off] +} + +func writeMetaHeaderToResponseBuffer(b []byte, epoch uint64, st *protostatus.Status) int { + ln := len(currentVersionResponseMetaHeader) + 1 + protowire.SizeVarint(epoch) + stLn := st.MarshaledSize() + if stLn != 0 { + ln += 1 + protowire.SizeBytes(stLn) + } + + b[0] = iprotobuf.TagBytes2 + off := 1 + binary.PutUvarint(b[1:], uint64(ln)) + off += copy(b[off:], currentVersionResponseMetaHeader) + b[off] = iprotobuf.TagVarint2 + off += 1 + binary.PutUvarint(b[off+1:], epoch) + if st != nil { + b[off] = iprotobuf.TagBytes6 + off += 1 + binary.PutUvarint(b[off+1:], uint64(stLn)) + st.MarshalStable(b[off:]) + off += stLn + } + + return off +} + +func shiftHeadResponseBuffer(respBuf, hdrBuf []byte, sigf, hdrf iprotobuf.FieldBounds) int { + if !hdrf.IsMissing() { + hdrBuf[hdrf.From] = iprotobuf.TagBytes1 + } + + hdrLen := (sigf.To - sigf.From) + (hdrf.To - hdrf.From) + + respBuf[0] = iprotobuf.TagBytes1 + off := 1 + binary.PutUvarint(respBuf[1:], uint64(1+protowire.SizeBytes(hdrLen))) + respBuf[off] = iprotobuf.TagBytes1 + off += 1 + binary.PutUvarint(respBuf[off+1:], uint64(hdrLen)) + + if !sigf.IsMissing() { + off += copy(respBuf[off:], hdrBuf[sigf.From:sigf.To]) + } + if !hdrf.IsMissing() { + off += copy(respBuf[off:], hdrBuf[hdrf.From:hdrf.To]) + } + + return off +} + +var headResponseBufferPool = iprotobuf.NewBufferPool(headResponseBufferLen) + +func getBufferForHeadResponse() (*iprotobuf.MemBuffer, []byte) { + item := headResponseBufferPool.Get() + return item, item.SliceBuffer[maxHeaderOffsetInHeadResponse:] +} + +var getResponseHeaderBufferPool = iprotobuf.NewBufferPool(getResponseHeaderBufferLen) + +func getBufferForGetResponseHeader() (*iprotobuf.MemBuffer, []byte) { + item := getResponseHeaderBufferPool.Get() + return item, item.SliceBuffer[maxHeaderOffsetInGetResponse:] +} + +func shiftGetResponseHeaderBuffer(respBuf, hdrBuf []byte) int { + bodyLen := 1 + protowire.SizeBytes(len(hdrBuf)) + + respBuf[0] = iprotobuf.TagBytes1 + off := 1 + binary.PutUvarint(respBuf[1:], uint64(bodyLen)) + respBuf[off] = iprotobuf.TagBytes1 + off += 1 + binary.PutUvarint(respBuf[off+1:], uint64(len(hdrBuf))) + + return off + copy(respBuf[off:], hdrBuf) +} + +var getResponseChunkBufferPool = iprotobuf.NewBufferPool(getResponseChunkBufferLen) + +func getBufferForGetResponseChunk() (*iprotobuf.MemBuffer, []byte) { + item := getResponseChunkBufferPool.Get() + return item, item.SliceBuffer[maxChunkOffsetInGetResponse:] +} + +func shiftGetResponseChunkBuffer(respBuf, chunk []byte) int { + bodyLen := 1 + protowire.SizeBytes(len(chunk)) + + respBuf[0] = iprotobuf.TagBytes1 + off := 1 + binary.PutUvarint(respBuf[1:], uint64(bodyLen)) + respBuf[off] = iprotobuf.TagBytes2 + off += 1 + binary.PutUvarint(respBuf[off+1:], uint64(len(chunk))) + + return off + copy(respBuf[off:], chunk) +} + +func parseHeaderBinary(b []byte) (iprotobuf.FieldBounds, iprotobuf.FieldBounds, iprotobuf.FieldBounds, error) { + idf, sigf, hdrf, err := iobject.SeekHeaderFields(b) + if err != nil { + return idf, sigf, hdrf, fmt.Errorf("restore layout from received binary: %w", err) + } + + if !idf.IsMissing() { + m := new(refs.ObjectID) + if err = proto.Unmarshal(b[idf.ValueFrom:idf.To], m); err != nil { + return idf, sigf, hdrf, fmt.Errorf("unmarshal ID from received binary: %w", err) + } + if err = new(oid.ID).FromProtoMessage(m); err != nil { + return idf, sigf, hdrf, fmt.Errorf("invalid ID in received binary: %w", err) + } + } + + if !sigf.IsMissing() { + m := new(refs.Signature) + if err = proto.Unmarshal(b[sigf.ValueFrom:sigf.To], m); err != nil { + return idf, sigf, hdrf, fmt.Errorf("unmarshal signature from received binary: %w", err) + } + if err = new(neofscrypto.Signature).FromProtoMessage(m); err != nil { + return idf, sigf, hdrf, fmt.Errorf("invalid signature in received binary: %w", err) + } + } + + if !hdrf.IsMissing() { + m := new(protoobject.Header) + if err = proto.Unmarshal(b[hdrf.ValueFrom:hdrf.To], m); err != nil { + return idf, sigf, hdrf, fmt.Errorf("unmarshal header from received binary: %w", err) + } + if err = new(object.Object).FromProtoMessage(&protoobject.Object{Header: m}); err != nil { + return idf, sigf, hdrf, fmt.Errorf("invalid header in received binary: %w", err) + } + } + + return idf, sigf, hdrf, nil +} diff --git a/pkg/services/object/protobuf_test.go b/pkg/services/object/protobuf_test.go new file mode 100644 index 0000000000..934164f1f2 --- /dev/null +++ b/pkg/services/object/protobuf_test.go @@ -0,0 +1,13 @@ +package object + +import ( + "encoding/binary" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestLimits(t *testing.T) { + varintB := make([]byte, binary.MaxVarintLen64) + require.EqualValues(t, binary.PutUvarint(varintB, maxGetResponseChunkLen), maxGetResponseChunkVarintLen) +} diff --git a/pkg/services/object/server.go b/pkg/services/object/server.go index 48e6f25d86..72f744c8ec 100644 --- a/pkg/services/object/server.go +++ b/pkg/services/object/server.go @@ -17,6 +17,7 @@ import ( "github.com/google/uuid" icrypto "github.com/nspcc-dev/neofs-node/internal/crypto" + iprotobuf "github.com/nspcc-dev/neofs-node/internal/protobuf" "github.com/nspcc-dev/neofs-node/pkg/core/client" "github.com/nspcc-dev/neofs-node/pkg/core/container" "github.com/nspcc-dev/neofs-node/pkg/core/netmap" @@ -49,6 +50,7 @@ import ( "github.com/nspcc-dev/tzhash/tz" "github.com/panjf2000/ants/v2" "google.golang.org/grpc" + "google.golang.org/protobuf/proto" ) // Handlers represents storage node's internal handler Object service op @@ -598,7 +600,11 @@ func (s *Server) makeStatusHeadResponse(err error, sign bool) *protoobject.HeadR }, sign) } -func (s *Server) Head(ctx context.Context, req *protoobject.HeadRequest) (*protoobject.HeadResponse, error) { +func (s *Server) Head(context.Context, *protoobject.HeadRequest) (*protoobject.HeadResponse, error) { + panic("must not be called") +} + +func (s *Server) HeadBuffered(ctx context.Context, req *protoobject.HeadRequest) (any, error) { var ( err error recheckEACL bool @@ -638,8 +644,7 @@ func (s *Server) Head(ctx context.Context, req *protoobject.HeadRequest) (*proto recheckEACL = true } - var resp protoobject.HeadResponse - p, err := convertHeadPrm(s.signer, req, &resp) + p, err := convertHeadPrm(s.signer, req) if err != nil { if !errors.Is(err, apistatus.Error) { var bad = new(apistatus.BadRequest) @@ -648,11 +653,70 @@ func (s *Server) Head(ctx context.Context, req *protoobject.HeadRequest) (*proto } return s.makeStatusHeadResponse(err, needSignResp), nil } + + var resp protoobject.HeadResponse + + respDst := &headResponse{ + dst: &resp, + } + p.SetHeaderWriter(respDst) + + var respBuf *iprotobuf.MemBuffer + defer func() { + if respBuf != nil { + respBuf.Free() + } + }() + + var hdrBuf []byte + hdrLen := -1 // to panic below if it is not updated along with hdrBuf + + p.WithBuffersFuncs(func() []byte { + if hdrBuf == nil { + respBuf, hdrBuf = getBufferForHeadResponse() + } + return hdrBuf + }, func(ln int) { + hdrLen = ln + }) + err = s.handlers.Head(ctx, p) if err != nil { return s.makeStatusHeadResponse(err, needSignResp), nil } + if hdrBuf != nil { + defer respBuf.Free() + + _, sigf, hdrf, err := parseHeaderBinary(hdrBuf[:hdrLen]) + if err != nil { + return s.makeStatusHeadResponse(err, needSignResp), nil + } + + if recheckEACL { // previous check didn't match, but we have a header now. + err = s.aclChecker.CheckEACL(hdrBuf[hdrf.ValueFrom:hdrf.To], reqInfo) + if err != nil && !errors.Is(err, aclsvc.ErrNotMatched) { // Not matched -> follow basic ACL. + err = eACLErr(reqInfo, err) // defer + return s.makeStatusHeadResponse(err, needSignResp), nil + } + } + + n := shiftHeadResponseBuffer(respBuf.SliceBuffer, hdrBuf, sigf, hdrf) + + n += writeMetaHeaderToResponseBuffer(respBuf.SliceBuffer[n:], s.fsChain.CurrentEpoch(), nil) + + if !needSignResp { + respBuf.Finalize(n) + return respBuf, nil + } + + if err = proto.Unmarshal(respBuf.SliceBuffer[:n], &resp); err != nil { + return nil, fmt.Errorf("unmarshal response: %w", err) + } + + recheckEACL = false + } + if recheckEACL { // previous check didn't match, but we have a header now. err = s.aclChecker.CheckEACL(&resp, reqInfo) if err != nil && !errors.Is(err, aclsvc.ErrNotMatched) { // Not matched -> follow basic ACL. @@ -670,20 +734,23 @@ type headResponse struct { func (x *headResponse) WriteHeader(hdr *object.Object) error { mo := hdr.ProtoMessage() - x.dst.Body = &protoobject.HeadResponse_Body{ + fillHeadResponse(x.dst, mo.GetHeader(), mo.GetSignature()) + return nil +} + +func fillHeadResponse(resp *protoobject.HeadResponse, hdr *protoobject.Header, sig *refs.Signature) { + resp.Body = &protoobject.HeadResponse_Body{ Head: &protoobject.HeadResponse_Body_Header{ Header: &protoobject.HeaderWithSignature{ - Header: mo.GetHeader(), - Signature: mo.GetSignature(), + Header: hdr, + Signature: sig, }, }, } - return nil } // converts original request into parameters accepted by the internal handler. -// Note that the response is untouched within this call. -func convertHeadPrm(signer ecdsa.PrivateKey, req *protoobject.HeadRequest, resp *protoobject.HeadResponse) (getsvc.HeadPrm, error) { +func convertHeadPrm(signer ecdsa.PrivateKey, req *protoobject.HeadRequest) (getsvc.HeadPrm, error) { body := req.GetBody() ma := body.GetAddress() if ma == nil { // includes nil body @@ -704,9 +771,6 @@ func convertHeadPrm(signer ecdsa.PrivateKey, req *protoobject.HeadRequest, resp p.SetCommonParameters(cp) p.WithAddress(addr) p.WithRawFlag(body.Raw) - p.SetHeaderWriter(&headResponse{ - dst: resp, - }) if cp.LocalOnly() { return p, nil } @@ -991,7 +1055,7 @@ func (s *Server) sendStatusGetResponse(stream protoobject.ObjectService_GetServe }, sign) } return s.sendGetResponse(stream, &protoobject.GetResponse{ - MetaHeader: s.makeResponseMetaHeader(util.ToStatus(err)), + MetaHeader: s.makeResponseMetaHeader(apistatus.FromError(err)), }, sign) } @@ -1081,13 +1145,15 @@ func (s *Server) Get(req *protoobject.GetRequest, gStream protoobject.ObjectServ recheckEACL = true } - p, err := convertGetPrm(s.signer, req, &getStream{ + respStream := &getStream{ base: gStream, srv: s, reqInfo: reqInfo, recheckEACL: recheckEACL, signResponse: needSignResp, - }) + } + + p, err := convertGetPrm(s.signer, req, respStream) if err != nil { if !errors.Is(err, apistatus.Error) { var bad = new(apistatus.BadRequest) @@ -1096,10 +1162,159 @@ func (s *Server) Get(req *protoobject.GetRequest, gStream protoobject.ObjectServ } return s.sendStatusGetResponse(gStream, err, needSignResp) } + p.SetObjectWriter(respStream) + + var pldStream io.ReadCloser + var hdrRespBuf *iprotobuf.MemBuffer + // TODO: explore how soon it can be freed after write + defer func() { + if pldStream != nil { + pldStream.Close() + } + if hdrRespBuf != nil { + hdrRespBuf.Free() + } + }() + + var hdrBuf []byte + hdrLen := -1 + + if !needSignResp { + p.WithBuffersFuncs(func() []byte { + if hdrBuf == nil { + hdrRespBuf, hdrBuf = getBufferForGetResponseHeader() + } + return hdrBuf + }, func(ln int, rc io.ReadCloser) { + hdrLen, pldStream = ln, rc + }) + } + err = s.handlers.Get(gStream.Context(), p) if err != nil { return s.sendStatusGetResponse(gStream, err, needSignResp) } + + if needSignResp || hdrLen < 0 { + return nil + } + + idf, sigf, hdrf, err := parseHeaderBinary(hdrBuf[:hdrLen]) + if err != nil { + return s.sendStatusGetResponse(gStream, err, needSignResp) + } + + pldPrefixSkipped := false + pldOff := max(idf.To, sigf.To, hdrf.To) + var pldValOff int + + if pldOff < hdrLen { + if hdrBuf[pldOff] != iprotobuf.TagBytes4 { + err = fmt.Errorf("unexpected byte %d after header instead of payload field tag %d", hdrBuf[pldOff], iprotobuf.TagBytes4) // defer + return s.sendStatusGetResponse(gStream, err, needSignResp) + } + + var n int + _, n, err = iprotobuf.ParseVarint(hdrBuf[:hdrLen], pldOff+1) + if err != nil { + if !errors.Is(err, io.ErrUnexpectedEOF) { + err = fmt.Errorf("parse payload field len: %w", err) // defer + return s.sendStatusGetResponse(gStream, err, needSignResp) + } + + pldValOff = pldOff + } else { + pldValOff = pldOff + 1 + n + pldPrefixSkipped = true + } + } else { + pldOff = 0 + } + + if recheckEACL { // previous check didn't match, but we have a header now. + err = s.aclChecker.CheckEACL(hdrBuf[hdrf.ValueFrom:hdrf.To], reqInfo) + if err != nil && !errors.Is(err, aclsvc.ErrNotMatched) { // Not matched -> follow basic ACL. + err = eACLErr(reqInfo, err) // defer + return s.sendStatusGetResponse(gStream, err, needSignResp) + } + } + + n := shiftGetResponseHeaderBuffer(hdrRespBuf.SliceBuffer, hdrBuf[:pldOff]) + // FIXME: meta header + // n += writeMetaHeaderToResponseBuffer(hdrRespBuf.SliceBuffer[n:], s.fsChain.CurrentEpoch(), nil) + hdrRespBuf.Finalize(n) + if err = gStream.SendMsg(hdrRespBuf); err != nil { + return err + } + + // TODO: check whether full object is buffered. If so, additional buffer is not + // needed. It's better to reuse the heading buffer instead. + + pldRespBuf, pldBuf := getBufferForGetResponseChunk() + defer pldRespBuf.Free() + + pldOff = copy(pldBuf, hdrBuf[pldValOff:hdrLen]) + + if !pldPrefixSkipped { + n, err = io.ReadFull(pldStream, pldBuf[pldOff:]) + done := errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) + if err != nil && !done { + err = fmt.Errorf("read payload stream: %w", err) // defer + return s.sendStatusGetResponse(gStream, err, needSignResp) + } + + pldOff += n + if pldOff == 0 { // no payload field + return nil + } + + if pldBuf[0] != iprotobuf.TagBytes4 { + err = fmt.Errorf("unexpected byte %d after header instead of payload field tag %d", pldBuf[0], iprotobuf.TagBytes4) // defer + return s.sendStatusGetResponse(gStream, err, needSignResp) + } + + _, n, err = iprotobuf.ParseVarint(pldBuf[:pldOff], 1) + if err != nil { + err = fmt.Errorf("parse payload field len: %w", err) // defer + return s.sendStatusGetResponse(gStream, err, needSignResp) + } + + pldValOff = 1 + n + + if done { + if pldValOff == pldOff { // empty payload + return nil + } + + n = shiftGetResponseChunkBuffer(pldRespBuf.SliceBuffer, pldBuf[pldValOff:pldOff]) + pldRespBuf.Finalize(n) + err = gStream.SendMsg(pldRespBuf) // defer + return err + } + + pldOff = copy(pldBuf, pldBuf[pldValOff:pldOff]) + } + + for { + n, err = io.ReadFull(pldStream, pldBuf[pldOff:]) + if err != nil && !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) { + err = fmt.Errorf("read payload stream: %w", err) // defer + return s.sendStatusGetResponse(gStream, err, needSignResp) + } + + pldOff += n + + n = shiftGetResponseChunkBuffer(pldRespBuf.SliceBuffer, pldBuf[:pldOff]) + pldRespBuf.Finalize(n) + if err = gStream.SendMsg(pldRespBuf); err != nil || pldOff < len(pldBuf) { + return err + } + + pldRespBuf, pldBuf = getBufferForGetResponseChunk() + defer pldRespBuf.Free() // TODO: avoid defer in for? + pldOff = 0 + } + return nil } @@ -1127,7 +1342,6 @@ func convertGetPrm(signer ecdsa.PrivateKey, req *protoobject.GetRequest, stream p.SetCommonParameters(cp) p.WithAddress(addr) p.WithRawFlag(body.Raw) - p.SetObjectWriter(stream) if cp.LocalOnly() { return p, nil }