CloudWatch
CloudWatch is a common issue for log ingestion. A CloudWatch record is not a single record, but instead a group of records:
{
"@type": "event",
"logEvents": [
{
"id": "38373430536198500175371505576435783808419451459190915072",
"message": "2 509304988160 eni-0f6428b026d2cdcf8 172.169.2.193 10.54.1.52 34451 3306 6 1 40 1720724137 1720724166 REJECT OK",
"timestamp": 1720724137000
},...
],
"logGroup": "fluency-test-lab-project-vpc/flowlog",
"logStream": "eni-0f6428b8888888888-all",
"messageType": "DATA_MESSAGE",
"owner": "59999999999",
"subscriptionFilters": [
"passToKinesis"
]
},
The logEvents
is an array, and be quite large. Most likely, this is not the type of record we want to ingress into our application.
Create a Receiver
A receiver is code attached to the data source to correct structural issues. In this case, we want to explode the record by taking this one record and making the many records that will be sent to the router.
To do this, we first go into the menu and choose Platform->Processors
and then select the Receviers
tab. Here you will find the Add Receviers
button and click it.
Give it a name, like CloudWatch and begin editing.
This is a common CloudWatch code pattern for a recevier:
// Data input format: ({ obj, size, source }) or ( doc )
function main({obj, size, source}) {
let logEvents = obj.logEvents
// Validate the field.
if !logEvents {
throw "no logEvents field"
}
// Explode the record by creating a look.
let list = []
for i, logEvent = range logEvents {
// copy the parent attributes into the child, for this is what we will send.
logEvent.logStream = obj.logStream
logEvent.logGroup = obj.logGroup
logEvent.owner = obj.owner
logEvent.subscriptionFilters = obj.subscriptionFilters
list = append(list, logEvent)
}
// Now we send the array into the pipe. The pipe will processes the elements of the
// array as individal records.
return list
}
Create a Process to Parse the record.
At this point we have individual records entering the router.
But the message component in this case is space formatted:
"message": "2 509304988160 eni-0f6428b026d2cdcf8 35.203.210.72 10.54.1.52 54423 12542 6 1 44 1720724137 1720724166 REJECT OK",
Refer to the parsing section. It covers the code pattern here.
function main({obj, size}) {
if (!obj["@timestamp"]) {
let t = new Time()
obj["@timestamp"] = t.UnixMilli()
}
obj["@type"] = "event"
obj["@parser"] = "CloudWatch"
let patterns = [
{
"name": "AWS CloudWatch Network Log",
"pattern": `^(?P<version>\d+) (?P<account_id>\d+) (?P<interface_id>eni-[a-zA-Z0-9]+) (?P<src_ip>\d+\.\d+\.\d+\.\d+) (?P<dest_ip>\d+\.\d+\.\d+\.\d+) (?P<src_port>\d+) (?P<dest_port>\d+) (?P<protocol>\d+) (?P<packets>\d+) (?P<bytes>\d+) (?P<start_time>\d+) (?P<end_time>\d+) (?P<action>REJECT|ACCEPT) (?P<log_status>OK|NODATA|SKIPDATA)$`
}
]
let result = checkPatterns(patterns, obj.message)
if (result) {
// No longer need the message field
delete(obj, "message")
obj["@fields"] = result
} else {
obj["@parser"] = "CloudWatch_Failed"
return "error"
}
return "pass"
}
function checkPatterns(patterns, message) {
for let i = 0; i < len(patterns); i++ {
let result = regexp(patterns[i].pattern, message)
if (result) {
result.parserName = patterns[i].name
return result
break
}
}
return undefined
}
The result of the processor is:
{
"obj": {
"@fields": {
"account_id": "509304988160",
"action": "REJECT",
"bytes": "40",
"dest_ip": "10.54.1.244",
"dest_port": "1454",
"end_time": "1720884244",
"interface_id": "eni-0739b0dca912ae857",
"log_status": "OK",
"packets": "1",
"parserName": "AWS CloudWatch Network Log",
"protocol": "6",
"src_ip": "92.63.197.210",
"src_port": "56404",
"start_time": "1720884220",
"version": "2"
},
"@parser": "CloudWatch",
"@timestamp": 1720884294701,
"@type": "event",
"id": "38377000506392116553116065564028538754339991284160856066",
"logGroup": "fluency-test-lab-project-vpc/flowlog",
"logStream": "eni-0739b0dca912ae857-all",
"owner": "509304988160",
"subscriptionFilters": [
"passToKinesis"
],
"timestamp": 1720884220000
},
"props": {},
"size": 369,
"source": ""
}
The message is parsed and the results are placed in the @fields
attribute. This separates the metadata that delivered the message from the message itself.
Also, the message
property was deleted in line twenty (20), to make the result cleaner.
Updated 20 days ago