From a33d8e58f40dd2ce0806f1d19b035ee9dccea125 Mon Sep 17 00:00:00 2001 From: Marius Grigaitis Date: Tue, 2 Jan 2018 19:06:07 +0200 Subject: [PATCH] Add initial lambda support --- app/conf/config.go | 29 +++++++++++++++-------- app/golambda/golambda.go | 39 ++++++++++++++++++++++++++++++ app/gosns/gosns.go | 51 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 109 insertions(+), 10 deletions(-) create mode 100644 app/golambda/golambda.go diff --git a/app/conf/config.go b/app/conf/config.go index 89201a2d9..523b425cc 100644 --- a/app/conf/config.go +++ b/app/conf/config.go @@ -14,6 +14,7 @@ import ( type EnvSubsciption struct { QueueName string + LambdaArn string Raw bool } @@ -96,17 +97,25 @@ func LoadYamlConfig(filename string, env string) []string { newTopic.Subscriptions = make([]*sns.Subscription, 0, 0) for _, subs := range topic.Subscriptions { - if _, ok := sqs.SyncQueues.Queues[subs.QueueName]; !ok { - //Queue does not exist yet, create it. - queueUrl := "http://" + envs[env].Host + ":" + ports[0] + "/queue/" + subs.QueueName - sqs.SyncQueues.Queues[subs.QueueName] = &sqs.Queue{Name: subs.QueueName, TimeoutSecs: 30, Arn: queueUrl, URL: queueUrl} + if subs.QueueName != "" { + if _, ok := sqs.SyncQueues.Queues[subs.QueueName]; !ok { + //Queue does not exist yet, create it. + queueUrl := "http://" + envs[env].Host + ":" + ports[0] + "/queue/" + subs.QueueName + sqs.SyncQueues.Queues[subs.QueueName] = &sqs.Queue{Name: subs.QueueName, TimeoutSecs: 30, Arn: queueUrl, URL: queueUrl} + } + qUrl := sqs.SyncQueues.Queues[subs.QueueName].URL + newSub := &sns.Subscription{EndPoint: qUrl, Protocol: "sqs", TopicArn: topicArn, Raw: subs.Raw} + subArn, _ := common.NewUUID() + subArn = topicArn + ":" + subArn + newSub.SubscriptionArn = subArn + newTopic.Subscriptions = append(newTopic.Subscriptions, newSub) + } else if subs.LambdaArn != "" { + newSub := &sns.Subscription{EndPoint: subs.LambdaArn, Protocol: "lambda", TopicArn: topicArn, Raw: subs.Raw} + subArn, _ := common.NewUUID() + subArn = topicArn + ":" + subArn + newSub.SubscriptionArn = subArn + newTopic.Subscriptions = append(newTopic.Subscriptions, newSub) } - qUrl := sqs.SyncQueues.Queues[subs.QueueName].URL - newSub := &sns.Subscription{EndPoint: qUrl, Protocol: "sqs", TopicArn: topicArn, Raw: subs.Raw} - subArn, _ := common.NewUUID() - subArn = topicArn + ":" + subArn - newSub.SubscriptionArn = subArn - newTopic.Subscriptions = append(newTopic.Subscriptions, newSub) } sns.SyncTopics.Topics[topic.Name] = newTopic } diff --git a/app/golambda/golambda.go b/app/golambda/golambda.go new file mode 100644 index 000000000..b5b0b60d0 --- /dev/null +++ b/app/golambda/golambda.go @@ -0,0 +1,39 @@ +package golambda + +import ( + "github.com/aws/aws-sdk-go/service/lambda" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" +) + +type ( + Message struct { + Records []Record + } + + Record struct { + EventVersion string + EventSubscriptionArn string + EventSource string + Sns interface{} `json:",omitempty"` + } +) + + +func NewLambda(endpoint string) (*lambda.Lambda, error) { + creds := credentials.NewStaticCredentials("id", "secret", "token") + + awsConfig := aws.NewConfig(). + WithRegion("faux-region-1"). + WithEndpoint(endpoint). + WithCredentials(creds) + + s, err := session.NewSession(awsConfig) + + if err != nil { + return nil, err + } + + return lambda.New(s), nil +} diff --git a/app/gosns/gosns.go b/app/gosns/gosns.go index 7c68f5d41..996231c6d 100644 --- a/app/gosns/gosns.go +++ b/app/gosns/gosns.go @@ -15,6 +15,9 @@ import ( "github.com/p4tin/goaws/app" "github.com/p4tin/goaws/app/common" sqs "github.com/p4tin/goaws/app/gosqs" + "github.com/p4tin/goaws/app/golambda" + "github.com/aws/aws-sdk-go/service/lambda" + "github.com/aws/aws-sdk-go/aws" ) type SnsErrorType struct { @@ -48,6 +51,7 @@ type ( const ( ProtocolSQS Protocol = "sqs" ProtocolDefault Protocol = "default" + ProtocolLambda Protocol = "lambda" ) const ( @@ -325,6 +329,53 @@ func Publish(w http.ResponseWriter, req *http.Request) { } else { common.LogMessage(fmt.Sprintf("%s: Queue %s does not exits, message discarded\n", time.Now().Format("2006-01-02 15:04:05"), queueName)) } + } else if Protocol(subs.Protocol) == ProtocolLambda { + m, err := CreateMessageBody(messageBody, subject, topicArn, subs.Protocol, messageStructure) + + if err != nil { + createErrorResponse(w, req, err.Error()) + return + } + + msg := golambda.Message{ + Records: []golambda.Record{ + { + EventVersion: "1.0", + EventSubscriptionArn: subs.SubscriptionArn, + EventSource: "aws:sns", + Sns: json.RawMessage(m), + }, + }, + } + + payload, err := json.Marshal(msg) + + log.Printf("Payload generated for lambda: %s", payload) + + input := &lambda.InvokeInput{ + FunctionName: aws.String("MyFunction"), + InvocationType: aws.String("Event"), + Payload: payload, + } + + // use arn as hostname + endpointParts := strings.Split(subs.EndPoint, ":") + + svc, err := golambda.NewLambda("http://"+endpointParts[len(endpointParts)-1]) + + if err != nil { + log.Warnf("Error creating lambda service: %s", err.Error()) + createErrorResponse(w, req, err.Error()) + return + } + + _, err = svc.Invoke(input) + + if err != nil { + log.Warnf("Error invoking lambda: %s", err.Error()) + createErrorResponse(w, req, err.Error()) + return + } } } } else {