System Functions

Calls made from an FPL process to Ingext

Platform REST API: call REST API

  • Platform_REST_Call(integration, request)
    call REST API via "RESTAPI" integration.
    integration: { url, path, method, timeout, skipVerify, headers, authentication}
    support HTTP Basic, Bearer authentication
    request: {method, path, headers, parameters, body, debug}
  let response = Platform_REST_Call("Terplab", {  
    path: "/api/ds/behavior_incident_update",  
    debug: true,  
    body: {  
      kargs: {  
        id: "username_foo",  
        status:"resolved"  
      }  
    }  
  })  
  printf("%v", response)

Platform Metric API: read/write metrics to Prometheus database (local or remote)

  • Platform_Metric_Counter(name, labels, increment)
    Write Counter metric to Prometheus database
    "undefined" or "null" label will be ignored
let customer = obj["@customer"]  
let labels = {  
   namespace:"fluency",  
   app:"import",  
   eventType:"Office365",  
   customer: customer  
}  
Platform_Metric_Counter("fluency_import_count", labels,1)  
Platform_Metric_Counter("fluency_import_bytes", labels,size)

  • Platform_Metric_QueryBuild(options)
    build a promQL query
    options: {metric, select, duration, stat, groupBy, aggregate, sort, limit}
    * for the select option, the select labels are:
    **
    =: Select labels that are exactly equal to the provided string.
    ** !=: Select labels that are not equal to the provided string.
    **
    =~: Select labels that regex-match the provided string.
    **** !~: Select labels that do not regex-match the provided string.
  • Platform_Metric_Query(query, time)
    ** return a fpl table
  • Platform_Metric_QueryRange(query, from, to, step)
    ** return a fpl stream
function main() {  
  // let query = `sum by(component) (increase(platform_component_bytes[5m]))`

  let query = Platform_Metric_QueryBuild({  
    metric: "platform_component_bytes",  
    duration: "1h",  
    stat: "increase",  
    aggregate:"sum",  
    groupBy: "component",  
    sort: "topk",  
    limit: 3  
  })  
  let table = Platform_Metric_Query(query, "@h")

  // let keys = \[]  
  let keys = table.Map((row) => {  
     return row.component  
  })

  let select = sprintf(`component=~"%s"`, keys.Join("|"))

  printf("%s",select)

  let query2 = Platform_Metric_QueryBuild({  
    metric: "platform_component_bytes",  
    select: select,  
    duration: "1h",  
    stat: "increase",  
    aggregate:"sum",  
    groupBy: "component"  
  })

  let stream = Platform_Metric_QueryRange(query2, "-24h@h", "@h", "1h")  
  //return {table}  
  //let query = `sum by(eventType) (increase(fluency_import_bytes[1h]))`  
  //let table = Platform_Metric_Query(query, "@h")  
  //let stream = Platform_Metric_QueryRange(query, "-48h@h", "@h", "1h")  
  return {table, stream}  
}

  • Platform_Metric_Sort({metric, select, groupBy, from, to, sort, limit})
    return top/bottom N rows
    metric: metric name (must be a counter type)
    select: metric label select
    groupBy: groupBy field(s), string or list of strings
    from/to: time range in relative or absolute time format
    sort: "topk" or "bottomk"
    ** limit: number of rows
  • Platform_Metric_Sort_Histogram({metric, select, groupBy, from, to, interval, sort, limit})
    return top/bottom N metrics
    metric: metric name (must be a counter type)
    select: metric label select
    groupBy: groupBy field(s), string or list of strings
    from/to: time range in relative or absolute time format
    sort: "topk" or "bottomk"
    limit: number of rows
    interval: histogram interval "1h", "1d", "1w", "1m"
function main({from="-24h@h", to="@h"}) {  
  let groupBy="importSource"  
  let options = {  
    metric: "fluency_import_bytes",  
    from: from,  
    to: to,  
    groupBy: groupBy,  
    sort: "topk",  
    limit: 10  
  }  
  // promQL: topk(10, sum by (importSource) (increase(fluency_import_bytes[24h])))  
  let table = Platform_Metric_Sort(options)

  options.interval= "1h"  
  // promQL: (sum by (importSource) (increase(fluency_import_bytes{importSource="foo" or importSource="bar"}[1h]))) [24h:1h]  
  let histogram = Platform_Metric_Sort_Histogram(options)

  return {table, histogram}  
}

  • Platform_Metric_Alert_Counter_Stop(options)
    alert if counter stop increasing for some time
    options: {metric, select, groupBy, window, refWindow, interval, recordWindow}
    metric: metric name (must be a counter type)
    select: metric label select
    groupBy: groupBy field(s), string or list of strings
    duration: detection thresold. default is "10m"
    lookback: lookback offset. default is "1h"
    interval: polling interval. default is "1m"
    history: alert record duration, default is "1h"
    if no alert found, return undefined.
    ** else return alerts.
  let options = {  
    metric: `platform_component_total`,  
    groupBy: "id",  
    duration: "10m",  
    lookback: "1h",  
    interval: "1m",  
    history: "1h"  
  }  
  let alerts = Platform_Metric_Alert_Counter_Stop(options)  
  if alerts {  
      alerts.Emit("Component_Stop", "component stopped for 10 minutes", "warn", 3600)  
  }

