From 32336674d49557d4cbebe1346619bbe4fd17da96 Mon Sep 17 00:00:00 2001 From: Aurelijus Banelis Date: Fri, 5 Jan 2018 17:31:13 +0200 Subject: [PATCH] Happy path for SendMessageBatch No validation No tests No attributes --- README.md | 1 + app/gosqs/gosqs.go | 91 +++++++++++++++++++++++++++++++++++++++++--- app/router/router.go | 1 + app/sqs.go | 14 +++++++ 4 files changed, 101 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index da0b48e2a..4ddffb746 100644 --- a/README.md +++ b/README.md @@ -17,6 +17,7 @@ Here is a list of the APIs: - [x] GetQueueAttributes (Always returns all attributes - depth and arn are set correctly others are mocked) - [x] GetQueueUrl - [x] SendMessage + - [ ] SendMessageBatch - [x] ReceiveMessage - [x] DeleteMessage - [x] PurgeQueue diff --git a/app/gosqs/gosqs.go b/app/gosqs/gosqs.go index 1639659d7..4eb982fb2 100644 --- a/app/gosqs/gosqs.go +++ b/app/gosqs/gosqs.go @@ -35,6 +35,11 @@ type Message struct { ReceiptTime time.Time } +type BatchMessage struct { + Message + Id string +} + type Queue struct { Name string URL string @@ -145,6 +150,80 @@ func SendMessage(w http.ResponseWriter, req *http.Request) { } } +func SendMessageBatch(w http.ResponseWriter, req *http.Request) { + w.Header().Set("Content-Type", "application/xml") + messageBody := req.FormValue("MessageBody") + entries := extractSendBatchRequestEntries(req) + + queueUrl := getQueueFromPath(req.FormValue("QueueUrl"), req.URL.String()) + + queueName := "" + if queueUrl == "" { + vars := mux.Vars(req) + queueName = vars["queueName"] + } else { + uriSegments := strings.Split(queueUrl, "/") + queueName = uriSegments[len(uriSegments)-1] + } + + if _, ok := SyncQueues.Queues[queueName]; !ok { + // Queue does not exists + createErrorResponse(w, req, "QueueNotFound") + return + } + + log.Println("Putting Batch Messages in Queue:", queueName) + resultEntries := []app.SendMessageBatchResultEntry{} + for _, entry := range entries { + entry.MD5OfMessageBody = common.GetMD5Hash(messageBody) + entry.Uuid, _ = common.NewUUID() + resultEntries = append(resultEntries, app.SendMessageBatchResultEntry{ + Id: entry.Id, + MessageId: entry.Uuid, + MD5OfMessageBody: entry.MD5OfMessageBody, + }) + } + SyncQueues.Lock() + for _, entry := range entries { + msg := entry.Message + SyncQueues.Queues[queueName].Messages = append(SyncQueues.Queues[queueName].Messages, msg) + common.LogMessage(fmt.Sprintf("%s: Queue: %s, Batch Message: %s\n", time.Now().Format("2006-01-02 15:04:05"), queueName, msg.MessageBody)) + } + SyncQueues.Unlock() + + respStruct := app.SendMessageBatchResponse{ + Xmlns: "http://queue.amazonaws.com/doc/2012-11-05/", + Result: resultEntries, + Metadata: app.ResponseMetadata{RequestId: "00000000-0000-0000-0000-000000000000"}, + } + enc := xml.NewEncoder(w) + enc.Indent(" ", " ") + if err := enc.Encode(respStruct); err != nil { + log.Printf("error: %v\n", err) + } +} + +func extractSendBatchRequestEntries(req *http.Request) []BatchMessage { + entries := []BatchMessage{} + + for i := 1; true; i++ { + Id := req.FormValue(fmt.Sprintf("SendMessageBatchRequestEntry.%d.Id", i)) + if Id == "" { + break + } + + messageBody := req.FormValue(fmt.Sprintf("SendMessageBatchRequestEntry.%d.MessageBody", i)) + entries = append(entries, BatchMessage{ + Id: Id, + Message: Message{ + MessageBody: []byte(messageBody), + }, + }) + } + + return entries +} + func ReceiveMessage(w http.ResponseWriter, req *http.Request) { w.Header().Set("Content-Type", "application/xml") @@ -236,10 +315,10 @@ func numberOfHiddenMessagesInQueue(queue Queue) int { } type DeleteEntry struct { - Id string + Id string ReceiptHandle string - Error string - Deleted bool + Error string + Deleted bool } func DeleteMessageBatch(w http.ResponseWriter, req *http.Request) { @@ -306,9 +385,9 @@ func DeleteMessageBatch(w http.ResponseWriter, req *http.Request) { for _, deleteEntry := range deleteEntries { if deleteEntry.Deleted == false { notFoundEntries = append(notFoundEntries, app.BatchResultErrorEntry{ - Code: "1", - Id: deleteEntry.Id, - Message: "Message not found", + Code: "1", + Id: deleteEntry.Id, + Message: "Message not found", SenderFault: true}) } } diff --git a/app/router/router.go b/app/router/router.go index 882c9146d..cc81a900f 100644 --- a/app/router/router.go +++ b/app/router/router.go @@ -28,6 +28,7 @@ var routingTable = map[string]http.HandlerFunc{ "GetQueueAttributes": sqs.GetQueueAttributes, "SetQueueAttributes": sqs.SetQueueAttributes, "SendMessage": sqs.SendMessage, + "SendMessageBatch": sqs.SendMessageBatch, "ReceiveMessage": sqs.ReceiveMessage, "DeleteMessage": sqs.DeleteMessage, "DeleteMessageBatch": sqs.DeleteMessageBatch, diff --git a/app/sqs.go b/app/sqs.go index 889777527..3e4ccc18f 100644 --- a/app/sqs.go +++ b/app/sqs.go @@ -36,6 +36,20 @@ type SendMessageResponse struct { Metadata ResponseMetadata `xml:"ResponseMetadata"` } +/*** Send Message Batch Response */ + +type SendMessageBatchResultEntry struct { + Id string `xml:"Id"` + MessageId string `xml:"MessageId"` + MD5OfMessageBody string `xml:"MD5OfMessageBody"` +} + +type SendMessageBatchResponse struct { + Xmlns string `xml:"xmlns,attr"` + Result []SendMessageBatchResultEntry `xml:"SendMessageBatchResult"` + Metadata ResponseMetadata `xml:"ResponseMetadata"` +} + /*** Receive Message Response */ type ResultMessage struct {