From 74512e282fe82b5cf8f15ee74d74add920d8cb24 Mon Sep 17 00:00:00 2001 From: Gabor Javorszky Date: Mon, 20 Mar 2023 19:40:20 +0000 Subject: [PATCH 1/6] Rework auth cache solutions --- e2core/auth/access.go | 82 ++---- e2core/auth/api.go | 75 ++++++ e2core/auth/authorizer_test.go | 170 ++++++------ e2core/auth/bigcache.go | 105 ++++++++ e2core/auth/cache.go | 79 ------ e2core/auth/go-cache.go | 60 +++++ e2core/server/server.go | 9 +- foundation/common/cache.go | 317 ---------------------- foundation/common/cache_test.go | 463 -------------------------------- go.mod | 2 + go.sum | 4 + 11 files changed, 360 insertions(+), 1006 deletions(-) create mode 100644 e2core/auth/api.go create mode 100644 e2core/auth/bigcache.go delete mode 100644 e2core/auth/cache.go create mode 100644 e2core/auth/go-cache.go delete mode 100644 foundation/common/cache.go delete mode 100644 foundation/common/cache_test.go diff --git a/e2core/auth/access.go b/e2core/auth/access.go index b88b7949..1076d704 100644 --- a/e2core/auth/access.go +++ b/e2core/auth/access.go @@ -1,7 +1,6 @@ package auth import ( - "encoding/json" "fmt" "net/http" "path/filepath" @@ -9,12 +8,16 @@ import ( "time" "github.com/labstack/echo/v4" + "github.com/pkg/errors" - "github.com/suborbital/e2core/e2core/options" - "github.com/suborbital/e2core/foundation/common" "github.com/suborbital/systemspec/system" ) +const ( + DefaultCacheTTL = 10 * time.Minute + DefaultCacheTTClean = 2 * time.Minute +) + type TenantInfo struct { AuthorizedParty string `json:"authorized_party"` Environment string `json:"environment"` @@ -22,9 +25,11 @@ type TenantInfo struct { Name string `json:"name"` } -func AuthorizationMiddleware(opts *options.Options) echo.MiddlewareFunc { - authorizer := NewApiAuthClient(opts) +type Authorizer interface { + Authorize(token system.Credential, identifier, namespace, name string) (TenantInfo, error) +} +func AuthorizationMiddleware(authorizer Authorizer) echo.MiddlewareFunc { return func(next echo.HandlerFunc) echo.HandlerFunc { return func(c echo.Context) error { identifier := c.Param("ident") @@ -43,63 +48,6 @@ func AuthorizationMiddleware(opts *options.Options) echo.MiddlewareFunc { } } -func NewApiAuthClient(opts *options.Options) *AuthzClient { - return &AuthzClient{ - httpClient: &http.Client{ - Timeout: 20 * time.Second, - Transport: http.DefaultTransport, - }, - location: opts.ControlPlane + "/environment/v1/tenant/", - cache: NewAuthorizationCache(opts.AuthCacheTTL), - } -} - -type AuthzClient struct { - location string - httpClient *http.Client - cache *AuthorizationCache -} - -func (client *AuthzClient) Authorize(token system.Credential, identifier, namespace, name string) (*TenantInfo, error) { - if token == nil { - return nil, common.Error(common.ErrAccess, "no credentials provided") - } - - key := filepath.Join(identifier, namespace, name, token.Value()) - - return client.cache.Get(key, client.loadAuth(token, identifier)) -} - -func (client *AuthzClient) loadAuth(token system.Credential, identifier string) func() (*TenantInfo, error) { - return func() (*TenantInfo, error) { - authzReq, err := http.NewRequest(http.MethodGet, client.location+identifier, nil) - if err != nil { - return nil, common.Error(err, "post authorization request") - } - - // pass token along - headerVal := fmt.Sprintf("%s %s", token.Scheme(), token.Value()) - authzReq.Header.Set(http.CanonicalHeaderKey("Authorization"), headerVal) - - resp, err := client.httpClient.Do(authzReq) - if err != nil { - return nil, common.Error(err, "dispatch remote authz request") - } - - if resp.StatusCode != http.StatusOK { - return nil, common.Error(common.ErrAccess, "non-200 response %d for authorization service", resp.StatusCode) - } - defer resp.Body.Close() - - var claims *TenantInfo - if err = json.NewDecoder(resp.Body).Decode(&claims); err != nil { - return nil, common.Error(err, "deserialized authorization response") - } - - return claims, nil - } -} - func NewAccessToken(token string) system.Credential { if token != "" { return &AccessToken{ @@ -146,3 +94,13 @@ func ExtractAccessToken(header http.Header) system.Credential { value: authInfo[splitAt+1:], } } + +// deriveKey is a utility function that takes a system.Credential token, identifier, namespace, and plugin name as args +// and returns one long string we can use as cache keys. +func deriveKey(token system.Credential, identifier, namespace, name string) (string, error) { + if token == nil { + return "", errors.New("token provided was nil") + } + + return filepath.Join(identifier, namespace, name, token.Value()), nil +} diff --git a/e2core/auth/api.go b/e2core/auth/api.go new file mode 100644 index 00000000..4b4af4d4 --- /dev/null +++ b/e2core/auth/api.go @@ -0,0 +1,75 @@ +package auth + +import ( + "encoding/json" + "fmt" + "net/http" + "time" + + "github.com/pkg/errors" + + "github.com/suborbital/e2core/e2core/options" + "github.com/suborbital/systemspec/system" +) + +var ErrUnauthorized = errors.New("received 401 Unauthorized from API") + +// NewApiAuthClient returns a configured SE2 Auth client that will ask the tenant endpoint for info about a tenant by +// its ID. +func NewApiAuthClient(opts *options.Options) *APIAuthorizer { + return &APIAuthorizer{ + httpClient: &http.Client{ + Timeout: 20 * time.Second, + }, + location: opts.ControlPlane + "/environment/v1/tenant/%s", + } +} + +// APIAuthorizer is a bridge between e2core and a REST API. The only thing this one does is asks SE2 information about a +// tenant identified by the identifier over plain old REST API. +type APIAuthorizer struct { + location string + httpClient *http.Client +} + +// Authorize implements the Authorizer interface. It doesn't use the namespace and the function name arguments as its +// only job is to use the token it received and the identifier and send a message to SE2 asking info about the tenant. +// +// SE2, in turn, is the system that actually does the check for the token. +func (client *APIAuthorizer) Authorize(token system.Credential, identifier, _, _ string) (TenantInfo, error) { + if token == nil { + return TenantInfo{}, errors.New("no credentials provided") + } + + authzReq, err := http.NewRequest(http.MethodGet, fmt.Sprintf(client.location, identifier), nil) + if err != nil { + return TenantInfo{}, errors.Wrap(err, "http.NewRequest GET control plane environment tenant") + } + + // pass token along + authzReq.Header.Set("Authorization", fmt.Sprintf("%s %s", token.Scheme(), token.Value())) + + resp, err := client.httpClient.Do(authzReq) + if err != nil { + return TenantInfo{}, errors.Wrap(err, "client.httpClient.Do") + } + + if resp.StatusCode == http.StatusUnauthorized { + return TenantInfo{}, ErrUnauthorized + } + + if resp.StatusCode != http.StatusOK { + return TenantInfo{}, fmt.Errorf("received non-200 and non-401 status code %d from API", resp.StatusCode) + } + + defer func() { + _ = resp.Body.Close() + }() + + var claims TenantInfo + if err = json.NewDecoder(resp.Body).Decode(&claims); err != nil { + return TenantInfo{}, errors.Wrap(err, "json.Decode into TenantInfo") + } + + return claims, nil +} diff --git a/e2core/auth/authorizer_test.go b/e2core/auth/authorizer_test.go index d3f0e8bd..78d0c357 100644 --- a/e2core/auth/authorizer_test.go +++ b/e2core/auth/authorizer_test.go @@ -10,52 +10,45 @@ import ( "testing" "time" + "github.com/allegro/bigcache/v3" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/suborbital/e2core/foundation/common" ) type CompareAssertionFunc[T any] func(t *testing.T, actual T) bool -func TestAuthorizerCache_ConcurrentRequests(t *testing.T) { - type args struct { - token string - } - - type test struct { +func TestAuthorizerGoCache_ConcurrentRequests(t *testing.T) { + tests := []struct { name string - args args + token string handler http.HandlerFunc assertOpts CompareAssertionFunc[uint64] assertErr assert.ErrorAssertionFunc - } - - tests := []test{ + }{ { - name: "Ensure duplicate requests are pipelined", - args: args{ - token: "token", - }, + name: "Ensure duplicate requests are pipelined", + token: "token", handler: func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) _ = json.NewEncoder(w).Encode(&TenantInfo{ AuthorizedParty: "tester", Environment: "env", ID: "tnt", + Name: "fnName", }) }, assertOpts: func(t *testing.T, actual uint64) bool { - return assert.Equal(t, uint64(1), actual) + return assert.Equalf(t, uint64(1), actual, "expected %d, got %d", 1, actual) }, assertErr: func(_ assert.TestingT, err error, _ ...interface{}) bool { return err == nil }, }, { - name: "Ensure non-credentialed requests are not dispatched", - args: args{ - token: "", - }, + name: "Ensure non-credentialed requests are not dispatched", + token: "", handler: func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) _ = json.NewEncoder(w).Encode(&TenantInfo{}) @@ -68,10 +61,8 @@ func TestAuthorizerCache_ConcurrentRequests(t *testing.T) { }, }, { - name: "Ensure denied requests return ErrAccess", - args: args{ - token: "token", - }, + name: "Ensure denied requests return ErrAccess", + token: "token", handler: func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusForbidden) }, @@ -86,38 +77,43 @@ func TestAuthorizerCache_ConcurrentRequests(t *testing.T) { }, } - for _, tc := range tests { - var opts uint64 = 0 - svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - atomic.AddUint64(&opts, 1) - tc.handler(w, r) - })) - - authorizer := &AuthzClient{ - httpClient: svr.Client(), - location: svr.URL + "/environment/v1/tenant/", - cache: newAuthorizationCache(common.StableTime(time.Now()), 10*time.Minute), - } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + var opts uint64 = 0 + svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + atomic.AddUint64(&opts, 1) + test.handler(w, r) + })) + + apiAuthorizer := &APIAuthorizer{ + httpClient: svr.Client(), + location: svr.URL + "/environment/v1/tenant/%s", + } + + // NewGoCacheAuthorizer always returns nil error. + goCacheAuthorizer, err := NewBigCacheAuthorizer(apiAuthorizer, DefaultConfig) + require.NoError(t, err, "initialising new big cache authorizer") + + wg := sync.WaitGroup{} + for i := 0; i < 30; i++ { + wg.Add(1) + go func() { + _, err = goCacheAuthorizer.Authorize(NewAccessToken(test.token), "env.app", "namespace", "mod") + wg.Done() + }() + } + wg.Wait() + + svr.Close() + + test.assertErr(t, err) + test.assertOpts(t, opts) + }) - var err error - wg := sync.WaitGroup{} - for i := 0; i < 1000; i++ { - wg.Add(1) - go func() { - _, err = authorizer.Authorize(NewAccessToken(tc.args.token), "env.app", "namespace", "mod") - wg.Done() - }() - } - wg.Wait() - - svr.Close() - - tc.assertErr(t, err) - tc.assertOpts(t, opts) } } -func TestAuthorizerCache(t *testing.T) { +func TestAuthorizerGoCache(t *testing.T) { type args struct { token string identifier string @@ -211,7 +207,7 @@ func TestAuthorizerCache(t *testing.T) { assertOpts: func(t *testing.T, actual uint64) bool { return assert.Equal(t, uint64(3), actual) }, - wantErr: common.ErrAccess, + wantErr: ErrUnauthorized, }, { name: "Ensure success after failure", @@ -258,27 +254,31 @@ func TestAuthorizerCache(t *testing.T) { } for _, tc := range tests { - var opts uint64 = 0 - svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - atomic.AddUint64(&opts, 1) - tc.handler(w, r) - })) + t.Run(tc.name, func(t *testing.T) { + var opts uint64 = 0 + svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + atomic.AddUint64(&opts, 1) + tc.handler(w, r) + })) - authorizer := &AuthzClient{ - httpClient: svr.Client(), - location: svr.URL + "/api/v2/tenant/", - cache: newAuthorizationCache(common.StableTime(time.Now()), 10*time.Minute), - } + apiAuthorizer := &APIAuthorizer{ + httpClient: svr.Client(), + location: svr.URL + "/api/v2/tenant/%s", + } - var err error - for _, arg := range tc.args { - _, err = authorizer.Authorize(NewAccessToken(arg.token), arg.identifier, arg.namespace, arg.mod) - } + goCacheAuthorizer, _ := NewGoCacheAuthorizer(apiAuthorizer) - svr.Close() + var err error + for _, arg := range tc.args { + _, err = goCacheAuthorizer.Authorize(NewAccessToken(arg.token), arg.identifier, arg.namespace, arg.mod) + } + + svr.Close() + + assert.ErrorIs(t, err, tc.wantErr) + assert.True(t, tc.assertOpts(t, opts)) + }) - assert.ErrorIs(t, err, tc.wantErr) - assert.True(t, tc.assertOpts(t, opts)) } } @@ -321,28 +321,30 @@ func TestAuthorizerCache_ExpiringEntry(t *testing.T) { tc.handler(w, r) })) - clock := common.StableTime(time.Now()) - - authzCache := newAuthorizationCache(clock, 10*time.Minute) - authzCache.clock = clock - - authorizer := &AuthzClient{ + authorizer := &APIAuthorizer{ httpClient: svr.Client(), - location: svr.URL + "/api/v2/tenant/", - cache: authzCache, + location: svr.URL + "/environment/v1/tenant/%s", } + bigCacheAuthorizer, err := NewBigCacheAuthorizer(authorizer, bigcache.Config{ + Shards: 4, + LifeWindow: 3 * time.Second, + CleanWindow: time.Second, + MaxEntriesInWindow: 200, + MaxEntrySize: 500, + }) + // 1 auth op - _, err := authorizer.Authorize(NewAccessToken("token"), "env.app", "namespace", "mod") - _, err = authorizer.Authorize(NewAccessToken("token"), "env.app", "namespace", "mod") - _, err = authorizer.Authorize(NewAccessToken("token"), "env.app", "namespace", "mod") + _, err = bigCacheAuthorizer.Authorize(NewAccessToken("token"), "env.app", "namespace", "mod") + _, err = bigCacheAuthorizer.Authorize(NewAccessToken("token"), "env.app", "namespace", "mod") + _, err = bigCacheAuthorizer.Authorize(NewAccessToken("token"), "env.app", "namespace", "mod") - clock.Tick(authzCache.ttl + 1*time.Second) + time.Sleep(4 * time.Second) // 2 auth op - _, err = authorizer.Authorize(NewAccessToken("token"), "env.app", "namespace", "mod") - _, err = authorizer.Authorize(NewAccessToken("token"), "env.app", "namespace", "mod") - _, err = authorizer.Authorize(NewAccessToken("token"), "env.app", "namespace", "mod") + _, err = bigCacheAuthorizer.Authorize(NewAccessToken("token"), "env.app", "namespace", "mod") + _, err = bigCacheAuthorizer.Authorize(NewAccessToken("token"), "env.app", "namespace", "mod") + _, err = bigCacheAuthorizer.Authorize(NewAccessToken("token"), "env.app", "namespace", "mod") svr.Close() diff --git a/e2core/auth/bigcache.go b/e2core/auth/bigcache.go new file mode 100644 index 00000000..e17531e8 --- /dev/null +++ b/e2core/auth/bigcache.go @@ -0,0 +1,105 @@ +package auth + +import ( + "bytes" + "context" + "encoding/gob" + "sync" + + "github.com/allegro/bigcache/v3" + "github.com/pkg/errors" + + "github.com/suborbital/systemspec/system" +) + +var DefaultConfig = bigcache.Config{ + Shards: 1, + LifeWindow: DefaultCacheTTL, + CleanWindow: DefaultCacheTTClean, + MaxEntriesInWindow: 4096, + MaxEntrySize: 512, // 3 * uuidv4 with -, (36), + name + struct size as ballpark + HardMaxCacheSize: 2048, // in MB, this is 2G +} + +// BigCacheAuthorizer has a big cache (see https://github.com/allegro/bigcache) implementation. It wraps the live authz +// client, though technically we could create an infinitely deep onion of different caches. +type BigCacheAuthorizer struct { + embedded Authorizer + mtx *sync.Mutex + cache *bigcache.BigCache +} + +// Authorize method implements the Authorizer interface for the Big Cache implementation. If the cache doesn't have a +// record for the arguments, it will ask the embedded (expected to be the Authz) implementation. +func (c *BigCacheAuthorizer) Authorize(token system.Credential, identifier, namespace, name string) (TenantInfo, error) { + c.mtx.Lock() + defer c.mtx.Unlock() + // construct key to check / retrieve / set value by. + key, err := deriveKey(token, identifier, namespace, name) + if err != nil { + return TenantInfo{}, errors.Wrap(err, "deriveKey") + } + + // create TenantInfo variable. + var ti TenantInfo + + // check if the bigcache implementation has a record about it. + stored, err := c.cache.Get(key) + + // something went wrong. + if err != nil { + // it does not have an entry, this is expected. + if errors.Is(err, bigcache.ErrEntryNotFound) { + // ask the wrapped authorizer for the entry. + ti, err = c.embedded.Authorize(token, identifier, namespace, name) + if err != nil { + return TenantInfo{}, errors.Wrap(err, "c.embedded.Authorize") + } + + // encode the retrieved entry into []byte to prep for storage into bigcache. + var b bytes.Buffer + enc := gob.NewEncoder(&b) + err = enc.Encode(ti) + if err != nil { + return TenantInfo{}, errors.Wrap(err, "enc.Encode TenantInfo into []byte using gob.Encoder") + } + + // store the encoded [] in bigcache. + err = c.cache.Set(key, b.Bytes()) + if err != nil { + return TenantInfo{}, errors.Wrap(err, "c.cache.Set") + } + + // return the TenantInfo. This is now cached going forward. + return ti, nil + } + + // The error was NOT an ErrEntryNotFound one, which is unexpected, so something else went wrong. + return TenantInfo{}, errors.Wrap(err, "c.cache.Get") + } + + // cache.Get did not return an error, so it had the data we were looking for. Let's decode it. + dec := gob.NewDecoder(bytes.NewReader(stored)) + err = dec.Decode(&ti) + if err != nil { + return TenantInfo{}, errors.Wrap(err, "dec.Decode bytes found in cache into TenantInfo") + } + + // return the cached and decoded TenantInfo. + return ti, nil +} + +// NewBigCacheAuthorizer returns a new configured BigCacheAuthorizer pointer. There are no configurations to pass in, +// the constructor takes a set of values that have already been decided based on the use case. +func NewBigCacheAuthorizer(embedded Authorizer, config bigcache.Config) (*BigCacheAuthorizer, error) { + cache, err := bigcache.New(context.Background(), config) + if err != nil { + return nil, errors.Wrap(err, "bigcache.New") + } + + return &BigCacheAuthorizer{ + embedded: embedded, + cache: cache, + mtx: &sync.Mutex{}, + }, nil +} diff --git a/e2core/auth/cache.go b/e2core/auth/cache.go deleted file mode 100644 index 3043abc6..00000000 --- a/e2core/auth/cache.go +++ /dev/null @@ -1,79 +0,0 @@ -package auth - -import ( - "time" - - "github.com/suborbital/e2core/foundation/common" -) - -// NewAuthorizationCache creates a new AuthorizationCache -func NewAuthorizationCache(ttl time.Duration) *AuthorizationCache { - return newAuthorizationCache(common.SystemTime(), ttl) -} - -// NewAuthorizationCache creates a new AuthorizationCache -func newAuthorizationCache(clock common.Clock, ttl time.Duration) *AuthorizationCache { - store := common.NewTreeStore[*expiringContext]() - cache := common.NewLoadingCache[*expiringContext](store) - - return &AuthorizationCache{ - clock: clock, - ttl: ttl, - cache: cache, - } -} - -// AuthorizationCache caches authorization successful policy decisions for up to 10 minutes. -type AuthorizationCache struct { - clock common.Clock - ttl time.Duration - cache *common.LoadingCache[*expiringContext] -} - -// Get fetches a cached result if present; otherwise it executes newFunc to obtain the result. -func (cache AuthorizationCache) Get(key string, newFunc func() (*TenantInfo, error)) (*TenantInfo, error) { - // register newFunc if not previously known - if !cache.cache.Check(key) { - _ = cache.cache.Put(key, cache.loadingFunc(newFunc)) - } - - // return promptly if entry is found and valid - entry := cache.cache.Get(key) - if entry.Value != nil { - if entry.Value.exp.After(cache.clock.Now()) { - return entry.Value.ctx, entry.Error - } - // entry found but expired, refresh and await result - _ = cache.cache.Refresh(key) - entry = cache.cache.Get(key) - } - - if entry.Error != nil { - // reset entry state so subsequent requests run - cache.cache.Replace(key, cache.loadingFunc(newFunc)) - return nil, entry.Error - } - - return entry.Value.ctx, entry.Error -} - -// loadingFun wraps a loader func with an expiringContext loader -func (cache AuthorizationCache) loadingFunc(inner func() (*TenantInfo, error)) func() (*expiringContext, error) { - return func() (*expiringContext, error) { - ctx, err := inner() - if err != nil { - return nil, err - } - - return &expiringContext{ - exp: cache.clock.In(cache.ttl), - ctx: ctx, - }, nil - } -} - -// expiringContext wraps a value with an expiry -type expiringContext struct { - exp time.Time - ctx *TenantInfo -} diff --git a/e2core/auth/go-cache.go b/e2core/auth/go-cache.go new file mode 100644 index 00000000..a4aa9baf --- /dev/null +++ b/e2core/auth/go-cache.go @@ -0,0 +1,60 @@ +package auth + +import ( + "github.com/patrickmn/go-cache" + "github.com/pkg/errors" + + "github.com/suborbital/systemspec/system" +) + +// GoCacheAuthorizer has a go-cache (see https://github.com/patrickmn/go-cache) implementation. It wraps the live Authz Authorizer implementation. +type GoCacheAuthorizer struct { + cache *cache.Cache + embedded Authorizer +} + +// Authorize implements the Authorizer interface for GoCacheAuthorizer. +func (g GoCacheAuthorizer) Authorize(token system.Credential, identifier, namespace, name string) (TenantInfo, error) { + // construct key to check / retrieve / set value by. + key, err := deriveKey(token, identifier, namespace, name) + if err != nil { + return TenantInfo{}, errors.Wrap(err, "deriveKey") + } + + // grab the stored pointer for the key, if any. + ptr, found := g.cache.Get(key) + + // there wasn't any. + if !found { + // ask the embedded Authorizer for the TenantInfo. + ti, err := g.embedded.Authorize(token, identifier, namespace, name) + if err != nil { + return TenantInfo{}, errors.Wrap(err, "g.embedded.Authorize") + } + + // store in the cache. This does not return error. Set pointer for performance per the documentation. + g.cache.Set(key, &ti, cache.DefaultExpiration) + + // return a now stored TenantInfo. + return ti, nil + } + + // assert that the pointer stored in the cache is a pointer to a TenantInfo. + tiPtr, ok := ptr.(*TenantInfo) + if !ok { + return TenantInfo{}, errors.New("ptr.(*TenantInfo) not ok: pointer stored was not for a TenantInfo") + } + + // return the value of the pointer. TenantInfo is just a bunch of strings in a struct, passing values is fine. + return *tiPtr, nil +} + +// NewGoCacheAuthorizer returns a cache implementation of the Authorizer interface that wraps another Authorizer +// implementation (expected to be the Authz client). The error is not used, but leaving it here to match the signature +// of the Authz, and BigCache implementation constructors. +func NewGoCacheAuthorizer(embedded Authorizer) (*GoCacheAuthorizer, error) { + return &GoCacheAuthorizer{ + cache: cache.New(DefaultCacheTTL, DefaultCacheTTClean), + embedded: embedded, + }, nil +} diff --git a/e2core/server/server.go b/e2core/server/server.go index 0c879aa4..f6652b1c 100644 --- a/e2core/server/server.go +++ b/e2core/server/server.go @@ -70,7 +70,14 @@ func New(l zerolog.Logger, sync *syncer.Syncer, opts *options.Options) (*Server, } if opts.AdminEnabled() { - e.POST("/name/:ident/:namespace/:name", server.executePluginByNameHandler(), auth.AuthorizationMiddleware(opts)) + authAPI := auth.NewApiAuthClient(opts) + + authCache, err := auth.NewBigCacheAuthorizer(authAPI, auth.DefaultConfig) + if err != nil { + return nil, errors.Wrap(err, "auth.NewBigCacheAuthorizer") + } + + e.POST("/name/:ident/:namespace/:name", server.executePluginByNameHandler(), auth.AuthorizationMiddleware(authCache)) } else { e.POST("/name/:ident/:namespace/:name", server.executePluginByNameHandler()) e.POST("/ref/:ref", server.executePluginByRefHandler(ll)) diff --git a/foundation/common/cache.go b/foundation/common/cache.go deleted file mode 100644 index e7de9944..00000000 --- a/foundation/common/cache.go +++ /dev/null @@ -1,317 +0,0 @@ -package common - -import ( - "sync" - - art "github.com/plar/go-adaptive-radix-tree" -) - -const ( - // EntryInit marks a cache entry as initialized but not yet populated. - EntryInit EntryState = iota - // EntryPending indicates the cache entry is the process of being updated. - EntryPending - // EntryError indicates the last attempt to update a cache entry failed. - EntryError - // EntryCanceled indicates an update was attempted but canceled before completing. - EntryCanceled - // EntryReady indicates the cache entry is ready for use. - EntryReady -) - -type ( - // EntryState represents the current state of a CacheEntry. - EntryState uint8 - - // Value is an immutable snapshot of a CacheEntry's state. - Value[V any] struct { - State EntryState - Value V - Error error - } - - // Entry represents a cached entry. - Entry[V any] struct { - cond *sync.Cond - state EntryState - loadingFunc func() (V, error) - value V - err error - } - - // LoadingCache is an in-memory key-value store. - // Value lifecycles are managed by the cache until their key is dropped. - LoadingCache[V any] struct { - lock *sync.Mutex - entries CacheStore[V] - } -) - -type CacheStore[V any] interface { - Get(string) *Entry[V] - Put(string, *Entry[V]) - Delete(string) -} - -func NewMapStore[V any]() MapStore[V] { - return MapStore[V]{ - store: make(map[string]*Entry[V]), - } -} - -type MapStore[V any] struct { - store map[string]*Entry[V] -} - -func (store MapStore[V]) Get(key string) *Entry[V] { - return store.store[key] -} - -func (store MapStore[V]) Put(key string, val *Entry[V]) { - store.store[key] = val -} - -func (store MapStore[V]) Delete(key string) { - delete(store.store, key) -} - -func NewTreeStore[V any]() TreeStore[V] { - return TreeStore[V]{ - store: art.New(), - } -} - -type TreeStore[V any] struct { - store art.Tree -} - -func (store TreeStore[V]) Get(key string) *Entry[V] { - if val, ok := store.store.Search(art.Key(key)); ok { - return val.(*Entry[V]) - } - - return nil -} - -func (store TreeStore[V]) Put(key string, val *Entry[V]) { - store.store.Insert(art.Key(key), val) -} - -func (store TreeStore[V]) Delete(key string) { - store.store.Delete(art.Key(key)) -} - -// String returns EntryState as a string. -func (state EntryState) String() string { - var name string - switch state { - case EntryInit: - name = "[EntryState=INIT]" - case EntryPending: - name = "[EntryState=PENDING]" - case EntryError: - name = "[EntryState=ERROR]" - case EntryCanceled: - name = "[EntryState=CANCELED]" - case EntryReady: - name = "[EntryState=READY]" - } - - return name -} - -// NewLoadingCache returns a new instance of LoadingCache[V]. -func NewLoadingCache[V any](store CacheStore[V]) *LoadingCache[V] { - return &LoadingCache[V]{ - lock: new(sync.Mutex), - entries: store, - } -} - -// Get the Entry associated with Key, loading a new instance if unknown. -// If Entry exists and is EntryPending this call parks the caller until the update is complete. -func (cache *LoadingCache[V]) Get(key string) Value[V] { - cache.lock.Lock() - defer cache.lock.Unlock() - - entry := cache.entries.Get(key) - if entry == nil { - return Value[V]{ - State: EntryError, - Error: DoesNotExistError("[LoadingCache] Get"), - } - } - - switch entry.state { - case EntryError: - return Value[V]{ - State: entry.state, - Value: entry.value, - Error: entry.err, - } - case EntryInit: - asyncLoad(entry) - fallthrough - case EntryPending: - // park routine until update completes - for entry.state == EntryPending { - entry.cond.Wait() - } - fallthrough - default: // Ready | Failed | Canceled - return Value[V]{ - State: entry.state, - Value: entry.value, - Error: entry.err, - } - } -} - -// Check returns true if a key is present in the cache, else false. -func (cache *LoadingCache[V]) Check(key string) bool { - cache.lock.Lock() - found := cache.entries.Get(key) != nil - cache.lock.Unlock() - - return found -} - -// Put creates a new entry in the LoadingCache using the supplied loadingFunc. -// All entries are loaded lazily meaning loadingFunc is not invoked until LoadingCache.Get(key) is called. -func (cache *LoadingCache[V]) Put(key string, loadingFunc func() (V, error)) error { - // Optimistic look up to avoid lock contention. - entry := cache.entries.Get(key) - if entry != nil { - return DuplicateEntryError("[LoadingCache] Put") - } - - cache.lock.Lock() - defer cache.lock.Unlock() - - // verify observed state is still valid - entry = cache.entries.Get(key) - if entry != nil { - return DuplicateEntryError("[LoadingCache] Put") - } - - cache.entries.Put(key, &Entry[V]{ - cond: sync.NewCond(cache.lock), - state: EntryInit, - loadingFunc: loadingFunc}) - - return nil -} - -// Replace overwrites the loadingFunc for an existing key. -// If there is no key present a new one will be created. -func (cache *LoadingCache[V]) Replace(key string, loadingFunc func() (V, error)) { - cache.lock.Lock() - - entry := cache.entries.Get(key) - if entry == nil { - cache.entries.Put(key, &Entry[V]{ - cond: sync.NewCond(cache.lock), - state: EntryInit, - loadingFunc: loadingFunc, - }) - } else { - entry.loadingFunc = loadingFunc - entry.state = EntryInit - } - - cache.lock.Unlock() -} - -// Refresh updates a cache Entry asynchronously. -// Calling LoadingCache.Get(key) immediately allows the caller to await the result. -func (cache *LoadingCache[V]) Refresh(key string) error { - cache.lock.Lock() - - entry := cache.entries.Get(key) - if entry == nil { - cache.lock.Unlock() - return DoesNotExistError("[LoadingCache] Pending") - } - - asyncLoad(entry) - cache.lock.Unlock() - - return nil -} - -// Cancel cancels an update and notifies all watchers. -func (cache *LoadingCache[V]) Cancel(key string) { - cache.lock.Lock() - - entry := cache.entries.Get(key) - if entry == nil { - cache.lock.Unlock() - return - } - - if entry.state <= EntryPending { - entry.err = ErrCanceled - entry.state = EntryCanceled - } - - cache.lock.Unlock() - entry.cond.Broadcast() -} - -// Drop removes the entry associated with key from the cache. -func (cache *LoadingCache[V]) Drop(key string) { - cache.lock.Lock() - defer cache.lock.Unlock() - - entry := cache.entries.Get(key) - if entry == nil { - return - } - - cache.entries.Delete(key) - // ensure asyncLoad does not repopulate entry on completion. - if entry.state == EntryPending { - entry.state = EntryCanceled - } - - entry.cond.Broadcast() -} - -// Cache lock must be held when calling asyncLoad. -func asyncLoad[V any](entry *Entry[V]) { - if entry == nil { - return - } - - entry.state = EntryPending - - go func() { - // execute potentially expensive operation without holding lock - value, err := entry.loadingFunc() - - // reacquire lock - entry.cond.L.Lock() - - if entry.state == EntryCanceled { - // no-op, throw-away results - entry.err = ErrCanceled - entry.cond.L.Unlock() - return - } - - // do not override value on err - if err == nil { - entry.state = EntryReady - entry.value = value - entry.err = err - } else { - entry.state = EntryError - entry.err = err - } - - // notify watchers - entry.cond.Broadcast() - - entry.cond.L.Unlock() - }() -} diff --git a/foundation/common/cache_test.go b/foundation/common/cache_test.go deleted file mode 100644 index cfe98242..00000000 --- a/foundation/common/cache_test.go +++ /dev/null @@ -1,463 +0,0 @@ -package common - -import ( - "sync" - "testing" - - "github.com/google/uuid" - "github.com/stretchr/testify/assert" -) - -func TestLoadingCache_Get(t *testing.T) { - type args struct { - putKey string - getKey string - failLoader bool - } - - type test struct { - name string - args args - want Value[int] - } - - tests := []test{ - { - name: "Test LoadingCache Get", - args: args{ - putKey: "count", - getKey: "count", - }, - want: Value[int]{ - State: EntryReady, - Value: 1, - Error: nil, - }, - }, - { - name: "Test LoadingCache Get invalid key", - args: args{ - putKey: "count", - getKey: "invalid", - }, - want: Value[int]{ - State: EntryError, - Value: 0, - Error: ErrNotExists, - }, - }, - { - name: "Test LoadingCache failed load operation", - args: args{ - putKey: "count", - getKey: "count", - failLoader: true, - }, - want: Value[int]{ - State: EntryError, - Value: 0, - Error: ErrNotExists, - }, - }, - } - - for _, tc := range tests { - // instructs the loader function to complete - complete := make(chan struct{}, 1) - // signals the test to proceed - step := make(chan struct{}, 1) - - cache := NewLoadingCache[int](NewTreeStore[int]()) - count := 0 - - cache.Put(tc.args.putKey, func() (int, error) { - <-complete - count += 1 - if tc.args.failLoader { - // value should not be overwritten on failure to load - return -1, tc.want.Error - } - - return count, nil - - }) - - var value Value[int] - go func() { - step <- struct{}{} - value = cache.Get(tc.args.getKey) - - assert.Equal(t, tc.want.State, value.State) - assert.Equal(t, tc.want.Value, value.Value) - assert.ErrorIs(t, value.Error, tc.want.Error) - - step <- struct{}{} - }() - - <-step - - // Ensure unrelated keys aren't blocked by loading func - other := uuid.NewString() - cache.Put(other, func() (int, error) { - return -1, nil - }) - assert.Equal(t, -1, cache.Get(other).Value) - - // cache.Get should be blocked awaiting a response - assert.Zero(t, value.Value) - - // complete loader func - complete <- struct{}{} - close(complete) - - <-step - - // should not invoke loader again; deadlock detector will panic since there are no more references to complete - value = cache.Get(tc.args.getKey) - - assert.Equal(t, tc.want.Value, value.Value) - assert.Equal(t, tc.want.State, value.State) - assert.ErrorIs(t, value.Error, tc.want.Error) - } -} - -func TestLoadingCache_Refresh(t *testing.T) { - type args struct { - putKey string - getKey string - failLoader bool - } - - type test struct { - name string - args args - want Value[int] - } - - tests := []test{ - { - name: "Test LoadingCache Get", - args: args{ - putKey: "count", - getKey: "count", - }, - want: Value[int]{ - State: EntryReady, - Value: 2, - Error: nil, - }, - }, - { - name: "Test LoadingCache Get invalid key", - args: args{ - putKey: "count", - getKey: "invalid", - }, - want: Value[int]{ - State: EntryError, - Value: 0, - Error: ErrNotExists, - }, - }, - { - name: "Test LoadingCache failed load operation", - args: args{ - putKey: "count", - getKey: "count", - failLoader: true, - }, - want: Value[int]{ - State: EntryError, - Value: 1, - Error: ErrNotExists, - }, - }, - } - - for _, tc := range tests { - // instructs the loader function to complete - complete := make(chan struct{}, 1) - // signals the test to proceed - step := make(chan struct{}, 1) - - cache := NewLoadingCache[int](NewTreeStore[int]()) - - count := -2 - cache.Put(tc.args.putKey, func() (int, error) { - if count == -2 { - count = 1 - return count, nil - } - - <-complete - - count += 1 - if tc.args.failLoader { - // value should not be overwritten on failure to load - return -1, tc.want.Error - } - - return count, nil - - }) - - // value initialization should always succeed - value := cache.Get(tc.args.putKey) - - go func() { - step <- struct{}{} - _ = cache.Refresh(tc.args.putKey) - }() - - <-step - - go func() { - step <- struct{}{} - value = cache.Get(tc.args.getKey) - }() - - <-step - close(step) - - complete <- struct{}{} - close(complete) - - value = cache.Get(tc.args.getKey) - assert.Equal(t, tc.want.State, value.State) - assert.Equal(t, tc.want.Value, value.Value) - assert.ErrorIs(t, value.Error, tc.want.Error) - } -} - -func TestLoadingCache_Cancel(t *testing.T) { - type args struct { - putKey string - getKey string - failLoader bool - } - - type test struct { - name string - args args - want Value[int] - } - - tests := []test{ - { - name: "Test LoadingCache Cancel", - args: args{ - putKey: "count", - getKey: "count", - }, - want: Value[int]{ - State: EntryCanceled, - Value: 0, - Error: ErrCanceled, - }, - }, - { - name: "Test LoadingCache Cancel invalid key", - args: args{ - putKey: "count", - getKey: "invalid", - }, - want: Value[int]{ - State: EntryError, - Value: 0, - Error: ErrNotExists, - }, - }, - { - name: "Test LoadingCache failed load operation", - args: args{ - putKey: "count", - getKey: "count", - failLoader: true, - }, - want: Value[int]{ - State: EntryCanceled, - Value: 0, - Error: ErrCanceled, - }, - }, - } - - for _, tc := range tests { - result := make(chan Value[int], 1) - // instructs the loader function to complete - complete := make(chan struct{}, 1) - // signals the test to proceed - step := make(chan struct{}, 1) - - cache := NewLoadingCache[int](NewTreeStore[int]()) - - cache.Put(tc.args.putKey, func() (int, error) { - <-complete - if tc.args.failLoader { - // value should not be overwritten on failure to load - return -1, tc.want.Error - } - - return 2, nil - }) - - go func() { - step <- struct{}{} - result <- cache.Get(tc.args.getKey) - }() - - // cache.Get() parked, load func still blocked - <-step - - // wake up cache.Get(key) watchers, cancel update - cache.Cancel(tc.args.getKey) - value := <-result - - close(step) - // close unblocks receiver - close(complete) - close(result) - - assert.Equal(t, tc.want.State, value.State) - assert.Equal(t, tc.want.Value, value.Value) - assert.ErrorIs(t, value.Error, tc.want.Error) - } -} - -func TestLoadingCache_Drop(t *testing.T) { - type args struct { - putKey string - getKey string - failLoader bool - } - - type test struct { - name string - args args - want Value[int] - } - - tests := []test{ - { - name: "Test LoadingCache Drop", - args: args{ - putKey: "count", - getKey: "count", - }, - want: Value[int]{ - State: EntryCanceled, - Value: 0, - Error: nil, - }, - }, - { - name: "Test LoadingCache Drop, invalid key", - args: args{ - putKey: "count", - getKey: "invalid", - }, - want: Value[int]{ - State: EntryError, - Value: 0, - Error: ErrNotExists, - }, - }, - { - name: "Test LoadingCache Drop, failed load operation", - args: args{ - putKey: "count", - getKey: "count", - failLoader: true, - }, - want: Value[int]{ - State: EntryCanceled, - Value: 0, - Error: nil, - }, - }, - } - - for _, tc := range tests { - // signals async cache.Get() has returned - result := make(chan Value[int], 1) - // instructs the loader function to complete - complete := make(chan struct{}, 1) - // signals the test to proceed - step := make(chan struct{}, 1) - - cache := NewLoadingCache[int](NewTreeStore[int]()) - - cache.Put(tc.args.putKey, func() (int, error) { - <-complete - if tc.args.failLoader { - // value should not be overwritten on failure to load - return -1, tc.want.Error - } - - return 2, nil - }) - - go func() { - step <- struct{}{} - result <- cache.Get(tc.args.getKey) - }() - - // cache.Get() scheduled, load func still blocked - <-step - close(step) - - // wake up cache.Get(key) watchers, cancel update - cache.Drop(tc.args.getKey) - - // pending request should be canceled - value := <-result - close(result) - - assert.Equal(t, tc.want.State, value.State) - assert.Equal(t, tc.want.Value, value.Value) - assert.ErrorIs(t, value.Error, tc.want.Error) - - // unblock loader func - complete <- struct{}{} - close(complete) - - // subsequent requests should fail because key is no longer present - value = cache.Get(tc.args.getKey) - assert.Equal(t, EntryError, value.State) - assert.Equal(t, 0, value.Value) - assert.ErrorIs(t, value.Error, ErrNotExists) - } -} - -func TestLoadingCache_DuplicateRegistration(t *testing.T) { - wg := sync.WaitGroup{} - // synchronized collection to capture response - errs := make(chan error, 10) - - cache := NewLoadingCache[int](NewTreeStore[int]()) - - for i := 0; i < 10; i++ { - wg.Add(1) - go func() { - errs <- cache.Put("key", func() (int, error) { - return 1, nil - }) - wg.Done() - }() - } - - // wait for all routines to complete - wg.Wait() - close(errs) - - errCount := 0 - for err := range errs { - // record presence of target error - if IsError(err, ErrExists) { - errCount += 1 - } - } - - // ensure all but one put failed with the correct error type - assert.Equal(t, 9, errCount) -} diff --git a/go.mod b/go.mod index 0ec8a001..026aece6 100644 --- a/go.mod +++ b/go.mod @@ -3,11 +3,13 @@ module github.com/suborbital/e2core go 1.20 require ( + github.com/allegro/bigcache/v3 v3.1.0 github.com/bytecodealliance/wasmtime-go v1.0.0 github.com/google/uuid v1.3.0 github.com/gorilla/websocket v1.5.0 github.com/labstack/echo/v4 v4.10.2 github.com/nats-io/nats.go v1.24.0 + github.com/patrickmn/go-cache v2.1.0+incompatible github.com/pkg/errors v0.9.1 github.com/plar/go-adaptive-radix-tree v1.0.5 github.com/rs/zerolog v1.29.0 diff --git a/go.sum b/go.sum index 46601407..5388367a 100644 --- a/go.sum +++ b/go.sum @@ -39,6 +39,8 @@ github.com/Microsoft/go-winio v0.5.2 h1:a9IhgEQBCUEk6QCdml9CiJGhAws+YwffDHEMp1VM github.com/Microsoft/go-winio v0.5.2/go.mod h1:WpS1mjBmmwHBEWmogvA2mj8546UReBk4v8QkMxJ6pZY= github.com/Microsoft/hcsshim v0.9.7 h1:mKNHW/Xvv1aFH87Jb6ERDzXTJTLPlmzfZ28VBFD/bfg= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= +github.com/allegro/bigcache/v3 v3.1.0 h1:H2Vp8VOvxcrB91o86fUSVJFqeuz8kpyyB02eH3bSzwk= +github.com/allegro/bigcache/v3 v3.1.0/go.mod h1:aPyh7jEvrog9zAwx5N7+JUQX5dZTSGpxF1LAR4dr35I= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/bytecodealliance/wasmtime-go v1.0.0 h1:9u9gqaUiaJeN5IoD1L7egD8atOnTGyJcNp8BhkL9cUU= github.com/bytecodealliance/wasmtime-go v1.0.0/go.mod h1:jjlqQbWUfVSbehpErw3UoWFndBXRRMvfikYH6KsCwOg= @@ -233,6 +235,8 @@ github.com/opencontainers/runc v1.1.3 h1:vIXrkId+0/J2Ymu2m7VjGvbSlAId9XNRPhn2p4b github.com/opencontainers/runc v1.1.3/go.mod h1:1J5XiS+vdZ3wCyZybsuxXZWGrgSr8fFJHLXuG2PsnNg= github.com/opencontainers/runtime-spec v1.0.3-0.20210326190908-1c3f411f0417/go.mod h1:jwyrGlmzljRJv/Fgzds9SsS/C5hL+LL3ko9hs6T5lQ0= github.com/opencontainers/selinux v1.10.0/go.mod h1:2i0OySw99QjzBBQByd1Gr9gSjvuho1lHsJxIJ3gGbJI= +github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= +github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pierrec/lz4/v4 v4.1.17 h1:kV4Ip+/hUBC+8T6+2EgburRtkE9ef4nbY3f4dFhGjMc= github.com/pierrec/lz4/v4 v4.1.17/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= From 379f9da02421a90b9bb61f1c65c6ad59a3473f4c Mon Sep 17 00:00:00 2001 From: Gabor Javorszky Date: Mon, 20 Mar 2023 20:05:12 +0000 Subject: [PATCH 2/6] Add both cache tests and refine go tests --- e2core/auth/authorizer_test.go | 270 ++++++++++++++++++++++----------- e2core/auth/go-cache.go | 12 +- 2 files changed, 193 insertions(+), 89 deletions(-) diff --git a/e2core/auth/authorizer_test.go b/e2core/auth/authorizer_test.go index 78d0c357..516e73f3 100644 --- a/e2core/auth/authorizer_test.go +++ b/e2core/auth/authorizer_test.go @@ -19,7 +19,9 @@ import ( type CompareAssertionFunc[T any] func(t *testing.T, actual T) bool -func TestAuthorizerGoCache_ConcurrentRequests(t *testing.T) { +func TestAuthorizerCache_ConcurrentRequests(t *testing.T) { + const loopTimes = 1000 + tests := []struct { name string token string @@ -66,10 +68,9 @@ func TestAuthorizerGoCache_ConcurrentRequests(t *testing.T) { handler: func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusForbidden) }, - // Although each request is effectively sent at the same time this is ultimately up to the scheduler. - // We allow for up to 10% of the requests to go through to avoid flakiness in CI environments. + // A request denied response is not cached, which means all 1000 tries need to be accounted for. assertOpts: func(t *testing.T, actual uint64) bool { - return assert.LessOrEqual(t, actual, uint64(100)) + return assert.LessOrEqual(t, actual, uint64(loopTimes)) }, assertErr: func(_ assert.TestingT, err error, _ ...interface{}) bool { return common.IsError(err, common.ErrAccess) || common.IsError(err, common.ErrCanceled) @@ -79,41 +80,76 @@ func TestAuthorizerGoCache_ConcurrentRequests(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - var opts uint64 = 0 - svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - atomic.AddUint64(&opts, 1) - test.handler(w, r) - })) - - apiAuthorizer := &APIAuthorizer{ - httpClient: svr.Client(), - location: svr.URL + "/environment/v1/tenant/%s", - } - - // NewGoCacheAuthorizer always returns nil error. - goCacheAuthorizer, err := NewBigCacheAuthorizer(apiAuthorizer, DefaultConfig) - require.NoError(t, err, "initialising new big cache authorizer") - - wg := sync.WaitGroup{} - for i := 0; i < 30; i++ { - wg.Add(1) - go func() { - _, err = goCacheAuthorizer.Authorize(NewAccessToken(test.token), "env.app", "namespace", "mod") - wg.Done() - }() - } - wg.Wait() - - svr.Close() - - test.assertErr(t, err) - test.assertOpts(t, opts) + t.Run("using Big Cache", func(t *testing.T) { + var opts uint64 = 0 + svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + atomic.AddUint64(&opts, 1) + test.handler(w, r) + })) + + apiAuthorizer := &APIAuthorizer{ + httpClient: svr.Client(), + location: svr.URL + "/environment/v1/tenant/%s", + } + + // NewGoCacheAuthorizer always returns nil error. + bigCacheAuthorizer, err := NewBigCacheAuthorizer(apiAuthorizer, DefaultConfig) + require.NoError(t, err, "initialising new big cache authorizer") + + wg := sync.WaitGroup{} + for i := 0; i < loopTimes; i++ { + wg.Add(1) + go func() { + _, err = bigCacheAuthorizer.Authorize(NewAccessToken(test.token), "env.app", "namespace", "mod") + wg.Done() + }() + } + wg.Wait() + + svr.Close() + + test.assertErr(t, err) + test.assertOpts(t, opts) + }) + + t.Run("using Go Cache", func(t *testing.T) { + var opts uint64 = 0 + svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + atomic.AddUint64(&opts, 1) + test.handler(w, r) + })) + + apiAuthorizer := &APIAuthorizer{ + httpClient: svr.Client(), + location: svr.URL + "/environment/v1/tenant/%s", + } + + // NewGoCacheAuthorizer always returns nil error. + goCacheAuthorizer, err := NewGoCacheAuthorizer(apiAuthorizer, DefaultCacheTTL, DefaultCacheTTClean) + require.NoError(t, err, "initialising new go cache authorizer") + + wg := sync.WaitGroup{} + for i := 0; i < 1000; i++ { + wg.Add(1) + go func() { + _, err = goCacheAuthorizer.Authorize(NewAccessToken(test.token), "env.app", "namespace", "mod") + wg.Done() + }() + } + wg.Wait() + + svr.Close() + + test.assertErr(t, err) + test.assertOpts(t, opts) + }) + }) } } -func TestAuthorizerGoCache(t *testing.T) { +func TestAuthorizerCache(t *testing.T) { type args struct { token string identifier string @@ -255,41 +291,65 @@ func TestAuthorizerGoCache(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - var opts uint64 = 0 - svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - atomic.AddUint64(&opts, 1) - tc.handler(w, r) - })) + t.Run("using Big cache", func(t *testing.T) { + var opts uint64 = 0 + svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + atomic.AddUint64(&opts, 1) + tc.handler(w, r) + })) + + apiAuthorizer := &APIAuthorizer{ + httpClient: svr.Client(), + location: svr.URL + "/api/v2/tenant/%s", + } + + bigCacheAuthorizer, err := NewBigCacheAuthorizer(apiAuthorizer, DefaultConfig) + require.NoError(t, err, "new big cache authorizer") + + for _, arg := range tc.args { + _, err = bigCacheAuthorizer.Authorize(NewAccessToken(arg.token), arg.identifier, arg.namespace, arg.mod) + } + + svr.Close() + + assert.ErrorIs(t, err, tc.wantErr) + assert.True(t, tc.assertOpts(t, opts)) + }) + + t.Run("using Big Cache", func(t *testing.T) { + var opts uint64 = 0 + svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + atomic.AddUint64(&opts, 1) + tc.handler(w, r) + })) - apiAuthorizer := &APIAuthorizer{ - httpClient: svr.Client(), - location: svr.URL + "/api/v2/tenant/%s", - } + apiAuthorizer := &APIAuthorizer{ + httpClient: svr.Client(), + location: svr.URL + "/api/v2/tenant/%s", + } + + goCacheAuthorizer, _ := NewGoCacheAuthorizer(apiAuthorizer, DefaultCacheTTL, DefaultCacheTTClean) - goCacheAuthorizer, _ := NewGoCacheAuthorizer(apiAuthorizer) + var err error + for _, arg := range tc.args { + _, err = goCacheAuthorizer.Authorize(NewAccessToken(arg.token), arg.identifier, arg.namespace, arg.mod) + } - var err error - for _, arg := range tc.args { - _, err = goCacheAuthorizer.Authorize(NewAccessToken(arg.token), arg.identifier, arg.namespace, arg.mod) - } + svr.Close() - svr.Close() + assert.ErrorIs(t, err, tc.wantErr) + assert.True(t, tc.assertOpts(t, opts)) + }) - assert.ErrorIs(t, err, tc.wantErr) - assert.True(t, tc.assertOpts(t, opts)) }) } } func TestAuthorizerCache_ExpiringEntry(t *testing.T) { - type args struct { - ttl time.Duration - } - type test struct { name string - args args + ttl time.Duration handler http.HandlerFunc assertOpts CompareAssertionFunc[uint64] wantErr error @@ -298,7 +358,7 @@ func TestAuthorizerCache_ExpiringEntry(t *testing.T) { tests := []test{ { name: "Ensure expired tokens are refreshed", - args: args{ttl: 1 * time.Second}, + ttl: 1 * time.Second, handler: func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) _ = json.NewEncoder(w).Encode(&TenantInfo{ @@ -315,40 +375,76 @@ func TestAuthorizerCache_ExpiringEntry(t *testing.T) { } for _, tc := range tests { - var opts uint64 = 0 - svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - atomic.AddUint64(&opts, 1) - tc.handler(w, r) - })) - - authorizer := &APIAuthorizer{ - httpClient: svr.Client(), - location: svr.URL + "/environment/v1/tenant/%s", - } - - bigCacheAuthorizer, err := NewBigCacheAuthorizer(authorizer, bigcache.Config{ - Shards: 4, - LifeWindow: 3 * time.Second, - CleanWindow: time.Second, - MaxEntriesInWindow: 200, - MaxEntrySize: 500, - }) + t.Run(tc.name, func(t *testing.T) { + t.Run("using Go cache", func(t *testing.T) { + var opts uint64 = 0 + svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + atomic.AddUint64(&opts, 1) + tc.handler(w, r) + })) + + authorizer := &APIAuthorizer{ + httpClient: svr.Client(), + location: svr.URL + "/environment/v1/tenant/%s", + } + + goCacheAuthorizer, err := NewGoCacheAuthorizer(authorizer, tc.ttl, tc.ttl) + + // 1 auth op + _, err = goCacheAuthorizer.Authorize(NewAccessToken("token"), "env.app", "namespace", "mod") + _, err = goCacheAuthorizer.Authorize(NewAccessToken("token"), "env.app", "namespace", "mod") + _, err = goCacheAuthorizer.Authorize(NewAccessToken("token"), "env.app", "namespace", "mod") - // 1 auth op - _, err = bigCacheAuthorizer.Authorize(NewAccessToken("token"), "env.app", "namespace", "mod") - _, err = bigCacheAuthorizer.Authorize(NewAccessToken("token"), "env.app", "namespace", "mod") - _, err = bigCacheAuthorizer.Authorize(NewAccessToken("token"), "env.app", "namespace", "mod") + time.Sleep(tc.ttl + time.Second) - time.Sleep(4 * time.Second) + // 2 auth op + _, err = goCacheAuthorizer.Authorize(NewAccessToken("token"), "env.app", "namespace", "mod") + _, err = goCacheAuthorizer.Authorize(NewAccessToken("token"), "env.app", "namespace", "mod") + _, err = goCacheAuthorizer.Authorize(NewAccessToken("token"), "env.app", "namespace", "mod") - // 2 auth op - _, err = bigCacheAuthorizer.Authorize(NewAccessToken("token"), "env.app", "namespace", "mod") - _, err = bigCacheAuthorizer.Authorize(NewAccessToken("token"), "env.app", "namespace", "mod") - _, err = bigCacheAuthorizer.Authorize(NewAccessToken("token"), "env.app", "namespace", "mod") + svr.Close() - svr.Close() + assert.ErrorIs(t, err, tc.wantErr) + assert.True(t, tc.assertOpts(t, opts)) + }) - assert.ErrorIs(t, err, tc.wantErr) - assert.True(t, tc.assertOpts(t, opts)) + t.Run("using Big cache", func(t *testing.T) { + var opts uint64 = 0 + svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + atomic.AddUint64(&opts, 1) + tc.handler(w, r) + })) + + authorizer := &APIAuthorizer{ + httpClient: svr.Client(), + location: svr.URL + "/environment/v1/tenant/%s", + } + + bigCacheAuthorizer, err := NewBigCacheAuthorizer(authorizer, bigcache.Config{ + Shards: 1, + LifeWindow: tc.ttl, + CleanWindow: time.Second, + MaxEntriesInWindow: 200, + MaxEntrySize: 500, + }) + + // 1 auth op + _, err = bigCacheAuthorizer.Authorize(NewAccessToken("token"), "env.app", "namespace", "mod") + _, err = bigCacheAuthorizer.Authorize(NewAccessToken("token"), "env.app", "namespace", "mod") + _, err = bigCacheAuthorizer.Authorize(NewAccessToken("token"), "env.app", "namespace", "mod") + + time.Sleep(tc.ttl + time.Second) + + // 2 auth op + _, err = bigCacheAuthorizer.Authorize(NewAccessToken("token"), "env.app", "namespace", "mod") + _, err = bigCacheAuthorizer.Authorize(NewAccessToken("token"), "env.app", "namespace", "mod") + _, err = bigCacheAuthorizer.Authorize(NewAccessToken("token"), "env.app", "namespace", "mod") + + svr.Close() + + assert.ErrorIs(t, err, tc.wantErr) + assert.True(t, tc.assertOpts(t, opts)) + }) + }) } } diff --git a/e2core/auth/go-cache.go b/e2core/auth/go-cache.go index a4aa9baf..b14373cd 100644 --- a/e2core/auth/go-cache.go +++ b/e2core/auth/go-cache.go @@ -1,6 +1,9 @@ package auth import ( + "sync" + "time" + "github.com/patrickmn/go-cache" "github.com/pkg/errors" @@ -11,10 +14,14 @@ import ( type GoCacheAuthorizer struct { cache *cache.Cache embedded Authorizer + mtx *sync.Mutex } // Authorize implements the Authorizer interface for GoCacheAuthorizer. func (g GoCacheAuthorizer) Authorize(token system.Credential, identifier, namespace, name string) (TenantInfo, error) { + g.mtx.Lock() + defer g.mtx.Unlock() + // construct key to check / retrieve / set value by. key, err := deriveKey(token, identifier, namespace, name) if err != nil { @@ -52,9 +59,10 @@ func (g GoCacheAuthorizer) Authorize(token system.Credential, identifier, namesp // NewGoCacheAuthorizer returns a cache implementation of the Authorizer interface that wraps another Authorizer // implementation (expected to be the Authz client). The error is not used, but leaving it here to match the signature // of the Authz, and BigCache implementation constructors. -func NewGoCacheAuthorizer(embedded Authorizer) (*GoCacheAuthorizer, error) { +func NewGoCacheAuthorizer(embedded Authorizer, ttl, cleanTTL time.Duration) (*GoCacheAuthorizer, error) { return &GoCacheAuthorizer{ - cache: cache.New(DefaultCacheTTL, DefaultCacheTTClean), + cache: cache.New(ttl, cleanTTL), embedded: embedded, + mtx: &sync.Mutex{}, }, nil } From b202291ef4f1f9112114a82e6f5e320946e8606a Mon Sep 17 00:00:00 2001 From: Gabor Javorszky Date: Mon, 20 Mar 2023 20:16:14 +0000 Subject: [PATCH 3/6] Add benchmark for gocache vs bigcache --- e2core/auth/authorizer_test.go | 55 ++++++++++++++++++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/e2core/auth/authorizer_test.go b/e2core/auth/authorizer_test.go index 516e73f3..b3142563 100644 --- a/e2core/auth/authorizer_test.go +++ b/e2core/auth/authorizer_test.go @@ -2,6 +2,7 @@ package auth import ( "encoding/json" + "fmt" "net/http" "net/http/httptest" "strings" @@ -448,3 +449,57 @@ func TestAuthorizerCache_ExpiringEntry(t *testing.T) { }) } } + +func Benchmark(b *testing.B) { + opts := int32(0) + + svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt32(&opts, 1) + w.WriteHeader(http.StatusOK) + _ = json.NewEncoder(w).Encode(&TenantInfo{ + AuthorizedParty: fmt.Sprintf("tester-%d", opts), + Environment: fmt.Sprintf("env-%d", opts), + ID: fmt.Sprintf("123-%d", opts), + Name: fmt.Sprintf("functionname-%d", opts), + }) + })) + + authorizer := &APIAuthorizer{ + httpClient: svr.Client(), + location: svr.URL + "/environment/v1/tenant/%s", + } + + benchmarks := []struct { + name string + cacheProvider func() Authorizer + }{ + { + name: "using Go cache", + cacheProvider: func() Authorizer { + goc, _ := NewGoCacheAuthorizer(authorizer, DefaultCacheTTL, DefaultCacheTTClean) + return goc + }, + }, + { + name: "using Big cache", + cacheProvider: func() Authorizer { + bigc, _ := NewBigCacheAuthorizer(authorizer, DefaultConfig) + return bigc + }, + }, + } + for _, bm := range benchmarks { + b.Run(bm.name, func(b *testing.B) { + a := bm.cacheProvider() + for i := 0; i < b.N; i++ { + sfx := b.N / 1000 + _, _ = a.Authorize( + NewAccessToken(fmt.Sprintf("sometoken-%d", sfx)), + fmt.Sprintf("ident-%d", sfx), + fmt.Sprintf("namespace-%d", sfx), + fmt.Sprintf("fnName-%d", sfx), + ) + } + }) + } +} From 8d9100036f25c3e298147d86c65a3e282054c832 Mon Sep 17 00:00:00 2001 From: Gabor Javorszky Date: Mon, 20 Mar 2023 20:16:49 +0000 Subject: [PATCH 4/6] Rename default config to be big cache specific --- e2core/auth/authorizer_test.go | 6 +++--- e2core/auth/bigcache.go | 2 +- e2core/server/server.go | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/e2core/auth/authorizer_test.go b/e2core/auth/authorizer_test.go index b3142563..aad523a3 100644 --- a/e2core/auth/authorizer_test.go +++ b/e2core/auth/authorizer_test.go @@ -94,7 +94,7 @@ func TestAuthorizerCache_ConcurrentRequests(t *testing.T) { } // NewGoCacheAuthorizer always returns nil error. - bigCacheAuthorizer, err := NewBigCacheAuthorizer(apiAuthorizer, DefaultConfig) + bigCacheAuthorizer, err := NewBigCacheAuthorizer(apiAuthorizer, DefaultBigCacheConfig) require.NoError(t, err, "initialising new big cache authorizer") wg := sync.WaitGroup{} @@ -304,7 +304,7 @@ func TestAuthorizerCache(t *testing.T) { location: svr.URL + "/api/v2/tenant/%s", } - bigCacheAuthorizer, err := NewBigCacheAuthorizer(apiAuthorizer, DefaultConfig) + bigCacheAuthorizer, err := NewBigCacheAuthorizer(apiAuthorizer, DefaultBigCacheConfig) require.NoError(t, err, "new big cache authorizer") for _, arg := range tc.args { @@ -483,7 +483,7 @@ func Benchmark(b *testing.B) { { name: "using Big cache", cacheProvider: func() Authorizer { - bigc, _ := NewBigCacheAuthorizer(authorizer, DefaultConfig) + bigc, _ := NewBigCacheAuthorizer(authorizer, DefaultBigCacheConfig) return bigc }, }, diff --git a/e2core/auth/bigcache.go b/e2core/auth/bigcache.go index e17531e8..c82ed634 100644 --- a/e2core/auth/bigcache.go +++ b/e2core/auth/bigcache.go @@ -12,7 +12,7 @@ import ( "github.com/suborbital/systemspec/system" ) -var DefaultConfig = bigcache.Config{ +var DefaultBigCacheConfig = bigcache.Config{ Shards: 1, LifeWindow: DefaultCacheTTL, CleanWindow: DefaultCacheTTClean, diff --git a/e2core/server/server.go b/e2core/server/server.go index f6652b1c..9093996b 100644 --- a/e2core/server/server.go +++ b/e2core/server/server.go @@ -72,7 +72,7 @@ func New(l zerolog.Logger, sync *syncer.Syncer, opts *options.Options) (*Server, if opts.AdminEnabled() { authAPI := auth.NewApiAuthClient(opts) - authCache, err := auth.NewBigCacheAuthorizer(authAPI, auth.DefaultConfig) + authCache, err := auth.NewBigCacheAuthorizer(authAPI, auth.DefaultBigCacheConfig) if err != nil { return nil, errors.Wrap(err, "auth.NewBigCacheAuthorizer") } From f39345abe84d87e5e9d4a727460355ab9c0ecde5 Mon Sep 17 00:00:00 2001 From: Gabor Javorszky Date: Mon, 20 Mar 2023 20:22:19 +0000 Subject: [PATCH 5/6] Use the result of the benchmark --- e2core/server/server.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/e2core/server/server.go b/e2core/server/server.go index 9093996b..b5f3392e 100644 --- a/e2core/server/server.go +++ b/e2core/server/server.go @@ -72,10 +72,8 @@ func New(l zerolog.Logger, sync *syncer.Syncer, opts *options.Options) (*Server, if opts.AdminEnabled() { authAPI := auth.NewApiAuthClient(opts) - authCache, err := auth.NewBigCacheAuthorizer(authAPI, auth.DefaultBigCacheConfig) - if err != nil { - return nil, errors.Wrap(err, "auth.NewBigCacheAuthorizer") - } + // Go Cache authorizer always returns nil error. + authCache, _ := auth.NewGoCacheAuthorizer(authAPI, auth.DefaultCacheTTL, auth.DefaultCacheTTClean) e.POST("/name/:ident/:namespace/:name", server.executePluginByNameHandler(), auth.AuthorizationMiddleware(authCache)) } else { From eecaca19a14b8092d86768aedab6268acb2c672e Mon Sep 17 00:00:00 2001 From: Gabor Javorszky Date: Mon, 20 Mar 2023 20:45:40 +0000 Subject: [PATCH 6/6] Handle error and accidentally fix a flakey failure --- foundation/bus/bus/reqreply_test.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/foundation/bus/bus/reqreply_test.go b/foundation/bus/bus/reqreply_test.go index c426b07f..52df4e32 100644 --- a/foundation/bus/bus/reqreply_test.go +++ b/foundation/bus/bus/reqreply_test.go @@ -4,6 +4,8 @@ import ( "fmt" "testing" + "github.com/stretchr/testify/require" + "github.com/suborbital/e2core/foundation/bus/testutil" ) @@ -41,10 +43,11 @@ func TestRequestReplyAsync(t *testing.T) { counter := testutil.NewAsyncCounter(10) - p1.Send(NewMsg(MsgTypeDefault, []byte("joey"))).OnReply(func(msg Message) error { + err := p1.Send(NewMsg(MsgTypeDefault, []byte("joey"))).OnReply(func(msg Message) error { counter.Count() return nil }) + require.NoError(t, err, "p1.Send encountered an error") p2 := g.ConnectWithReplay() p2.On(func(msg Message) error {