Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
121 commits
Select commit Hold shift + click to select a range
5c9053b
added changes
the-tushar-meesho Dec 18, 2025
42b47fc
added changes
the-tushar-meesho Dec 18, 2025
23194aa
added changes
the-tushar-meesho Dec 18, 2025
861c2a7
addedc change
the-tushar-meesho Dec 18, 2025
11e6ccd
added
the-tushar-meesho Dec 18, 2025
0812c4e
added changes
the-tushar-meesho Dec 18, 2025
9d38a93
added changes
the-tushar-meesho Dec 18, 2025
19c9018
added changes
the-tushar-meesho Dec 18, 2025
49416e5
added large request grpc call
the-tushar-meesho Dec 19, 2025
2172d8f
remove formatting bug from rust
the-tushar-meesho Dec 19, 2025
ca30fc7
refactor unrefactor change form rust
the-tushar-meesho Dec 19, 2025
7038846
update feature schema
the-tushar-meesho Dec 19, 2025
b248827
remove worker_count from rust and reformat code
the-tushar-meesho Dec 19, 2025
55afafe
remove function name to main
the-tushar-meesho Dec 19, 2025
7a4e41f
edit some changes to make equal to go
the-tushar-meesho Dec 19, 2025
4a9ca01
added arc cloning changes
the-tushar-meesho Dec 19, 2025
273e040
made rust and go identical
the-tushar-meesho Dec 21, 2025
d150276
convert Vec<Arc<str>> to Vec<String>
the-tushar-meesho Dec 21, 2025
4860077
fixed keys_schema rust
the-tushar-meesho Dec 21, 2025
45071dd
change log
the-tushar-meesho Dec 21, 2025
8ac99b2
added changes
the-tushar-meesho Dec 21, 2025
4082556
added setting all the data at the time of calling
the-tushar-meesho Dec 23, 2025
5d9426f
reverted and make fg object in main
the-tushar-meesho Dec 23, 2025
6f15167
remove bug
the-tushar-meesho Dec 23, 2025
e3910ee
added rust profiling
the-tushar-meesho Dec 23, 2025
14618f6
rust profiling
the-tushar-meesho Dec 23, 2025
337f946
fix pprof changes
the-tushar-meesho Dec 23, 2025
9ec3c22
fix
the-tushar-meesho Dec 23, 2025
3f55c32
added
the-tushar-meesho Dec 23, 2025
3802a7b
added pprof fix
the-tushar-meesho Dec 23, 2025
55c20db
pprof fix
the-tushar-meesho Dec 23, 2025
3dbfd18
added changes
the-tushar-meesho Dec 23, 2025
584fa5a
added changes
the-tushar-meesho Dec 23, 2025
17e2e67
added
the-tushar-meesho Dec 23, 2025
c40ff0f
added changes
the-tushar-meesho Dec 23, 2025
539bf23
added
the-tushar-meesho Dec 23, 2025
24c40b9
added changes
the-tushar-meesho Dec 23, 2025
c1b570b
added pprof api
the-tushar-meesho Dec 23, 2025
04cc45b
added fix pprof
the-tushar-meesho Dec 23, 2025
88f8ec2
added pprof
the-tushar-meesho Dec 23, 2025
5cfe7e8
added pprof changes
the-tushar-meesho Dec 23, 2025
3aa8ac8
added pprof
the-tushar-meesho Dec 23, 2025
b75cde7
added pprof
the-tushar-meesho Dec 23, 2025
658419c
added pprof
the-tushar-meesho Dec 23, 2025
84f3b04
added pprof
the-tushar-meesho Dec 23, 2025
e207a9e
added pprof
the-tushar-meesho Dec 23, 2025
de15150
added retrieve features optimization
the-tushar-meesho Dec 23, 2025
bd6d8f4
added pprof
the-tushar-meesho Dec 23, 2025
4dd9c38
remove pprof for analysis
the-tushar-meesho Dec 23, 2025
ff338fc
change request
the-tushar-meesho Dec 23, 2025
bcf3272
added similar changes in go also
the-tushar-meesho Dec 24, 2025
8115ae7
remove thread ristriction
the-tushar-meesho Dec 24, 2025
c15f0ba
added client pooling
the-tushar-meesho Dec 26, 2025
8164fce
remove pooling and increase concurrency_limit
the-tushar-meesho Dec 26, 2025
9da6de7
increase _window_size and connection_window_size
the-tushar-meesho Dec 26, 2025
c4b9ede
increase concurrency_limit
the-tushar-meesho Dec 26, 2025
8ad3cc1
remove concurrency kimit and window size
the-tushar-meesho Dec 26, 2025
7f7486a
added hyper and remove axum
the-tushar-meesho Dec 28, 2025
9a2e8cc
remove worker thread
the-tushar-meesho Dec 28, 2025
8dbf784
zero copy
the-tushar-meesho Dec 28, 2025
8728155
refactor: replace Axum with raw Hyper and optimize zero-copy transfor…
the-tushar-meesho Dec 28, 2025
8740bef
remove pprof
the-tushar-meesho Dec 28, 2025
d3c3674
change main.rs body conversion
the-tushar-meesho Dec 29, 2025
c757d44
fix error
the-tushar-meesho Dec 29, 2025
4764eed
added _inner_data
the-tushar-meesho Dec 29, 2025
afcce92
added pprof changes
the-tushar-meesho Dec 29, 2025
e274c40
correct jemalloc_pprof package name
the-tushar-meesho Dec 29, 2025
7545c42
added pprof version update
the-tushar-meesho Dec 29, 2025
125d1d5
added pprof correction
the-tushar-meesho Dec 29, 2025
5cf9f65
added pprof correction
the-tushar-meesho Dec 29, 2025
c57290d
added pprof correction
the-tushar-meesho Dec 29, 2025
827b760
added jemallocator
the-tushar-meesho Dec 29, 2025
62e6e01
added pprof heap correction
the-tushar-meesho Dec 29, 2025
dfcd12d
added frame graph correction
the-tushar-meesho Dec 29, 2025
5228f32
added pprof
the-tushar-meesho Dec 29, 2025
b0a2989
optimized rust code
the-tushar-meesho Dec 29, 2025
ea6d90b
increased concurrency to 10000
the-tushar-meesho Dec 29, 2025
a5cf0b0
changed code remove pprof
the-tushar-meesho Dec 29, 2025
dc5e0f3
change for to 8080
the-tushar-meesho Dec 29, 2025
5ffc0dd
added zero copy code
the-tushar-meesho Dec 29, 2025
ba888e6
mutable fix
the-tushar-meesho Dec 29, 2025
20b2f24
use hyper over axum
the-tushar-meesho Dec 29, 2025
a587532
remove cpu utilizatiob
the-tushar-meesho Dec 29, 2025
429d059
added
the-tushar-meesho Dec 29, 2025
af45d1a
make correct
the-tushar-meesho Dec 29, 2025
fa5a827
added changes
the-tushar-meesho Dec 29, 2025
d305607
added performance fixes
the-tushar-meesho Dec 29, 2025
470f5b7
correct
the-tushar-meesho Dec 29, 2025
ecf6cdb
added bottleneck fix
the-tushar-meesho Dec 29, 2025
7c4677c
added some more to reduce cpu utilization
the-tushar-meesho Dec 30, 2025
5df15ee
added change
the-tushar-meesho Dec 30, 2025
24e7fda
increased go rps for http
the-tushar-meesho Dec 30, 2025
607a1e9
go connection pool changes
the-tushar-meesho Dec 30, 2025
cd255c1
increase keep alove
the-tushar-meesho Dec 30, 2025
9ed22a0
added channel changes
the-tushar-meesho Dec 30, 2025
fab1a8d
added best version of rust
the-tushar-meesho Dec 30, 2025
31c7418
added best version rust
the-tushar-meesho Dec 30, 2025
5c62f73
added go caller connection pull
the-tushar-meesho Dec 30, 2025
8591162
added go fix
the-tushar-meesho Dec 30, 2025
803c731
added go
the-tushar-meesho Dec 30, 2025
505fc48
added keep alive true
the-tushar-meesho Dec 30, 2025
da6bbc4
issue with port connection
the-tushar-meesho Dec 30, 2025
04e575e
increase timeout
the-tushar-meesho Dec 30, 2025
1c60c70
ADDED 3X
the-tushar-meesho Dec 30, 2025
3df6ed7
added stable go-caller
the-tushar-meesho Dec 30, 2025
02feea2
removed pooling
the-tushar-meesho Dec 31, 2025
92422f4
added go changes
the-tushar-meesho Dec 31, 2025
93e1b27
go test version 5
the-tushar-meesho Dec 31, 2025
eda81ce
hide response
the-tushar-meesho Dec 31, 2025
38b4d4b
added go and rust return response changes
the-tushar-meesho Dec 31, 2025
4090a19
remove profiling and added Server stopped gracefully
the-tushar-meesho Dec 31, 2025
99736c5
reduce time out to 5
the-tushar-meesho Dec 31, 2025
024e1d7
added connection pool rust
the-tushar-meesho Jan 1, 2026
0d17039
make clinet mutable
the-tushar-meesho Jan 1, 2026
c73e597
remove pooling and increase concurrency
the-tushar-meesho Jan 1, 2026
220eac6
remove concurrency
the-tushar-meesho Jan 1, 2026
51cf96d
added k6 latency test script
the-tushar-meesho Jan 2, 2026
a3e7aa3
added http3 changes
the-tushar-meesho Jan 5, 2026
cb071a4
fix tonic-h3 dependency
the-tushar-meesho Jan 5, 2026
36aa8fb
fix http3 issue
the-tushar-meesho Jan 5, 2026
1b7d16d
added go channel pool
the-tushar-meesho Jan 8, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
269 changes: 175 additions & 94 deletions go-caller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,137 +2,218 @@ package main

