CloudWatch

CloudWatch is a common issue for log ingestion. A CloudWatch record is not a single record, but instead a group of records:

 {
    "@type": "event",
    "logEvents": [
      {
        "id": "38373430536198500175371505576435783808419451459190915072",
        "message": "2 509304988160 eni-0f6428b026d2cdcf8 172.169.2.193 10.54.1.52 34451 3306 6 1 40 1720724137 1720724166 REJECT OK",
        "timestamp": 1720724137000
      },...
    ],
    "logGroup": "fluency-test-lab-project-vpc/flowlog",
    "logStream": "eni-0f6428b8888888888-all",
    "messageType": "DATA_MESSAGE",
    "owner": "59999999999",
    "subscriptionFilters": [
      "passToKinesis"
    ]
  },

The logEvents is an array, and be quite large. Most likely, this is not the type of record we want to ingress into our application.

Create a Receiver

A receiver is code attached to the data source to correct structural issues. In this case, we want to explode the record by taking this one record and making the many records that will be sent to the router.

To do this, we first go into the menu and choose Platform->Processors and then select the Receviers tab. Here you will find the Add Receviersbutton and click it.

Give it a name, like CloudWatch and begin editing.

This is a common CloudWatch code pattern for a recevier:

// Data input format: ({ obj, size, source }) or ( doc )
function main({obj, size, source}) {
    let logEvents = obj.logEvents
    
    // Validate the field.
    if !logEvents {
       throw "no logEvents field"
    }
  
    // Explode the record by creating a look.
    let list = []
    for i, logEvent = range logEvents {
      // copy the parent attributes into the child, for this is what we will send.
      logEvent.logStream = obj.logStream
      logEvent.logGroup = obj.logGroup
      logEvent.owner = obj.owner
      logEvent.subscriptionFilters = obj.subscriptionFilters      
      list = append(list, logEvent)
    }
    // Now we send the array into the pipe.  The pipe will processes the elements of the 
    // array as individal records.
    return list
}

Create a Process to Parse the record.

At this point we have individual records entering the router.

But the message component in this case is space formatted:
"message": "2 509304988160 eni-0f6428b026d2cdcf8 35.203.210.72 10.54.1.52 54423 12542 6 1 44 1720724137 1720724166 REJECT OK",

Refer to the parsing section. It covers the code pattern here.

function main({obj, size}) {

    if (!obj["@timestamp"]) {
        let t = new Time()
        obj["@timestamp"] = t.UnixMilli()
    }
    obj["@type"] = "event"
    obj["@parser"] = "CloudWatch"
    
    let patterns = [
    {
      "name": "AWS CloudWatch Network Log",
      "pattern": `^(?P<version>\d+) (?P<account_id>\d+) (?P<interface_id>eni-[a-zA-Z0-9]+) (?P<src_ip>\d+\.\d+\.\d+\.\d+) (?P<dest_ip>\d+\.\d+\.\d+\.\d+) (?P<src_port>\d+) (?P<dest_port>\d+) (?P<protocol>\d+) (?P<packets>\d+) (?P<bytes>\d+) (?P<start_time>\d+) (?P<end_time>\d+) (?P<action>REJECT|ACCEPT) (?P<log_status>OK|NODATA|SKIPDATA)$`
}
    ]
    
    let result = checkPatterns(patterns, obj.message)
    if (result) {
        // No longer need the message field
        delete(obj, "message")
        obj["@fields"] = result
    } else {
        obj["@parser"] = "CloudWatch_Failed"
        return "error"
    }
    
    return "pass"
}

function checkPatterns(patterns, message) {
  for let i = 0; i < len(patterns); i++ {
    let result = regexp(patterns[i].pattern, message)
     if (result) {
       result.parserName = patterns[i].name
       return result
       break
     }
  }
  
  return undefined
}

The result of the processor is:

{
  "obj": {
    "@fields": {
      "account_id": "509304988160",
      "action": "REJECT",
      "bytes": "40",
      "dest_ip": "10.54.1.244",
      "dest_port": "1454",
      "end_time": "1720884244",
      "interface_id": "eni-0739b0dca912ae857",
      "log_status": "OK",
      "packets": "1",
      "parserName": "AWS CloudWatch Network Log",
      "protocol": "6",
      "src_ip": "92.63.197.210",
      "src_port": "56404",
      "start_time": "1720884220",
      "version": "2"
    },
    "@parser": "CloudWatch",
    "@timestamp": 1720884294701,
    "@type": "event",
    "id": "38377000506392116553116065564028538754339991284160856066",
    "logGroup": "fluency-test-lab-project-vpc/flowlog",
    "logStream": "eni-0739b0dca912ae857-all",
    "owner": "509304988160",
    "subscriptionFilters": [
      "passToKinesis"
    ],
    "timestamp": 1720884220000
  },
  "props": {},
  "size": 369,
  "source": ""
}

The message is parsed and the results are placed in the @fields attribute. This separates the metadata that delivered the message from the message itself.

Also, the message property was deleted in line twenty (20), to make the result cleaner.