System Functions
Calls made from an FPL process to Ingext
Platform REST API: call REST API
- Platform_REST_Call(integration, request)
call REST API via "RESTAPI" integration.
integration: { url, path, method, timeout, skipVerify, headers, authentication}
support HTTP Basic, Bearer authentication
request: {method, path, headers, parameters, body, debug}
let response = Platform_REST_Call("Terplab", {
path: "/api/ds/behavior_incident_update",
debug: true,
body: {
kargs: {
id: "username_foo",
status:"resolved"
}
}
})
printf("%v", response)
Platform Metric API: read/write metrics to Prometheus database (local or remote)
- Platform_Metric_Counter(name, labels, increment)
Write Counter metric to Prometheus database
"undefined" or "null" label will be ignored
let customer = obj["@customer"]
let labels = {
namespace:"fluency",
app:"import",
eventType:"Office365",
customer: customer
}
Platform_Metric_Counter("fluency_import_count", labels,1)
Platform_Metric_Counter("fluency_import_bytes", labels,size)
- Platform_Metric_QueryBuild(options)
build a promQL query
options: {metric, select, duration, stat, groupBy, aggregate, sort, limit}
* for the select option, the select labels are:
** =: Select labels that are exactly equal to the provided string.
** !=: Select labels that are not equal to the provided string.
** =~: Select labels that regex-match the provided string.
**** !~: Select labels that do not regex-match the provided string. - Platform_Metric_Query(query, time)
** return a fpl table - Platform_Metric_QueryRange(query, from, to, step)
** return a fpl stream
function main() {
// let query = `sum by(component) (increase(platform_component_bytes[5m]))`
let query = Platform_Metric_QueryBuild({
metric: "platform_component_bytes",
duration: "1h",
stat: "increase",
aggregate:"sum",
groupBy: "component",
sort: "topk",
limit: 3
})
let table = Platform_Metric_Query(query, "@h")
// let keys = \[]
let keys = table.Map((row) => {
return row.component
})
let select = sprintf(`component=~"%s"`, keys.Join("|"))
printf("%s",select)
let query2 = Platform_Metric_QueryBuild({
metric: "platform_component_bytes",
select: select,
duration: "1h",
stat: "increase",
aggregate:"sum",
groupBy: "component"
})
let stream = Platform_Metric_QueryRange(query2, "-24h@h", "@h", "1h")
//return {table}
//let query = `sum by(eventType) (increase(fluency_import_bytes[1h]))`
//let table = Platform_Metric_Query(query, "@h")
//let stream = Platform_Metric_QueryRange(query, "-48h@h", "@h", "1h")
return {table, stream}
}
- Platform_Metric_Sort({metric, select, groupBy, from, to, sort, limit})
return top/bottom N rows
metric: metric name (must be a counter type)
select: metric label select
groupBy: groupBy field(s), string or list of strings
from/to: time range in relative or absolute time format
sort: "topk" or "bottomk"
** limit: number of rows - Platform_Metric_Sort_Histogram({metric, select, groupBy, from, to, interval, sort, limit})
return top/bottom N metrics
metric: metric name (must be a counter type)
select: metric label select
groupBy: groupBy field(s), string or list of strings
from/to: time range in relative or absolute time format
sort: "topk" or "bottomk"
limit: number of rows
interval: histogram interval "1h", "1d", "1w", "1m"
function main({from="-24h@h", to="@h"}) {
let groupBy="importSource"
let options = {
metric: "fluency_import_bytes",
from: from,
to: to,
groupBy: groupBy,
sort: "topk",
limit: 10
}
// promQL: topk(10, sum by (importSource) (increase(fluency_import_bytes[24h])))
let table = Platform_Metric_Sort(options)
options.interval= "1h"
// promQL: (sum by (importSource) (increase(fluency_import_bytes{importSource="foo" or importSource="bar"}[1h]))) [24h:1h]
let histogram = Platform_Metric_Sort_Histogram(options)
return {table, histogram}
}
- Platform_Metric_Alert_Counter_Stop(options)
alert if counter stop increasing for some time
options: {metric, select, groupBy, window, refWindow, interval, recordWindow}
metric: metric name (must be a counter type)
select: metric label select
groupBy: groupBy field(s), string or list of strings
duration: detection thresold. default is "10m"
lookback: lookback offset. default is "1h"
interval: polling interval. default is "1m"
history: alert record duration, default is "1h"
if no alert found, return undefined.
** else return alerts.
let options = {
metric: `platform_component_total`,
groupBy: "id",
duration: "10m",
lookback: "1h",
interval: "1m",
history: "1h"
}
let alerts = Platform_Metric_Alert_Counter_Stop(options)
if alerts {
alerts.Emit("Component_Stop", "component stopped for 10 minutes", "warn", 3600)
}
Platform API
- sleep(delayInMillisecnod)
sleep(1000) // sleep for one second
- Platform_LoadComponent()
** return all components (datasource, datasink, router and pipe)
// create a key value map for component id => name translation
let idMap = {}
let components = Platform_LoadComponent()
components.Each( (_, c) => {
idMap[c.id] = c.name
})
-
Platform_Site_GetInfo()
** return site information // {siteURL, account, multiTenant} -
Platform_Site_GetTenants()
** return tenant list //[{name, displayName, description}]
-
Platform_Grok_Check(grokName)
** return true if grok handle exists -
Platform_Grok_Register(grokName)
** register a grok handle -
Platform_Grok_Parse(grokName, pattern, input)
** grok parse. return a map of hits -
Platform_Grok_Add_Pattern(grokName, patternName, pattern)
** add a pattern to grok handle
if !Platform_Grok_Check("default") {
Platform_Grok_Register("default")
}
let m = Platform_Grok_Parse("default", "%{COMMONAPACHELOG}", `127.0.0.1 - - [23/Apr/2014:22:58:32 +0200] "GET /index.php HTTP/1.1" 404 207`)
if m {
printf("%v", m)
}
return "pass"
- Platform_Cache_Check(cacheName)
** return true if cache exists - Platform_Cache_Register(cacheName, options)
register a cache
return true if success
return false if cache is already registered
options: {expire: 0}
** cache expire time in seconds, default is 0 (never expire) - Platform_Cache_DeRegister(cacheName)
deregister a cache
return true if success
** return false if cache is not found - Platform_Cache_Set(cacheName, key, value)
** Set a key value pair to cache - Platform_Cache_SetMultiple(cacheName, keys, values)
** Set multiple key value pairs to cache - Platform_Cache_Get(cacheName, key)
** Get a value from cache. return undefined is key not found - Platform_Cache_Delete(cacheName, key)
** Delete a key from the cache. (do nothing if key not found) - Platform_Cache_Replace(newName, currentName)
Rename cache "$currentName" to "$newName".
newName must be different from currentName
let exist = Platform_Cache_Check("cache1")
if !exist {
Platform_Cache_Register("cache1", {expire: 3600})
}
Platform_Cache_Set("cache1", "foo", "bar")
Platform_Cache_SetMultiple("cache1", ["k1", "k2"], ["v1", "v2"])
let value = Platform_Cache_Get("cache1", "foo")
printf("value: %s", value)
-
Platform_Channel(channel, eventEnvelop)
send event to a channel
event will be sent to all rules in this channel.
** runtime excpetions will be ignored -
Platform_Sink(sink, eventEnvelop)
** send event to one data sink
// processor S3Passthrough
// send event to S3 data sink without a direct connection from router pipe to the sink
function main({obj, size}) {
Platform_Sink("BehaviorEventBackup", {obj, size})
return "abort"
}
- Platform_Notification_Email(options)
send email notification
options: {to, cc, bcc, subject, html, text}
to: email address or list of email addresses
cc: email address or list of email addresses
bcc: email address or list of email addresses
subject: email subject
html: email body in html format
text: email body in text format
let template = `<p>Time: {{ .time }}</p><p>Alert: <b>{{.name}}</b> ({{ .description }})</p>`
let subjectTemplate = `Fluency Platform Alert: {{.name}} - {{ .action }}: {{.displayName}}`
let html = htmlTemplate(template, event)
let subject = template(subjectTemplate, event)
let options = {
to: config.to,
cc: config.cc,
subject,
html
}
Platform_Notification_Email(options)
- Platform_Notification_Slack(integrationName, options)
send slack notification
integrationName: slack integration name
options: {channel, message}
channel: slack channel name
** message: slack message
let template = ` Alert: *{{.name}}*
Description: *{{.description}}*
Severity: *{{.severity}}*
Action: *{{.action}}*
Source: *{{.source}}*`
let message = template(template, event)
let integrationName = config.integrationName
let options = {
channel: "#fluency_grid",
message,
}
Platform_Notification_Slack(integrationName, options)
- Platform_Notification_PagerDuty(integrationName, options)
send PagerDuty notification
integrationName: pagerduty integration name
** options: {event_action, dedup_key, payload:{summary, source, severity, component, group, class, eventTime}, details} - Platform_Notification_ServiceNow(integrationName, options)
call ServiceNow API
integrationName: serviceNow integration name
options: {action, key, entry:{}}
action: "add" | "update" - Platform_EntityinfoCheck(entity, key)
** check if one key exists in one entity table
let hit = Fluency_EntityinfoCheck("HOME_NET", "20.0.0.1")
if hit {
printf("home net")
} else {
printf("internet")
}
- Platform_Action(action, doc, config)
** call a pre-defined FPL action
let doc = {
time: "2024-01-01",
name: "alert1",
severity: "error",
action: "drop",
source: "fpl"
}
let config = {
to:"[[email protected]](mailto:[email protected])"
}
Platform_Action("PlatformAlertEmail", doc, config)
- Platform_Action_Endpoint(endpoint, doc)
** call a pre-defined FPL action endpoint
let doc = {
time: "2024-01-01",
name: "alert1",
severity: "error",
action: "drop",
source: "fpl"
}
Platform_Action_Endpoint("FluencySupport", doc)
- Platform_EntityinfoLookup(entity, keyCol, key, valueCol)
check value from one column based on key column value
return an object {exist, value}
let categoryID = "%%12547"
let {exist, value} = fluencyEntityinfoLookup("AD_EventID_4719_CategoryId", "Id", categoryID, "Description")
if exist {
printf("value %s", value)
}
Platform Asset API: Asset management
- Platform_Asset_Refresh(plugin, entries)
plugin is the asset plugin name: "AD", "SentinelOne", "Qualys"
entries is a list of asset objects: {name, fqdn, agentID, instanceID, model, platform, machineType, category, os, publicIP, privateIP, location, osVersion, vendor, uuid, serialNumber, region, vpc, account, sites, flags, tags}
name is the required field
fqdn is the fully qualified domain name (optional)
cronjob to populate the asset table from Qualys plugin
each refresh will trigger a rebuild of the asset table
function main() {
let table = loadQualysDevices()
let list = table.Map( (row) => row)
Platform_Asset_Refresh("Qualys", list)
return {table}
}
function loadQualysDevices() {
let table = Fluency_ResourceLoad("Qualys", "host", "\*", (obj, customer) => {
let fields= obj["@qualysHost"]
let {created, name, fqdn, model, manufacturer:vendor, os, type:machineType, address:privateIP} = fields
let timestamp = obj["@timestamp"]
return {
aggregate: {
groupBy: {fqdn},
columns: {
argmax: {
created,
name,
model,
privateIP,
os,
machineType,
vendor,
customer,
timestamp
}
}
}
}
})
return table
}
- Platform_Asset_Lookup(name)
return an asset object, if the name match the asset name, fqdn, agentID or instanceID.
call this function in FPL event parser to get asset information. - Platform_Asset_Register({name, fqdn, category, machineType, groups, flags...})
** register an asset
// get deviceName from the parsed event
let asset = Platform_Asset_Lookup(deviceName)
if (!asset) {
// register a new asset. the provider will be set to "FPL"
assetEntry = platform_Asset_Register({
name: deviceName,
groups: ["FPL-detect: FortiGate NGFW"],
machineType: "FortiGate NGFW",
category: "Firewall"
})
}
printf("asset name %s", asset.name)
Platform EntityProvider API: UEBA entity lookup
- the default entity info : {id, obj, entity}
id is the entity key: EDR agent uuid, device name or username. Must be unique for each integration
obj is the entity object from the vendor
entity is the normalized fields for UEBA correlation: {agentID, username, asset, ADAsset, ADUser, privateIP, publicIP}
typical user case is to run Platform_EntityProvider_Refresh as a hourly cronjob. Then run Platform_EntityProvider_Lookup in FPL parser or rule. - Platform_EntityProvider_Lookup(plugin, customer, key)
let agentInfos = Platform_EntityProvider_Lookup("SentinelOne", "*", agentID)
if len(agentInfos) > 0 {
let agentInfo = agentInfos[0]
newObj.agent = agentInfo.obj
newObj.uuid = agentID
envelop.obj["entity"] = agentInfo.entity
} else {
// printf("agentID lookup missing: %s", agentID)
}
- Platform_EntityProvider_Refresh(plugin, customer, entries)
function main(doc) {
Platform_PluginLambda("SentinelOne", "\*", (customer) => {
let agents = Plugin_SentinelOne_LoadAgent()
let agentInfos = agents.Map( (_, obj) => {
// printf("uuid %s", obj.uuid)
let entity = {
agentID: obj.uuid,
username: obj.externalId,
asset: obj.computerName,
ADAsset: obj.activeDirectory?.computerDistinguishedName,
ADUser: obj.activeDirectory?.lastUserDistinguishedName
}
if obj.machineType == "server" {
entity.privateIP = obj.lastIpToMgmt
}
return {
id: obj.uuid,
obj: obj,
entity: entity
}
})
Platform_EntityProvider_Refresh("SentinelOne", customer, agentInfos)
return {}
})
return {}
}
Platform Import Device API
- Fluency_DeviceSearch(query, from, to, ()=>{})
** Search Fluency Import Device database
let newDevices = Fluency_DeviceSearch("", "-7d@m", "@m", (obj) => {
let {name, group, device:{name:devName, category}, ips, createdOn} = obj
return {name, group, devName, category, ips, createdOn}
})
- Fluency_Device_Lookup(ipAddress)
** Lookup device information from Fluency Device database - Fluency_Device_LookupName(deviceName)
** Lookup device information by name from Fluency Device database - Fluency_Device_Add(device)
** Add device information to Fluency Device database - Fluency_Device_Update(ipAddress, newName)
** assign ipAddress to a new name - Fluency_Device_Delete(deviceName)
** delete device by name
function main({obj, size}) {
let sender = obj["@sender"]
let deviceEntry = Fluency_Device_Lookup(sender)
if deviceEntry {
printf("%s", deviceEntry)
} else {
printf("device not found")
deviceEntry = {
name:"$name",
description:"Added by FPL processor",
ips: [sender],
group:"$group",
device: {
name:"$subCategory",
category:"$category"
}
}
Fluency_Device_Add(deviceEntry)
}
// call platform metric api...
return "pass"
}
Parser API
- geoip(ip_address)
return an object with all the fields.
return an empty object if the address is not internet IP address
let info = geoip("8.8.8.8")
// OR
let {city, country, isp} = geoip("8.8.8.8")
{
"city": "Mountain View",
"country": "United States",
"countryCode": "US",
"isp": "Google LLC",
"latitude": 37.4223,
"longitude": -122.085,
"org": "Level 3"
}
- decoder_CSV(csvText)
** decode CSV format
let text = "2023-09-25 14:53:35","field1", "field2"
let fields = decoder_CSV(text)
// ["2023-09-25 14:53:35", "field1", "field2"]
- decoder_CEF(cefText)
decode CEF format
return a object of the following fields:
SignatureID
Name
Severity
Vendor
Product
Version
** Fields
let cef = `CEF:0|Imperva Inc|Attack Analytics|0|1|SQL Injection attack by several IPs using an unknown bot |MINOR|msg=On host "www.google.com" start=1646830802431 end=1646831309201 cs4=CloudWAF cs4Label=ImpervaAAPlatform`
let m = decoder_cef(cef)
///
{
"Fields": {
"msg": "On host \"[www.google.com\\""](http://www.google.com"")
"ImpervaAAPlatform": "CloudWAF",
"start": "1646830802431"
},
"Name": "SQL Injection attack by several IPs using an unknown bot ",
"Product": "Attack Analytics",
"Severity": "MINOR",
"SignatureID": "1",
"Vendor": "Imperva Inc",
"Version": "0"
}
// CEF:2 format
let cef2 = `CEF:2|SentinelOne|Mgmt|ip=127.0.0.1|eventID=5126|eventDesc=SentinelOne: Device Control connected USB|eventSeverity=1|...`
- decoder_QuotedKeyValue(text)
** decode quoted key value format k1="v1" k2="v2" ... - decoder_MixedKeyValue(text)
** decode key value pair where some value are quoted k1=v1 k2="v2 v3"
Updated about 2 months ago