Skip to content
This repository was archived by the owner on Apr 1, 2025. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 19 additions & 10 deletions app/conf/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

type EnvSubsciption struct {
QueueName string
LambdaArn string
Raw bool
}

Expand Down Expand Up @@ -96,17 +97,25 @@ func LoadYamlConfig(filename string, env string) []string {
newTopic.Subscriptions = make([]*sns.Subscription, 0, 0)

for _, subs := range topic.Subscriptions {
if _, ok := sqs.SyncQueues.Queues[subs.QueueName]; !ok {
//Queue does not exist yet, create it.
queueUrl := "http://" + envs[env].Host + ":" + ports[0] + "/queue/" + subs.QueueName
sqs.SyncQueues.Queues[subs.QueueName] = &sqs.Queue{Name: subs.QueueName, TimeoutSecs: 30, Arn: queueUrl, URL: queueUrl}
if subs.QueueName != "" {
if _, ok := sqs.SyncQueues.Queues[subs.QueueName]; !ok {
//Queue does not exist yet, create it.
queueUrl := "http://" + envs[env].Host + ":" + ports[0] + "/queue/" + subs.QueueName
sqs.SyncQueues.Queues[subs.QueueName] = &sqs.Queue{Name: subs.QueueName, TimeoutSecs: 30, Arn: queueUrl, URL: queueUrl}
}
qUrl := sqs.SyncQueues.Queues[subs.QueueName].URL
newSub := &sns.Subscription{EndPoint: qUrl, Protocol: "sqs", TopicArn: topicArn, Raw: subs.Raw}
subArn, _ := common.NewUUID()
subArn = topicArn + ":" + subArn
newSub.SubscriptionArn = subArn
newTopic.Subscriptions = append(newTopic.Subscriptions, newSub)
} else if subs.LambdaArn != "" {
newSub := &sns.Subscription{EndPoint: subs.LambdaArn, Protocol: "lambda", TopicArn: topicArn, Raw: subs.Raw}
subArn, _ := common.NewUUID()
subArn = topicArn + ":" + subArn
newSub.SubscriptionArn = subArn
newTopic.Subscriptions = append(newTopic.Subscriptions, newSub)
}
qUrl := sqs.SyncQueues.Queues[subs.QueueName].URL
newSub := &sns.Subscription{EndPoint: qUrl, Protocol: "sqs", TopicArn: topicArn, Raw: subs.Raw}
subArn, _ := common.NewUUID()
subArn = topicArn + ":" + subArn
newSub.SubscriptionArn = subArn
newTopic.Subscriptions = append(newTopic.Subscriptions, newSub)
}
sns.SyncTopics.Topics[topic.Name] = newTopic
}
Expand Down
39 changes: 39 additions & 0 deletions app/golambda/golambda.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package golambda

import (
"github.com/aws/aws-sdk-go/service/lambda"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
)

type (
Message struct {
Records []Record
}

Record struct {
EventVersion string
EventSubscriptionArn string
EventSource string
Sns interface{} `json:",omitempty"`
}
)


func NewLambda(endpoint string) (*lambda.Lambda, error) {
creds := credentials.NewStaticCredentials("id", "secret", "token")

awsConfig := aws.NewConfig().
WithRegion("faux-region-1").
WithEndpoint(endpoint).
WithCredentials(creds)

s, err := session.NewSession(awsConfig)

if err != nil {
return nil, err
}

return lambda.New(s), nil
}
51 changes: 51 additions & 0 deletions app/gosns/gosns.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ import (
"github.com/p4tin/goaws/app"
"github.com/p4tin/goaws/app/common"
sqs "github.com/p4tin/goaws/app/gosqs"
"github.com/p4tin/goaws/app/golambda"
"github.com/aws/aws-sdk-go/service/lambda"
"github.com/aws/aws-sdk-go/aws"
)

type SnsErrorType struct {
Expand Down Expand Up @@ -48,6 +51,7 @@ type (
const (
ProtocolSQS Protocol = "sqs"
ProtocolDefault Protocol = "default"
ProtocolLambda Protocol = "lambda"
)

const (
Expand Down Expand Up @@ -325,6 +329,53 @@ func Publish(w http.ResponseWriter, req *http.Request) {
} else {
common.LogMessage(fmt.Sprintf("%s: Queue %s does not exits, message discarded\n", time.Now().Format("2006-01-02 15:04:05"), queueName))
}
} else if Protocol(subs.Protocol) == ProtocolLambda {
m, err := CreateMessageBody(messageBody, subject, topicArn, subs.Protocol, messageStructure)

if err != nil {
createErrorResponse(w, req, err.Error())
return
}

msg := golambda.Message{
Records: []golambda.Record{
{
EventVersion: "1.0",
EventSubscriptionArn: subs.SubscriptionArn,
EventSource: "aws:sns",
Sns: json.RawMessage(m),
},
},
}

payload, err := json.Marshal(msg)

log.Printf("Payload generated for lambda: %s", payload)

input := &lambda.InvokeInput{
FunctionName: aws.String("MyFunction"),
InvocationType: aws.String("Event"),
Payload: payload,
}

// use arn as hostname
endpointParts := strings.Split(subs.EndPoint, ":")

svc, err := golambda.NewLambda("http://"+endpointParts[len(endpointParts)-1])

if err != nil {
log.Warnf("Error creating lambda service: %s", err.Error())
createErrorResponse(w, req, err.Error())
return
}

_, err = svc.Invoke(input)

if err != nil {
log.Warnf("Error invoking lambda: %s", err.Error())
createErrorResponse(w, req, err.Error())
return
}
}
}
} else {
Expand Down