import (
"context"
"fmt"
"log"
"net/http"
"os"
"os/signal"
"sync"
"sync/atomic"
"syscall"
"time"

retrieve "github.com/Meesho/BharatMLStack/go-sdk/pkg/proto/onfs/retrieve"
"github.com/gin-gonic/gin"
retrieve "github.com/Meesho/BharatMLStack/go-sdk/pkg/proto/onfs/retrieve" // adjust path
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
)

// ApiResponse matches your Rust ApiResponse struct
type ApiResponse struct {
Success bool `json:"success"`
Data *string `json:"data,omitempty"`
Error *string `json:"error,omitempty"`
Message string `json:"message"`
// Request body structures for retrieve_features endpoint
type RetrieveFeaturesRequest struct {
EntityLabel string `json:"entity_label" binding:"required"`
FeatureGroups []FeatureGroupRequest `json:"feature_groups" binding:"required"`
KeysSchema []string `json:"keys_schema" binding:"required"`
Keys []KeysRequest `json:"keys" binding:"required"`
}
type FeatureGroupRequest struct {
Label string `json:"label" binding:"required"`
FeatureLabels []string `json:"feature_labels" binding:"required"`
}
type KeysRequest struct {
Cols []string `json:"cols" binding:"required"`
}

// AppState stores gRPC client
type AppState struct {
client retrieve.FeatureServiceClient
const (
// CONNECTION_POOL_SIZE defines the number of gRPC connections in the pool
// Each HTTP/2 connection supports ~100 concurrent streams
// With 16 connections: 16 * 100 = 1,600 concurrent streams capacity
CONNECTION_POOL_SIZE = 16
)

// ClientPool manages a pool of gRPC clients for load distribution
type ClientPool struct {
clients []retrieve.FeatureServiceClient
conns []*grpc.ClientConn
counter uint64 // Atomic counter for round-robin selection
}

// retrieveFeatures handles HTTP request
func (s *AppState) retrieveFeatures(c *gin.Context) {
authToken := "atishay"
callerID := "test-3"
// NewClientPool creates a new pool of gRPC connections and clients
func NewClientPool(address string) (*ClientPool, error) {
clients := make([]retrieve.FeatureServiceClient, 0, CONNECTION_POOL_SIZE)
conns := make([]*grpc.ClientConn, 0, CONNECTION_POOL_SIZE)

for i := 0; i < CONNECTION_POOL_SIZE; i++ {
conn, err := grpc.NewClient(
address,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 30 * time.Second,
Timeout: 10 * time.Second,
PermitWithoutStream: true,
}),
grpc.WithInitialWindowSize(2*1024*1024), // 2MB stream window
grpc.WithInitialConnWindowSize(4*1024*1024), // 4MB connection window
)
if err != nil {
// Cleanup already created connections on error
for _, c := range conns {
_ = c.Close()
}
return nil, err
}
conns = append(conns, conn)
clients = append(clients, retrieve.NewFeatureServiceClient(conn))
}

result, err := s.retrieveFeaturesInternal(authToken, callerID)
if err != nil {
log.Printf("❌ gRPC Error: %v", err)
errMsg := err.Error()
c.JSON(http.StatusInternalServerError, ApiResponse{
Success: false,
Data: nil,
Error: &errMsg,
Message: "Failed to retrieve features",
})
return
return &ClientPool{
clients: clients,
conns: conns,
}, nil
}

// Next returns the next client from the pool using round-robin distribution
func (p *ClientPool) Next() retrieve.FeatureServiceClient {
idx := atomic.AddUint64(&p.counter, 1) - 1
return p.clients[idx%uint64(len(p.clients))]
}

// Close closes all connections in the pool
func (p *ClientPool) Close() error {
var wg sync.WaitGroup
errChan := make(chan error, len(p.conns))

for i := range p.conns {
wg.Add(1)
go func(c *grpc.ClientConn) {
defer wg.Done()
if err := c.Close(); err != nil {
errChan <- err
}
}(p.conns[i])
}

wg.Wait()
close(errChan)

// Collect any errors
var errs []error
for err := range errChan {
errs = append(errs, err)
}

data := fmt.Sprintf("%v", result)
c.JSON(http.StatusOK, ApiResponse{
Success: true,
Data: &data,
Error: nil,
Message: "Features retrieved successfully",
})
if len(errs) > 0 {
return errs[0] // Return first error
}
return nil
}

// retrieveFeaturesInternal calls gRPC backend
func (s *AppState) retrieveFeaturesInternal(authToken, callerID string) (*retrieve.Result, error) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// AppState stores gRPC client pool and metadata
type AppState struct {
pool *ClientPool
metadata metadata.MD
}

// Attach metadata
md := metadata.New(map[string]string{
"online-feature-store-auth-token": authToken,
"online-feature-store-caller-id": callerID,
})
ctx = metadata.NewOutgoingContext(ctx, md)
func (s *AppState) handler(c *gin.Context) {
var requestBody RetrieveFeaturesRequest
if err := c.ShouldBindJSON(&requestBody); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}

// Build gRPC request
ctx, cancel := context.WithTimeout(c.Request.Context(), 5*time.Second)
defer cancel()
ctx = metadata.NewOutgoingContext(ctx, s.metadata)

// Convert request body to protobuf Query
featureGroups := make([]*retrieve.FeatureGroup, 0, len(requestBody.FeatureGroups))
for _, fg := range requestBody.FeatureGroups {
featureGroups = append(featureGroups, &retrieve.FeatureGroup{
Label: fg.Label,
FeatureLabels: fg.FeatureLabels,
})
}
keys := make([]*retrieve.Keys, 0, len(requestBody.Keys))
for _, k := range requestBody.Keys {
keys = append(keys, &retrieve.Keys{
Cols: k.Cols,
})
}
req := &retrieve.Query{
EntityLabel: "catalog",
FeatureGroups: []*retrieve.FeatureGroup{
{
Label: "derived_fp32",
FeatureLabels: []string{
"clicks_by_views_3_days",
},
},
},
KeysSchema: []string{"catalog_id"},
Keys: []*retrieve.Keys{
{Cols: []string{"176"}},
{Cols: []string{"179"}},
},
EntityLabel: requestBody.EntityLabel,
FeatureGroups: featureGroups,
KeysSchema: requestBody.KeysSchema,
Keys: keys,
}

log.Println("πŸ“‘ Retrieving features...")
resp, err := s.client.RetrieveFeatures(ctx, req)
client := s.pool.Next()
resp, err := client.RetrieveFeatures(ctx, req)
if err != nil {
return nil, err
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
return resp, nil
c.JSON(http.StatusOK, gin.H{"status": "success", "response": resp})
}

func main() {
log.Println("Connecting to feature store...")
log.Printf("Starting go-caller with connection pool (size: %d)", CONNECTION_POOL_SIZE)
gin.SetMode(gin.ReleaseMode)

// gRPC channel
conn, err := grpc.Dial(
"online-feature-store-api.int.meesho.int:80",
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
pool, err := NewClientPool("online-feature-store-api.int.meesho.int:80")
if err != nil {
log.Fatalf("Failed to connect gRPC: %v", err)
log.Fatalf("Failed to create gRPC connection pool: %v", err)
}
defer conn.Close()

client := retrieve.NewFeatureServiceClient(conn)
state := &AppState{client: client}

// Gin server
router := gin.Default()

// Allow CORS permissive (similar to Rust)
router.Use(func(c *gin.Context) {
c.Writer.Header().Set("Access-Control-Allow-Origin", "*")
c.Writer.Header().Set("Access-Control-Allow-Methods", "POST, OPTIONS")
c.Writer.Header().Set("Access-Control-Allow-Headers", "*")
if c.Request.Method == "OPTIONS" {
c.AbortWithStatus(http.StatusNoContent)
return
defer func() {
log.Println("Closing gRPC connection pool...")
if err := pool.Close(); err != nil {
log.Printf("Error closing gRPC connection pool: %v", err)
}
c.Next()
})
}()

state := &AppState{
pool: pool,
metadata: metadata.MD{
"online-feature-store-auth-token": []string{"atishay"},
"online-feature-store-caller-id": []string{"test-3"},
},
}

router.POST("/retrieve-features", state.retrieveFeatures)
r := gin.New()
r.POST("/retrieve-features", state.handler)

port := "8081"
if os.Getenv("PORT") != "" {
port = os.Getenv("PORT")
srv := &http.Server{
Addr: ":8081",
Handler: r,
}

log.Printf("πŸš€ Starting go-caller on http://0.0.0.0:%s\n", port)
if err := router.Run("0.0.0.0:" + port); err != nil {
log.Fatalf("Failed to start server: %v", err)
// Graceful shutdown
go func() {
log.Println("πŸš€ Go gRPC Client running on http://0.0.0.0:8081")
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatalf("Failed to start server: %v", err)
}
}()

// Wait for interrupt signal to gracefully shutdown the server
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
log.Println("Shutting down server...")

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := srv.Shutdown(ctx); err != nil {
log.Fatalf("Server forced to shutdown: %v", err)
}
log.Println("Server exited")
}
Loading