Platform API

  • sleep(delayInMillisecnod)
sleep(1000)  // sleep for one second
  • Platform_LoadComponent()
    ** return all components (datasource, datasink, router and pipe)
  // create a key value map for component id => name translation  
  let idMap = {}  
  let components = Platform_LoadComponent()  
  components.Each( (_, c) => {  
    idMap[c.id] = c.name  
  })

  • Platform_Site_GetInfo()
    ** return site information // {siteURL, account, multiTenant}

  • Platform_Site_GetTenants()
    ** return tenant list // [{name, displayName, description}]

  • Platform_Grok_Check(grokName)
    ** return true if grok handle exists

  • Platform_Grok_Register(grokName)
    ** register a grok handle

  • Platform_Grok_Parse(grokName, pattern, input)
    ** grok parse. return a map of hits

  • Platform_Grok_Add_Pattern(grokName, patternName, pattern)
    ** add a pattern to grok handle

if !Platform_Grok_Check("default") {
   Platform_Grok_Register("default")
}
let m = Platform_Grok_Parse("default", "%{COMMONAPACHELOG}", `127.0.0.1 - - [23/Apr/2014:22:58:32 +0200] "GET /index.php HTTP/1.1" 404 207`)

if m {
  printf("%v", m)
}

return "pass"
  • Platform_Cache_Check(cacheName)
    ** return true if cache exists
  • Platform_Cache_Register(cacheName, options)
    register a cache
    return true if success
    return false if cache is already registered
    options: {expire: 0}
    ** cache expire time in seconds, default is 0 (never expire)
  • Platform_Cache_DeRegister(cacheName)
    deregister a cache
    return true if success
    ** return false if cache is not found
  • Platform_Cache_Set(cacheName, key, value)
    ** Set a key value pair to cache
  • Platform_Cache_SetMultiple(cacheName, keys, values)
    ** Set multiple key value pairs to cache
  • Platform_Cache_Get(cacheName, key)
    ** Get a value from cache. return undefined is key not found
  • Platform_Cache_Delete(cacheName, key)
    ** Delete a key from the cache. (do nothing if key not found)
  • Platform_Cache_Replace(newName, currentName)
    Rename cache "$currentName" to "$newName".
    newName must be different from currentName
let exist = Platform_Cache_Check("cache1")  
if !exist {  
  Platform_Cache_Register("cache1", {expire: 3600})  
}  
Platform_Cache_Set("cache1", "foo", "bar")  
Platform_Cache_SetMultiple("cache1", ["k1", "k2"], ["v1", "v2"])

let  value = Platform_Cache_Get("cache1", "foo")  
printf("value: %s", value)

  • Platform_Channel(channel, eventEnvelop)
    send event to a channel
    event will be sent to all rules in this channel.
    ** runtime excpetions will be ignored

  • Platform_Sink(sink, eventEnvelop)
    ** send event to one data sink

// processor S3Passthrough  
// send event to S3 data sink without a direct connection from router pipe to the sink  
function main({obj, size}) {

   Platform_Sink("BehaviorEventBackup", {obj, size})  
   return "abort"  
} 

  • Platform_Notification_Email(options)
    send email notification
    options: {to, cc, bcc, subject, html, text}
    to: email address or list of email addresses
    cc: email address or list of email addresses
    bcc: email address or list of email addresses
    subject: email subject
    html: email body in html format
    text: email body in text format
   let template = `<p>Time: {{ .time }}</p><p>Alert: <b>{{.name}}</b> ({{ .description }})</p>`  
   let subjectTemplate = `Fluency Platform Alert: {{.name}} - {{ .action }}: {{.displayName}}`  
   let html = htmlTemplate(template, event)  
   let subject = template(subjectTemplate, event)

   let options = {  
      to: config.to,  
      cc: config.cc,  
      subject,  
      html  
    }  
    Platform_Notification_Email(options)

  • Platform_Notification_Slack(integrationName, options)
    send slack notification
    integrationName: slack integration name
    options: {channel, message}
    channel: slack channel name
    ** message: slack message
   let template = `  Alert: *{{.name}}*
     Description: *{{.description}}*
     Severity: *{{.severity}}*
     Action: *{{.action}}*
    Source: *{{.source}}*`  
   let message = template(template, event)  
   let integrationName = config.integrationName  
   let options = {  
      channel: "#fluency_grid",  
      message,  
   }  
   Platform_Notification_Slack(integrationName, options)

  • Platform_Notification_PagerDuty(integrationName, options)
    send PagerDuty notification
    integrationName: pagerduty integration name
    ** options: {event_action, dedup_key, payload:{summary, source, severity, component, group, class, eventTime}, details}
  • Platform_Notification_ServiceNow(integrationName, options)
    call ServiceNow API
    integrationName: serviceNow integration name
    options: {action, key, entry:{}}
    action: "add" | "update"
  • Platform_EntityinfoCheck(entity, key)
    ** check if one key exists in one entity table
let hit = Fluency_EntityinfoCheck("HOME_NET", "20.0.0.1")  
if hit {  
  printf("home net")  
} else {  
  printf("internet")  
}

  • Platform_Action(action, doc, config)
    ** call a pre-defined FPL action
let doc = {  
  time: "2024-01-01",  
  name: "alert1",  
  severity: "error",  
  action: "drop",  
  source: "fpl"  
}  
let config = {  
  to:"[[email protected]](mailto:[email protected])"  
}  
Platform_Action("PlatformAlertEmail", doc, config)

  • Platform_Action_Endpoint(endpoint, doc)
    ** call a pre-defined FPL action endpoint
let doc = {  
  time: "2024-01-01",  
  name: "alert1",  
  severity: "error",  
  action: "drop",  
  source: "fpl"  
}  
Platform_Action_Endpoint("FluencySupport", doc)

  • Platform_EntityinfoLookup(entity, keyCol, key, valueCol)
    check value from one column based on key column value
    return an object {exist, value}
 let categoryID = "%%12547"  
   let {exist, value} = fluencyEntityinfoLookup("AD_EventID_4719_CategoryId", "Id", categoryID, "Description")  
   if exist {  
      printf("value %s", value)  
   }

Platform Asset API: Asset management

  • Platform_Asset_Refresh(plugin, entries)
    plugin is the asset plugin name: "AD", "SentinelOne", "Qualys"
    entries is a list of asset objects: {name, fqdn, agentID, instanceID, model, platform, machineType, category, os, publicIP, privateIP, location, osVersion, vendor, uuid, serialNumber, region, vpc, account, sites, flags, tags}
    name is the required field
    fqdn is the fully qualified domain name (optional)
    cronjob to populate the asset table from Qualys plugin
    each refresh will trigger a rebuild of the asset table
function main() {  
    let table = loadQualysDevices()  
    let list = table.Map( (row) => row)  
    Platform_Asset_Refresh("Qualys", list)  
    return {table}  
}

function loadQualysDevices() {  
  let table =  Fluency_ResourceLoad("Qualys", "host", "\*", (obj, customer) => {  
    let fields= obj["@qualysHost"]  
    let {created, name, fqdn, model, manufacturer:vendor, os, type:machineType, address:privateIP} = fields  
    let timestamp = obj["@timestamp"]  
    return {  
      aggregate: {  
        groupBy: {fqdn},  
        columns: {  
          argmax: {  
            created,  
            name,  
            model,  
            privateIP,  
            os,  
            machineType,  
            vendor,  
            customer,  
            timestamp  
          }  
        }  
      }  
    }  
  })  
  return table  
}

  • Platform_Asset_Lookup(name)
    return an asset object, if the name match the asset name, fqdn, agentID or instanceID.
    call this function in FPL event parser to get asset information.
  • Platform_Asset_Register({name, fqdn, category, machineType, groups, flags...})
    ** register an asset
  // get deviceName from the parsed event  
  let asset = Platform_Asset_Lookup(deviceName)  
  if (!asset) {  
      // register a new asset. the provider will be set to "FPL"  
     assetEntry = platform_Asset_Register({  
       name: deviceName,  
       groups: ["FPL-detect: FortiGate NGFW"],  
       machineType: "FortiGate NGFW",  
       category: "Firewall"  
     })  
  }  
  printf("asset name %s", asset.name)

Platform EntityProvider API: UEBA entity lookup

  • the default entity info : {id, obj, entity}
    id is the entity key: EDR agent uuid, device name or username. Must be unique for each integration
    obj is the entity object from the vendor
    entity is the normalized fields for UEBA correlation: {agentID, username, asset, ADAsset, ADUser, privateIP, publicIP}
    typical user case is to run Platform_EntityProvider_Refresh as a hourly cronjob. Then run Platform_EntityProvider_Lookup in FPL parser or rule.
  • Platform_EntityProvider_Lookup(plugin, customer, key)
  let agentInfos = Platform_EntityProvider_Lookup("SentinelOne", "*", agentID)
  if len(agentInfos) > 0 {
     let agentInfo = agentInfos[0]
     newObj.agent = agentInfo.obj
     newObj.uuid = agentID
     envelop.obj["entity"] = agentInfo.entity
  } else {
     // printf("agentID lookup missing: %s", agentID)
  }
  • Platform_EntityProvider_Refresh(plugin, customer, entries)
function main(doc) {  
    Platform_PluginLambda("SentinelOne", "\*", (customer) => {  
       let agents = Plugin_SentinelOne_LoadAgent()  
       let agentInfos = agents.Map( (_, obj) => {  
           // printf("uuid %s", obj.uuid)  
           let entity = {  
              agentID: obj.uuid,  
              username: obj.externalId,  
              asset: obj.computerName,  
              ADAsset: obj.activeDirectory?.computerDistinguishedName,  
              ADUser: obj.activeDirectory?.lastUserDistinguishedName  
           }  
           if obj.machineType == "server" {  
              entity.privateIP = obj.lastIpToMgmt  
           }  
           return {  
             id: obj.uuid,  
             obj: obj,  
             entity: entity  
           }  
       })  
       Platform_EntityProvider_Refresh("SentinelOne", customer, agentInfos)  
       return {}  
    })  
    return {}  
}

Platform Import Device API

  • Fluency_DeviceSearch(query, from, to, ()=>{})
    ** Search Fluency Import Device database
let newDevices = Fluency_DeviceSearch("", "-7d@m", "@m", (obj) => {  
  let {name, group, device:{name:devName, category}, ips, createdOn} = obj  
  return {name, group, devName, category, ips, createdOn}  
})

  • Fluency_Device_Lookup(ipAddress)
    ** Lookup device information from Fluency Device database
  • Fluency_Device_LookupName(deviceName)
    ** Lookup device information by name from Fluency Device database
  • Fluency_Device_Add(device)
    ** Add device information to Fluency Device database
  • Fluency_Device_Update(ipAddress, newName)
    ** assign ipAddress to a new name
  • Fluency_Device_Delete(deviceName)
    ** delete device by name
function main({obj, size}) {

   let sender = obj["@sender"]  
   let deviceEntry = Fluency_Device_Lookup(sender)

   if deviceEntry {  
     printf("%s", deviceEntry)  
   } else {  
     printf("device not found")  
     deviceEntry = {  
       name:"$name",  
       description:"Added by FPL processor",  
       ips: [sender],  
       group:"$group",  
       device: {  
         name:"$subCategory",  
         category:"$category"  
       }  
     }  
     Fluency_Device_Add(deviceEntry)  
   }  
   // call platform metric api...

   return "pass"  
}

Parser API

  • geoip(ip_address)
    return an object with all the fields.
    return an empty object if the address is not internet IP address
let info = geoip("8.8.8.8")  
// OR  
let {city, country, isp} = geoip("8.8.8.8")

{  
  "city": "Mountain View",  
  "country": "United States",  
  "countryCode": "US",  
  "isp": "Google LLC",  
  "latitude": 37.4223,  
  "longitude": -122.085,  
  "org": "Level 3"  
}

  • decoder_CSV(csvText)
    ** decode CSV format
let text = "2023-09-25 14:53:35","field1", "field2"  
let fields = decoder_CSV(text)  
// ["2023-09-25 14:53:35", "field1", "field2"]

  • decoder_CEF(cefText)
    decode CEF format
    return a object of the following fields:
    SignatureID
    Name
    Severity
    Vendor
    Product
    Version
    ** Fields
let cef = `CEF:0|Imperva Inc|Attack Analytics|0|1|SQL Injection attack by several IPs using an unknown bot |MINOR|msg=On host "www.google.com" start=1646830802431 end=1646831309201 cs4=CloudWAF cs4Label=ImpervaAAPlatform`  
let m = decoder_cef(cef)  
///  
{  
  "Fields": {  
    "msg": "On host \"[www.google.com\\""](http://www.google.com"")  
    "ImpervaAAPlatform": "CloudWAF",  
    "start": "1646830802431"  
  },  
  "Name": "SQL Injection attack by several IPs using an unknown bot ",  
  "Product": "Attack Analytics",  
  "Severity": "MINOR",  
  "SignatureID": "1",  
  "Vendor": "Imperva Inc",  
  "Version": "0"  
}  
// CEF:2 format  
let cef2 = `CEF:2|SentinelOne|Mgmt|ip=127.0.0.1|eventID=5126|eventDesc=SentinelOne: Device Control connected USB|eventSeverity=1|...`

  • decoder_QuotedKeyValue(text)
    ** decode quoted key value format k1="v1" k2="v2" ...
  • decoder_MixedKeyValue(text)
    ** decode key value pair where some value are quoted k1=v1 k2="v2 v3"